1# -*- coding: utf-8 -*- 2# Part of Odoo. See LICENSE file for full copyright and licensing details. 3 4from collections import defaultdict 5 6import psycopg2 7 8from odoo.exceptions import AccessError, MissingError 9from odoo.tests.common import TransactionCase 10from odoo.tools import mute_logger 11 12 13class TestORM(TransactionCase): 14 """ test special behaviors of ORM CRUD functions """ 15 16 @mute_logger('odoo.models') 17 def test_access_deleted_records(self): 18 """ Verify that accessing deleted records works as expected """ 19 p1 = self.env['res.partner'].create({'name': 'W'}) 20 p2 = self.env['res.partner'].create({'name': 'Y'}) 21 p1.unlink() 22 23 # read() is expected to skip deleted records because our API is not 24 # transactional for a sequence of search()->read() performed from the 25 # client-side... a concurrent deletion could therefore cause spurious 26 # exceptions even when simply opening a list view! 27 # /!\ Using unprileged user to detect former side effects of ir.rules! 28 user = self.env['res.users'].create({ 29 'name': 'test user', 30 'login': 'test2', 31 'groups_id': [(6, 0, [self.ref('base.group_user')])], 32 }) 33 ps = (p1 + p2).with_user(user) 34 self.assertEqual([{'id': p2.id, 'name': 'Y'}], ps.read(['name']), "read() should skip deleted records") 35 self.assertEqual([], ps[0].read(['name']), "read() should skip deleted records") 36 37 # Deleting an already deleted record should be simply ignored 38 self.assertTrue(p1.unlink(), "Re-deleting should be a no-op") 39 40 @mute_logger('odoo.models') 41 def test_access_partial_deletion(self): 42 """ Check accessing a record from a recordset where another record has been deleted. """ 43 Model = self.env['res.country'] 44 self.assertTrue(type(Model).display_name.automatic, "test assumption not satisfied") 45 46 # access regular field when another record from the same prefetch set has been deleted 47 records = Model.create([{'name': name} for name in ('Foo', 'Bar', 'Baz')]) 48 for record in records: 49 record.name 50 record.unlink() 51 52 # access computed field when another record from the same prefetch set has been deleted 53 records = Model.create([{'name': name} for name in ('Foo', 'Bar', 'Baz')]) 54 for record in records: 55 record.display_name 56 record.unlink() 57 58 @mute_logger('odoo.models', 'odoo.addons.base.models.ir_rule') 59 def test_access_filtered_records(self): 60 """ Verify that accessing filtered records works as expected for non-admin user """ 61 p1 = self.env['res.partner'].create({'name': 'W'}) 62 p2 = self.env['res.partner'].create({'name': 'Y'}) 63 user = self.env['res.users'].create({ 64 'name': 'test user', 65 'login': 'test2', 66 'groups_id': [(6, 0, [self.ref('base.group_user')])], 67 }) 68 69 partner_model = self.env['ir.model'].search([('model','=','res.partner')]) 70 self.env['ir.rule'].create({ 71 'name': 'Y is invisible', 72 'domain_force': [('id', '!=', p1.id)], 73 'model_id': partner_model.id, 74 }) 75 76 # search as unprivileged user 77 partners = self.env['res.partner'].with_user(user).search([]) 78 self.assertNotIn(p1, partners, "W should not be visible...") 79 self.assertIn(p2, partners, "... but Y should be visible") 80 81 # read as unprivileged user 82 with self.assertRaises(AccessError): 83 p1.with_user(user).read(['name']) 84 # write as unprivileged user 85 with self.assertRaises(AccessError): 86 p1.with_user(user).write({'name': 'foo'}) 87 # unlink as unprivileged user 88 with self.assertRaises(AccessError): 89 p1.with_user(user).unlink() 90 91 # Prepare mixed case 92 p2.unlink() 93 # read mixed records: some deleted and some filtered 94 with self.assertRaises(AccessError): 95 (p1 + p2).with_user(user).read(['name']) 96 # delete mixed records: some deleted and some filtered 97 with self.assertRaises(AccessError): 98 (p1 + p2).with_user(user).unlink() 99 100 def test_read(self): 101 partner = self.env['res.partner'].create({'name': 'MyPartner1'}) 102 result = partner.read() 103 self.assertIsInstance(result, list) 104 105 @mute_logger('odoo.models') 106 def test_search_read(self): 107 partner = self.env['res.partner'] 108 109 # simple search_read 110 partner.create({'name': 'MyPartner1'}) 111 found = partner.search_read([('name', '=', 'MyPartner1')], ['name']) 112 self.assertEqual(len(found), 1) 113 self.assertEqual(found[0]['name'], 'MyPartner1') 114 self.assertIn('id', found[0]) 115 116 # search_read correct order 117 partner.create({'name': 'MyPartner2'}) 118 found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name") 119 self.assertEqual(len(found), 2) 120 self.assertEqual(found[0]['name'], 'MyPartner1') 121 self.assertEqual(found[1]['name'], 'MyPartner2') 122 found = partner.search_read([('name', 'like', 'MyPartner')], ['name'], order="name desc") 123 self.assertEqual(len(found), 2) 124 self.assertEqual(found[0]['name'], 'MyPartner2') 125 self.assertEqual(found[1]['name'], 'MyPartner1') 126 127 # search_read that finds nothing 128 found = partner.search_read([('name', '=', 'Does not exists')], ['name']) 129 self.assertEqual(len(found), 0) 130 131 # search_read with an empty array of fields 132 found = partner.search_read([], [], limit=1) 133 self.assertEqual(len(found), 1) 134 self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email']) 135 136 # search_read without fields 137 found = partner.search_read([], False, limit=1) 138 self.assertEqual(len(found), 1) 139 self.assertTrue(field in list(found[0]) for field in ['id', 'name', 'display_name', 'email']) 140 141 @mute_logger('odoo.sql_db') 142 def test_exists(self): 143 partner = self.env['res.partner'] 144 145 # check that records obtained from search exist 146 recs = partner.search([]) 147 self.assertTrue(recs) 148 self.assertEqual(recs.exists(), recs) 149 150 # check that new records exist by convention 151 recs = partner.new({}) 152 self.assertTrue(recs.exists()) 153 154 # check that there is no record with id 0 155 recs = partner.browse([0]) 156 self.assertFalse(recs.exists()) 157 158 # check that there is no record with string id 159 recs = partner.browse('xxx') 160 with self.assertRaises(psycopg2.DataError): 161 recs.exists() 162 163 def test_groupby_date(self): 164 partners_data = dict( 165 A='2012-11-19', 166 B='2012-12-17', 167 C='2012-12-31', 168 D='2013-01-07', 169 E='2013-01-14', 170 F='2013-01-28', 171 G='2013-02-11', 172 ) 173 174 partner_ids = [] 175 partner_ids_by_day = defaultdict(list) 176 partner_ids_by_month = defaultdict(list) 177 partner_ids_by_year = defaultdict(list) 178 179 partners = self.env['res.partner'] 180 for name, date in partners_data.items(): 181 p = partners.create(dict(name=name, date=date)) 182 partner_ids.append(p.id) 183 partner_ids_by_day[date].append(p.id) 184 partner_ids_by_month[date.rsplit('-', 1)[0]].append(p.id) 185 partner_ids_by_year[date.split('-', 1)[0]].append(p.id) 186 187 def read_group(interval): 188 domain = [('id', 'in', partner_ids)] 189 result = {} 190 for grp in partners.read_group(domain, ['date'], ['date:' + interval]): 191 result[grp['date:' + interval]] = partners.search(grp['__domain']) 192 return result 193 194 self.assertEqual(len(read_group('day')), len(partner_ids_by_day)) 195 self.assertEqual(len(read_group('month')), len(partner_ids_by_month)) 196 self.assertEqual(len(read_group('year')), len(partner_ids_by_year)) 197 198 res = partners.read_group([('id', 'in', partner_ids)], ['date'], 199 ['date:month', 'date:day'], lazy=False) 200 self.assertEqual(len(res), len(partner_ids)) 201 202 # combine groupby and orderby 203 months = ['February 2013', 'January 2013', 'December 2012', 'November 2012'] 204 res = partners.read_group([('id', 'in', partner_ids)], ['date'], 205 groupby=['date:month'], orderby='date:month DESC') 206 self.assertEqual([item['date:month'] for item in res], months) 207 208 # order by date should reorder by date:month 209 res = partners.read_group([('id', 'in', partner_ids)], ['date'], 210 groupby=['date:month'], orderby='date DESC') 211 self.assertEqual([item['date:month'] for item in res], months) 212 213 # order by date should reorder by date:day 214 days = ['11 Feb 2013', '28 Jan 2013', '14 Jan 2013', '07 Jan 2013', 215 '31 Dec 2012', '17 Dec 2012', '19 Nov 2012'] 216 res = partners.read_group([('id', 'in', partner_ids)], ['date'], 217 groupby=['date:month', 'date:day'], 218 orderby='date DESC', lazy=False) 219 self.assertEqual([item['date:day'] for item in res], days) 220 221 def test_write_duplicate(self): 222 p1 = self.env['res.partner'].create({'name': 'W'}) 223 (p1 + p1).write({'name': 'X'}) 224 225 def test_m2m_store_trigger(self): 226 group_user = self.env.ref('base.group_user') 227 228 user = self.env['res.users'].create({ 229 'name': 'test', 230 'login': 'test_m2m_store_trigger', 231 'groups_id': [(6, 0, [])], 232 }) 233 self.assertTrue(user.share) 234 235 group_user.write({'users': [(4, user.id)]}) 236 self.assertFalse(user.share) 237 238 group_user.write({'users': [(3, user.id)]}) 239 self.assertTrue(user.share) 240 241 @mute_logger('odoo.models') 242 def test_unlink_with_property(self): 243 """ Verify that unlink removes the related ir.property as unprivileged user """ 244 user = self.env['res.users'].create({ 245 'name': 'Justine Bridou', 246 'login': 'saucisson', 247 'groups_id': [(6, 0, [self.ref('base.group_partner_manager')])], 248 }) 249 p1 = self.env['res.partner'].with_user(user).create({'name': 'Zorro'}) 250 self.env['ir.property'].with_user(user)._set_multi("ref", "res.partner", {p1.id: "Nain poilu"}) 251 p1_prop = self.env['ir.property'].with_user(user)._get("ref", "res.partner", res_id=p1.id) 252 self.assertEqual( 253 p1_prop, "Nain poilu", 'p1_prop should have been created') 254 255 # Unlink with unprivileged user 256 p1.unlink() 257 258 # ir.property is deleted 259 p1_prop = self.env['ir.property'].with_user(user)._get("ref", "res.partner", res_id=p1.id) 260 self.assertEqual( 261 p1_prop, False, 'p1_prop should have been deleted') 262 263 def test_create_multi(self): 264 """ create for multiple records """ 265 # assumption: 'res.bank' does not override 'create' 266 vals_list = [{'name': name} for name in ('Foo', 'Bar', 'Baz')] 267 vals_list[0]['email'] = 'foo@example.com' 268 for vals in vals_list: 269 record = self.env['res.bank'].create(vals) 270 self.assertEqual(len(record), 1) 271 self.assertEqual(record.name, vals['name']) 272 self.assertEqual(record.email, vals.get('email', False)) 273 274 records = self.env['res.bank'].create([]) 275 self.assertFalse(records) 276 277 records = self.env['res.bank'].create(vals_list) 278 self.assertEqual(len(records), len(vals_list)) 279 for record, vals in zip(records, vals_list): 280 self.assertEqual(record.name, vals['name']) 281 self.assertEqual(record.email, vals.get('email', False)) 282 283 # create countries and states 284 vals_list = [{ 285 'name': 'Foo', 286 'state_ids': [ 287 (0, 0, {'name': 'North Foo', 'code': 'NF'}), 288 (0, 0, {'name': 'South Foo', 'code': 'SF'}), 289 (0, 0, {'name': 'West Foo', 'code': 'WF'}), 290 (0, 0, {'name': 'East Foo', 'code': 'EF'}), 291 ], 292 }, { 293 'name': 'Bar', 294 'state_ids': [ 295 (0, 0, {'name': 'North Bar', 'code': 'NB'}), 296 (0, 0, {'name': 'South Bar', 'code': 'SB'}), 297 ], 298 }] 299 foo, bar = self.env['res.country'].create(vals_list) 300 self.assertEqual(foo.name, 'Foo') 301 self.assertCountEqual(foo.mapped('state_ids.code'), ['NF', 'SF', 'WF', 'EF']) 302 self.assertEqual(bar.name, 'Bar') 303 self.assertCountEqual(bar.mapped('state_ids.code'), ['NB', 'SB']) 304 305 306class TestInherits(TransactionCase): 307 """ test the behavior of the orm for models that use _inherits; 308 specifically: res.users, that inherits from res.partner 309 """ 310 311 def test_default(self): 312 """ `default_get` cannot return a dictionary or a new id """ 313 defaults = self.env['res.users'].default_get(['partner_id']) 314 if 'partner_id' in defaults: 315 self.assertIsInstance(defaults['partner_id'], (bool, int)) 316 317 def test_create(self): 318 """ creating a user should automatically create a new partner """ 319 partners_before = self.env['res.partner'].search([]) 320 user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'}) 321 322 self.assertNotIn(user_foo.partner_id, partners_before) 323 324 def test_create_with_ancestor(self): 325 """ creating a user with a specific 'partner_id' should not create a new partner """ 326 partner_foo = self.env['res.partner'].create({'name': 'Foo'}) 327 partners_before = self.env['res.partner'].search([]) 328 user_foo = self.env['res.users'].create({'partner_id': partner_foo.id, 'login': 'foo'}) 329 partners_after = self.env['res.partner'].search([]) 330 331 self.assertEqual(partners_before, partners_after) 332 self.assertEqual(user_foo.name, 'Foo') 333 self.assertEqual(user_foo.partner_id, partner_foo) 334 335 @mute_logger('odoo.models') 336 def test_read(self): 337 """ inherited fields should be read without any indirection """ 338 user_foo = self.env['res.users'].create({'name': 'Foo', 'login': 'foo'}) 339 user_values, = user_foo.read() 340 partner_values, = user_foo.partner_id.read() 341 342 self.assertEqual(user_values['name'], partner_values['name']) 343 self.assertEqual(user_foo.name, user_foo.partner_id.name) 344 345 @mute_logger('odoo.models') 346 def test_copy(self): 347 """ copying a user should automatically copy its partner, too """ 348 user_foo = self.env['res.users'].create({ 349 'name': 'Foo', 350 'login': 'foo', 351 'employee': True, 352 }) 353 foo_before, = user_foo.read() 354 del foo_before['__last_update'] 355 del foo_before['create_date'] 356 del foo_before['write_date'] 357 user_bar = user_foo.copy({'login': 'bar'}) 358 foo_after, = user_foo.read() 359 del foo_after['__last_update'] 360 del foo_after['create_date'] 361 del foo_after['write_date'] 362 self.assertEqual(foo_before, foo_after) 363 364 self.assertEqual(user_bar.name, 'Foo (copy)') 365 self.assertEqual(user_bar.login, 'bar') 366 self.assertEqual(user_foo.employee, user_bar.employee) 367 self.assertNotEqual(user_foo.id, user_bar.id) 368 self.assertNotEqual(user_foo.partner_id.id, user_bar.partner_id.id) 369 370 @mute_logger('odoo.models') 371 def test_copy_with_ancestor(self): 372 """ copying a user with 'parent_id' in defaults should not duplicate the partner """ 373 user_foo = self.env['res.users'].create({'login': 'foo', 'name': 'Foo', 'signature': 'Foo'}) 374 partner_bar = self.env['res.partner'].create({'name': 'Bar'}) 375 376 foo_before, = user_foo.read() 377 del foo_before['__last_update'] 378 del foo_before['create_date'] 379 del foo_before['write_date'] 380 del foo_before['login_date'] 381 partners_before = self.env['res.partner'].search([]) 382 user_bar = user_foo.copy({'partner_id': partner_bar.id, 'login': 'bar'}) 383 foo_after, = user_foo.read() 384 del foo_after['__last_update'] 385 del foo_after['create_date'] 386 del foo_after['write_date'] 387 del foo_after['login_date'] 388 partners_after = self.env['res.partner'].search([]) 389 390 self.assertEqual(foo_before, foo_after) 391 self.assertEqual(partners_before, partners_after) 392 393 self.assertNotEqual(user_foo.id, user_bar.id) 394 self.assertEqual(user_bar.partner_id.id, partner_bar.id) 395 self.assertEqual(user_bar.login, 'bar', "login is given from copy parameters") 396 self.assertFalse(user_bar.password, "password should not be copied from original record") 397 self.assertEqual(user_bar.name, 'Bar', "name is given from specific partner") 398 self.assertEqual(user_bar.signature, user_foo.signature, "signature should be copied") 399 400 @mute_logger('odoo.models') 401 def test_write_date(self): 402 """ modifying inherited fields must update write_date """ 403 user = self.env.user 404 write_date_before = user.write_date 405 406 # write base64 image 407 user.write({'image_1920': 'R0lGODlhAQABAIAAAP///////yH5BAEKAAEALAAAAAABAAEAAAICTAEAOw=='}) 408 write_date_after = user.write_date 409 self.assertNotEqual(write_date_before, write_date_after) 410