1import logging
2import random
3import string
4
5import pytest
6import salt.config
7import salt.loader
8import salt.states.boto_cognitoidentity as boto_cognitoidentity
9from salt.utils.versions import LooseVersion
10from tests.support.mixins import LoaderModuleMockMixin
11from tests.support.mock import MagicMock, patch
12from tests.support.unit import TestCase, skipIf
13
14# pylint: disable=import-error,no-name-in-module
15from tests.unit.modules.test_boto_cognitoidentity import (
16    BotoCognitoIdentityTestCaseMixin,
17)
18
19try:
20    import boto3
21    from botocore.exceptions import ClientError
22
23    HAS_BOTO = True
24except ImportError:
25    HAS_BOTO = False
26
27
28# pylint: enable=import-error,no-name-in-module
29
30# the boto_cognitoidentity module relies on the connect_to_region() method
31# which was added in boto 2.8.0
32# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12
33required_boto3_version = "1.2.1"
34
35region = "us-east-1"
36access_key = "GKTADJGHEIQSXMKKRBJ08H"
37secret_key = "askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs"
38conn_parameters = {
39    "region": region,
40    "key": access_key,
41    "keyid": secret_key,
42    "profile": {},
43}
44error_message = (
45    "An error occurred (101) when calling the {0} operation: Test-defined error"
46)
47error_content = {"Error": {"Code": 101, "Message": "Test-defined error"}}
48
49first_pool_id = "first_pool_id"
50first_pool_name = "first_pool"
51second_pool_id = "second_pool_id"
52second_pool_name = "second_pool"
53second_pool_name_updated = "second_pool_updated"
54third_pool_id = "third_pool_id"
55third_pool_name = first_pool_name
56default_pool_name = "default_pool_name"
57default_pool_id = "default_pool_id"
58default_dev_provider = "test_provider_default"
59
60identity_pools_ret = dict(
61    IdentityPools=[
62        dict(IdentityPoolId=first_pool_id, IdentityPoolName=first_pool_name),
63        dict(IdentityPoolId=second_pool_id, IdentityPoolName=second_pool_name),
64        dict(IdentityPoolId=third_pool_id, IdentityPoolName=third_pool_name),
65    ]
66)
67
68first_pool_ret = dict(
69    IdentityPoolId=first_pool_id,
70    IdentityPoolName=first_pool_name,
71    AllowUnauthenticatedIdentities=False,
72    SupportedLoginProviders={
73        "accounts.google.com": "testing123",
74        "api.twitter.com": "testing123",
75        "graph.facebook.com": "testing123",
76        "www.amazon.com": "testing123",
77    },
78    DeveloperProviderName="test_provider",
79    OpenIdConnectProviderARNs=["some_provider_arn", "another_provider_arn"],
80)
81
82first_pool_role_ret = dict(
83    IdentityPoolId=first_pool_id,
84    Roles=dict(
85        authenticated="first_pool_auth_role", unauthenticated="first_pool_unauth_role"
86    ),
87)
88
89second_pool_ret = dict(
90    IdentityPoolId=second_pool_id,
91    IdentityPoolName=second_pool_name,
92    AllowUnauthenticatedIdentities=False,
93)
94
95second_pool_role_ret = dict(
96    IdentityPoolId=second_pool_id, Roles=dict(authenticated="second_pool_auth_role")
97)
98
99second_pool_update_ret = dict(
100    IdentityPoolId=second_pool_id,
101    IdentityPoolName=second_pool_name,
102    AllowUnauthenticatedIdentities=True,
103)
104
105third_pool_ret = dict(
106    IdentityPoolId=third_pool_id,
107    IdentityPoolName=third_pool_name,
108    AllowUnauthenticatedIdentities=False,
109    DeveloperProviderName="test_provider2",
110)
111
112third_pool_role_ret = dict(IdentityPoolId=third_pool_id)
113
114default_pool_ret = dict(
115    IdentityPoolId=default_pool_id,
116    IdentityPoolName=default_pool_name,
117    AllowUnauthenticatedIdentities=False,
118    DeveloperProviderName=default_dev_provider,
119)
120
121default_pool_role_ret = dict(IdentityPoolId=default_pool_id)
122
123
124log = logging.getLogger(__name__)
125
126
127def _has_required_boto():
128    """
129    Returns True/False boolean depending on if Boto is installed and correct
130    version.
131    """
132    if not HAS_BOTO:
133        return False
134    elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version):
135        return False
136    else:
137        return True
138
139
140class BotoCognitoIdentityStateTestCaseBase(TestCase, LoaderModuleMockMixin):
141    conn = None
142
143    def setup_loader_modules(self):
144        ctx = {}
145        utils = salt.loader.utils(
146            self.opts,
147            whitelist=["boto", "boto3", "args", "systemd", "path", "platform", "reg"],
148            context=ctx,
149        )
150        serializers = salt.loader.serializers(self.opts)
151        self.funcs = funcs = salt.loader.minion_mods(
152            self.opts, context=ctx, utils=utils, whitelist=["boto_cognitoidentity"]
153        )
154        self.salt_states = salt.loader.states(
155            opts=self.opts,
156            functions=funcs,
157            utils=utils,
158            whitelist=["boto_cognitoidentity"],
159            serializers=serializers,
160        )
161        return {
162            boto_cognitoidentity: {
163                "__opts__": self.opts,
164                "__salt__": funcs,
165                "__utils__": utils,
166                "__states__": self.salt_states,
167                "__serializers__": serializers,
168            }
169        }
170
171    @classmethod
172    def setUpClass(cls):
173        cls.opts = salt.config.DEFAULT_MINION_OPTS.copy()
174        cls.opts["grains"] = salt.loader.grains(cls.opts)
175
176    @classmethod
177    def tearDownClass(cls):
178        del cls.opts
179
180    def setUp(self):
181        self.addCleanup(delattr, self, "funcs")
182        self.addCleanup(delattr, self, "salt_states")
183        # Set up MagicMock to replace the boto3 session
184        # connections keep getting cached from prior tests, can't find the
185        # correct context object to clear it. So randomize the cache key, to prevent any
186        # cache hits
187        conn_parameters["key"] = "".join(
188            random.choice(string.ascii_lowercase + string.digits) for _ in range(50)
189        )
190
191        self.patcher = patch("boto3.session.Session")
192        self.addCleanup(self.patcher.stop)
193        self.addCleanup(delattr, self, "patcher")
194        mock_session = self.patcher.start()
195
196        session_instance = mock_session.return_value
197        self.conn = MagicMock()
198        self.addCleanup(delattr, self, "conn")
199        session_instance.client.return_value = self.conn
200
201
202@skipIf(HAS_BOTO is False, "The boto module must be installed.")
203@skipIf(
204    _has_required_boto() is False,
205    "The boto3 module must be greater than or equal to version {}".format(
206        required_boto3_version
207    ),
208)
209class BotoCognitoIdentityTestCase(
210    BotoCognitoIdentityStateTestCaseBase, BotoCognitoIdentityTestCaseMixin
211):
212    """
213    TestCase for salt.states.boto_cognitoidentity state.module
214    """
215
216    def _describe_identity_pool_side_effect(self, *args, **kwargs):
217        if kwargs.get("IdentityPoolId") == first_pool_id:
218            return first_pool_ret
219        elif kwargs.get("IdentityPoolId") == second_pool_id:
220            return second_pool_ret
221        elif kwargs.get("IdentityPoolId") == third_pool_id:
222            return third_pool_ret
223        else:
224            return default_pool_ret
225
226    def test_present_when_failing_to_describe_identity_pools(self):
227        """
228        Tests exceptions when describing identity pools
229        """
230        self.conn.list_identity_pools.return_value = identity_pools_ret
231        self.conn.describe_identity_pool.side_effect = ClientError(
232            error_content, "error on describe identity pool"
233        )
234        result = self.salt_states["boto_cognitoidentity.pool_present"](
235            name="test pool present",
236            IdentityPoolName=first_pool_name,
237            AuthenticatedRole="my_auth_role",
238            **conn_parameters
239        )
240        self.assertEqual(result.get("result"), False)
241        self.assertTrue("error on describe identity pool" in result.get("comment", {}))
242
243    def test_present_when_multiple_pools_with_same_name_exist(self):
244        """
245        Tests present on an identity pool name where it matched
246        multiple pools.  The result should fail.
247        """
248        self.conn.list_identity_pools.return_value = identity_pools_ret
249        self.conn.describe_identity_pool.side_effect = (
250            self._describe_identity_pool_side_effect
251        )
252        result = self.salt_states["boto_cognitoidentity.pool_present"](
253            name="test pool present",
254            IdentityPoolName=first_pool_name,
255            AuthenticatedRole="my_auth_role",
256            **conn_parameters
257        )
258        self.assertEqual(result.get("result"), False)
259        self.assertIn(
260            "{}".format([first_pool_ret, third_pool_ret]), result.get("comment", "")
261        )
262
263    def test_present_when_failing_to_create_a_new_identity_pool(self):
264        """
265        Tests present on an identity pool name that doesn't exist and
266        an error is thrown on creation.
267        """
268        self.conn.list_identity_pools.return_value = identity_pools_ret
269        self.conn.describe_identity_pool.side_effect = (
270            self._describe_identity_pool_side_effect
271        )
272        self.conn.create_identity_pool.side_effect = ClientError(
273            error_content, "error on create_identity_pool"
274        )
275        result = self.salt_states["boto_cognitoidentity.pool_present"](
276            name="test pool present",
277            IdentityPoolName=default_pool_name,
278            AuthenticatedRole="my_auth_role",
279            **conn_parameters
280        )
281        self.assertEqual(result.get("result"), False)
282        self.assertTrue("error on create_identity_pool" in result.get("comment", ""))
283        self.assertTrue(self.conn.update_identity_pool.call_count == 0)
284
285    def test_present_when_failing_to_update_an_existing_identity_pool(self):
286        """
287        Tests present on a unique instance of identity pool having the matching
288        IdentityPoolName, and an error is thrown on updating the pool properties.
289        """
290        self.conn.list_identity_pools.return_value = identity_pools_ret
291        self.conn.describe_identity_pool.side_effect = (
292            self._describe_identity_pool_side_effect
293        )
294        self.conn.update_identity_pool.side_effect = ClientError(
295            error_content, "error on update_identity_pool"
296        )
297        result = self.salt_states["boto_cognitoidentity.pool_present"](
298            name="test pool present",
299            IdentityPoolName=second_pool_name,
300            AuthenticatedRole="my_auth_role",
301            AllowUnauthenticatedIdentities=True,
302            **conn_parameters
303        )
304        self.assertEqual(result.get("result"), False)
305        self.assertTrue("error on update_identity_pool" in result.get("comment", ""))
306        self.assertTrue(self.conn.create_identity_pool.call_count == 0)
307
308    def _get_identity_pool_roles_side_effect(self, *args, **kwargs):
309        if kwargs.get("IdentityPoolId") == first_pool_id:
310            return first_pool_role_ret
311        elif kwargs.get("IdentityPoolId") == second_pool_id:
312            return second_pool_role_ret
313        elif kwargs.get("IdentityPoolId") == third_pool_id:
314            return third_pool_role_ret
315        else:
316            return default_pool_role_ret
317
318    def test_present_when_failing_to_get_identity_pool_roles(self):
319        """
320        Tests present on a unique instance of identity pool having the matching
321        IdentityPoolName, where update_identity_pool succeeded, but an error
322        is thrown on getting the identity pool role prior to setting the roles.
323        """
324        self.conn.list_identity_pools.return_value = identity_pools_ret
325        self.conn.describe_identity_pool.side_effect = (
326            self._describe_identity_pool_side_effect
327        )
328        self.conn.update_identity_pool.return_value = second_pool_update_ret
329        self.conn.get_identity_pool_roles.side_effect = ClientError(
330            error_content, "error on get_identity_pool_roles"
331        )
332        result = self.salt_states["boto_cognitoidentity.pool_present"](
333            name="test pool present",
334            IdentityPoolName=second_pool_name,
335            AuthenticatedRole="my_auth_role",
336            AllowUnauthenticatedIdentities=True,
337            **conn_parameters
338        )
339        self.assertEqual(result.get("result"), False)
340        self.assertTrue("error on get_identity_pool_roles" in result.get("comment", ""))
341        self.assertTrue(self.conn.create_identity_pool.call_count == 0)
342        self.assertTrue(self.conn.set_identity_pool_roles.call_count == 0)
343
344    @pytest.mark.slow_test
345    def test_present_when_failing_to_set_identity_pool_roles(self):
346        """
347        Tests present on a unique instance of identity pool having the matching
348        IdentityPoolName, where update_identity_pool succeeded, but an error
349        is thrown on setting the identity pool role.
350        """
351        self.conn.list_identity_pools.return_value = identity_pools_ret
352        self.conn.describe_identity_pool.side_effect = (
353            self._describe_identity_pool_side_effect
354        )
355        self.conn.update_identity_pool.return_value = second_pool_update_ret
356        self.conn.get_identity_pool_roles.return_value = second_pool_role_ret
357        self.conn.set_identity_pool_roles.side_effect = ClientError(
358            error_content, "error on set_identity_pool_roles"
359        )
360        with patch.dict(
361            self.funcs,
362            {
363                "boto_iam.describe_role": MagicMock(
364                    return_value={"arn": "my_auth_role_arn"}
365                )
366            },
367        ):
368            result = self.salt_states["boto_cognitoidentity.pool_present"](
369                name="test pool present",
370                IdentityPoolName=second_pool_name,
371                AuthenticatedRole="my_auth_role",
372                AllowUnauthenticatedIdentities=True,
373                **conn_parameters
374            )
375            self.assertEqual(result.get("result"), False)
376            self.assertTrue(
377                "error on set_identity_pool_roles" in result.get("comment", "")
378            )
379            expected_call_args = (
380                dict(
381                    IdentityPoolId=second_pool_id,
382                    Roles={"authenticated": "my_auth_role_arn"},
383                ),
384            )
385            self.assertTrue(
386                self.conn.set_identity_pool_roles.call_args == expected_call_args
387            )
388
389    @pytest.mark.slow_test
390    def test_present_when_pool_name_does_not_exist(self):
391        """
392        Tests the successful case of creating a new instance, and updating its
393        roles
394        """
395        self.conn.list_identity_pools.return_value = identity_pools_ret
396        self.conn.create_identity_pool.side_effect = (
397            self._describe_identity_pool_side_effect
398        )
399        self.conn.get_identity_pool_roles.return_value = default_pool_role_ret
400        self.conn.set_identity_pool_roles.return_value = None
401        with patch.dict(
402            self.funcs,
403            {
404                "boto_iam.describe_role": MagicMock(
405                    return_value={"arn": "my_auth_role_arn"}
406                )
407            },
408        ):
409            result = self.salt_states["boto_cognitoidentity.pool_present"](
410                name="test pool present",
411                IdentityPoolName=default_pool_name,
412                AuthenticatedRole="my_auth_role",
413                AllowUnauthenticatedIdentities=True,
414                DeveloperProviderName=default_dev_provider,
415                **conn_parameters
416            )
417            self.assertEqual(result.get("result"), True)
418            expected_call_args = (
419                dict(
420                    AllowUnauthenticatedIdentities=True,
421                    IdentityPoolName=default_pool_name,
422                    DeveloperProviderName=default_dev_provider,
423                    SupportedLoginProviders={},
424                    OpenIdConnectProviderARNs=[],
425                ),
426            )
427            self.assertTrue(
428                self.conn.create_identity_pool.call_args == expected_call_args
429            )
430            expected_call_args = (
431                dict(
432                    IdentityPoolId=default_pool_id,
433                    Roles={"authenticated": "my_auth_role_arn"},
434                ),
435            )
436            self.assertTrue(
437                self.conn.set_identity_pool_roles.call_args == expected_call_args
438            )
439            self.assertTrue(self.conn.update_identity_pool.call_count == 0)
440
441    @pytest.mark.slow_test
442    def test_present_when_pool_name_exists(self):
443        """
444        Tests the successful case of updating a single instance with matching
445        IdentityPoolName and its roles.
446        """
447        self.conn.list_identity_pools.return_value = identity_pools_ret
448        self.conn.describe_identity_pool.side_effect = (
449            self._describe_identity_pool_side_effect
450        )
451        self.conn.update_identity_pool.return_value = second_pool_update_ret
452        self.conn.get_identity_pool_roles.return_value = second_pool_role_ret
453        self.conn.set_identity_pool_roles.return_value = None
454        with patch.dict(
455            self.funcs,
456            {
457                "boto_iam.describe_role": MagicMock(
458                    return_value={"arn": "my_auth_role_arn"}
459                )
460            },
461        ):
462            result = self.salt_states["boto_cognitoidentity.pool_present"](
463                name="test pool present",
464                IdentityPoolName=second_pool_name,
465                AuthenticatedRole="my_auth_role",
466                AllowUnauthenticatedIdentities=True,
467                **conn_parameters
468            )
469            self.assertEqual(result.get("result"), True)
470            expected_call_args = (
471                dict(
472                    AllowUnauthenticatedIdentities=True,
473                    IdentityPoolId=second_pool_id,
474                    IdentityPoolName=second_pool_name,
475                ),
476            )
477            self.assertTrue(
478                self.conn.update_identity_pool.call_args == expected_call_args
479            )
480            expected_call_args = (
481                dict(
482                    IdentityPoolId=second_pool_id,
483                    Roles={"authenticated": "my_auth_role_arn"},
484                ),
485            )
486            self.assertTrue(
487                self.conn.set_identity_pool_roles.call_args == expected_call_args
488            )
489            self.assertTrue(self.conn.create_identity_pool.call_count == 0)
490
491    def test_absent_when_pool_does_not_exist(self):
492        """
493        Tests absent on an identity pool that does not exist.
494        """
495        self.conn.list_identity_pools.return_value = identity_pools_ret
496        result = self.salt_states["boto_cognitoidentity.pool_absent"](
497            name="test pool absent",
498            IdentityPoolName="no_such_pool_name",
499            RemoveAllMatched=False,
500            **conn_parameters
501        )
502        self.assertEqual(result.get("result"), True)
503        self.assertEqual(result["changes"], {})
504
505    def test_absent_when_removeallmatched_is_false_and_multiple_pools_matched(self):
506        """
507        Tests absent on when RemoveAllMatched flag is false and there are multiple matches
508        for the given pool name
509        first_pool_name is matched to first and third pool with different id's
510        """
511        self.conn.list_identity_pools.return_value = identity_pools_ret
512        self.conn.describe_identity_pool.side_effect = (
513            self._describe_identity_pool_side_effect
514        )
515        result = self.salt_states["boto_cognitoidentity.pool_absent"](
516            name="test pool absent",
517            IdentityPoolName=first_pool_name,
518            RemoveAllMatched=False,
519            **conn_parameters
520        )
521        self.assertEqual(result.get("result"), False)
522        self.assertEqual(result["changes"], {})
523        self.assertTrue(
524            "{}".format([first_pool_ret, third_pool_ret]) in result.get("comment", "")
525        )
526
527    def test_absent_when_failing_to_describe_identity_pools(self):
528        """
529        Tests exceptions when describing identity pools
530        """
531        self.conn.list_identity_pools.return_value = identity_pools_ret
532        self.conn.describe_identity_pool.side_effect = ClientError(
533            error_content, "error on describe identity pool"
534        )
535        result = self.salt_states["boto_cognitoidentity.pool_absent"](
536            name="test pool absent",
537            IdentityPoolName=first_pool_name,
538            RemoveAllMatched=False,
539            **conn_parameters
540        )
541        self.assertEqual(result.get("result"), False)
542        self.assertTrue("error on describe identity pool" in result.get("comment", {}))
543
544    def test_absent_when_erroring_on_delete_identity_pool(self):
545        """
546        Tests error due to delete_identity_pools
547        """
548        self.conn.list_identity_pools.return_value = identity_pools_ret
549        self.conn.describe_identity_pool.side_effect = (
550            self._describe_identity_pool_side_effect
551        )
552        self.conn.delete_identity_pool.side_effect = ClientError(
553            error_content, "error on delete identity pool"
554        )
555        result = self.salt_states["boto_cognitoidentity.pool_absent"](
556            name="test pool absent",
557            IdentityPoolName=first_pool_name,
558            RemoveAllMatched=True,
559            **conn_parameters
560        )
561        self.assertEqual(result.get("result"), False)
562        self.assertEqual(result["changes"], {})
563        self.assertTrue("error on delete identity pool" in result.get("comment", ""))
564
565    def test_absent_when_a_single_pool_exists(self):
566        """
567        Tests absent succeeds on delete when a single pool matched and
568        RemoveAllMatched is False
569        """
570        self.conn.list_identity_pools.return_value = identity_pools_ret
571        self.conn.describe_identity_pool.return_value = second_pool_ret
572        self.conn.delete_identity_pool.return_value = None
573        result = self.salt_states["boto_cognitoidentity.pool_absent"](
574            name="test pool absent",
575            IdentityPoolName=second_pool_name,
576            RemoveAllMatched=False,
577            **conn_parameters
578        )
579        self.assertEqual(result.get("result"), True)
580        expected_changes = {
581            "new": {"Identity Pool Id {}".format(second_pool_id): None},
582            "old": {"Identity Pool Id {}".format(second_pool_id): second_pool_name},
583        }
584        self.assertEqual(result["changes"], expected_changes)
585
586    def test_absent_when_multiple_pool_exists_and_removeallmatched_flag_is_true(self):
587        """
588        Tests absent succeeds on delete when a multiple pools matched and
589        RemoveAllMatched is True
590
591        first_pool_name should match to first_pool_id and third_pool_id
592        """
593        self.conn.list_identity_pools.return_value = identity_pools_ret
594        self.conn.describe_identity_pool.side_effect = (
595            self._describe_identity_pool_side_effect
596        )
597        self.conn.delete_identity_pool.return_value = None
598        result = self.salt_states["boto_cognitoidentity.pool_absent"](
599            name="test pool absent",
600            IdentityPoolName=first_pool_name,
601            RemoveAllMatched=True,
602            **conn_parameters
603        )
604        self.assertEqual(result.get("result"), True)
605        expected_changes = {
606            "new": {
607                "Identity Pool Id {}".format(first_pool_id): None,
608                "Identity Pool Id {}".format(third_pool_id): None,
609            },
610            "old": {
611                "Identity Pool Id {}".format(first_pool_id): first_pool_name,
612                "Identity Pool Id {}".format(third_pool_id): third_pool_name,
613            },
614        }
615        self.assertEqual(result["changes"], expected_changes)
616