1import logging
2import random
3import string
4
5import salt.config
6import salt.loader
7import salt.modules.boto_cloudtrail as boto_cloudtrail
8from salt.utils.versions import LooseVersion
9from tests.support.mixins import LoaderModuleMockMixin
10from tests.support.mock import MagicMock, patch
11from tests.support.unit import TestCase, skipIf
12
13# pylint: disable=import-error,no-name-in-module,unused-import
14try:
15    import boto
16    import boto3
17    from botocore.exceptions import ClientError
18
19    HAS_BOTO = True
20except ImportError:
21    HAS_BOTO = False
22
23# pylint: enable=import-error,no-name-in-module,unused-import
24
25# the boto_cloudtrail module relies on the connect_to_region() method
26# which was added in boto 2.8.0
27# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12
28required_boto3_version = "1.2.1"
29
30log = logging.getLogger(__name__)
31
32
33def _has_required_boto():
34    """
35    Returns True/False boolean depending on if Boto is installed and correct
36    version.
37    """
38    if not HAS_BOTO:
39        return False
40    elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version):
41        return False
42    else:
43        return True
44
45
46if _has_required_boto():
47    region = "us-east-1"
48    access_key = "GKTADJGHEIQSXMKKRBJ08H"
49    secret_key = "askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs"
50    conn_parameters = {
51        "region": region,
52        "key": access_key,
53        "keyid": secret_key,
54        "profile": {},
55    }
56    error_message = (
57        "An error occurred (101) when calling the {0} operation: Test-defined error"
58    )
59    not_found_error = ClientError(
60        {"Error": {"Code": "TrailNotFoundException", "Message": "Test-defined error"}},
61        "msg",
62    )
63    error_content = {"Error": {"Code": 101, "Message": "Test-defined error"}}
64    trail_ret = dict(
65        Name="testtrail",
66        IncludeGlobalServiceEvents=True,
67        KmsKeyId=None,
68        LogFileValidationEnabled=False,
69        S3BucketName="auditinfo",
70        TrailARN="arn:aws:cloudtrail:us-east-1:214351231622:trail/testtrail",
71    )
72    status_ret = dict(
73        IsLogging=False,
74        LatestCloudWatchLogsDeliveryError=None,
75        LatestCloudWatchLogsDeliveryTime=None,
76        LatestDeliveryError=None,
77        LatestDeliveryTime=None,
78        LatestDigestDeliveryError=None,
79        LatestDigestDeliveryTime=None,
80        LatestNotificationError=None,
81        LatestNotificationTime=None,
82        StartLoggingTime=None,
83        StopLoggingTime=None,
84    )
85
86
87@skipIf(HAS_BOTO is False, "The boto module must be installed.")
88@skipIf(
89    _has_required_boto() is False,
90    "The boto3 module must be greater than or equal to version {}".format(
91        required_boto3_version
92    ),
93)
94class BotoCloudTrailTestCaseBase(TestCase, LoaderModuleMockMixin):
95    conn = None
96
97    def setup_loader_modules(self):
98        self.opts = opts = salt.config.DEFAULT_MINION_OPTS.copy()
99        utils = salt.loader.utils(
100            opts, whitelist=["boto3", "args", "systemd", "path", "platform"], context={}
101        )
102        return {boto_cloudtrail: {"__utils__": utils}}
103
104    def setUp(self):
105        super().setUp()
106        boto_cloudtrail.__init__(self.opts)
107        del self.opts
108
109        # Set up MagicMock to replace the boto3 session
110        # connections keep getting cached from prior tests, can't find the
111        # correct context object to clear it. So randomize the cache key, to prevent any
112        # cache hits
113        conn_parameters["key"] = "".join(
114            random.choice(string.ascii_lowercase + string.digits) for _ in range(50)
115        )
116
117        self.patcher = patch("boto3.session.Session")
118        self.addCleanup(self.patcher.stop)
119        self.addCleanup(delattr, self, "patcher")
120        mock_session = self.patcher.start()
121
122        session_instance = mock_session.return_value
123        self.conn = MagicMock()
124        self.addCleanup(delattr, self, "conn")
125        session_instance.client.return_value = self.conn
126
127
128class BotoCloudTrailTestCaseMixin:
129    pass
130
131
132class BotoCloudTrailTestCase(BotoCloudTrailTestCaseBase, BotoCloudTrailTestCaseMixin):
133    """
134    TestCase for salt.modules.boto_cloudtrail module
135    """
136
137    def test_that_when_checking_if_a_trail_exists_and_a_trail_exists_the_trail_exists_method_returns_true(
138        self,
139    ):
140        """
141        Tests checking cloudtrail trail existence when the cloudtrail trail already exists
142        """
143        self.conn.get_trail_status.return_value = trail_ret
144        result = boto_cloudtrail.exists(Name=trail_ret["Name"], **conn_parameters)
145
146        self.assertTrue(result["exists"])
147
148    def test_that_when_checking_if_a_trail_exists_and_a_trail_does_not_exist_the_trail_exists_method_returns_false(
149        self,
150    ):
151        """
152        Tests checking cloudtrail trail existence when the cloudtrail trail does not exist
153        """
154        self.conn.get_trail_status.side_effect = not_found_error
155        result = boto_cloudtrail.exists(Name="mytrail", **conn_parameters)
156
157        self.assertFalse(result["exists"])
158
159    def test_that_when_checking_if_a_trail_exists_and_boto3_returns_an_error_the_trail_exists_method_returns_error(
160        self,
161    ):
162        """
163        Tests checking cloudtrail trail existence when boto returns an error
164        """
165        self.conn.get_trail_status.side_effect = ClientError(
166            error_content, "get_trail_status"
167        )
168        result = boto_cloudtrail.exists(Name="mytrail", **conn_parameters)
169
170        self.assertEqual(
171            result.get("error", {}).get("message"),
172            error_message.format("get_trail_status"),
173        )
174
175    def test_that_when_creating_a_trail_succeeds_the_create_trail_method_returns_true(
176        self,
177    ):
178        """
179        tests True trail created.
180        """
181        self.conn.create_trail.return_value = trail_ret
182        result = boto_cloudtrail.create(
183            Name=trail_ret["Name"],
184            S3BucketName=trail_ret["S3BucketName"],
185            **conn_parameters
186        )
187
188        self.assertTrue(result["created"])
189
190    def test_that_when_creating_a_trail_fails_the_create_trail_method_returns_error(
191        self,
192    ):
193        """
194        tests False trail not created.
195        """
196        self.conn.create_trail.side_effect = ClientError(error_content, "create_trail")
197        result = boto_cloudtrail.create(
198            Name=trail_ret["Name"],
199            S3BucketName=trail_ret["S3BucketName"],
200            **conn_parameters
201        )
202        self.assertEqual(
203            result.get("error", {}).get("message"), error_message.format("create_trail")
204        )
205
206    def test_that_when_deleting_a_trail_succeeds_the_delete_trail_method_returns_true(
207        self,
208    ):
209        """
210        tests True trail deleted.
211        """
212        result = boto_cloudtrail.delete(Name="testtrail", **conn_parameters)
213
214        self.assertTrue(result["deleted"])
215
216    def test_that_when_deleting_a_trail_fails_the_delete_trail_method_returns_false(
217        self,
218    ):
219        """
220        tests False trail not deleted.
221        """
222        self.conn.delete_trail.side_effect = ClientError(error_content, "delete_trail")
223        result = boto_cloudtrail.delete(Name="testtrail", **conn_parameters)
224        self.assertFalse(result["deleted"])
225
226    def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_true(
227        self,
228    ):
229        """
230        Tests describing parameters if trail exists
231        """
232        self.conn.describe_trails.return_value = {"trailList": [trail_ret]}
233
234        result = boto_cloudtrail.describe(Name=trail_ret["Name"], **conn_parameters)
235
236        self.assertTrue(result["trail"])
237
238    def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_false(
239        self,
240    ):
241        """
242        Tests describing parameters if trail does not exist
243        """
244        self.conn.describe_trails.side_effect = not_found_error
245        result = boto_cloudtrail.describe(Name="testtrail", **conn_parameters)
246
247        self.assertFalse(result["trail"])
248
249    def test_that_when_describing_trail_on_client_error_it_returns_error(self):
250        """
251        Tests describing parameters failure
252        """
253        self.conn.describe_trails.side_effect = ClientError(error_content, "get_trail")
254        result = boto_cloudtrail.describe(Name="testtrail", **conn_parameters)
255        self.assertTrue("error" in result)
256
257    def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_true(
258        self,
259    ):
260        """
261        Tests getting status if trail exists
262        """
263        self.conn.get_trail_status.return_value = status_ret
264
265        result = boto_cloudtrail.status(Name=trail_ret["Name"], **conn_parameters)
266
267        self.assertTrue(result["trail"])
268
269    def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_false(
270        self,
271    ):
272        """
273        Tests getting status if trail does not exist
274        """
275        self.conn.get_trail_status.side_effect = not_found_error
276        result = boto_cloudtrail.status(Name="testtrail", **conn_parameters)
277
278        self.assertFalse(result["trail"])
279
280    def test_that_when_getting_status_on_client_error_it_returns_error(self):
281        """
282        Tests getting status failure
283        """
284        self.conn.get_trail_status.side_effect = ClientError(
285            error_content, "get_trail_status"
286        )
287        result = boto_cloudtrail.status(Name="testtrail", **conn_parameters)
288        self.assertTrue("error" in result)
289
290    def test_that_when_listing_trails_succeeds_the_list_trails_method_returns_true(
291        self,
292    ):
293        """
294        tests True trails listed.
295        """
296        self.conn.describe_trails.return_value = {"trailList": [trail_ret]}
297        result = boto_cloudtrail.list(**conn_parameters)
298
299        self.assertTrue(result["trails"])
300
301    def test_that_when_listing_trail_fails_the_list_trail_method_returns_false(self):
302        """
303        tests False no trail listed.
304        """
305        self.conn.describe_trails.return_value = {"trailList": []}
306        result = boto_cloudtrail.list(**conn_parameters)
307        self.assertFalse(result["trails"])
308
309    def test_that_when_listing_trail_fails_the_list_trail_method_returns_error(self):
310        """
311        tests False trail error.
312        """
313        self.conn.describe_trails.side_effect = ClientError(
314            error_content, "list_trails"
315        )
316        result = boto_cloudtrail.list(**conn_parameters)
317        self.assertEqual(
318            result.get("error", {}).get("message"), error_message.format("list_trails")
319        )
320
321    def test_that_when_updating_a_trail_succeeds_the_update_trail_method_returns_true(
322        self,
323    ):
324        """
325        tests True trail updated.
326        """
327        self.conn.update_trail.return_value = trail_ret
328        result = boto_cloudtrail.update(
329            Name=trail_ret["Name"],
330            S3BucketName=trail_ret["S3BucketName"],
331            **conn_parameters
332        )
333
334        self.assertTrue(result["updated"])
335
336    def test_that_when_updating_a_trail_fails_the_update_trail_method_returns_error(
337        self,
338    ):
339        """
340        tests False trail not updated.
341        """
342        self.conn.update_trail.side_effect = ClientError(error_content, "update_trail")
343        result = boto_cloudtrail.update(
344            Name=trail_ret["Name"],
345            S3BucketName=trail_ret["S3BucketName"],
346            **conn_parameters
347        )
348        self.assertEqual(
349            result.get("error", {}).get("message"), error_message.format("update_trail")
350        )
351
352    def test_that_when_starting_logging_succeeds_the_start_logging_method_returns_true(
353        self,
354    ):
355        """
356        tests True logging started.
357        """
358        result = boto_cloudtrail.start_logging(
359            Name=trail_ret["Name"], **conn_parameters
360        )
361
362        self.assertTrue(result["started"])
363
364    def test_that_when_start_logging_fails_the_start_logging_method_returns_false(self):
365        """
366        tests False logging not started.
367        """
368        self.conn.describe_trails.return_value = {"trailList": []}
369        self.conn.start_logging.side_effect = ClientError(
370            error_content, "start_logging"
371        )
372        result = boto_cloudtrail.start_logging(
373            Name=trail_ret["Name"], **conn_parameters
374        )
375        self.assertFalse(result["started"])
376
377    def test_that_when_stopping_logging_succeeds_the_stop_logging_method_returns_true(
378        self,
379    ):
380        """
381        tests True logging stopped.
382        """
383        result = boto_cloudtrail.stop_logging(Name=trail_ret["Name"], **conn_parameters)
384
385        self.assertTrue(result["stopped"])
386
387    def test_that_when_stop_logging_fails_the_stop_logging_method_returns_false(self):
388        """
389        tests False logging not stopped.
390        """
391        self.conn.describe_trails.return_value = {"trailList": []}
392        self.conn.stop_logging.side_effect = ClientError(error_content, "stop_logging")
393        result = boto_cloudtrail.stop_logging(Name=trail_ret["Name"], **conn_parameters)
394        self.assertFalse(result["stopped"])
395
396    def test_that_when_adding_tags_succeeds_the_add_tags_method_returns_true(self):
397        """
398        tests True tags added.
399        """
400        with patch.dict(
401            boto_cloudtrail.__salt__,
402            {"boto_iam.get_account_id": MagicMock(return_value="1234")},
403        ):
404            result = boto_cloudtrail.add_tags(
405                Name=trail_ret["Name"], a="b", **conn_parameters
406            )
407
408        self.assertTrue(result["tagged"])
409
410    def test_that_when_adding_tags_fails_the_add_tags_method_returns_false(self):
411        """
412        tests False tags not added.
413        """
414        self.conn.add_tags.side_effect = ClientError(error_content, "add_tags")
415        with patch.dict(
416            boto_cloudtrail.__salt__,
417            {"boto_iam.get_account_id": MagicMock(return_value="1234")},
418        ):
419            result = boto_cloudtrail.add_tags(
420                Name=trail_ret["Name"], a="b", **conn_parameters
421            )
422        self.assertFalse(result["tagged"])
423
424    def test_that_when_removing_tags_succeeds_the_remove_tags_method_returns_true(self):
425        """
426        tests True tags removed.
427        """
428        with patch.dict(
429            boto_cloudtrail.__salt__,
430            {"boto_iam.get_account_id": MagicMock(return_value="1234")},
431        ):
432            result = boto_cloudtrail.remove_tags(
433                Name=trail_ret["Name"], a="b", **conn_parameters
434            )
435
436        self.assertTrue(result["tagged"])
437
438    def test_that_when_removing_tags_fails_the_remove_tags_method_returns_false(self):
439        """
440        tests False tags not removed.
441        """
442        self.conn.remove_tags.side_effect = ClientError(error_content, "remove_tags")
443        with patch.dict(
444            boto_cloudtrail.__salt__,
445            {"boto_iam.get_account_id": MagicMock(return_value="1234")},
446        ):
447            result = boto_cloudtrail.remove_tags(
448                Name=trail_ret["Name"], a="b", **conn_parameters
449            )
450        self.assertFalse(result["tagged"])
451
452    def test_that_when_listing_tags_succeeds_the_list_tags_method_returns_true(self):
453        """
454        tests True tags listed.
455        """
456        with patch.dict(
457            boto_cloudtrail.__salt__,
458            {"boto_iam.get_account_id": MagicMock(return_value="1234")},
459        ):
460            result = boto_cloudtrail.list_tags(
461                Name=trail_ret["Name"], **conn_parameters
462            )
463
464        self.assertEqual(result["tags"], {})
465
466    def test_that_when_listing_tags_fails_the_list_tags_method_returns_false(self):
467        """
468        tests False tags not listed.
469        """
470        self.conn.list_tags.side_effect = ClientError(error_content, "list_tags")
471        with patch.dict(
472            boto_cloudtrail.__salt__,
473            {"boto_iam.get_account_id": MagicMock(return_value="1234")},
474        ):
475            result = boto_cloudtrail.list_tags(
476                Name=trail_ret["Name"], **conn_parameters
477            )
478        self.assertTrue(result["error"])
479