1# -*- coding: utf-8 -*-
2# Part of Odoo. See LICENSE file for full copyright and licensing details.
3import time
4
5from odoo.exceptions import AccessDenied, AccessError
6from odoo.http import _request_stack
7
8import odoo.tools
9from odoo.tests import common
10from odoo.service import common as auth, model
11from odoo.tools import DotDict
12
13
14@common.tagged('post_install', '-at_install')
15class TestXMLRPC(common.HttpCase):
16
17    def setUp(self):
18        super(TestXMLRPC, self).setUp()
19        self.admin_uid = self.env.ref('base.user_admin').id
20
21    def test_01_xmlrpc_login(self):
22        """ Try to login on the common service. """
23        db_name = common.get_db_name()
24        uid = self.xmlrpc_common.login(db_name, 'admin', 'admin')
25        self.assertEqual(uid, self.admin_uid)
26
27    def test_xmlrpc_ir_model_search(self):
28        """ Try a search on the object service. """
29        o = self.xmlrpc_object
30        db_name = common.get_db_name()
31        ids = o.execute(db_name, self.admin_uid, 'admin', 'ir.model', 'search', [])
32        self.assertIsInstance(ids, list)
33        ids = o.execute(db_name, self.admin_uid, 'admin', 'ir.model', 'search', [], {})
34        self.assertIsInstance(ids, list)
35
36    def test_xmlrpc_read_group(self):
37        groups = self.xmlrpc_object.execute(
38            common.get_db_name(), self.admin_uid, 'admin',
39            'res.partner', 'read_group', [], ['is_company', 'color'], ['parent_id']
40        )
41
42    def test_xmlrpc_name_search(self):
43        self.xmlrpc_object.execute(
44            common.get_db_name(), self.admin_uid, 'admin',
45            'res.partner', 'name_search', "admin"
46        )
47
48    def test_xmlrpc_frozendict_marshalling(self):
49        """ Test that the marshalling of a frozendict object works properly over XMLRPC """
50        ctx = self.xmlrpc_object.execute(
51            common.get_db_name(), self.admin_uid, 'admin',
52            'res.users', 'context_get',
53        )
54        self.assertEqual(ctx['lang'], 'en_US')
55        self.assertEqual(ctx['tz'], 'Europe/Brussels')
56
57    def test_jsonrpc_read_group(self):
58        self._json_call(
59            common.get_db_name(), self.admin_uid, 'admin',
60            'res.partner', 'read_group', [], ['is_company', 'color'], ['parent_id']
61        )
62
63    def test_jsonrpc_name_search(self):
64        # well that's some sexy sexy call right there
65        self._json_call(
66            common.get_db_name(),
67            self.admin_uid, 'admin',
68            'res.partner', 'name_search', 'admin'
69        )
70
71    def _json_call(self, *args):
72        self.opener.post("http://%s:%s/jsonrpc" % (common.HOST, odoo.tools.config['http_port']), json={
73            'jsonrpc': '2.0',
74            'id': None,
75            'method': 'call',
76            'params': {
77                'service': 'object',
78                'method': 'execute',
79                'args': args
80            }
81        })
82
83# really just for the test cursor
84@common.tagged('post_install', '-at_install')
85class TestAPIKeys(common.HttpCase):
86    def setUp(self):
87        super().setUp()
88        self._user = self.env['res.users'].create({
89            'name': "Bylan",
90            'login': 'byl',
91            'password': 'ananananan',
92            'tz': 'Australia/Eucla',
93        })
94
95        # needs a fake request in order to call methods protected with check_identity
96        fake_req = DotDict({
97            # various things go and access request items
98            'httprequest': DotDict({
99                'environ': {'REMOTE_ADDR': 'localhost'},
100                'cookies': {},
101            }),
102            # bypass check_identity flow
103            'session': {'identity-check-last': time.time()}
104        })
105        _request_stack.push(fake_req)
106        self.addCleanup(_request_stack.pop)
107
108    def test_trivial(self):
109        uid = auth.dispatch('authenticate', [self.env.cr.dbname, 'byl', 'ananananan', {}])
110        self.assertEqual(uid, self._user.id)
111
112        ctx = model.dispatch('execute_kw', [
113            self.env.cr.dbname, uid, 'ananananan',
114            'res.users', 'context_get', []
115        ])
116        self.assertEqual(ctx['tz'], 'Australia/Eucla')
117
118    def test_wrongpw(self):
119        # User.authenticate raises but RPC.authenticate returns False
120        uid = auth.dispatch('authenticate', [self.env.cr.dbname, 'byl', 'aws', {}])
121        self.assertFalse(uid)
122        with self.assertRaises(AccessDenied):
123            model.dispatch('execute_kw', [
124                self.env.cr.dbname, self._user.id, 'aws',
125                'res.users', 'context_get', []
126            ])
127
128    def test_key(self):
129        env = self.env(user=self._user)
130        r = env['res.users.apikeys.description'].create({
131            'name': 'a',
132        }).make_key()
133        k = r['context']['default_key']
134
135        uid = auth.dispatch('authenticate', [self.env.cr.dbname, 'byl', 'ananananan', {}])
136        self.assertEqual(uid, self._user.id)
137
138        uid = auth.dispatch('authenticate', [self.env.cr.dbname, 'byl', k, {}])
139        self.assertEqual(uid, self._user.id)
140
141        ctx = model.dispatch('execute_kw', [
142            self.env.cr.dbname, uid, k,
143            'res.users', 'context_get', []
144        ])
145        self.assertEqual(ctx['tz'], 'Australia/Eucla')
146
147    def test_delete(self):
148        env = self.env(user=self._user)
149        env['res.users.apikeys.description'].create({'name': 'b',}).make_key()
150        env['res.users.apikeys.description'].create({'name': 'b',}).make_key()
151        env['res.users.apikeys.description'].create({'name': 'b',}).make_key()
152        k0, k1, k2 = env['res.users.apikeys'].search([])
153
154        # user can remove their own keys
155        k0.remove()
156        self.assertFalse(k0.exists())
157
158        # admin can remove user keys
159        k1.with_user(self.env.ref('base.user_admin')).remove    ()
160        self.assertFalse(k1.exists())
161
162        # other user can't remove user keys
163        u = self.env['res.users'].create({
164            'name': 'a',
165            'login': 'a',
166            'groups_id': self.env.ref('base.group_user').ids,
167        })
168        with self.assertRaises(AccessError):
169            k2.with_user(u).remove()
170
171    def test_disabled(self):
172        env = self.env(user=self._user)
173        k = env['res.users.apikeys.description'].create({'name': 'b',}).make_key()['context']['default_key']
174
175        self._user.active = False
176
177        with self.assertRaises(AccessDenied):
178            model.dispatch('execute_kw', [
179                self.env.cr.dbname, self._user.id, 'ananananan',
180                'res.users', 'context_get', []
181            ])
182
183        with self.assertRaises(AccessDenied):
184            model.dispatch('execute_kw', [
185                self.env.cr.dbname, self._user.id, k,
186                'res.users', 'context_get', []
187            ])
188