1# -*- coding: utf-8 -*-
2
3# Copyright: (c) 2021, Ansible Project
4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import absolute_import, division, print_function
7__metaclass__ = type
8
9from contextlib import contextmanager
10
11from ansible_collections.community.general.tests.unit.compat import unittest
12from ansible_collections.community.general.tests.unit.compat.mock import call, patch
13from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args
14
15from ansible_collections.community.general.plugins.modules.identity.keycloak import keycloak_realm
16
17from itertools import count
18
19from ansible.module_utils.six import StringIO
20
21
22@contextmanager
23def patch_keycloak_api(get_realm_by_id, create_realm=None, update_realm=None, delete_realm=None):
24    """Mock context manager for patching the methods in PwPolicyIPAClient that contact the IPA server
25
26    Patches the `login` and `_post_json` methods
27
28    Keyword arguments are passed to the mock object that patches `_post_json`
29
30    No arguments are passed to the mock object that patches `login` because no tests require it
31
32    Example::
33
34        with patch_ipa(return_value={}) as (mock_login, mock_post):
35            ...
36    """
37
38    obj = keycloak_realm.KeycloakAPI
39    with patch.object(obj, 'get_realm_by_id', side_effect=get_realm_by_id) as mock_get_realm_by_id:
40        with patch.object(obj, 'create_realm', side_effect=create_realm) as mock_create_realm:
41            with patch.object(obj, 'update_realm', side_effect=update_realm) as mock_update_realm:
42                with patch.object(obj, 'delete_realm', side_effect=delete_realm) as mock_delete_realm:
43                    yield mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm
44
45
46def get_response(object_with_future_response, method, get_id_call_count):
47    if callable(object_with_future_response):
48        return object_with_future_response()
49    if isinstance(object_with_future_response, dict):
50        return get_response(
51            object_with_future_response[method], method, get_id_call_count)
52    if isinstance(object_with_future_response, list):
53        call_number = next(get_id_call_count)
54        return get_response(
55            object_with_future_response[call_number], method, get_id_call_count)
56    return object_with_future_response
57
58
59def build_mocked_request(get_id_user_count, response_dict):
60    def _mocked_requests(*args, **kwargs):
61        url = args[0]
62        method = kwargs['method']
63        future_response = response_dict.get(url, None)
64        return get_response(future_response, method, get_id_user_count)
65    return _mocked_requests
66
67
68def create_wrapper(text_as_string):
69    """Allow to mock many times a call to one address.
70    Without this function, the StringIO is empty for the second call.
71    """
72    def _create_wrapper():
73        return StringIO(text_as_string)
74    return _create_wrapper
75
76
77def mock_good_connection():
78    token_response = {
79        'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': create_wrapper('{"access_token": "alongtoken"}'), }
80    return patch(
81        'ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak.open_url',
82        side_effect=build_mocked_request(count(), token_response),
83        autospec=True
84    )
85
86
87class TestKeycloakRealm(ModuleTestCase):
88    def setUp(self):
89        super(TestKeycloakRealm, self).setUp()
90        self.module = keycloak_realm
91
92    def test_create_when_absent(self):
93        """Add a new realm"""
94
95        module_args = {
96            'auth_keycloak_url': 'http://keycloak.url/auth',
97            'auth_password': 'admin',
98            'auth_realm': 'master',
99            'auth_username': 'admin',
100            'auth_client_id': 'admin-cli',
101            'validate_certs': True,
102            'id': 'realm-name',
103            'realm': 'realm-name',
104            'enabled': True
105        }
106        return_value_absent = [None, {'id': 'realm-name', 'realm': 'realm-name', 'enabled': True}]
107        return_value_created = [{
108            'code': 201,
109            'id': 'realm-name',
110            'realm': 'realm-name',
111            'enabled': True
112        }]
113        changed = True
114
115        set_module_args(module_args)
116
117        # Run the module
118
119        with mock_good_connection():
120            with patch_keycloak_api(get_realm_by_id=return_value_absent, create_realm=return_value_created) \
121                    as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm):
122                with self.assertRaises(AnsibleExitJson) as exec_info:
123                    self.module.main()
124
125        self.assertEqual(len(mock_get_realm_by_id.mock_calls), 2)
126        self.assertEqual(len(mock_create_realm.mock_calls), 1)
127        self.assertEqual(len(mock_update_realm.mock_calls), 0)
128
129        # Verify that the module's changed status matches what is expected
130        self.assertIs(exec_info.exception.args[0]['changed'], changed)
131
132    def test_create_when_present_with_change(self):
133        """Update with change a realm"""
134
135        module_args = {
136            'auth_keycloak_url': 'http://keycloak.url/auth',
137            'auth_password': 'admin',
138            'auth_realm': 'master',
139            'auth_username': 'admin',
140            'auth_client_id': 'admin-cli',
141            'validate_certs': True,
142            'id': 'realm-name',
143            'realm': 'realm-name',
144            'enabled': False
145        }
146        return_value_absent = [
147            {
148                'id': 'realm-name',
149                'realm': 'realm-name',
150                'enabled': True
151            },
152            {
153                'id': 'realm-name',
154                'realm': 'realm-name',
155                'enabled': False
156            }
157        ]
158        return_value_updated = [{
159            'code': 201,
160            'id': 'realm-name',
161            'realm': 'realm-name',
162            'enabled': False
163        }]
164        changed = True
165
166        set_module_args(module_args)
167
168        # Run the module
169
170        with mock_good_connection():
171            with patch_keycloak_api(get_realm_by_id=return_value_absent, update_realm=return_value_updated) \
172                    as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm):
173                with self.assertRaises(AnsibleExitJson) as exec_info:
174                    self.module.main()
175
176        self.assertEqual(len(mock_get_realm_by_id.mock_calls), 2)
177        self.assertEqual(len(mock_create_realm.mock_calls), 0)
178        self.assertEqual(len(mock_update_realm.mock_calls), 1)
179
180        # Verify that the module's changed status matches what is expected
181        self.assertIs(exec_info.exception.args[0]['changed'], changed)
182
183    def test_create_when_present_no_change(self):
184        """Update without change a realm"""
185
186        module_args = {
187            'auth_keycloak_url': 'http://keycloak.url/auth',
188            'auth_password': 'admin',
189            'auth_realm': 'master',
190            'auth_username': 'admin',
191            'auth_client_id': 'admin-cli',
192            'validate_certs': True,
193            'id': 'realm-name',
194            'realm': 'realm-name',
195            'enabled': True
196        }
197        return_value_absent = [
198            {
199                'id': 'realm-name',
200                'realm': 'realm-name',
201                'enabled': True
202            },
203            {
204                'id': 'realm-name',
205                'realm': 'realm-name',
206                'enabled': True
207            }
208        ]
209        return_value_updated = [{
210            'code': 201,
211            'id': 'realm-name',
212            'realm': 'realm-name',
213            'enabled': True
214        }]
215        changed = False
216
217        set_module_args(module_args)
218
219        # Run the module
220
221        with mock_good_connection():
222            with patch_keycloak_api(get_realm_by_id=return_value_absent, update_realm=return_value_updated) \
223                    as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm):
224                with self.assertRaises(AnsibleExitJson) as exec_info:
225                    self.module.main()
226
227        self.assertEqual(len(mock_get_realm_by_id.mock_calls), 2)
228        self.assertEqual(len(mock_create_realm.mock_calls), 0)
229        self.assertEqual(len(mock_update_realm.mock_calls), 1)
230
231        # Verify that the module's changed status matches what is expected
232        self.assertIs(exec_info.exception.args[0]['changed'], changed)
233
234    def test_delete_when_absent(self):
235        """Remove an absent realm"""
236
237        module_args = {
238            'auth_keycloak_url': 'http://keycloak.url/auth',
239            'auth_password': 'admin',
240            'auth_realm': 'master',
241            'auth_username': 'admin',
242            'auth_client_id': 'admin-cli',
243            'validate_certs': True,
244            'id': 'realm-name',
245            'realm': 'realm-name',
246            'enabled': True,
247            'state': 'absent'
248        }
249        return_value_absent = [None]
250        return_value_deleted = [None]
251        changed = False
252
253        set_module_args(module_args)
254
255        # Run the module
256
257        with mock_good_connection():
258            with patch_keycloak_api(get_realm_by_id=return_value_absent, delete_realm=return_value_deleted) \
259                    as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm):
260                with self.assertRaises(AnsibleExitJson) as exec_info:
261                    self.module.main()
262
263        self.assertEqual(len(mock_get_realm_by_id.mock_calls), 1)
264        self.assertEqual(len(mock_delete_realm.mock_calls), 0)
265
266        # Verify that the module's changed status matches what is expected
267        self.assertIs(exec_info.exception.args[0]['changed'], changed)
268
269    def test_delete_when_present(self):
270        """Remove a present realm"""
271
272        module_args = {
273            'auth_keycloak_url': 'http://keycloak.url/auth',
274            'auth_password': 'admin',
275            'auth_realm': 'master',
276            'auth_username': 'admin',
277            'auth_client_id': 'admin-cli',
278            'validate_certs': True,
279            'id': 'realm-name',
280            'realm': 'realm-name',
281            'enabled': True,
282            'state': 'absent'
283        }
284        return_value_absent = [
285            {
286                'id': 'realm-name',
287                'realm': 'realm-name'
288            }]
289        return_value_deleted = [None]
290        changed = True
291
292        set_module_args(module_args)
293
294        # Run the module
295
296        with mock_good_connection():
297            with patch_keycloak_api(get_realm_by_id=return_value_absent, delete_realm=return_value_deleted) \
298                    as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm):
299                with self.assertRaises(AnsibleExitJson) as exec_info:
300                    self.module.main()
301
302        self.assertEqual(len(mock_get_realm_by_id.mock_calls), 1)
303        self.assertEqual(len(mock_delete_realm.mock_calls), 1)
304
305        # Verify that the module's changed status matches what is expected
306        self.assertIs(exec_info.exception.args[0]['changed'], changed)
307
308
309if __name__ == '__main__':
310    unittest.main()
311