1# -*- coding: utf-8 -*- 2 3# Copyright: (c) 2021, Ansible Project 4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5 6from __future__ import absolute_import, division, print_function 7__metaclass__ = type 8 9from contextlib import contextmanager 10 11from ansible_collections.community.general.tests.unit.compat import unittest 12from ansible_collections.community.general.tests.unit.compat.mock import call, patch 13from ansible_collections.community.general.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase, set_module_args 14 15from ansible_collections.community.general.plugins.modules.identity.keycloak import keycloak_realm 16 17from itertools import count 18 19from ansible.module_utils.six import StringIO 20 21 22@contextmanager 23def patch_keycloak_api(get_realm_by_id, create_realm=None, update_realm=None, delete_realm=None): 24 """Mock context manager for patching the methods in PwPolicyIPAClient that contact the IPA server 25 26 Patches the `login` and `_post_json` methods 27 28 Keyword arguments are passed to the mock object that patches `_post_json` 29 30 No arguments are passed to the mock object that patches `login` because no tests require it 31 32 Example:: 33 34 with patch_ipa(return_value={}) as (mock_login, mock_post): 35 ... 36 """ 37 38 obj = keycloak_realm.KeycloakAPI 39 with patch.object(obj, 'get_realm_by_id', side_effect=get_realm_by_id) as mock_get_realm_by_id: 40 with patch.object(obj, 'create_realm', side_effect=create_realm) as mock_create_realm: 41 with patch.object(obj, 'update_realm', side_effect=update_realm) as mock_update_realm: 42 with patch.object(obj, 'delete_realm', side_effect=delete_realm) as mock_delete_realm: 43 yield mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm 44 45 46def get_response(object_with_future_response, method, get_id_call_count): 47 if callable(object_with_future_response): 48 return object_with_future_response() 49 if isinstance(object_with_future_response, dict): 50 return get_response( 51 object_with_future_response[method], method, get_id_call_count) 52 if isinstance(object_with_future_response, list): 53 call_number = next(get_id_call_count) 54 return get_response( 55 object_with_future_response[call_number], method, get_id_call_count) 56 return object_with_future_response 57 58 59def build_mocked_request(get_id_user_count, response_dict): 60 def _mocked_requests(*args, **kwargs): 61 url = args[0] 62 method = kwargs['method'] 63 future_response = response_dict.get(url, None) 64 return get_response(future_response, method, get_id_user_count) 65 return _mocked_requests 66 67 68def create_wrapper(text_as_string): 69 """Allow to mock many times a call to one address. 70 Without this function, the StringIO is empty for the second call. 71 """ 72 def _create_wrapper(): 73 return StringIO(text_as_string) 74 return _create_wrapper 75 76 77def mock_good_connection(): 78 token_response = { 79 'http://keycloak.url/auth/realms/master/protocol/openid-connect/token': create_wrapper('{"access_token": "alongtoken"}'), } 80 return patch( 81 'ansible_collections.community.general.plugins.module_utils.identity.keycloak.keycloak.open_url', 82 side_effect=build_mocked_request(count(), token_response), 83 autospec=True 84 ) 85 86 87class TestKeycloakRealm(ModuleTestCase): 88 def setUp(self): 89 super(TestKeycloakRealm, self).setUp() 90 self.module = keycloak_realm 91 92 def test_create_when_absent(self): 93 """Add a new realm""" 94 95 module_args = { 96 'auth_keycloak_url': 'http://keycloak.url/auth', 97 'auth_password': 'admin', 98 'auth_realm': 'master', 99 'auth_username': 'admin', 100 'auth_client_id': 'admin-cli', 101 'validate_certs': True, 102 'id': 'realm-name', 103 'realm': 'realm-name', 104 'enabled': True 105 } 106 return_value_absent = [None, {'id': 'realm-name', 'realm': 'realm-name', 'enabled': True}] 107 return_value_created = [{ 108 'code': 201, 109 'id': 'realm-name', 110 'realm': 'realm-name', 111 'enabled': True 112 }] 113 changed = True 114 115 set_module_args(module_args) 116 117 # Run the module 118 119 with mock_good_connection(): 120 with patch_keycloak_api(get_realm_by_id=return_value_absent, create_realm=return_value_created) \ 121 as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm): 122 with self.assertRaises(AnsibleExitJson) as exec_info: 123 self.module.main() 124 125 self.assertEqual(len(mock_get_realm_by_id.mock_calls), 2) 126 self.assertEqual(len(mock_create_realm.mock_calls), 1) 127 self.assertEqual(len(mock_update_realm.mock_calls), 0) 128 129 # Verify that the module's changed status matches what is expected 130 self.assertIs(exec_info.exception.args[0]['changed'], changed) 131 132 def test_create_when_present_with_change(self): 133 """Update with change a realm""" 134 135 module_args = { 136 'auth_keycloak_url': 'http://keycloak.url/auth', 137 'auth_password': 'admin', 138 'auth_realm': 'master', 139 'auth_username': 'admin', 140 'auth_client_id': 'admin-cli', 141 'validate_certs': True, 142 'id': 'realm-name', 143 'realm': 'realm-name', 144 'enabled': False 145 } 146 return_value_absent = [ 147 { 148 'id': 'realm-name', 149 'realm': 'realm-name', 150 'enabled': True 151 }, 152 { 153 'id': 'realm-name', 154 'realm': 'realm-name', 155 'enabled': False 156 } 157 ] 158 return_value_updated = [{ 159 'code': 201, 160 'id': 'realm-name', 161 'realm': 'realm-name', 162 'enabled': False 163 }] 164 changed = True 165 166 set_module_args(module_args) 167 168 # Run the module 169 170 with mock_good_connection(): 171 with patch_keycloak_api(get_realm_by_id=return_value_absent, update_realm=return_value_updated) \ 172 as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm): 173 with self.assertRaises(AnsibleExitJson) as exec_info: 174 self.module.main() 175 176 self.assertEqual(len(mock_get_realm_by_id.mock_calls), 2) 177 self.assertEqual(len(mock_create_realm.mock_calls), 0) 178 self.assertEqual(len(mock_update_realm.mock_calls), 1) 179 180 # Verify that the module's changed status matches what is expected 181 self.assertIs(exec_info.exception.args[0]['changed'], changed) 182 183 def test_create_when_present_no_change(self): 184 """Update without change a realm""" 185 186 module_args = { 187 'auth_keycloak_url': 'http://keycloak.url/auth', 188 'auth_password': 'admin', 189 'auth_realm': 'master', 190 'auth_username': 'admin', 191 'auth_client_id': 'admin-cli', 192 'validate_certs': True, 193 'id': 'realm-name', 194 'realm': 'realm-name', 195 'enabled': True 196 } 197 return_value_absent = [ 198 { 199 'id': 'realm-name', 200 'realm': 'realm-name', 201 'enabled': True 202 }, 203 { 204 'id': 'realm-name', 205 'realm': 'realm-name', 206 'enabled': True 207 } 208 ] 209 return_value_updated = [{ 210 'code': 201, 211 'id': 'realm-name', 212 'realm': 'realm-name', 213 'enabled': True 214 }] 215 changed = False 216 217 set_module_args(module_args) 218 219 # Run the module 220 221 with mock_good_connection(): 222 with patch_keycloak_api(get_realm_by_id=return_value_absent, update_realm=return_value_updated) \ 223 as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm): 224 with self.assertRaises(AnsibleExitJson) as exec_info: 225 self.module.main() 226 227 self.assertEqual(len(mock_get_realm_by_id.mock_calls), 2) 228 self.assertEqual(len(mock_create_realm.mock_calls), 0) 229 self.assertEqual(len(mock_update_realm.mock_calls), 1) 230 231 # Verify that the module's changed status matches what is expected 232 self.assertIs(exec_info.exception.args[0]['changed'], changed) 233 234 def test_delete_when_absent(self): 235 """Remove an absent realm""" 236 237 module_args = { 238 'auth_keycloak_url': 'http://keycloak.url/auth', 239 'auth_password': 'admin', 240 'auth_realm': 'master', 241 'auth_username': 'admin', 242 'auth_client_id': 'admin-cli', 243 'validate_certs': True, 244 'id': 'realm-name', 245 'realm': 'realm-name', 246 'enabled': True, 247 'state': 'absent' 248 } 249 return_value_absent = [None] 250 return_value_deleted = [None] 251 changed = False 252 253 set_module_args(module_args) 254 255 # Run the module 256 257 with mock_good_connection(): 258 with patch_keycloak_api(get_realm_by_id=return_value_absent, delete_realm=return_value_deleted) \ 259 as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm): 260 with self.assertRaises(AnsibleExitJson) as exec_info: 261 self.module.main() 262 263 self.assertEqual(len(mock_get_realm_by_id.mock_calls), 1) 264 self.assertEqual(len(mock_delete_realm.mock_calls), 0) 265 266 # Verify that the module's changed status matches what is expected 267 self.assertIs(exec_info.exception.args[0]['changed'], changed) 268 269 def test_delete_when_present(self): 270 """Remove a present realm""" 271 272 module_args = { 273 'auth_keycloak_url': 'http://keycloak.url/auth', 274 'auth_password': 'admin', 275 'auth_realm': 'master', 276 'auth_username': 'admin', 277 'auth_client_id': 'admin-cli', 278 'validate_certs': True, 279 'id': 'realm-name', 280 'realm': 'realm-name', 281 'enabled': True, 282 'state': 'absent' 283 } 284 return_value_absent = [ 285 { 286 'id': 'realm-name', 287 'realm': 'realm-name' 288 }] 289 return_value_deleted = [None] 290 changed = True 291 292 set_module_args(module_args) 293 294 # Run the module 295 296 with mock_good_connection(): 297 with patch_keycloak_api(get_realm_by_id=return_value_absent, delete_realm=return_value_deleted) \ 298 as (mock_get_realm_by_id, mock_create_realm, mock_update_realm, mock_delete_realm): 299 with self.assertRaises(AnsibleExitJson) as exec_info: 300 self.module.main() 301 302 self.assertEqual(len(mock_get_realm_by_id.mock_calls), 1) 303 self.assertEqual(len(mock_delete_realm.mock_calls), 1) 304 305 # Verify that the module's changed status matches what is expected 306 self.assertIs(exec_info.exception.args[0]['changed'], changed) 307 308 309if __name__ == '__main__': 310 unittest.main() 311