1from sqlalchemy.testing import eq_
2from sqlalchemy.util import pickle
3import sqlalchemy as sa
4from sqlalchemy import testing
5from sqlalchemy.testing.util import picklers
6from sqlalchemy.testing import assert_raises_message
7from sqlalchemy import Integer, String, ForeignKey, exc, MetaData
8from sqlalchemy.testing.schema import Table, Column
9from sqlalchemy.orm import mapper, relationship, create_session, \
10                            sessionmaker, attributes, interfaces,\
11                            clear_mappers, exc as orm_exc,\
12                            configure_mappers, Session, lazyload_all,\
13                            lazyload, aliased
14from sqlalchemy.orm import state as sa_state
15from sqlalchemy.orm import instrumentation
16from sqlalchemy.orm.collections import attribute_mapped_collection, \
17    column_mapped_collection
18from sqlalchemy.testing import fixtures
19from test.orm import _fixtures
20from sqlalchemy.testing.pickleable import User, Address, Dingaling, Order, \
21    Child1, Child2, Parent, Screen, EmailUser
22
23
24class PickleTest(fixtures.MappedTest):
25
26    @classmethod
27    def define_tables(cls, metadata):
28        Table('users', metadata,
29              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
30              Column('name', String(30), nullable=False),
31              test_needs_acid=True,
32              test_needs_fk=True
33        )
34
35        Table('addresses', metadata,
36              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
37              Column('user_id', None, ForeignKey('users.id')),
38              Column('email_address', String(50), nullable=False),
39              test_needs_acid=True,
40              test_needs_fk=True
41        )
42        Table('orders', metadata,
43                  Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
44                  Column('user_id', None, ForeignKey('users.id')),
45                  Column('address_id', None, ForeignKey('addresses.id')),
46                  Column('description', String(30)),
47                  Column('isopen', Integer),
48                  test_needs_acid=True,
49                  test_needs_fk=True
50        )
51        Table("dingalings", metadata,
52              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
53              Column('address_id', None, ForeignKey('addresses.id')),
54              Column('data', String(30)),
55              test_needs_acid=True,
56              test_needs_fk=True
57        )
58
59
60    def test_transient(self):
61        users, addresses = (self.tables.users,
62                                self.tables.addresses)
63
64        mapper(User, users, properties={
65            'addresses':relationship(Address, backref="user")
66        })
67        mapper(Address, addresses)
68
69        sess = create_session()
70        u1 = User(name='ed')
71        u1.addresses.append(Address(email_address='ed@bar.com'))
72
73        u2 = pickle.loads(pickle.dumps(u1))
74        sess.add(u2)
75        sess.flush()
76
77        sess.expunge_all()
78
79        eq_(u1, sess.query(User).get(u2.id))
80
81    def test_no_mappers(self):
82        users = self.tables.users
83
84
85        umapper = mapper(User, users)
86        u1 = User(name='ed')
87        u1_pickled = pickle.dumps(u1, -1)
88
89        clear_mappers()
90
91        assert_raises_message(
92            orm_exc.UnmappedInstanceError,
93            "Cannot deserialize object of type <class 'sqlalchemy.testing.pickleable.User'> - no mapper()",
94            pickle.loads, u1_pickled)
95
96    def test_no_instrumentation(self):
97        users = self.tables.users
98
99
100        umapper = mapper(User, users)
101        u1 = User(name='ed')
102        u1_pickled = pickle.dumps(u1, -1)
103
104        clear_mappers()
105
106        umapper = mapper(User, users)
107
108        u1 = pickle.loads(u1_pickled)
109        # this fails unless the InstanceState
110        # compiles the mapper
111        eq_(str(u1), "User(name='ed')")
112
113
114    def test_class_deferred_cols(self):
115        addresses, users = (self.tables.addresses,
116                                self.tables.users)
117
118        mapper(User, users, properties={
119            'name': sa.orm.deferred(users.c.name),
120            'addresses': relationship(Address, backref="user")
121        })
122        mapper(Address, addresses, properties={
123            'email_address': sa.orm.deferred(addresses.c.email_address)
124        })
125        sess = create_session()
126        u1 = User(name='ed')
127        u1.addresses.append(Address(email_address='ed@bar.com'))
128        sess.add(u1)
129        sess.flush()
130        sess.expunge_all()
131        u1 = sess.query(User).get(u1.id)
132        assert 'name' not in u1.__dict__
133        assert 'addresses' not in u1.__dict__
134
135        u2 = pickle.loads(pickle.dumps(u1))
136        sess2 = create_session()
137        sess2.add(u2)
138        eq_(u2.name, 'ed')
139        eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
140
141        u2 = pickle.loads(pickle.dumps(u1))
142        sess2 = create_session()
143        u2 = sess2.merge(u2, load=False)
144        eq_(u2.name, 'ed')
145        eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
146
147    def test_instance_lazy_relation_loaders(self):
148        users, addresses = (self.tables.users,
149                                self.tables.addresses)
150
151        mapper(User, users, properties={
152            'addresses': relationship(Address, lazy='noload')
153        })
154        mapper(Address, addresses)
155
156        sess = Session()
157        u1 = User(name='ed', addresses=[
158                        Address(
159                            email_address='ed@bar.com',
160                        )
161                ])
162
163        sess.add(u1)
164        sess.commit()
165        sess.close()
166
167        u1 = sess.query(User).options(
168                                lazyload(User.addresses)
169                            ).first()
170        u2 = pickle.loads(pickle.dumps(u1))
171
172        sess = Session()
173        sess.add(u2)
174        assert u2.addresses
175
176    @testing.requires.non_broken_pickle
177    def test_instance_deferred_cols(self):
178        users, addresses = (self.tables.users,
179                                self.tables.addresses)
180
181        mapper(User, users, properties={
182            'addresses':relationship(Address, backref="user")
183        })
184        mapper(Address, addresses)
185
186        sess = create_session()
187        u1 = User(name='ed')
188        u1.addresses.append(Address(email_address='ed@bar.com'))
189        sess.add(u1)
190        sess.flush()
191        sess.expunge_all()
192
193        u1 = sess.query(User).\
194                options(sa.orm.defer('name'),
195                        sa.orm.defer('addresses.email_address')).\
196                        get(u1.id)
197        assert 'name' not in u1.__dict__
198        assert 'addresses' not in u1.__dict__
199
200        u2 = pickle.loads(pickle.dumps(u1))
201        sess2 = create_session()
202        sess2.add(u2)
203        eq_(u2.name, 'ed')
204        assert 'addresses' not in u2.__dict__
205        ad = u2.addresses[0]
206        assert 'email_address' not in ad.__dict__
207        eq_(ad.email_address, 'ed@bar.com')
208        eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
209
210        u2 = pickle.loads(pickle.dumps(u1))
211        sess2 = create_session()
212        u2 = sess2.merge(u2, load=False)
213        eq_(u2.name, 'ed')
214        assert 'addresses' not in u2.__dict__
215        ad = u2.addresses[0]
216
217        # mapper options now transmit over merge(),
218        # new as of 0.6, so email_address is deferred.
219        assert 'email_address' not in ad.__dict__
220
221        eq_(ad.email_address, 'ed@bar.com')
222        eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
223
224    def test_pickle_protocols(self):
225        users, addresses = (self.tables.users,
226                                self.tables.addresses)
227
228        mapper(User, users, properties={
229            'addresses':relationship(Address, backref="user")
230        })
231        mapper(Address, addresses)
232
233        sess = sessionmaker()()
234        u1 = User(name='ed')
235        u1.addresses.append(Address(email_address='ed@bar.com'))
236        sess.add(u1)
237        sess.commit()
238
239        u1 = sess.query(User).first()
240        u1.addresses
241
242        for loads, dumps in picklers():
243            u2 = loads(dumps(u1))
244            eq_(u1, u2)
245
246    def test_09_pickle(self):
247        users = self.tables.users
248        mapper(User, users)
249        sess = Session()
250        sess.add(User(id=1, name='ed'))
251        sess.commit()
252        sess.close()
253
254        inst = User(id=1, name='ed')
255        del inst._sa_instance_state
256
257        state = sa_state.InstanceState.__new__(sa_state.InstanceState)
258        state_09 = {
259            'class_': User,
260            'modified': False,
261            'committed_state': {},
262            'instance': inst,
263            'callables': {'name': state, 'id': state},
264            'key': (User, (1,)),
265            'expired': True}
266        manager = instrumentation._SerializeManager.__new__(
267            instrumentation._SerializeManager)
268        manager.class_ = User
269        state_09['manager'] = manager
270        state.__setstate__(state_09)
271
272        sess = Session()
273        sess.add(inst)
274        eq_(inst.name, 'ed')
275
276    @testing.requires.non_broken_pickle
277    def test_options_with_descriptors(self):
278        users, addresses, dingalings = (self.tables.users,
279                                self.tables.addresses,
280                                self.tables.dingalings)
281
282        mapper(User, users, properties={
283            'addresses':relationship(Address, backref="user")
284        })
285        mapper(Address, addresses, properties={
286            'dingaling':relationship(Dingaling)
287        })
288        mapper(Dingaling, dingalings)
289        sess = create_session()
290        u1 = User(name='ed')
291        u1.addresses.append(Address(email_address='ed@bar.com'))
292        sess.add(u1)
293        sess.flush()
294        sess.expunge_all()
295
296        for opt in [
297            sa.orm.joinedload(User.addresses),
298            sa.orm.joinedload("addresses"),
299            sa.orm.defer("name"),
300            sa.orm.defer(User.name),
301            sa.orm.joinedload("addresses", Address.dingaling),
302        ]:
303            opt2 = pickle.loads(pickle.dumps(opt))
304            eq_(opt.path, opt2.path)
305
306        u1 = sess.query(User).options(opt).first()
307        u2 = pickle.loads(pickle.dumps(u1))
308
309    def test_collection_setstate(self):
310        """test a particular cycle that requires CollectionAdapter
311        to not rely upon InstanceState to deserialize."""
312
313        m = MetaData()
314        c1 = Table('c1', m,
315            Column('parent_id', String,
316                        ForeignKey('p.id'), primary_key=True)
317        )
318        c2 = Table('c2', m,
319            Column('parent_id', String,
320                        ForeignKey('p.id'), primary_key=True)
321        )
322        p = Table('p', m,
323            Column('id', String, primary_key=True)
324        )
325
326        mapper(Parent, p, properties={
327            'children1':relationship(Child1),
328            'children2':relationship(Child2)
329        })
330        mapper(Child1, c1)
331        mapper(Child2, c2)
332
333        obj = Parent()
334        screen1 = Screen(obj)
335        screen1.errors = [obj.children1, obj.children2]
336        screen2 = Screen(Child2(), screen1)
337        pickle.loads(pickle.dumps(screen2))
338
339    def test_exceptions(self):
340        class Foo(object):
341            pass
342        users = self.tables.users
343        mapper(User, users)
344
345        for sa_exc in (
346            orm_exc.UnmappedInstanceError(Foo()),
347            orm_exc.UnmappedClassError(Foo),
348            orm_exc.ObjectDeletedError(attributes.instance_state(User())),
349        ):
350            for loads, dumps in picklers():
351                repickled = loads(dumps(sa_exc))
352                eq_(repickled.args[0], sa_exc.args[0])
353
354    def test_attribute_mapped_collection(self):
355        users, addresses = self.tables.users, self.tables.addresses
356
357        mapper(User, users, properties={
358            'addresses':relationship(
359                            Address,
360                            collection_class=
361                            attribute_mapped_collection('email_address')
362                        )
363        })
364        mapper(Address, addresses)
365        u1 = User()
366        u1.addresses = {"email1":Address(email_address="email1")}
367        for loads, dumps in picklers():
368            repickled = loads(dumps(u1))
369            eq_(u1.addresses, repickled.addresses)
370            eq_(repickled.addresses['email1'],
371                    Address(email_address="email1"))
372
373    def test_column_mapped_collection(self):
374        users, addresses = self.tables.users, self.tables.addresses
375
376        mapper(User, users, properties={
377            'addresses':relationship(
378                            Address,
379                            collection_class=
380                            column_mapped_collection(
381                                addresses.c.email_address)
382                        )
383        })
384        mapper(Address, addresses)
385        u1 = User()
386        u1.addresses = {
387            "email1":Address(email_address="email1"),
388            "email2":Address(email_address="email2")
389        }
390        for loads, dumps in picklers():
391            repickled = loads(dumps(u1))
392            eq_(u1.addresses, repickled.addresses)
393            eq_(repickled.addresses['email1'],
394                    Address(email_address="email1"))
395
396    def test_composite_column_mapped_collection(self):
397        users, addresses = self.tables.users, self.tables.addresses
398
399        mapper(User, users, properties={
400            'addresses':relationship(
401                            Address,
402                            collection_class=
403                            column_mapped_collection([
404                                addresses.c.id,
405                                addresses.c.email_address])
406                        )
407        })
408        mapper(Address, addresses)
409        u1 = User()
410        u1.addresses = {
411            (1, "email1"):Address(id=1, email_address="email1"),
412            (2, "email2"):Address(id=2, email_address="email2")
413        }
414        for loads, dumps in picklers():
415            repickled = loads(dumps(u1))
416            eq_(u1.addresses, repickled.addresses)
417            eq_(repickled.addresses[(1, 'email1')],
418                    Address(id=1, email_address="email1"))
419
420class PolymorphicDeferredTest(fixtures.MappedTest):
421    @classmethod
422    def define_tables(cls, metadata):
423        Table('users', metadata,
424            Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
425            Column('name', String(30)),
426            Column('type', String(30)))
427        Table('email_users', metadata,
428            Column('id', Integer, ForeignKey('users.id'), primary_key=True),
429            Column('email_address', String(30)))
430
431
432    def test_polymorphic_deferred(self):
433        email_users, users = (self.tables.email_users,
434                                self.tables.users,
435                                )
436
437        mapper(User, users, polymorphic_identity='user', polymorphic_on=users.c.type)
438        mapper(EmailUser, email_users, inherits=User, polymorphic_identity='emailuser')
439
440        eu = EmailUser(name="user1", email_address='foo@bar.com')
441        sess = create_session()
442        sess.add(eu)
443        sess.flush()
444        sess.expunge_all()
445
446        eu = sess.query(User).first()
447        eu2 = pickle.loads(pickle.dumps(eu))
448        sess2 = create_session()
449        sess2.add(eu2)
450        assert 'email_address' not in eu2.__dict__
451        eq_(eu2.email_address, 'foo@bar.com')
452
453class TupleLabelTest(_fixtures.FixtureTest):
454    @classmethod
455    def setup_classes(cls):
456        pass
457
458    @classmethod
459    def setup_mappers(cls):
460        users, addresses, orders = cls.tables.users, cls.tables.addresses, cls.tables.orders
461        mapper(User, users, properties={
462            'addresses':relationship(Address, backref='user', order_by=addresses.c.id),
463            'orders':relationship(Order, backref='user', order_by=orders.c.id), # o2m, m2o
464        })
465        mapper(Address, addresses)
466        mapper(Order, orders, properties={
467            'address':relationship(Address),  # m2o
468        })
469
470    def test_tuple_labeling(self):
471        users = self.tables.users
472        sess = create_session()
473
474        # test pickle + all the protocols !
475        for pickled in False, -1, 0, 1, 2:
476            for row in sess.query(User, Address).join(User.addresses).all():
477                if pickled is not False:
478                    row = pickle.loads(pickle.dumps(row, pickled))
479
480                eq_(list(row.keys()), ['User', 'Address'])
481                eq_(row.User, row[0])
482                eq_(row.Address, row[1])
483
484            for row in sess.query(User.name, User.id.label('foobar')):
485                if pickled is not False:
486                    row = pickle.loads(pickle.dumps(row, pickled))
487                eq_(list(row.keys()), ['name', 'foobar'])
488                eq_(row.name, row[0])
489                eq_(row.foobar, row[1])
490
491            for row in sess.query(User).values(User.name, User.id.label('foobar')):
492                if pickled is not False:
493                    row = pickle.loads(pickle.dumps(row, pickled))
494                eq_(list(row.keys()), ['name', 'foobar'])
495                eq_(row.name, row[0])
496                eq_(row.foobar, row[1])
497
498            oalias = aliased(Order)
499            for row in sess.query(User, oalias).join(User.orders).all():
500                if pickled is not False:
501                    row = pickle.loads(pickle.dumps(row, pickled))
502                eq_(list(row.keys()), ['User'])
503                eq_(row.User, row[0])
504
505            oalias = aliased(Order, name='orders')
506            for row in sess.query(User, oalias).join(oalias, User.orders).all():
507                if pickled is not False:
508                    row = pickle.loads(pickle.dumps(row, pickled))
509                eq_(list(row.keys()), ['User', 'orders'])
510                eq_(row.User, row[0])
511                eq_(row.orders, row[1])
512
513            # test here that first col is not labeled, only
514            # one name in keys, matches correctly
515            for row in sess.query(User.name + 'hoho', User.name):
516                eq_(list(row.keys()), ['name'])
517                eq_(row[0], row.name + 'hoho')
518
519            if pickled is not False:
520                ret = sess.query(User, Address).join(User.addresses).all()
521                pickle.loads(pickle.dumps(ret, pickled))
522
523class CustomSetupTeardownTest(fixtures.MappedTest):
524    @classmethod
525    def define_tables(cls, metadata):
526        Table('users', metadata,
527              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
528              Column('name', String(30), nullable=False),
529              test_needs_acid=True,
530              test_needs_fk=True
531        )
532
533        Table('addresses', metadata,
534              Column('id', Integer, primary_key=True, test_needs_autoincrement=True),
535              Column('user_id', None, ForeignKey('users.id')),
536              Column('email_address', String(50), nullable=False),
537              test_needs_acid=True,
538              test_needs_fk=True
539        )
540    def test_rebuild_state(self):
541        """not much of a 'test', but illustrate how to
542        remove instance-level state before pickling.
543
544        """
545
546        users = self.tables.users
547
548        mapper(User, users)
549
550        u1 = User()
551        attributes.manager_of_class(User).teardown_instance(u1)
552        assert not u1.__dict__
553        u2 = pickle.loads(pickle.dumps(u1))
554        attributes.manager_of_class(User).setup_instance(u2)
555        assert attributes.instance_state(u2)
556
557