1import logging 2import random 3import string 4 5import salt.config 6import salt.loader 7import salt.modules.boto_cognitoidentity as boto_cognitoidentity 8from salt.utils.versions import LooseVersion 9from tests.support.mixins import LoaderModuleMockMixin 10from tests.support.mock import MagicMock, patch 11from tests.support.unit import TestCase, skipIf 12 13# pylint: disable=import-error,no-name-in-module 14try: 15 import boto3 16 from botocore.exceptions import ClientError 17 18 HAS_BOTO = True 19except ImportError: 20 HAS_BOTO = False 21 22 23# pylint: enable=import-error,no-name-in-module 24 25# the boto_lambda module relies on the connect_to_region() method 26# which was added in boto 2.8.0 27# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12 28required_boto3_version = "1.2.1" 29 30region = "us-east-1" 31access_key = "GKTADJGHEIQSXMKKRBJ08H" 32secret_key = "askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs" 33conn_parameters = { 34 "region": region, 35 "key": access_key, 36 "keyid": secret_key, 37 "profile": {}, 38} 39error_message = ( 40 "An error occurred (101) when calling the {0} operation: Test-defined error" 41) 42error_content = {"Error": {"Code": 101, "Message": "Test-defined error"}} 43 44first_pool_id = "first_pool_id" 45first_pool_name = "first_pool" 46second_pool_id = "second_pool_id" 47second_pool_name = "second_pool" 48second_pool_name_updated = "second_pool_updated" 49third_pool_id = "third_pool_id" 50third_pool_name = first_pool_name 51 52identity_pools_ret = dict( 53 IdentityPools=[ 54 dict(IdentityPoolId=first_pool_id, IdentityPoolName=first_pool_name), 55 dict(IdentityPoolId=second_pool_id, IdentityPoolName=second_pool_name), 56 dict(IdentityPoolId=third_pool_id, IdentityPoolName=third_pool_name), 57 ] 58) 59 60first_pool_ret = dict( 61 IdentityPoolId=first_pool_id, 62 IdentityPoolName=first_pool_name, 63 AllowUnauthenticatedIdentities=False, 64 SupportedLoginProviders={ 65 "accounts.google.com": "testing123", 66 "api.twitter.com": "testing123", 67 "graph.facebook.com": "testing123", 68 "www.amazon.com": "testing123", 69 }, 70 DeveloperProviderName="test_provider", 71 OpenIdConnectProviderARNs=["some_provider_arn", "another_provider_arn"], 72) 73 74first_pool_role_ret = dict( 75 IdentityPoolId=first_pool_id, 76 Roles=dict( 77 authenticated="first_pool_auth_role", unauthenticated="first_pool_unauth_role" 78 ), 79) 80 81second_pool_ret = dict( 82 IdentityPoolId=second_pool_id, 83 IdentityPoolName=second_pool_name, 84 AllowUnauthenticatedIdentities=False, 85) 86 87third_pool_ret = dict( 88 IdentityPoolId=third_pool_id, 89 IdentityPoolName=third_pool_name, 90 AllowUnauthenticatedIdentities=False, 91 DeveloperProviderName="test_provider2", 92) 93 94third_pool_role_ret = dict(IdentityPoolId=third_pool_id) 95 96default_pool_ret = dict( 97 IdentityPoolId="default_pool_id", 98 IdentityPoolName="default_pool_name", 99 AllowUnauthenticatedIdentities=False, 100 DeveloperProviderName="test_provider_default", 101) 102 103default_pool_role_ret = dict(IdentityPoolId="default_pool_id") 104 105log = logging.getLogger(__name__) 106 107 108def _has_required_boto(): 109 """ 110 Returns True/False boolean depending on if Boto is installed and correct 111 version. 112 """ 113 if not HAS_BOTO: 114 return False 115 elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version): 116 return False 117 else: 118 return True 119 120 121class BotoCognitoIdentityTestCaseBase(TestCase, LoaderModuleMockMixin): 122 conn = None 123 124 def setup_loader_modules(self): 125 self.opts = opts = salt.config.DEFAULT_MINION_OPTS.copy() 126 utils = salt.loader.utils( 127 opts, whitelist=["boto3", "args", "systemd", "path", "platform"], context={} 128 ) 129 return {boto_cognitoidentity: {"__utils__": utils}} 130 131 def setUp(self): 132 super().setUp() 133 boto_cognitoidentity.__init__(self.opts) 134 del self.opts 135 136 # Set up MagicMock to replace the boto3 session 137 # connections keep getting cached from prior tests, can't find the 138 # correct context object to clear it. So randomize the cache key, to prevent any 139 # cache hits 140 conn_parameters["key"] = "".join( 141 random.choice(string.ascii_lowercase + string.digits) for _ in range(50) 142 ) 143 144 self.patcher = patch("boto3.session.Session") 145 self.addCleanup(self.patcher.stop) 146 self.addCleanup(delattr, self, "patcher") 147 mock_session = self.patcher.start() 148 149 session_instance = mock_session.return_value 150 self.conn = MagicMock() 151 self.addCleanup(delattr, self, "conn") 152 session_instance.client.return_value = self.conn 153 154 155class BotoCognitoIdentityTestCaseMixin: 156 pass 157 158 159# @skipIf(True, "Skip these tests while investigating failures") 160@skipIf(HAS_BOTO is False, "The boto module must be installed.") 161@skipIf( 162 _has_required_boto() is False, 163 "The boto3 module must be greater than or equal to version {}".format( 164 required_boto3_version 165 ), 166) 167class BotoCognitoIdentityTestCase( 168 BotoCognitoIdentityTestCaseBase, BotoCognitoIdentityTestCaseMixin 169): 170 """ 171 TestCase for salt.modules.boto_cognitoidentity module 172 """ 173 174 def _describe_identity_pool_side_effect(self, *args, **kwargs): 175 if kwargs.get("IdentityPoolId") == first_pool_id: 176 return first_pool_ret 177 elif kwargs.get("IdentityPoolId") == third_pool_id: 178 return third_pool_ret 179 else: 180 return default_pool_ret 181 182 def test_that_when_describing_a_named_identity_pool_and_pool_exists_the_describe_identity_pool_method_returns_pools_properties( 183 self, 184 ): 185 """ 186 Tests describing identity pool when the pool's name exists 187 """ 188 self.conn.list_identity_pools.return_value = identity_pools_ret 189 self.conn.describe_identity_pool.side_effect = ( 190 self._describe_identity_pool_side_effect 191 ) 192 result = boto_cognitoidentity.describe_identity_pools( 193 IdentityPoolName=first_pool_name, **conn_parameters 194 ) 195 self.assertEqual(result.get("identity_pools"), [first_pool_ret, third_pool_ret]) 196 197 def test_that_when_describing_a_identity_pool_by_its_id_and_pool_exists_the_desribe_identity_pool_method_returns_pools_properties( 198 self, 199 ): 200 """ 201 Tests describing identity pool when the given pool's id exists 202 """ 203 self.conn.describe_identity_pool.return_value = third_pool_ret 204 result = boto_cognitoidentity.describe_identity_pools( 205 IdentityPoolName="", IdentityPoolId=third_pool_id, **conn_parameters 206 ) 207 self.assertEqual(result.get("identity_pools"), [third_pool_ret]) 208 self.assertTrue(self.conn.list_identity_pools.call_count == 0) 209 210 def test_that_when_describing_a_named_identity_pool_and_pool_does_not_exist_the_describe_identity_pool_method_returns_none( 211 self, 212 ): 213 """ 214 Tests describing identity pool when the pool's name doesn't exist 215 """ 216 self.conn.list_identity_pools.return_value = identity_pools_ret 217 self.conn.describe_identity_pool.return_value = first_pool_ret 218 result = boto_cognitoidentity.describe_identity_pools( 219 IdentityPoolName="no_such_pool", **conn_parameters 220 ) 221 self.assertEqual(result.get("identity_pools", "no such key"), None) 222 223 def test_that_when_describing_a_named_identity_pool_and_error_thrown_the_describe_identity_pool_method_returns_error( 224 self, 225 ): 226 """ 227 Tests describing identity pool returns error when there is an exception to boto3 calls 228 """ 229 self.conn.list_identity_pools.return_value = identity_pools_ret 230 self.conn.describe_identity_pool.side_effect = ClientError( 231 error_content, "error on describe identity pool" 232 ) 233 result = boto_cognitoidentity.describe_identity_pools( 234 IdentityPoolName=first_pool_name, **conn_parameters 235 ) 236 self.assertEqual( 237 result.get("error", {}).get("message"), 238 error_message.format("error on describe identity pool"), 239 ) 240 241 def test_that_when_create_identity_pool_the_create_identity_pool_method_returns_created_identity_pool( 242 self, 243 ): 244 """ 245 Tests the positive case where create identity pool succeeds 246 """ 247 return_val = default_pool_ret.copy() 248 return_val.pop("DeveloperProviderName", None) 249 self.conn.create_identity_pool.return_value = return_val 250 result = boto_cognitoidentity.create_identity_pool( 251 IdentityPoolName="default_pool_name", **conn_parameters 252 ) 253 mock_calls = self.conn.mock_calls 254 # name, args, kwargs = mock_calls[0] 255 self.assertTrue(result.get("created")) 256 self.assertEqual(len(mock_calls), 1) 257 self.assertEqual(mock_calls[0][0], "create_identity_pool") 258 self.assertNotIn("DeveloperProviderName", mock_calls[0][2]) 259 260 def test_that_when_create_identity_pool_and_error_thrown_the_create_identity_pool_method_returns_error( 261 self, 262 ): 263 """ 264 Tests the negative case where create identity pool has a boto3 client error 265 """ 266 self.conn.create_identity_pool.side_effect = ClientError( 267 error_content, "create_identity_pool" 268 ) 269 result = boto_cognitoidentity.create_identity_pool( 270 IdentityPoolName="default_pool_name", **conn_parameters 271 ) 272 self.assertIs(result.get("created"), False) 273 self.assertEqual( 274 result.get("error", {}).get("message"), 275 error_message.format("create_identity_pool"), 276 ) 277 278 def test_that_when_delete_identity_pools_with_multiple_matching_pool_names_the_delete_identity_pools_methos_returns_true_and_deleted_count( 279 self, 280 ): 281 """ 282 Tests that given 2 matching pool ids, the operation returns deleted status of true and 283 count 2 284 """ 285 self.conn.list_identity_pools.return_value = identity_pools_ret 286 self.conn.delete_identity_pool.return_value = None 287 result = boto_cognitoidentity.delete_identity_pools( 288 IdentityPoolName=first_pool_name, **conn_parameters 289 ) 290 mock_calls = self.conn.mock_calls 291 self.assertTrue(result.get("deleted")) 292 self.assertEqual(result.get("count"), 2) 293 self.assertEqual(len(mock_calls), 3) 294 self.assertEqual(mock_calls[1][0], "delete_identity_pool") 295 self.assertEqual(mock_calls[2][0], "delete_identity_pool") 296 self.assertEqual(mock_calls[1][2].get("IdentityPoolId"), first_pool_id) 297 self.assertEqual(mock_calls[2][2].get("IdentityPoolId"), third_pool_id) 298 299 def test_that_when_delete_identity_pools_with_no_matching_pool_names_the_delete_identity_pools_method_returns_false( 300 self, 301 ): 302 """ 303 Tests that the given pool name does not exist, the operation returns deleted status of false 304 and count 0 305 """ 306 self.conn.list_identity_pools.return_value = identity_pools_ret 307 result = boto_cognitoidentity.delete_identity_pools( 308 IdentityPoolName="no_such_pool_name", **conn_parameters 309 ) 310 mock_calls = self.conn.mock_calls 311 self.assertIs(result.get("deleted"), False) 312 self.assertEqual(result.get("count"), 0) 313 self.assertEqual(len(mock_calls), 1) 314 315 def test_that_when_delete_identity_pools_and_error_thrown_the_delete_identity_pools_method_returns_false_and_the_error( 316 self, 317 ): 318 """ 319 Tests that the delete_identity_pool method throws an exception 320 """ 321 self.conn.delete_identity_pool.side_effect = ClientError( 322 error_content, "delete_identity_pool" 323 ) 324 # try to delete an unexistent pool id "no_such_pool_id" 325 result = boto_cognitoidentity.delete_identity_pools( 326 IdentityPoolName=first_pool_name, 327 IdentityPoolId="no_such_pool_id", 328 **conn_parameters 329 ) 330 mock_calls = self.conn.mock_calls 331 self.assertIs(result.get("deleted"), False) 332 self.assertIs(result.get("count"), None) 333 self.assertEqual( 334 result.get("error", {}).get("message"), 335 error_message.format("delete_identity_pool"), 336 ) 337 self.assertEqual(len(mock_calls), 1) 338 339 def _get_identity_pool_roles_side_effect(self, *args, **kwargs): 340 if kwargs.get("IdentityPoolId") == first_pool_id: 341 return first_pool_role_ret 342 elif kwargs.get("IdentityPoolId") == third_pool_id: 343 return third_pool_role_ret 344 else: 345 return default_pool_role_ret 346 347 def test_that_when_get_identity_pool_roles_with_matching_pool_names_the_get_identity_pool_roles_method_returns_pools_role_properties( 348 self, 349 ): 350 """ 351 Tests that the given 2 pool id's matching the given pool name, the results are 352 passed through as a list of identity_pool_roles 353 """ 354 self.conn.list_identity_pools.return_value = identity_pools_ret 355 self.conn.get_identity_pool_roles.side_effect = ( 356 self._get_identity_pool_roles_side_effect 357 ) 358 result = boto_cognitoidentity.get_identity_pool_roles( 359 IdentityPoolName=first_pool_name, **conn_parameters 360 ) 361 id_pool_roles = result.get("identity_pool_roles") 362 self.assertIsNot(id_pool_roles, None) 363 self.assertEqual( 364 result.get("error", "key_should_not_be_there"), "key_should_not_be_there" 365 ) 366 self.assertEqual(len(id_pool_roles), 2) 367 self.assertEqual(id_pool_roles[0], first_pool_role_ret) 368 self.assertEqual(id_pool_roles[1], third_pool_role_ret) 369 370 def test_that_when_get_identity_pool_roles_with_no_matching_pool_names_the_get_identity_pool_roles_method_returns_none( 371 self, 372 ): 373 """ 374 Tests that the given no pool id's matching the given pool name, the results returned is 375 None and get_identity_pool_roles should never be called 376 """ 377 self.conn.list_identity_pools.return_value = identity_pools_ret 378 result = boto_cognitoidentity.get_identity_pool_roles( 379 IdentityPoolName="no_such_pool_name", **conn_parameters 380 ) 381 mock_calls = self.conn.mock_calls 382 self.assertIs(result.get("identity_pool_roles", "key_should_be_there"), None) 383 self.assertEqual(len(mock_calls), 1) 384 self.assertEqual( 385 result.get("error", "key_should_not_be_there"), "key_should_not_be_there" 386 ) 387 388 def test_that_when_get_identity_pool_roles_and_error_thrown_due_to_invalid_pool_id_the_get_identity_pool_roles_method_returns_error( 389 self, 390 ): 391 """ 392 Tests that given an invalid pool id, we properly handle error generated from get_identity_pool_roles 393 """ 394 self.conn.get_identity_pool_roles.side_effect = ClientError( 395 error_content, "get_identity_pool_roles" 396 ) 397 # try to delete an unexistent pool id "no_such_pool_id" 398 result = boto_cognitoidentity.get_identity_pool_roles( 399 IdentityPoolName="", IdentityPoolId="no_such_pool_id", **conn_parameters 400 ) 401 mock_calls = self.conn.mock_calls 402 self.assertEqual( 403 result.get("error", {}).get("message"), 404 error_message.format("get_identity_pool_roles"), 405 ) 406 self.assertEqual(len(mock_calls), 1) 407 408 def test_that_when_set_identity_pool_roles_with_invalid_pool_id_the_set_identity_pool_roles_method_returns_set_false_and_error( 409 self, 410 ): 411 """ 412 Tests that given an invalid pool id, we properly handle error generated from set_identity_pool_roles 413 """ 414 self.conn.set_identity_pool_roles.side_effect = ClientError( 415 error_content, "set_identity_pool_roles" 416 ) 417 result = boto_cognitoidentity.set_identity_pool_roles( 418 IdentityPoolId="no_such_pool_id", **conn_parameters 419 ) 420 mock_calls = self.conn.mock_calls 421 self.assertIs(result.get("set"), False) 422 self.assertEqual( 423 result.get("error", {}).get("message"), 424 error_message.format("set_identity_pool_roles"), 425 ) 426 self.assertEqual(len(mock_calls), 1) 427 428 def test_that_when_set_identity_pool_roles_with_no_roles_specified_the_set_identity_pool_roles_method_unset_the_roles( 429 self, 430 ): 431 """ 432 Tests that given a valid pool id, and no other roles given, the role for the pool is cleared. 433 """ 434 self.conn.set_identity_pool_roles.return_value = None 435 result = boto_cognitoidentity.set_identity_pool_roles( 436 IdentityPoolId="some_id", **conn_parameters 437 ) 438 mock_calls = self.conn.mock_calls 439 self.assertTrue(result.get("set")) 440 self.assertEqual(len(mock_calls), 1) 441 self.assertEqual(mock_calls[0][2].get("Roles"), {}) 442 443 def test_that_when_set_identity_pool_roles_with_only_auth_role_specified_the_set_identity_pool_roles_method_only_set_the_auth_role( 444 self, 445 ): 446 """ 447 Tests that given a valid pool id, and only other given role is the AuthenticatedRole, the auth role for the 448 pool is set and unauth role is cleared. 449 """ 450 self.conn.set_identity_pool_roles.return_value = None 451 with patch.dict( 452 boto_cognitoidentity.__salt__, 453 { 454 "boto_iam.describe_role": MagicMock( 455 return_value={"arn": "my_auth_role_arn"} 456 ) 457 }, 458 ): 459 expected_roles = dict(authenticated="my_auth_role_arn") 460 result = boto_cognitoidentity.set_identity_pool_roles( 461 IdentityPoolId="some_id", 462 AuthenticatedRole="my_auth_role", 463 **conn_parameters 464 ) 465 mock_calls = self.conn.mock_calls 466 self.assertTrue(result.get("set")) 467 self.assertEqual(len(mock_calls), 1) 468 self.assertEqual(mock_calls[0][2].get("Roles"), expected_roles) 469 470 def test_that_when_set_identity_pool_roles_with_only_unauth_role_specified_the_set_identity_pool_roles_method_only_set_the_unauth_role( 471 self, 472 ): 473 """ 474 Tests that given a valid pool id, and only other given role is the UnauthenticatedRole, the unauth role for the 475 pool is set and the auth role is cleared. 476 """ 477 self.conn.set_identity_pool_roles.return_value = None 478 with patch.dict( 479 boto_cognitoidentity.__salt__, 480 { 481 "boto_iam.describe_role": MagicMock( 482 return_value={"arn": "my_unauth_role_arn"} 483 ) 484 }, 485 ): 486 expected_roles = dict(unauthenticated="my_unauth_role_arn") 487 result = boto_cognitoidentity.set_identity_pool_roles( 488 IdentityPoolId="some_id", 489 UnauthenticatedRole="my_unauth_role", 490 **conn_parameters 491 ) 492 mock_calls = self.conn.mock_calls 493 self.assertTrue(result.get("set")) 494 self.assertEqual(len(mock_calls), 1) 495 self.assertEqual(mock_calls[0][2].get("Roles"), expected_roles) 496 497 def test_that_when_set_identity_pool_roles_with_both_roles_specified_the_set_identity_pool_role_method_set_both_roles( 498 self, 499 ): 500 """ 501 Tests setting of both roles to valid given roles 502 """ 503 self.conn.set_identity_pool_roles.return_value = None 504 with patch.dict( 505 boto_cognitoidentity.__salt__, 506 { 507 "boto_iam.describe_role": MagicMock( 508 return_value={"arn": "my_unauth_role_arn"} 509 ) 510 }, 511 ): 512 expected_roles = dict( 513 authenticated="arn:aws:iam:my_auth_role", 514 unauthenticated="my_unauth_role_arn", 515 ) 516 result = boto_cognitoidentity.set_identity_pool_roles( 517 IdentityPoolId="some_id", 518 AuthenticatedRole="arn:aws:iam:my_auth_role", 519 UnauthenticatedRole="my_unauth_role", 520 **conn_parameters 521 ) 522 mock_calls = self.conn.mock_calls 523 self.assertTrue(result.get("set")) 524 self.assertEqual(len(mock_calls), 1) 525 self.assertEqual(mock_calls[0][2].get("Roles"), expected_roles) 526 527 def test_that_when_set_identity_pool_roles_given_invalid_auth_role_the_set_identity_pool_method_returns_set_false_and_error( 528 self, 529 ): 530 """ 531 Tests error handling for invalid auth role 532 """ 533 with patch.dict( 534 boto_cognitoidentity.__salt__, 535 {"boto_iam.describe_role": MagicMock(return_value=False)}, 536 ): 537 result = boto_cognitoidentity.set_identity_pool_roles( 538 IdentityPoolId="some_id", 539 AuthenticatedRole="no_such_auth_role", 540 **conn_parameters 541 ) 542 mock_calls = self.conn.mock_calls 543 self.assertIs(result.get("set"), False) 544 self.assertIn("no_such_auth_role", result.get("error", "")) 545 self.assertEqual(len(mock_calls), 0) 546 547 def test_that_when_set_identity_pool_roles_given_invalid_unauth_role_the_set_identity_pool_method_returns_set_false_and_error( 548 self, 549 ): 550 """ 551 Tests error handling for invalid unauth role 552 """ 553 with patch.dict( 554 boto_cognitoidentity.__salt__, 555 {"boto_iam.describe_role": MagicMock(return_value=False)}, 556 ): 557 result = boto_cognitoidentity.set_identity_pool_roles( 558 IdentityPoolId="some_id", 559 AuthenticatedRole="arn:aws:iam:my_auth_role", 560 UnauthenticatedRole="no_such_unauth_role", 561 **conn_parameters 562 ) 563 mock_calls = self.conn.mock_calls 564 self.assertIs(result.get("set"), False) 565 self.assertIn("no_such_unauth_role", result.get("error", "")) 566 self.assertEqual(len(mock_calls), 0) 567 568 def test_that_when_update_identity_pool_given_invalid_pool_id_the_update_identity_pool_method_returns_updated_false_and_error( 569 self, 570 ): 571 """ 572 Tests error handling for invalid pool id 573 """ 574 self.conn.describe_identity_pool.side_effect = ClientError( 575 error_content, "error on describe identity pool with pool id" 576 ) 577 result = boto_cognitoidentity.update_identity_pool( 578 IdentityPoolId="no_such_pool_id", **conn_parameters 579 ) 580 self.assertIs(result.get("updated"), False) 581 self.assertEqual( 582 result.get("error", {}).get("message"), 583 error_message.format("error on describe identity pool with pool id"), 584 ) 585 586 def test_that_when_update_identity_pool_given_only_valid_pool_id_the_update_identity_pool_method_returns_udpated_identity( 587 self, 588 ): 589 """ 590 Tests the base case of calling update_identity_pool with only the pool id, verify 591 that the passed parameters into boto3 update_identity_pool has at least all the 592 expected required parameters in IdentityPoolId, IdentityPoolName and AllowUnauthenticatedIdentities 593 """ 594 self.conn.describe_identity_pool.return_value = second_pool_ret 595 self.conn.update_identity_pool.return_value = second_pool_ret 596 expected_params = second_pool_ret 597 result = boto_cognitoidentity.update_identity_pool( 598 IdentityPoolId=second_pool_id, **conn_parameters 599 ) 600 self.assertTrue(result.get("updated")) 601 self.assertEqual(result.get("identity_pool"), second_pool_ret) 602 self.conn.update_identity_pool.assert_called_with(**expected_params) 603 604 def test_that_when_update_identity_pool_given_valid_pool_id_and_pool_name_the_update_identity_pool_method_returns_updated_identity_pool( 605 self, 606 ): 607 """ 608 Tests successful update of a pool's name 609 """ 610 self.conn.describe_identity_pool.return_value = second_pool_ret 611 second_pool_updated_ret = second_pool_ret.copy() 612 second_pool_updated_ret["IdentityPoolName"] = second_pool_name_updated 613 self.conn.update_identity_pool.return_value = second_pool_updated_ret 614 expected_params = second_pool_updated_ret 615 result = boto_cognitoidentity.update_identity_pool( 616 IdentityPoolId=second_pool_id, 617 IdentityPoolName=second_pool_name_updated, 618 **conn_parameters 619 ) 620 self.assertTrue(result.get("updated")) 621 self.assertEqual(result.get("identity_pool"), second_pool_updated_ret) 622 self.conn.update_identity_pool.assert_called_with(**expected_params) 623 624 def test_that_when_update_identity_pool_given_empty_dictionary_for_supported_login_providers_the_update_identity_pool_method_is_called_with_proper_request_params( 625 self, 626 ): 627 """ 628 Tests the request parameters to boto3 update_identity_pool's AllowUnauthenticatedIdentities is {} 629 """ 630 self.conn.describe_identity_pool.return_value = first_pool_ret 631 first_pool_updated_ret = first_pool_ret.copy() 632 first_pool_updated_ret.pop("SupportedLoginProviders") 633 self.conn.update_identity_pool.return_value = first_pool_updated_ret 634 expected_params = first_pool_ret.copy() 635 expected_params["SupportedLoginProviders"] = {} 636 expected_params.pop("DeveloperProviderName") 637 expected_params.pop("OpenIdConnectProviderARNs") 638 result = boto_cognitoidentity.update_identity_pool( 639 IdentityPoolId=first_pool_id, SupportedLoginProviders={}, **conn_parameters 640 ) 641 self.assertTrue(result.get("updated")) 642 self.assertEqual(result.get("identity_pool"), first_pool_updated_ret) 643 self.conn.update_identity_pool.assert_called_with(**expected_params) 644 645 def test_that_when_update_identity_pool_given_empty_list_for_openid_connect_provider_arns_the_update_identity_pool_method_is_called_with_proper_request_params( 646 self, 647 ): 648 """ 649 Tests the request parameters to boto3 update_identity_pool's OpenIdConnectProviderARNs is [] 650 """ 651 self.conn.describe_identity_pool.return_value = first_pool_ret 652 first_pool_updated_ret = first_pool_ret.copy() 653 first_pool_updated_ret.pop("OpenIdConnectProviderARNs") 654 self.conn.update_identity_pool.return_value = first_pool_updated_ret 655 expected_params = first_pool_ret.copy() 656 expected_params.pop("SupportedLoginProviders") 657 expected_params.pop("DeveloperProviderName") 658 expected_params["OpenIdConnectProviderARNs"] = [] 659 result = boto_cognitoidentity.update_identity_pool( 660 IdentityPoolId=first_pool_id, 661 OpenIdConnectProviderARNs=[], 662 **conn_parameters 663 ) 664 self.assertTrue(result.get("updated")) 665 self.assertEqual(result.get("identity_pool"), first_pool_updated_ret) 666 self.conn.update_identity_pool.assert_called_with(**expected_params) 667 668 def test_that_when_update_identity_pool_given_developer_provider_name_when_developer_provider_name_was_set_previously_the_udpate_identity_pool_method_is_called_without_developer_provider_name_param( 669 self, 670 ): 671 """ 672 Tests the request parameters do not include 'DeveloperProviderName' if this was previously set 673 for the given pool id 674 """ 675 self.conn.describe_identity_pool.return_value = first_pool_ret 676 self.conn.update_identity_pool.return_value = first_pool_ret 677 expected_params = first_pool_ret.copy() 678 expected_params.pop("SupportedLoginProviders") 679 expected_params.pop("DeveloperProviderName") 680 expected_params.pop("OpenIdConnectProviderARNs") 681 result = boto_cognitoidentity.update_identity_pool( 682 IdentityPoolId=first_pool_id, 683 DeveloperProviderName="this should not change", 684 **conn_parameters 685 ) 686 self.assertTrue(result.get("updated")) 687 self.assertEqual(result.get("identity_pool"), first_pool_ret) 688 self.conn.update_identity_pool.assert_called_with(**expected_params) 689 690 def test_that_when_update_identity_pool_given_developer_provider_name_is_included_in_the_params_when_associated_for_the_first_time( 691 self, 692 ): 693 """ 694 Tests the request parameters include 'DeveloperProviderName' when the pool did not have this 695 property set previously. 696 """ 697 self.conn.describe_identity_pool.return_value = second_pool_ret 698 second_pool_updated_ret = second_pool_ret.copy() 699 second_pool_updated_ret["DeveloperProviderName"] = "added_developer_provider" 700 self.conn.update_identity_pool.return_value = second_pool_updated_ret 701 expected_params = second_pool_updated_ret.copy() 702 result = boto_cognitoidentity.update_identity_pool( 703 IdentityPoolId=second_pool_id, 704 DeveloperProviderName="added_developer_provider", 705 **conn_parameters 706 ) 707 self.assertTrue(result.get("updated")) 708 self.assertEqual(result.get("identity_pool"), second_pool_updated_ret) 709 self.conn.update_identity_pool.assert_called_with(**expected_params) 710 711 def test_that_the_update_identity_pool_method_handles_exception_from_boto3(self): 712 """ 713 Tests the error handling of exception generated by boto3 update_identity_pool 714 """ 715 self.conn.describe_identity_pool.return_value = second_pool_ret 716 second_pool_updated_ret = second_pool_ret.copy() 717 second_pool_updated_ret["DeveloperProviderName"] = "added_developer_provider" 718 self.conn.update_identity_pool.side_effect = ClientError( 719 error_content, "update_identity_pool" 720 ) 721 result = boto_cognitoidentity.update_identity_pool( 722 IdentityPoolId=second_pool_id, 723 DeveloperProviderName="added_developer_provider", 724 **conn_parameters 725 ) 726 self.assertIs(result.get("updated"), False) 727 self.assertEqual( 728 result.get("error", {}).get("message"), 729 error_message.format("update_identity_pool"), 730 ) 731