1import logging 2import random 3import string 4 5import salt.config 6import salt.loader 7import salt.modules.boto_cloudtrail as boto_cloudtrail 8from salt.utils.versions import LooseVersion 9from tests.support.mixins import LoaderModuleMockMixin 10from tests.support.mock import MagicMock, patch 11from tests.support.unit import TestCase, skipIf 12 13# pylint: disable=import-error,no-name-in-module,unused-import 14try: 15 import boto 16 import boto3 17 from botocore.exceptions import ClientError 18 19 HAS_BOTO = True 20except ImportError: 21 HAS_BOTO = False 22 23# pylint: enable=import-error,no-name-in-module,unused-import 24 25# the boto_cloudtrail module relies on the connect_to_region() method 26# which was added in boto 2.8.0 27# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12 28required_boto3_version = "1.2.1" 29 30log = logging.getLogger(__name__) 31 32 33def _has_required_boto(): 34 """ 35 Returns True/False boolean depending on if Boto is installed and correct 36 version. 37 """ 38 if not HAS_BOTO: 39 return False 40 elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version): 41 return False 42 else: 43 return True 44 45 46if _has_required_boto(): 47 region = "us-east-1" 48 access_key = "GKTADJGHEIQSXMKKRBJ08H" 49 secret_key = "askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs" 50 conn_parameters = { 51 "region": region, 52 "key": access_key, 53 "keyid": secret_key, 54 "profile": {}, 55 } 56 error_message = ( 57 "An error occurred (101) when calling the {0} operation: Test-defined error" 58 ) 59 not_found_error = ClientError( 60 {"Error": {"Code": "TrailNotFoundException", "Message": "Test-defined error"}}, 61 "msg", 62 ) 63 error_content = {"Error": {"Code": 101, "Message": "Test-defined error"}} 64 trail_ret = dict( 65 Name="testtrail", 66 IncludeGlobalServiceEvents=True, 67 KmsKeyId=None, 68 LogFileValidationEnabled=False, 69 S3BucketName="auditinfo", 70 TrailARN="arn:aws:cloudtrail:us-east-1:214351231622:trail/testtrail", 71 ) 72 status_ret = dict( 73 IsLogging=False, 74 LatestCloudWatchLogsDeliveryError=None, 75 LatestCloudWatchLogsDeliveryTime=None, 76 LatestDeliveryError=None, 77 LatestDeliveryTime=None, 78 LatestDigestDeliveryError=None, 79 LatestDigestDeliveryTime=None, 80 LatestNotificationError=None, 81 LatestNotificationTime=None, 82 StartLoggingTime=None, 83 StopLoggingTime=None, 84 ) 85 86 87@skipIf(HAS_BOTO is False, "The boto module must be installed.") 88@skipIf( 89 _has_required_boto() is False, 90 "The boto3 module must be greater than or equal to version {}".format( 91 required_boto3_version 92 ), 93) 94class BotoCloudTrailTestCaseBase(TestCase, LoaderModuleMockMixin): 95 conn = None 96 97 def setup_loader_modules(self): 98 self.opts = opts = salt.config.DEFAULT_MINION_OPTS.copy() 99 utils = salt.loader.utils( 100 opts, whitelist=["boto3", "args", "systemd", "path", "platform"], context={} 101 ) 102 return {boto_cloudtrail: {"__utils__": utils}} 103 104 def setUp(self): 105 super().setUp() 106 boto_cloudtrail.__init__(self.opts) 107 del self.opts 108 109 # Set up MagicMock to replace the boto3 session 110 # connections keep getting cached from prior tests, can't find the 111 # correct context object to clear it. So randomize the cache key, to prevent any 112 # cache hits 113 conn_parameters["key"] = "".join( 114 random.choice(string.ascii_lowercase + string.digits) for _ in range(50) 115 ) 116 117 self.patcher = patch("boto3.session.Session") 118 self.addCleanup(self.patcher.stop) 119 self.addCleanup(delattr, self, "patcher") 120 mock_session = self.patcher.start() 121 122 session_instance = mock_session.return_value 123 self.conn = MagicMock() 124 self.addCleanup(delattr, self, "conn") 125 session_instance.client.return_value = self.conn 126 127 128class BotoCloudTrailTestCaseMixin: 129 pass 130 131 132class BotoCloudTrailTestCase(BotoCloudTrailTestCaseBase, BotoCloudTrailTestCaseMixin): 133 """ 134 TestCase for salt.modules.boto_cloudtrail module 135 """ 136 137 def test_that_when_checking_if_a_trail_exists_and_a_trail_exists_the_trail_exists_method_returns_true( 138 self, 139 ): 140 """ 141 Tests checking cloudtrail trail existence when the cloudtrail trail already exists 142 """ 143 self.conn.get_trail_status.return_value = trail_ret 144 result = boto_cloudtrail.exists(Name=trail_ret["Name"], **conn_parameters) 145 146 self.assertTrue(result["exists"]) 147 148 def test_that_when_checking_if_a_trail_exists_and_a_trail_does_not_exist_the_trail_exists_method_returns_false( 149 self, 150 ): 151 """ 152 Tests checking cloudtrail trail existence when the cloudtrail trail does not exist 153 """ 154 self.conn.get_trail_status.side_effect = not_found_error 155 result = boto_cloudtrail.exists(Name="mytrail", **conn_parameters) 156 157 self.assertFalse(result["exists"]) 158 159 def test_that_when_checking_if_a_trail_exists_and_boto3_returns_an_error_the_trail_exists_method_returns_error( 160 self, 161 ): 162 """ 163 Tests checking cloudtrail trail existence when boto returns an error 164 """ 165 self.conn.get_trail_status.side_effect = ClientError( 166 error_content, "get_trail_status" 167 ) 168 result = boto_cloudtrail.exists(Name="mytrail", **conn_parameters) 169 170 self.assertEqual( 171 result.get("error", {}).get("message"), 172 error_message.format("get_trail_status"), 173 ) 174 175 def test_that_when_creating_a_trail_succeeds_the_create_trail_method_returns_true( 176 self, 177 ): 178 """ 179 tests True trail created. 180 """ 181 self.conn.create_trail.return_value = trail_ret 182 result = boto_cloudtrail.create( 183 Name=trail_ret["Name"], 184 S3BucketName=trail_ret["S3BucketName"], 185 **conn_parameters 186 ) 187 188 self.assertTrue(result["created"]) 189 190 def test_that_when_creating_a_trail_fails_the_create_trail_method_returns_error( 191 self, 192 ): 193 """ 194 tests False trail not created. 195 """ 196 self.conn.create_trail.side_effect = ClientError(error_content, "create_trail") 197 result = boto_cloudtrail.create( 198 Name=trail_ret["Name"], 199 S3BucketName=trail_ret["S3BucketName"], 200 **conn_parameters 201 ) 202 self.assertEqual( 203 result.get("error", {}).get("message"), error_message.format("create_trail") 204 ) 205 206 def test_that_when_deleting_a_trail_succeeds_the_delete_trail_method_returns_true( 207 self, 208 ): 209 """ 210 tests True trail deleted. 211 """ 212 result = boto_cloudtrail.delete(Name="testtrail", **conn_parameters) 213 214 self.assertTrue(result["deleted"]) 215 216 def test_that_when_deleting_a_trail_fails_the_delete_trail_method_returns_false( 217 self, 218 ): 219 """ 220 tests False trail not deleted. 221 """ 222 self.conn.delete_trail.side_effect = ClientError(error_content, "delete_trail") 223 result = boto_cloudtrail.delete(Name="testtrail", **conn_parameters) 224 self.assertFalse(result["deleted"]) 225 226 def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_true( 227 self, 228 ): 229 """ 230 Tests describing parameters if trail exists 231 """ 232 self.conn.describe_trails.return_value = {"trailList": [trail_ret]} 233 234 result = boto_cloudtrail.describe(Name=trail_ret["Name"], **conn_parameters) 235 236 self.assertTrue(result["trail"]) 237 238 def test_that_when_describing_trail_it_returns_the_dict_of_properties_returns_false( 239 self, 240 ): 241 """ 242 Tests describing parameters if trail does not exist 243 """ 244 self.conn.describe_trails.side_effect = not_found_error 245 result = boto_cloudtrail.describe(Name="testtrail", **conn_parameters) 246 247 self.assertFalse(result["trail"]) 248 249 def test_that_when_describing_trail_on_client_error_it_returns_error(self): 250 """ 251 Tests describing parameters failure 252 """ 253 self.conn.describe_trails.side_effect = ClientError(error_content, "get_trail") 254 result = boto_cloudtrail.describe(Name="testtrail", **conn_parameters) 255 self.assertTrue("error" in result) 256 257 def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_true( 258 self, 259 ): 260 """ 261 Tests getting status if trail exists 262 """ 263 self.conn.get_trail_status.return_value = status_ret 264 265 result = boto_cloudtrail.status(Name=trail_ret["Name"], **conn_parameters) 266 267 self.assertTrue(result["trail"]) 268 269 def test_that_when_getting_status_it_returns_the_dict_of_properties_returns_false( 270 self, 271 ): 272 """ 273 Tests getting status if trail does not exist 274 """ 275 self.conn.get_trail_status.side_effect = not_found_error 276 result = boto_cloudtrail.status(Name="testtrail", **conn_parameters) 277 278 self.assertFalse(result["trail"]) 279 280 def test_that_when_getting_status_on_client_error_it_returns_error(self): 281 """ 282 Tests getting status failure 283 """ 284 self.conn.get_trail_status.side_effect = ClientError( 285 error_content, "get_trail_status" 286 ) 287 result = boto_cloudtrail.status(Name="testtrail", **conn_parameters) 288 self.assertTrue("error" in result) 289 290 def test_that_when_listing_trails_succeeds_the_list_trails_method_returns_true( 291 self, 292 ): 293 """ 294 tests True trails listed. 295 """ 296 self.conn.describe_trails.return_value = {"trailList": [trail_ret]} 297 result = boto_cloudtrail.list(**conn_parameters) 298 299 self.assertTrue(result["trails"]) 300 301 def test_that_when_listing_trail_fails_the_list_trail_method_returns_false(self): 302 """ 303 tests False no trail listed. 304 """ 305 self.conn.describe_trails.return_value = {"trailList": []} 306 result = boto_cloudtrail.list(**conn_parameters) 307 self.assertFalse(result["trails"]) 308 309 def test_that_when_listing_trail_fails_the_list_trail_method_returns_error(self): 310 """ 311 tests False trail error. 312 """ 313 self.conn.describe_trails.side_effect = ClientError( 314 error_content, "list_trails" 315 ) 316 result = boto_cloudtrail.list(**conn_parameters) 317 self.assertEqual( 318 result.get("error", {}).get("message"), error_message.format("list_trails") 319 ) 320 321 def test_that_when_updating_a_trail_succeeds_the_update_trail_method_returns_true( 322 self, 323 ): 324 """ 325 tests True trail updated. 326 """ 327 self.conn.update_trail.return_value = trail_ret 328 result = boto_cloudtrail.update( 329 Name=trail_ret["Name"], 330 S3BucketName=trail_ret["S3BucketName"], 331 **conn_parameters 332 ) 333 334 self.assertTrue(result["updated"]) 335 336 def test_that_when_updating_a_trail_fails_the_update_trail_method_returns_error( 337 self, 338 ): 339 """ 340 tests False trail not updated. 341 """ 342 self.conn.update_trail.side_effect = ClientError(error_content, "update_trail") 343 result = boto_cloudtrail.update( 344 Name=trail_ret["Name"], 345 S3BucketName=trail_ret["S3BucketName"], 346 **conn_parameters 347 ) 348 self.assertEqual( 349 result.get("error", {}).get("message"), error_message.format("update_trail") 350 ) 351 352 def test_that_when_starting_logging_succeeds_the_start_logging_method_returns_true( 353 self, 354 ): 355 """ 356 tests True logging started. 357 """ 358 result = boto_cloudtrail.start_logging( 359 Name=trail_ret["Name"], **conn_parameters 360 ) 361 362 self.assertTrue(result["started"]) 363 364 def test_that_when_start_logging_fails_the_start_logging_method_returns_false(self): 365 """ 366 tests False logging not started. 367 """ 368 self.conn.describe_trails.return_value = {"trailList": []} 369 self.conn.start_logging.side_effect = ClientError( 370 error_content, "start_logging" 371 ) 372 result = boto_cloudtrail.start_logging( 373 Name=trail_ret["Name"], **conn_parameters 374 ) 375 self.assertFalse(result["started"]) 376 377 def test_that_when_stopping_logging_succeeds_the_stop_logging_method_returns_true( 378 self, 379 ): 380 """ 381 tests True logging stopped. 382 """ 383 result = boto_cloudtrail.stop_logging(Name=trail_ret["Name"], **conn_parameters) 384 385 self.assertTrue(result["stopped"]) 386 387 def test_that_when_stop_logging_fails_the_stop_logging_method_returns_false(self): 388 """ 389 tests False logging not stopped. 390 """ 391 self.conn.describe_trails.return_value = {"trailList": []} 392 self.conn.stop_logging.side_effect = ClientError(error_content, "stop_logging") 393 result = boto_cloudtrail.stop_logging(Name=trail_ret["Name"], **conn_parameters) 394 self.assertFalse(result["stopped"]) 395 396 def test_that_when_adding_tags_succeeds_the_add_tags_method_returns_true(self): 397 """ 398 tests True tags added. 399 """ 400 with patch.dict( 401 boto_cloudtrail.__salt__, 402 {"boto_iam.get_account_id": MagicMock(return_value="1234")}, 403 ): 404 result = boto_cloudtrail.add_tags( 405 Name=trail_ret["Name"], a="b", **conn_parameters 406 ) 407 408 self.assertTrue(result["tagged"]) 409 410 def test_that_when_adding_tags_fails_the_add_tags_method_returns_false(self): 411 """ 412 tests False tags not added. 413 """ 414 self.conn.add_tags.side_effect = ClientError(error_content, "add_tags") 415 with patch.dict( 416 boto_cloudtrail.__salt__, 417 {"boto_iam.get_account_id": MagicMock(return_value="1234")}, 418 ): 419 result = boto_cloudtrail.add_tags( 420 Name=trail_ret["Name"], a="b", **conn_parameters 421 ) 422 self.assertFalse(result["tagged"]) 423 424 def test_that_when_removing_tags_succeeds_the_remove_tags_method_returns_true(self): 425 """ 426 tests True tags removed. 427 """ 428 with patch.dict( 429 boto_cloudtrail.__salt__, 430 {"boto_iam.get_account_id": MagicMock(return_value="1234")}, 431 ): 432 result = boto_cloudtrail.remove_tags( 433 Name=trail_ret["Name"], a="b", **conn_parameters 434 ) 435 436 self.assertTrue(result["tagged"]) 437 438 def test_that_when_removing_tags_fails_the_remove_tags_method_returns_false(self): 439 """ 440 tests False tags not removed. 441 """ 442 self.conn.remove_tags.side_effect = ClientError(error_content, "remove_tags") 443 with patch.dict( 444 boto_cloudtrail.__salt__, 445 {"boto_iam.get_account_id": MagicMock(return_value="1234")}, 446 ): 447 result = boto_cloudtrail.remove_tags( 448 Name=trail_ret["Name"], a="b", **conn_parameters 449 ) 450 self.assertFalse(result["tagged"]) 451 452 def test_that_when_listing_tags_succeeds_the_list_tags_method_returns_true(self): 453 """ 454 tests True tags listed. 455 """ 456 with patch.dict( 457 boto_cloudtrail.__salt__, 458 {"boto_iam.get_account_id": MagicMock(return_value="1234")}, 459 ): 460 result = boto_cloudtrail.list_tags( 461 Name=trail_ret["Name"], **conn_parameters 462 ) 463 464 self.assertEqual(result["tags"], {}) 465 466 def test_that_when_listing_tags_fails_the_list_tags_method_returns_false(self): 467 """ 468 tests False tags not listed. 469 """ 470 self.conn.list_tags.side_effect = ClientError(error_content, "list_tags") 471 with patch.dict( 472 boto_cloudtrail.__salt__, 473 {"boto_iam.get_account_id": MagicMock(return_value="1234")}, 474 ): 475 result = boto_cloudtrail.list_tags( 476 Name=trail_ret["Name"], **conn_parameters 477 ) 478 self.assertTrue(result["error"]) 479