1# -*- coding: utf-8 -*-
2# Part of Odoo. See LICENSE file for full copyright and licensing details.
3
4from collections import defaultdict
5
6import psycopg2
7
8from odoo.exceptions import AccessError, MissingError
9from odoo.tests.common import TransactionCase
10from odoo.tools import mute_logger
11
12
13class TestORM(TransactionCase):
14    """ test special behaviors of ORM CRUD functions """
15
16    @mute_logger('odoo.models')
17    def test_access_deleted_records(self):
18        """ Verify that accessing deleted records works as expected """
19        p1 = self.env['res.partner'].create({'name': 'W'})
20        p2 = self.env['res.partner'].create({'name': 'Y'})
21        p1.unlink()
22
23        # read() is expected to skip deleted records because our API is not
24        # transactional for a sequence of search()->read() performed from the
25        # client-side... a concurrent deletion could therefore cause spurious
26        # exceptions even when simply opening a list view!
27        # /!\ Using unprileged user to detect former side effects of ir.rules!
28        user = self.env['res.users'].create({
29            'name': 'test user',
30            'login': 'test2',
31            'groups_id': [(6, 0, [self.ref('base.group_user')])],
32        })
33        ps = (p1 + p2).with_user(user)
34        self.assertEqual([{'id': p2.id, 'name': 'Y'}], ps.read(['name']), "read() should skip deleted records")
35        self.assertEqual([], ps[0].read(['name']), "read() should skip deleted records")
36
37        # Deleting an already deleted record should be simply ignored
38        self.assertTrue(p1.unlink(), "Re-deleting should be a no-op")
39
40    @mute_logger('odoo.models')
41    def test_access_partial_deletion(self):
42        """ Check accessing a record from a recordset where another record has been deleted. """
43        Model = self.env['res.country']
44        self.assertTrue(type(Model).display_name.automatic, "test assumption not satisfied")
45
46        # access regular field when another record from the same prefetch set has been deleted
47        records = Model.create([{'name': name} for name in ('Foo', 'Bar', 'Baz')])
48        for record in records:
49            record.name
50            record.unlink()
51
52        # access computed field when another record from the same prefetch set has been deleted
53        records = Model.create([{'name': name} for name in ('Foo', 'Bar', 'Baz')])
54        for record in records:
55            record.display_name
56            record.unlink()
57
58    @mute_logger('odoo.models', 'odoo.addons.base.models.ir_rule')
59    def test_access_filtered_records(self):
60        """ Verify that accessing filtered records works as expected for non-admin user """
61        p1 = self.env['res.partner'].create({'name': 'W'})
62        p2 = self.env['res.partner'].create({'name': 'Y'})
63        user = self.env['res.users'].create({
64            'name': 'test user',
65            'login': 'test2',
66            'groups_id': [(6, 0, [self.ref('base.group_user')])],
67        })
68
69        partner_model = self.env['ir.model'].search([('model','=','res.partner')])
70        self.env['ir.rule'].create({
71            'name': 'Y is invisible',
72            'domain_force': [('id', '!=', p1.id)],
73            'model_id': partner_model.id,
74        })
75
76        # search as unprivileged user
77        partners = self.env['res.partner'].with_user(user).search([])
78        self.assertNotIn(p1, partners, "W should not be visible...")
79        self.assertIn(p2, partners, "... but Y should be visible")
80
81        # read as unprivileged user
82        with self.assertRaises(AccessError):
83            p1.with_user(user).read(['name'])
84        # write as unprivileged user
85        with self.assertRaises(AccessError):
86            p1.with_user(user).write({'name': 'foo'})
87        # unlink as unprivileged user
88        with self.assertRaises(AccessError):
89            p1.with_user(user).unlink()
90
91        # Prepare mixed case
92        p2.unlink()
93        # read mixed records: some deleted and some filtered
94        with self.assertRaises(AccessError):
95            (p1 + p2).with_user(user).read(['name'])
96        # delete mixed records: some deleted and some filtered
97        with self.assertRaises(AccessError):
98            (p1 + p2).with_user(user).unlink()
99
100    def test_read(self):
101        partner = self.env['res.partner'].create({'name': 'MyPartner1'})
102        result = partner.read()
103        self.assertIsInstance(result, list)
104
105    @mute_logger('odoo.models')
106    def test_search_read(self):
107        partner = self.env['res.partner']
108
109        # simple search_read
110        partner.create({'name': 'MyPartner1'})
111        found = partner.search_read([('name', '=', 'MyPartner1')], ['name'])
112        self.assertEqual(len(found), 1)
113        self.assertEqual(found[0]['name'], 'MyPartner1')
114        self.assertIn('id', found[0])
115
116        # search_read correct order
117        partner.create({'name': 'MyPartner2'})
118        found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name")
119        self.assertEqual(len(found), 2)
120        self.assertEqual(found[0]['name'], 'MyPartner1')
121        self.assertEqual(found[1]['name'], 'MyPartner2')
122        found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name desc")
123        self.assertEqual(len(found), 2)
124        self.assertEqual(found[0]['name'], 'MyPartner2')
125        self.assertEqual(found[1]['name'], 'MyPartner1')
126
127        # search_read that finds nothing
128        found = partner.search_read([('name', '=', 'Does not exists')], ['name'])
129        self.assertEqual(len(found), 0)
130
131        # search_read with an empty array of fields
132        found = partner.search_read([], [], limit=1)
133        self.assertEqual(len(found), 1)
134        self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])
135
136        # search_read without fields
137        found = partner.search_read([], False, limit=1)
138        self.assertEqual(len(found), 1)
139        self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email'])
140
141    @mute_logger('odoo.sql_db')
142    def test_exists(self):
143        partner = self.env['res.partner']
144
145        # check that records obtained from search exist
146        recs = partner.search([])
147        self.assertTrue(recs)
148        self.assertEqual(recs.exists(), recs)
149
150        # check that new records exist by convention
151        recs = partner.new({})
152        self.assertTrue(recs.exists())
153
154        # check that there is no record with id 0
155        recs = partner.browse([0])
156        self.assertFalse(recs.exists())
157
158        # check that there is no record with string id
159        recs = partner.browse('xxx')
160        with self.assertRaises(psycopg2.DataError):
161            recs.exists()
162
163    def test_groupby_date(self):
164        partners_data = dict(
165            A='2012-11-19',
166            B='2012-12-17',
167            C='2012-12-31',
168            D='2013-01-07',
169            E='2013-01-14',
170            F='2013-01-28',
171            G='2013-02-11',
172        )
173
174        partner_ids = []
175        partner_ids_by_day = defaultdict(list)
176        partner_ids_by_month = defaultdict(list)
177        partner_ids_by_year = defaultdict(list)
178
179        partners = self.env['res.partner']
180        for name, date in partners_data.items():
181            p = partners.create(dict(name=name, date=date))
182            partner_ids.append(p.id)
183            partner_ids_by_day[date].append(p.id)
184            partner_ids_by_month[date.rsplit('-', 1)[0]].append(p.id)
185            partner_ids_by_year[date.split('-', 1)[0]].append(p.id)
186
187        def read_group(interval):
188            domain = [('id', 'in', partner_ids)]
189            result = {}
190            for grp in partners.read_group(domain, ['date'], ['date:' + interval]):
191                result[grp['date:' + interval]] = partners.search(grp['__domain'])
192            return result
193
194        self.assertEqual(len(read_group('day')), len(partner_ids_by_day))
195        self.assertEqual(len(read_group('month')), len(partner_ids_by_month))
196        self.assertEqual(len(read_group('year')), len(partner_ids_by_year))
197
198        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
199                                  ['date:month', 'date:day'], lazy=False)
200        self.assertEqual(len(res), len(partner_ids))
201
202        # combine groupby and orderby
203        months = ['February 2013', 'January 2013', 'December 2012', 'November 2012']
204        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
205                                  groupby=['date:month'], orderby='date:month DESC')
206        self.assertEqual([item['date:month'] for item in res], months)
207
208        # order by date should reorder by date:month
209        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
210                                  groupby=['date:month'], orderby='date DESC')
211        self.assertEqual([item['date:month'] for item in res], months)
212
213        # order by date should reorder by date:day
214        days = ['11 Feb 2013', '28 Jan 2013', '14 Jan 2013', '07 Jan 2013',
215                '31 Dec 2012', '17 Dec 2012', '19 Nov 2012']
216        res = partners.read_group([('id', 'in', partner_ids)], ['date'],
217                                  groupby=['date:month', 'date:day'],
218                                  orderby='date DESC', lazy=False)
219        self.assertEqual([item['date:day'] for item in res], days)
220
221    def test_write_duplicate(self):
222        p1 = self.env['res.partner'].create({'name': 'W'})
223        (p1 + p1).write({'name': 'X'})
224
225    def test_m2m_store_trigger(self):
226        group_user = self.env.ref('base.group_user')
227
228        user = self.env['res.users'].create({
229            'name': 'test',
230            'login': 'test_m2m_store_trigger',
231            'groups_id': [(6, 0, [])],
232        })
233        self.assertTrue(user.share)
234
235        group_user.write({'users': [(4, user.id)]})
236        self.assertFalse(user.share)
237
238        group_user.write({'users': [(3, user.id)]})
239        self.assertTrue(user.share)
240
241    @mute_logger('odoo.models')
242    def test_unlink_with_property(self):
243        """ Verify that unlink removes the related ir.property as unprivileged user """
244        user = self.env['res.users'].create({
245            'name': 'Justine Bridou',
246            'login': 'saucisson',
247            'groups_id': [(6, 0, [self.ref('base.group_partner_manager')])],
248        })
249        p1 = self.env['res.partner'].with_user(user).create({'name': 'Zorro'})
250        self.env['ir.property'].with_user(user)._set_multi("ref", "res.partner", {p1.id: "Nain poilu"})
251        p1_prop = self.env['ir.property'].with_user(user)._get("ref", "res.partner", res_id=p1.id)
252        self.assertEqual(
253            p1_prop, "Nain poilu", 'p1_prop should have been created')
254
255        # Unlink with unprivileged user
256        p1.unlink()
257
258        # ir.property is deleted
259        p1_prop = self.env['ir.property'].with_user(user)._get("ref", "res.partner", res_id=p1.id)
260        self.assertEqual(
261            p1_prop, False, 'p1_prop should have been deleted')
262
263    def test_create_multi(self):
264        """ create for multiple records """
265        # assumption: 'res.bank' does not override 'create'
266        vals_list = [{'name': name} for name in ('Foo', 'Bar', 'Baz')]
267        vals_list[0]['email'] = 'foo@example.com'
268        for vals in vals_list:
269            record = self.env['res.bank'].create(vals)
270            self.assertEqual(len(record), 1)
271            self.assertEqual(record.name, vals['name'])
272            self.assertEqual(record.email, vals.get('email', False))
273
274        records = self.env['res.bank'].create([])
275        self.assertFalse(records)
276
277        records = self.env['res.bank'].create(vals_list)
278        self.assertEqual(len(records), len(vals_list))
279        for record, vals in zip(records, vals_list):
280            self.assertEqual(record.name, vals['name'])
281            self.assertEqual(record.email, vals.get('email', False))
282
283        # create countries and states
284        vals_list = [{
285            'name': 'Foo',
286            'state_ids': [
287                (0, 0, {'name': 'North Foo', 'code': 'NF'}),
288                (0, 0, {'name': 'South Foo', 'code': 'SF'}),
289                (0, 0, {'name': 'West Foo', 'code': 'WF'}),
290                (0, 0, {'name': 'East Foo', 'code': 'EF'}),
291            ],
292        }, {
293            'name': 'Bar',
294            'state_ids': [
295                (0, 0, {'name': 'North Bar', 'code': 'NB'}),
296                (0, 0, {'name': 'South Bar', 'code': 'SB'}),
297            ],
298        }]
299        foo, bar = self.env['res.country'].create(vals_list)
300        self.assertEqual(foo.name, 'Foo')
301        self.assertCountEqual(foo.mapped('state_ids.code'), ['NF', 'SF', 'WF', 'EF'])
302        self.assertEqual(bar.name, 'Bar')
303        self.assertCountEqual(bar.mapped('state_ids.code'), ['NB', 'SB'])
304
305
306class TestInherits(TransactionCase):
307    """ test the behavior of the orm for models that use _inherits;
308        specifically: res.users, that inherits from res.partner
309    """
310
311    def test_default(self):
312        """ `default_get` cannot return a dictionary or a new id """
313        defaults = self.env['res.users'].default_get(['partner_id'])
314        if 'partner_id' in defaults:
315            self.assertIsInstance(defaults['partner_id'], (bool, int))
316
317    def test_create(self):
318        """ creating a user should automatically create a new partner """
319        partners_before = self.env['res.partner'].search([])
320        user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
321
322        self.assertNotIn(user_foo.partner_id, partners_before)
323
324    def test_create_with_ancestor(self):
325        """ creating a user with a specific 'partner_id' should not create a new partner """
326        partner_foo = self.env['res.partner'].create({'name': 'Foo'})
327        partners_before = self.env['res.partner'].search([])
328        user_foo = self.env['res.users'].create({'partner_id': partner_foo.id, 'login': 'foo'})
329        partners_after = self.env['res.partner'].search([])
330
331        self.assertEqual(partners_before, partners_after)
332        self.assertEqual(user_foo.name, 'Foo')
333        self.assertEqual(user_foo.partner_id, partner_foo)
334
335    @mute_logger('odoo.models')
336    def test_read(self):
337        """ inherited fields should be read without any indirection """
338        user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'})
339        user_values, = user_foo.read()
340        partner_values, = user_foo.partner_id.read()
341
342        self.assertEqual(user_values['name'], partner_values['name'])
343        self.assertEqual(user_foo.name, user_foo.partner_id.name)
344
345    @mute_logger('odoo.models')
346    def test_copy(self):
347        """ copying a user should automatically copy its partner, too """
348        user_foo = self.env['res.users'].create({
349            'name': 'Foo',
350            'login': 'foo',
351            'employee': True,
352        })
353        foo_before, = user_foo.read()
354        del foo_before['__last_update']
355        del foo_before['create_date']
356        del foo_before['write_date']
357        user_bar = user_foo.copy({'login': 'bar'})
358        foo_after, = user_foo.read()
359        del foo_after['__last_update']
360        del foo_after['create_date']
361        del foo_after['write_date']
362        self.assertEqual(foo_before, foo_after)
363
364        self.assertEqual(user_bar.name, 'Foo (copy)')
365        self.assertEqual(user_bar.login, 'bar')
366        self.assertEqual(user_foo.employee, user_bar.employee)
367        self.assertNotEqual(user_foo.id, user_bar.id)
368        self.assertNotEqual(user_foo.partner_id.id, user_bar.partner_id.id)
369
370    @mute_logger('odoo.models')
371    def test_copy_with_ancestor(self):
372        """ copying a user with 'parent_id' in defaults should not duplicate the partner """
373        user_foo = self.env['res.users'].create({'login': 'foo', 'name': 'Foo', 'signature': 'Foo'})
374        partner_bar = self.env['res.partner'].create({'name': 'Bar'})
375
376        foo_before, = user_foo.read()
377        del foo_before['__last_update']
378        del foo_before['create_date']
379        del foo_before['write_date']
380        del foo_before['login_date']
381        partners_before = self.env['res.partner'].search([])
382        user_bar = user_foo.copy({'partner_id': partner_bar.id, 'login': 'bar'})
383        foo_after, = user_foo.read()
384        del foo_after['__last_update']
385        del foo_after['create_date']
386        del foo_after['write_date']
387        del foo_after['login_date']
388        partners_after = self.env['res.partner'].search([])
389
390        self.assertEqual(foo_before, foo_after)
391        self.assertEqual(partners_before, partners_after)
392
393        self.assertNotEqual(user_foo.id, user_bar.id)
394        self.assertEqual(user_bar.partner_id.id, partner_bar.id)
395        self.assertEqual(user_bar.login, 'bar', "login is given from copy parameters")
396        self.assertFalse(user_bar.password, "password should not be copied from original record")
397        self.assertEqual(user_bar.name, 'Bar', "name is given from specific partner")
398        self.assertEqual(user_bar.signature, user_foo.signature, "signature should be copied")
399
400    @mute_logger('odoo.models')
401    def test_write_date(self):
402        """ modifying inherited fields must update write_date """
403        user = self.env.user
404        write_date_before = user.write_date
405
406        # write base64 image
407        user.write({'image_1920': 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw=='})
408        write_date_after = user.write_date
409        self.assertNotEqual(write_date_before, write_date_after)
410