1import json 2import os 3import time 4from datetime import datetime, timedelta 5 6import boto3 7from botocore.exceptions import ClientError 8from unittest import SkipTest 9import pytest 10 11from moto import mock_s3 12from moto.config import mock_config 13from moto.core import ACCOUNT_ID 14 15import sure # noqa # pylint: disable=unused-import 16 17 18@mock_config 19def test_put_configuration_recorder(): 20 client = boto3.client("config", region_name="us-west-2") 21 22 # Try without a name supplied: 23 with pytest.raises(ClientError) as ce: 24 client.put_configuration_recorder(ConfigurationRecorder={"roleARN": "somearn"}) 25 assert ( 26 ce.value.response["Error"]["Code"] 27 == "InvalidConfigurationRecorderNameException" 28 ) 29 assert "is not valid, blank string." in ce.value.response["Error"]["Message"] 30 31 # Try with a really long name: 32 with pytest.raises(ClientError) as ce: 33 client.put_configuration_recorder( 34 ConfigurationRecorder={"name": "a" * 257, "roleARN": "somearn"} 35 ) 36 assert ce.value.response["Error"]["Code"] == "ValidationException" 37 assert ( 38 "Member must have length less than or equal to 256" 39 in ce.value.response["Error"]["Message"] 40 ) 41 42 # With resource types and flags set to True: 43 bad_groups = [ 44 { 45 "allSupported": True, 46 "includeGlobalResourceTypes": True, 47 "resourceTypes": ["item"], 48 }, 49 { 50 "allSupported": False, 51 "includeGlobalResourceTypes": True, 52 "resourceTypes": ["item"], 53 }, 54 { 55 "allSupported": True, 56 "includeGlobalResourceTypes": False, 57 "resourceTypes": ["item"], 58 }, 59 { 60 "allSupported": False, 61 "includeGlobalResourceTypes": False, 62 "resourceTypes": [], 63 }, 64 {"includeGlobalResourceTypes": False, "resourceTypes": []}, 65 {"includeGlobalResourceTypes": True}, 66 {"resourceTypes": []}, 67 {}, 68 ] 69 70 for bg in bad_groups: 71 with pytest.raises(ClientError) as ce: 72 client.put_configuration_recorder( 73 ConfigurationRecorder={ 74 "name": "default", 75 "roleARN": "somearn", 76 "recordingGroup": bg, 77 } 78 ) 79 assert ce.value.response["Error"]["Code"] == "InvalidRecordingGroupException" 80 assert ( 81 ce.value.response["Error"]["Message"] 82 == "The recording group provided is not valid" 83 ) 84 85 # With an invalid Resource Type: 86 with pytest.raises(ClientError) as ce: 87 client.put_configuration_recorder( 88 ConfigurationRecorder={ 89 "name": "default", 90 "roleARN": "somearn", 91 "recordingGroup": { 92 "allSupported": False, 93 "includeGlobalResourceTypes": False, 94 # 2 good, and 2 bad: 95 "resourceTypes": [ 96 "AWS::EC2::Volume", 97 "LOLNO", 98 "AWS::EC2::VPC", 99 "LOLSTILLNO", 100 ], 101 }, 102 } 103 ) 104 assert ce.value.response["Error"]["Code"] == "ValidationException" 105 assert "2 validation error detected: Value '['LOLNO', 'LOLSTILLNO']" in str( 106 ce.value.response["Error"]["Message"] 107 ) 108 assert "AWS::EC2::Instance" in ce.value.response["Error"]["Message"] 109 110 # Create a proper one: 111 client.put_configuration_recorder( 112 ConfigurationRecorder={ 113 "name": "testrecorder", 114 "roleARN": "somearn", 115 "recordingGroup": { 116 "allSupported": False, 117 "includeGlobalResourceTypes": False, 118 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 119 }, 120 } 121 ) 122 123 result = client.describe_configuration_recorders()["ConfigurationRecorders"] 124 assert len(result) == 1 125 assert result[0]["name"] == "testrecorder" 126 assert result[0]["roleARN"] == "somearn" 127 assert not result[0]["recordingGroup"]["allSupported"] 128 assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"] 129 assert len(result[0]["recordingGroup"]["resourceTypes"]) == 2 130 assert ( 131 "AWS::EC2::Volume" in result[0]["recordingGroup"]["resourceTypes"] 132 and "AWS::EC2::VPC" in result[0]["recordingGroup"]["resourceTypes"] 133 ) 134 135 # Now update the configuration recorder: 136 client.put_configuration_recorder( 137 ConfigurationRecorder={ 138 "name": "testrecorder", 139 "roleARN": "somearn", 140 "recordingGroup": { 141 "allSupported": True, 142 "includeGlobalResourceTypes": True, 143 }, 144 } 145 ) 146 result = client.describe_configuration_recorders()["ConfigurationRecorders"] 147 assert len(result) == 1 148 assert result[0]["name"] == "testrecorder" 149 assert result[0]["roleARN"] == "somearn" 150 assert result[0]["recordingGroup"]["allSupported"] 151 assert result[0]["recordingGroup"]["includeGlobalResourceTypes"] 152 assert len(result[0]["recordingGroup"]["resourceTypes"]) == 0 153 154 # With a default recording group (i.e. lacking one) 155 client.put_configuration_recorder( 156 ConfigurationRecorder={"name": "testrecorder", "roleARN": "somearn"} 157 ) 158 result = client.describe_configuration_recorders()["ConfigurationRecorders"] 159 assert len(result) == 1 160 assert result[0]["name"] == "testrecorder" 161 assert result[0]["roleARN"] == "somearn" 162 assert result[0]["recordingGroup"]["allSupported"] 163 assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"] 164 assert not result[0]["recordingGroup"].get("resourceTypes") 165 166 # Can currently only have exactly 1 Config Recorder in an account/region: 167 with pytest.raises(ClientError) as ce: 168 client.put_configuration_recorder( 169 ConfigurationRecorder={ 170 "name": "someotherrecorder", 171 "roleARN": "somearn", 172 "recordingGroup": { 173 "allSupported": False, 174 "includeGlobalResourceTypes": False, 175 }, 176 } 177 ) 178 assert ( 179 ce.value.response["Error"]["Code"] 180 == "MaxNumberOfConfigurationRecordersExceededException" 181 ) 182 assert ( 183 "maximum number of configuration recorders: 1 is reached." 184 in ce.value.response["Error"]["Message"] 185 ) 186 187 188@mock_config 189def test_put_configuration_aggregator(): 190 client = boto3.client("config", region_name="us-west-2") 191 192 # With too many aggregation sources: 193 with pytest.raises(ClientError) as ce: 194 client.put_configuration_aggregator( 195 ConfigurationAggregatorName="testing", 196 AccountAggregationSources=[ 197 { 198 "AccountIds": ["012345678910", "111111111111", "222222222222"], 199 "AwsRegions": ["us-east-1", "us-west-2"], 200 }, 201 { 202 "AccountIds": ["012345678910", "111111111111", "222222222222"], 203 "AwsRegions": ["us-east-1", "us-west-2"], 204 }, 205 ], 206 ) 207 assert ( 208 "Member must have length less than or equal to 1" 209 in ce.value.response["Error"]["Message"] 210 ) 211 assert ce.value.response["Error"]["Code"] == "ValidationException" 212 213 # With an invalid region config (no regions defined): 214 with pytest.raises(ClientError) as ce: 215 client.put_configuration_aggregator( 216 ConfigurationAggregatorName="testing", 217 AccountAggregationSources=[ 218 { 219 "AccountIds": ["012345678910", "111111111111", "222222222222"], 220 "AllAwsRegions": False, 221 } 222 ], 223 ) 224 assert ( 225 "Your request does not specify any regions" 226 in ce.value.response["Error"]["Message"] 227 ) 228 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 229 230 with pytest.raises(ClientError) as ce: 231 client.put_configuration_aggregator( 232 ConfigurationAggregatorName="testing", 233 OrganizationAggregationSource={ 234 "RoleArn": "arn:aws:iam::012345678910:role/SomeRole" 235 }, 236 ) 237 assert ( 238 "Your request does not specify any regions" 239 in ce.value.response["Error"]["Message"] 240 ) 241 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 242 243 # With both region flags defined: 244 with pytest.raises(ClientError) as ce: 245 client.put_configuration_aggregator( 246 ConfigurationAggregatorName="testing", 247 AccountAggregationSources=[ 248 { 249 "AccountIds": ["012345678910", "111111111111", "222222222222"], 250 "AwsRegions": ["us-east-1", "us-west-2"], 251 "AllAwsRegions": True, 252 } 253 ], 254 ) 255 assert ( 256 "You must choose one of these options" in ce.value.response["Error"]["Message"] 257 ) 258 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 259 260 with pytest.raises(ClientError) as ce: 261 client.put_configuration_aggregator( 262 ConfigurationAggregatorName="testing", 263 OrganizationAggregationSource={ 264 "RoleArn": "arn:aws:iam::012345678910:role/SomeRole", 265 "AwsRegions": ["us-east-1", "us-west-2"], 266 "AllAwsRegions": True, 267 }, 268 ) 269 assert ( 270 "You must choose one of these options" in ce.value.response["Error"]["Message"] 271 ) 272 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 273 274 # Name too long: 275 with pytest.raises(ClientError) as ce: 276 client.put_configuration_aggregator( 277 ConfigurationAggregatorName="a" * 257, 278 AccountAggregationSources=[ 279 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 280 ], 281 ) 282 assert "configurationAggregatorName" in ce.value.response["Error"]["Message"] 283 assert ce.value.response["Error"]["Code"] == "ValidationException" 284 285 # Too many tags (>50): 286 with pytest.raises(ClientError) as ce: 287 client.put_configuration_aggregator( 288 ConfigurationAggregatorName="testing", 289 AccountAggregationSources=[ 290 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 291 ], 292 Tags=[ 293 {"Key": "{}".format(x), "Value": "{}".format(x)} for x in range(0, 51) 294 ], 295 ) 296 assert ( 297 "Member must have length less than or equal to 50" 298 in ce.value.response["Error"]["Message"] 299 ) 300 assert ce.value.response["Error"]["Code"] == "ValidationException" 301 302 # Tag key is too big (>128 chars): 303 with pytest.raises(ClientError) as ce: 304 client.put_configuration_aggregator( 305 ConfigurationAggregatorName="testing", 306 AccountAggregationSources=[ 307 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 308 ], 309 Tags=[{"Key": "a" * 129, "Value": "a"}], 310 ) 311 assert ( 312 "Member must have length less than or equal to 128" 313 in ce.value.response["Error"]["Message"] 314 ) 315 assert ce.value.response["Error"]["Code"] == "ValidationException" 316 317 # Tag value is too big (>256 chars): 318 with pytest.raises(ClientError) as ce: 319 client.put_configuration_aggregator( 320 ConfigurationAggregatorName="testing", 321 AccountAggregationSources=[ 322 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 323 ], 324 Tags=[{"Key": "tag", "Value": "a" * 257}], 325 ) 326 assert ( 327 "Member must have length less than or equal to 256" 328 in ce.value.response["Error"]["Message"] 329 ) 330 assert ce.value.response["Error"]["Code"] == "ValidationException" 331 332 # Duplicate Tags: 333 with pytest.raises(ClientError) as ce: 334 client.put_configuration_aggregator( 335 ConfigurationAggregatorName="testing", 336 AccountAggregationSources=[ 337 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 338 ], 339 Tags=[{"Key": "a", "Value": "a"}, {"Key": "a", "Value": "a"}], 340 ) 341 assert "Duplicate tag keys found." in ce.value.response["Error"]["Message"] 342 assert ce.value.response["Error"]["Code"] == "InvalidInput" 343 344 # Invalid characters in the tag key: 345 with pytest.raises(ClientError) as ce: 346 client.put_configuration_aggregator( 347 ConfigurationAggregatorName="testing", 348 AccountAggregationSources=[ 349 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 350 ], 351 Tags=[{"Key": "!", "Value": "a"}], 352 ) 353 assert ( 354 "Member must satisfy regular expression pattern:" 355 in ce.value.response["Error"]["Message"] 356 ) 357 assert ce.value.response["Error"]["Code"] == "ValidationException" 358 359 # If it contains both the AccountAggregationSources and the OrganizationAggregationSource 360 with pytest.raises(ClientError) as ce: 361 client.put_configuration_aggregator( 362 ConfigurationAggregatorName="testing", 363 AccountAggregationSources=[ 364 {"AccountIds": ["012345678910"], "AllAwsRegions": False} 365 ], 366 OrganizationAggregationSource={ 367 "RoleArn": "arn:aws:iam::012345678910:role/SomeRole", 368 "AllAwsRegions": False, 369 }, 370 ) 371 assert ( 372 "AccountAggregationSource and the OrganizationAggregationSource" 373 in ce.value.response["Error"]["Message"] 374 ) 375 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 376 377 # If it contains neither: 378 with pytest.raises(ClientError) as ce: 379 client.put_configuration_aggregator(ConfigurationAggregatorName="testing") 380 assert ( 381 "AccountAggregationSource or the OrganizationAggregationSource" 382 in ce.value.response["Error"]["Message"] 383 ) 384 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 385 386 # Just make one: 387 account_aggregation_source = { 388 "AccountIds": ["012345678910", "111111111111", "222222222222"], 389 "AwsRegions": ["us-east-1", "us-west-2"], 390 "AllAwsRegions": False, 391 } 392 393 result = client.put_configuration_aggregator( 394 ConfigurationAggregatorName="testing", 395 AccountAggregationSources=[account_aggregation_source], 396 ) 397 assert result["ConfigurationAggregator"]["ConfigurationAggregatorName"] == "testing" 398 assert result["ConfigurationAggregator"]["AccountAggregationSources"] == [ 399 account_aggregation_source 400 ] 401 assert ( 402 "arn:aws:config:us-west-2:{}:config-aggregator/config-aggregator-".format( 403 ACCOUNT_ID 404 ) 405 in result["ConfigurationAggregator"]["ConfigurationAggregatorArn"] 406 ) 407 assert ( 408 result["ConfigurationAggregator"]["CreationTime"] 409 == result["ConfigurationAggregator"]["LastUpdatedTime"] 410 ) 411 412 # Update the existing one: 413 original_arn = result["ConfigurationAggregator"]["ConfigurationAggregatorArn"] 414 account_aggregation_source.pop("AwsRegions") 415 account_aggregation_source["AllAwsRegions"] = True 416 result = client.put_configuration_aggregator( 417 ConfigurationAggregatorName="testing", 418 AccountAggregationSources=[account_aggregation_source], 419 ) 420 421 assert result["ConfigurationAggregator"]["ConfigurationAggregatorName"] == "testing" 422 assert result["ConfigurationAggregator"]["AccountAggregationSources"] == [ 423 account_aggregation_source 424 ] 425 assert ( 426 result["ConfigurationAggregator"]["ConfigurationAggregatorArn"] == original_arn 427 ) 428 429 # Make an org one: 430 result = client.put_configuration_aggregator( 431 ConfigurationAggregatorName="testingOrg", 432 OrganizationAggregationSource={ 433 "RoleArn": "arn:aws:iam::012345678910:role/SomeRole", 434 "AwsRegions": ["us-east-1", "us-west-2"], 435 }, 436 ) 437 438 assert ( 439 result["ConfigurationAggregator"]["ConfigurationAggregatorName"] == "testingOrg" 440 ) 441 assert result["ConfigurationAggregator"]["OrganizationAggregationSource"] == { 442 "RoleArn": "arn:aws:iam::012345678910:role/SomeRole", 443 "AwsRegions": ["us-east-1", "us-west-2"], 444 "AllAwsRegions": False, 445 } 446 447 448@mock_config 449def test_describe_configuration_aggregators(): 450 client = boto3.client("config", region_name="us-west-2") 451 452 # Without any config aggregators: 453 assert not client.describe_configuration_aggregators()["ConfigurationAggregators"] 454 455 # Make 10 config aggregators: 456 for x in range(0, 10): 457 client.put_configuration_aggregator( 458 ConfigurationAggregatorName="testing{}".format(x), 459 AccountAggregationSources=[ 460 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 461 ], 462 ) 463 464 # Describe with an incorrect name: 465 with pytest.raises(ClientError) as ce: 466 client.describe_configuration_aggregators( 467 ConfigurationAggregatorNames=["DoesNotExist"] 468 ) 469 assert ( 470 "The configuration aggregator does not exist." 471 in ce.value.response["Error"]["Message"] 472 ) 473 assert ( 474 ce.value.response["Error"]["Code"] == "NoSuchConfigurationAggregatorException" 475 ) 476 477 # Error describe with more than 1 item in the list: 478 with pytest.raises(ClientError) as ce: 479 client.describe_configuration_aggregators( 480 ConfigurationAggregatorNames=["testing0", "DoesNotExist"] 481 ) 482 assert ( 483 "At least one of the configuration aggregators does not exist." 484 in ce.value.response["Error"]["Message"] 485 ) 486 assert ( 487 ce.value.response["Error"]["Code"] == "NoSuchConfigurationAggregatorException" 488 ) 489 490 # Get the normal list: 491 result = client.describe_configuration_aggregators() 492 assert not result.get("NextToken") 493 assert len(result["ConfigurationAggregators"]) == 10 494 495 # Test filtered list: 496 agg_names = ["testing0", "testing1", "testing2"] 497 result = client.describe_configuration_aggregators( 498 ConfigurationAggregatorNames=agg_names 499 ) 500 assert not result.get("NextToken") 501 assert len(result["ConfigurationAggregators"]) == 3 502 assert [ 503 agg["ConfigurationAggregatorName"] for agg in result["ConfigurationAggregators"] 504 ] == agg_names 505 506 # Test Pagination: 507 result = client.describe_configuration_aggregators(Limit=4) 508 assert len(result["ConfigurationAggregators"]) == 4 509 assert result["NextToken"] == "testing4" 510 assert [ 511 agg["ConfigurationAggregatorName"] for agg in result["ConfigurationAggregators"] 512 ] == ["testing{}".format(x) for x in range(0, 4)] 513 result = client.describe_configuration_aggregators(Limit=4, NextToken="testing4") 514 assert len(result["ConfigurationAggregators"]) == 4 515 assert result["NextToken"] == "testing8" 516 assert [ 517 agg["ConfigurationAggregatorName"] for agg in result["ConfigurationAggregators"] 518 ] == ["testing{}".format(x) for x in range(4, 8)] 519 result = client.describe_configuration_aggregators(Limit=4, NextToken="testing8") 520 assert len(result["ConfigurationAggregators"]) == 2 521 assert not result.get("NextToken") 522 assert [ 523 agg["ConfigurationAggregatorName"] for agg in result["ConfigurationAggregators"] 524 ] == ["testing{}".format(x) for x in range(8, 10)] 525 526 # Test Pagination with Filtering: 527 result = client.describe_configuration_aggregators( 528 ConfigurationAggregatorNames=["testing2", "testing4"], Limit=1 529 ) 530 assert len(result["ConfigurationAggregators"]) == 1 531 assert result["NextToken"] == "testing4" 532 assert ( 533 result["ConfigurationAggregators"][0]["ConfigurationAggregatorName"] 534 == "testing2" 535 ) 536 result = client.describe_configuration_aggregators( 537 ConfigurationAggregatorNames=["testing2", "testing4"], 538 Limit=1, 539 NextToken="testing4", 540 ) 541 assert not result.get("NextToken") 542 assert ( 543 result["ConfigurationAggregators"][0]["ConfigurationAggregatorName"] 544 == "testing4" 545 ) 546 547 # Test with an invalid filter: 548 with pytest.raises(ClientError) as ce: 549 client.describe_configuration_aggregators(NextToken="WRONG") 550 assert "The nextToken provided is invalid" == ce.value.response["Error"]["Message"] 551 assert ce.value.response["Error"]["Code"] == "InvalidNextTokenException" 552 553 554@mock_config 555def test_put_aggregation_authorization(): 556 client = boto3.client("config", region_name="us-west-2") 557 558 # Too many tags (>50): 559 with pytest.raises(ClientError) as ce: 560 client.put_aggregation_authorization( 561 AuthorizedAccountId="012345678910", 562 AuthorizedAwsRegion="us-west-2", 563 Tags=[ 564 {"Key": "{}".format(x), "Value": "{}".format(x)} for x in range(0, 51) 565 ], 566 ) 567 assert ( 568 "Member must have length less than or equal to 50" 569 in ce.value.response["Error"]["Message"] 570 ) 571 assert ce.value.response["Error"]["Code"] == "ValidationException" 572 573 # Tag key is too big (>128 chars): 574 with pytest.raises(ClientError) as ce: 575 client.put_aggregation_authorization( 576 AuthorizedAccountId="012345678910", 577 AuthorizedAwsRegion="us-west-2", 578 Tags=[{"Key": "a" * 129, "Value": "a"}], 579 ) 580 assert ( 581 "Member must have length less than or equal to 128" 582 in ce.value.response["Error"]["Message"] 583 ) 584 assert ce.value.response["Error"]["Code"] == "ValidationException" 585 586 # Tag value is too big (>256 chars): 587 with pytest.raises(ClientError) as ce: 588 client.put_aggregation_authorization( 589 AuthorizedAccountId="012345678910", 590 AuthorizedAwsRegion="us-west-2", 591 Tags=[{"Key": "tag", "Value": "a" * 257}], 592 ) 593 assert ( 594 "Member must have length less than or equal to 256" 595 in ce.value.response["Error"]["Message"] 596 ) 597 assert ce.value.response["Error"]["Code"] == "ValidationException" 598 599 # Duplicate Tags: 600 with pytest.raises(ClientError) as ce: 601 client.put_aggregation_authorization( 602 AuthorizedAccountId="012345678910", 603 AuthorizedAwsRegion="us-west-2", 604 Tags=[{"Key": "a", "Value": "a"}, {"Key": "a", "Value": "a"}], 605 ) 606 assert "Duplicate tag keys found." in ce.value.response["Error"]["Message"] 607 assert ce.value.response["Error"]["Code"] == "InvalidInput" 608 609 # Invalid characters in the tag key: 610 with pytest.raises(ClientError) as ce: 611 client.put_aggregation_authorization( 612 AuthorizedAccountId="012345678910", 613 AuthorizedAwsRegion="us-west-2", 614 Tags=[{"Key": "!", "Value": "a"}], 615 ) 616 assert ( 617 "Member must satisfy regular expression pattern:" 618 in ce.value.response["Error"]["Message"] 619 ) 620 assert ce.value.response["Error"]["Code"] == "ValidationException" 621 622 # Put a normal one there: 623 result = client.put_aggregation_authorization( 624 AuthorizedAccountId="012345678910", 625 AuthorizedAwsRegion="us-east-1", 626 Tags=[{"Key": "tag", "Value": "a"}], 627 ) 628 629 assert result["AggregationAuthorization"][ 630 "AggregationAuthorizationArn" 631 ] == "arn:aws:config:us-west-2:{}:aggregation-authorization/012345678910/us-east-1".format( 632 ACCOUNT_ID 633 ) 634 assert result["AggregationAuthorization"]["AuthorizedAccountId"] == "012345678910" 635 assert result["AggregationAuthorization"]["AuthorizedAwsRegion"] == "us-east-1" 636 assert isinstance(result["AggregationAuthorization"]["CreationTime"], datetime) 637 638 creation_date = result["AggregationAuthorization"]["CreationTime"] 639 640 # And again: 641 result = client.put_aggregation_authorization( 642 AuthorizedAccountId="012345678910", AuthorizedAwsRegion="us-east-1" 643 ) 644 assert result["AggregationAuthorization"][ 645 "AggregationAuthorizationArn" 646 ] == "arn:aws:config:us-west-2:{}:aggregation-authorization/012345678910/us-east-1".format( 647 ACCOUNT_ID 648 ) 649 assert result["AggregationAuthorization"]["AuthorizedAccountId"] == "012345678910" 650 assert result["AggregationAuthorization"]["AuthorizedAwsRegion"] == "us-east-1" 651 assert result["AggregationAuthorization"]["CreationTime"] == creation_date 652 653 654@mock_config 655def test_describe_aggregation_authorizations(): 656 client = boto3.client("config", region_name="us-west-2") 657 658 # With no aggregation authorizations: 659 assert not client.describe_aggregation_authorizations()["AggregationAuthorizations"] 660 661 # Make 10 account authorizations: 662 for i in range(0, 10): 663 client.put_aggregation_authorization( 664 AuthorizedAccountId="{}".format(str(i) * 12), 665 AuthorizedAwsRegion="us-west-2", 666 ) 667 668 result = client.describe_aggregation_authorizations() 669 assert len(result["AggregationAuthorizations"]) == 10 670 assert not result.get("NextToken") 671 for i in range(0, 10): 672 assert ( 673 result["AggregationAuthorizations"][i]["AuthorizedAccountId"] == str(i) * 12 674 ) 675 676 # Test Pagination: 677 result = client.describe_aggregation_authorizations(Limit=4) 678 assert len(result["AggregationAuthorizations"]) == 4 679 assert result["NextToken"] == ("4" * 12) + "/us-west-2" 680 assert [ 681 auth["AuthorizedAccountId"] for auth in result["AggregationAuthorizations"] 682 ] == ["{}".format(str(x) * 12) for x in range(0, 4)] 683 684 result = client.describe_aggregation_authorizations( 685 Limit=4, NextToken=("4" * 12) + "/us-west-2" 686 ) 687 assert len(result["AggregationAuthorizations"]) == 4 688 assert result["NextToken"] == ("8" * 12) + "/us-west-2" 689 assert [ 690 auth["AuthorizedAccountId"] for auth in result["AggregationAuthorizations"] 691 ] == ["{}".format(str(x) * 12) for x in range(4, 8)] 692 693 result = client.describe_aggregation_authorizations( 694 Limit=4, NextToken=("8" * 12) + "/us-west-2" 695 ) 696 assert len(result["AggregationAuthorizations"]) == 2 697 assert not result.get("NextToken") 698 assert [ 699 auth["AuthorizedAccountId"] for auth in result["AggregationAuthorizations"] 700 ] == ["{}".format(str(x) * 12) for x in range(8, 10)] 701 702 # Test with an invalid filter: 703 with pytest.raises(ClientError) as ce: 704 client.describe_aggregation_authorizations(NextToken="WRONG") 705 assert "The nextToken provided is invalid" == ce.value.response["Error"]["Message"] 706 assert ce.value.response["Error"]["Code"] == "InvalidNextTokenException" 707 708 709@mock_config 710def test_delete_aggregation_authorization(): 711 client = boto3.client("config", region_name="us-west-2") 712 713 client.put_aggregation_authorization( 714 AuthorizedAccountId="012345678910", AuthorizedAwsRegion="us-west-2" 715 ) 716 717 # Delete it: 718 client.delete_aggregation_authorization( 719 AuthorizedAccountId="012345678910", AuthorizedAwsRegion="us-west-2" 720 ) 721 722 # Verify that none are there: 723 assert not client.describe_aggregation_authorizations()["AggregationAuthorizations"] 724 725 # Try it again -- nothing should happen: 726 client.delete_aggregation_authorization( 727 AuthorizedAccountId="012345678910", AuthorizedAwsRegion="us-west-2" 728 ) 729 730 731@mock_config 732def test_delete_configuration_aggregator(): 733 client = boto3.client("config", region_name="us-west-2") 734 client.put_configuration_aggregator( 735 ConfigurationAggregatorName="testing", 736 AccountAggregationSources=[ 737 {"AccountIds": ["012345678910"], "AllAwsRegions": True} 738 ], 739 ) 740 741 client.delete_configuration_aggregator(ConfigurationAggregatorName="testing") 742 743 # And again to confirm that it's deleted: 744 with pytest.raises(ClientError) as ce: 745 client.delete_configuration_aggregator(ConfigurationAggregatorName="testing") 746 assert ( 747 "The configuration aggregator does not exist." 748 in ce.value.response["Error"]["Message"] 749 ) 750 assert ( 751 ce.value.response["Error"]["Code"] == "NoSuchConfigurationAggregatorException" 752 ) 753 754 755@mock_config 756def test_describe_configurations(): 757 client = boto3.client("config", region_name="us-west-2") 758 759 # Without any configurations: 760 result = client.describe_configuration_recorders() 761 assert not result["ConfigurationRecorders"] 762 763 client.put_configuration_recorder( 764 ConfigurationRecorder={ 765 "name": "testrecorder", 766 "roleARN": "somearn", 767 "recordingGroup": { 768 "allSupported": False, 769 "includeGlobalResourceTypes": False, 770 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 771 }, 772 } 773 ) 774 775 result = client.describe_configuration_recorders()["ConfigurationRecorders"] 776 assert len(result) == 1 777 assert result[0]["name"] == "testrecorder" 778 assert result[0]["roleARN"] == "somearn" 779 assert not result[0]["recordingGroup"]["allSupported"] 780 assert not result[0]["recordingGroup"]["includeGlobalResourceTypes"] 781 assert len(result[0]["recordingGroup"]["resourceTypes"]) == 2 782 assert ( 783 "AWS::EC2::Volume" in result[0]["recordingGroup"]["resourceTypes"] 784 and "AWS::EC2::VPC" in result[0]["recordingGroup"]["resourceTypes"] 785 ) 786 787 # Specify an incorrect name: 788 with pytest.raises(ClientError) as ce: 789 client.describe_configuration_recorders(ConfigurationRecorderNames=["wrong"]) 790 assert ce.value.response["Error"]["Code"] == "NoSuchConfigurationRecorderException" 791 assert "wrong" in ce.value.response["Error"]["Message"] 792 793 # And with both a good and wrong name: 794 with pytest.raises(ClientError) as ce: 795 client.describe_configuration_recorders( 796 ConfigurationRecorderNames=["testrecorder", "wrong"] 797 ) 798 assert ce.value.response["Error"]["Code"] == "NoSuchConfigurationRecorderException" 799 assert "wrong" in ce.value.response["Error"]["Message"] 800 801 802@mock_config 803def test_delivery_channels(): 804 client = boto3.client("config", region_name="us-west-2") 805 806 # Try without a config recorder: 807 with pytest.raises(ClientError) as ce: 808 client.put_delivery_channel(DeliveryChannel={}) 809 assert ( 810 ce.value.response["Error"]["Code"] 811 == "NoAvailableConfigurationRecorderException" 812 ) 813 assert ( 814 ce.value.response["Error"]["Message"] 815 == "Configuration recorder is not available to " 816 "put delivery channel." 817 ) 818 819 # Create a config recorder to continue testing: 820 client.put_configuration_recorder( 821 ConfigurationRecorder={ 822 "name": "testrecorder", 823 "roleARN": "somearn", 824 "recordingGroup": { 825 "allSupported": False, 826 "includeGlobalResourceTypes": False, 827 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 828 }, 829 } 830 ) 831 832 # Try without a name supplied: 833 with pytest.raises(ClientError) as ce: 834 client.put_delivery_channel(DeliveryChannel={}) 835 assert ce.value.response["Error"]["Code"] == "InvalidDeliveryChannelNameException" 836 assert "is not valid, blank string." in ce.value.response["Error"]["Message"] 837 838 # Try with a really long name: 839 with pytest.raises(ClientError) as ce: 840 client.put_delivery_channel(DeliveryChannel={"name": "a" * 257}) 841 assert ce.value.response["Error"]["Code"] == "ValidationException" 842 assert ( 843 "Member must have length less than or equal to 256" 844 in ce.value.response["Error"]["Message"] 845 ) 846 847 # Without specifying a bucket name: 848 with pytest.raises(ClientError) as ce: 849 client.put_delivery_channel(DeliveryChannel={"name": "testchannel"}) 850 assert ce.value.response["Error"]["Code"] == "NoSuchBucketException" 851 assert ( 852 ce.value.response["Error"]["Message"] 853 == "Cannot find a S3 bucket with an empty bucket name." 854 ) 855 856 with pytest.raises(ClientError) as ce: 857 client.put_delivery_channel( 858 DeliveryChannel={"name": "testchannel", "s3BucketName": ""} 859 ) 860 assert ce.value.response["Error"]["Code"] == "NoSuchBucketException" 861 assert ( 862 ce.value.response["Error"]["Message"] 863 == "Cannot find a S3 bucket with an empty bucket name." 864 ) 865 866 # With an empty string for the S3 key prefix: 867 with pytest.raises(ClientError) as ce: 868 client.put_delivery_channel( 869 DeliveryChannel={ 870 "name": "testchannel", 871 "s3BucketName": "somebucket", 872 "s3KeyPrefix": "", 873 } 874 ) 875 assert ce.value.response["Error"]["Code"] == "InvalidS3KeyPrefixException" 876 assert "empty s3 key prefix." in ce.value.response["Error"]["Message"] 877 878 # With an empty string for the SNS ARN: 879 with pytest.raises(ClientError) as ce: 880 client.put_delivery_channel( 881 DeliveryChannel={ 882 "name": "testchannel", 883 "s3BucketName": "somebucket", 884 "snsTopicARN": "", 885 } 886 ) 887 assert ce.value.response["Error"]["Code"] == "InvalidSNSTopicARNException" 888 assert "The sns topic arn" in ce.value.response["Error"]["Message"] 889 890 # With an invalid delivery frequency: 891 with pytest.raises(ClientError) as ce: 892 client.put_delivery_channel( 893 DeliveryChannel={ 894 "name": "testchannel", 895 "s3BucketName": "somebucket", 896 "configSnapshotDeliveryProperties": {"deliveryFrequency": "WRONG"}, 897 } 898 ) 899 assert ce.value.response["Error"]["Code"] == "InvalidDeliveryFrequency" 900 assert "WRONG" in ce.value.response["Error"]["Message"] 901 assert "TwentyFour_Hours" in ce.value.response["Error"]["Message"] 902 903 # Create a proper one: 904 client.put_delivery_channel( 905 DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"} 906 ) 907 result = client.describe_delivery_channels()["DeliveryChannels"] 908 assert len(result) == 1 909 assert len(result[0].keys()) == 2 910 assert result[0]["name"] == "testchannel" 911 assert result[0]["s3BucketName"] == "somebucket" 912 913 # Overwrite it with another proper configuration: 914 client.put_delivery_channel( 915 DeliveryChannel={ 916 "name": "testchannel", 917 "s3BucketName": "somebucket", 918 "snsTopicARN": "sometopicarn", 919 "configSnapshotDeliveryProperties": { 920 "deliveryFrequency": "TwentyFour_Hours" 921 }, 922 } 923 ) 924 result = client.describe_delivery_channels()["DeliveryChannels"] 925 assert len(result) == 1 926 assert len(result[0].keys()) == 4 927 assert result[0]["name"] == "testchannel" 928 assert result[0]["s3BucketName"] == "somebucket" 929 assert result[0]["snsTopicARN"] == "sometopicarn" 930 assert ( 931 result[0]["configSnapshotDeliveryProperties"]["deliveryFrequency"] 932 == "TwentyFour_Hours" 933 ) 934 935 # Can only have 1: 936 with pytest.raises(ClientError) as ce: 937 client.put_delivery_channel( 938 DeliveryChannel={"name": "testchannel2", "s3BucketName": "somebucket"} 939 ) 940 assert ( 941 ce.value.response["Error"]["Code"] 942 == "MaxNumberOfDeliveryChannelsExceededException" 943 ) 944 assert ( 945 "because the maximum number of delivery channels: 1 is reached." 946 in ce.value.response["Error"]["Message"] 947 ) 948 949 950@mock_config 951def test_describe_delivery_channels(): 952 client = boto3.client("config", region_name="us-west-2") 953 client.put_configuration_recorder( 954 ConfigurationRecorder={ 955 "name": "testrecorder", 956 "roleARN": "somearn", 957 "recordingGroup": { 958 "allSupported": False, 959 "includeGlobalResourceTypes": False, 960 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 961 }, 962 } 963 ) 964 965 # Without any channels: 966 result = client.describe_delivery_channels() 967 assert not result["DeliveryChannels"] 968 969 client.put_delivery_channel( 970 DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"} 971 ) 972 result = client.describe_delivery_channels()["DeliveryChannels"] 973 assert len(result) == 1 974 assert len(result[0].keys()) == 2 975 assert result[0]["name"] == "testchannel" 976 assert result[0]["s3BucketName"] == "somebucket" 977 978 # Overwrite it with another proper configuration: 979 client.put_delivery_channel( 980 DeliveryChannel={ 981 "name": "testchannel", 982 "s3BucketName": "somebucket", 983 "snsTopicARN": "sometopicarn", 984 "configSnapshotDeliveryProperties": { 985 "deliveryFrequency": "TwentyFour_Hours" 986 }, 987 } 988 ) 989 result = client.describe_delivery_channels()["DeliveryChannels"] 990 assert len(result) == 1 991 assert len(result[0].keys()) == 4 992 assert result[0]["name"] == "testchannel" 993 assert result[0]["s3BucketName"] == "somebucket" 994 assert result[0]["snsTopicARN"] == "sometopicarn" 995 assert ( 996 result[0]["configSnapshotDeliveryProperties"]["deliveryFrequency"] 997 == "TwentyFour_Hours" 998 ) 999 1000 # Specify an incorrect name: 1001 with pytest.raises(ClientError) as ce: 1002 client.describe_delivery_channels(DeliveryChannelNames=["wrong"]) 1003 assert ce.value.response["Error"]["Code"] == "NoSuchDeliveryChannelException" 1004 assert "wrong" in ce.value.response["Error"]["Message"] 1005 1006 # And with both a good and wrong name: 1007 with pytest.raises(ClientError) as ce: 1008 client.describe_delivery_channels(DeliveryChannelNames=["testchannel", "wrong"]) 1009 assert ce.value.response["Error"]["Code"] == "NoSuchDeliveryChannelException" 1010 assert "wrong" in ce.value.response["Error"]["Message"] 1011 1012 1013@mock_config 1014def test_start_configuration_recorder(): 1015 client = boto3.client("config", region_name="us-west-2") 1016 1017 # Without a config recorder: 1018 with pytest.raises(ClientError) as ce: 1019 client.start_configuration_recorder(ConfigurationRecorderName="testrecorder") 1020 assert ce.value.response["Error"]["Code"] == "NoSuchConfigurationRecorderException" 1021 1022 # Make the config recorder; 1023 client.put_configuration_recorder( 1024 ConfigurationRecorder={ 1025 "name": "testrecorder", 1026 "roleARN": "somearn", 1027 "recordingGroup": { 1028 "allSupported": False, 1029 "includeGlobalResourceTypes": False, 1030 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 1031 }, 1032 } 1033 ) 1034 1035 # Without a delivery channel: 1036 with pytest.raises(ClientError) as ce: 1037 client.start_configuration_recorder(ConfigurationRecorderName="testrecorder") 1038 assert ce.value.response["Error"]["Code"] == "NoAvailableDeliveryChannelException" 1039 1040 # Make the delivery channel: 1041 client.put_delivery_channel( 1042 DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"} 1043 ) 1044 1045 # Start it: 1046 client.start_configuration_recorder(ConfigurationRecorderName="testrecorder") 1047 1048 # Verify it's enabled: 1049 result = client.describe_configuration_recorder_status()[ 1050 "ConfigurationRecordersStatus" 1051 ] 1052 lower_bound = datetime.utcnow() - timedelta(minutes=5) 1053 assert result[0]["recording"] 1054 assert result[0]["lastStatus"] == "PENDING" 1055 assert ( 1056 lower_bound 1057 < result[0]["lastStartTime"].replace(tzinfo=None) 1058 <= datetime.utcnow() 1059 ) 1060 assert ( 1061 lower_bound 1062 < result[0]["lastStatusChangeTime"].replace(tzinfo=None) 1063 <= datetime.utcnow() 1064 ) 1065 1066 1067@mock_config 1068def test_stop_configuration_recorder(): 1069 client = boto3.client("config", region_name="us-west-2") 1070 1071 # Without a config recorder: 1072 with pytest.raises(ClientError) as ce: 1073 client.stop_configuration_recorder(ConfigurationRecorderName="testrecorder") 1074 assert ce.value.response["Error"]["Code"] == "NoSuchConfigurationRecorderException" 1075 1076 # Make the config recorder; 1077 client.put_configuration_recorder( 1078 ConfigurationRecorder={ 1079 "name": "testrecorder", 1080 "roleARN": "somearn", 1081 "recordingGroup": { 1082 "allSupported": False, 1083 "includeGlobalResourceTypes": False, 1084 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 1085 }, 1086 } 1087 ) 1088 1089 # Make the delivery channel for creation: 1090 client.put_delivery_channel( 1091 DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"} 1092 ) 1093 1094 # Start it: 1095 client.start_configuration_recorder(ConfigurationRecorderName="testrecorder") 1096 client.stop_configuration_recorder(ConfigurationRecorderName="testrecorder") 1097 1098 # Verify it's disabled: 1099 result = client.describe_configuration_recorder_status()[ 1100 "ConfigurationRecordersStatus" 1101 ] 1102 lower_bound = datetime.utcnow() - timedelta(minutes=5) 1103 assert not result[0]["recording"] 1104 assert result[0]["lastStatus"] == "PENDING" 1105 assert ( 1106 lower_bound 1107 < result[0]["lastStartTime"].replace(tzinfo=None) 1108 <= datetime.utcnow() 1109 ) 1110 assert ( 1111 lower_bound 1112 < result[0]["lastStopTime"].replace(tzinfo=None) 1113 <= datetime.utcnow() 1114 ) 1115 assert ( 1116 lower_bound 1117 < result[0]["lastStatusChangeTime"].replace(tzinfo=None) 1118 <= datetime.utcnow() 1119 ) 1120 1121 1122@mock_config 1123def test_describe_configuration_recorder_status(): 1124 client = boto3.client("config", region_name="us-west-2") 1125 1126 # Without any: 1127 result = client.describe_configuration_recorder_status() 1128 assert not result["ConfigurationRecordersStatus"] 1129 1130 # Make the config recorder; 1131 client.put_configuration_recorder( 1132 ConfigurationRecorder={ 1133 "name": "testrecorder", 1134 "roleARN": "somearn", 1135 "recordingGroup": { 1136 "allSupported": False, 1137 "includeGlobalResourceTypes": False, 1138 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 1139 }, 1140 } 1141 ) 1142 1143 # Without specifying a config recorder: 1144 result = client.describe_configuration_recorder_status()[ 1145 "ConfigurationRecordersStatus" 1146 ] 1147 assert len(result) == 1 1148 assert result[0]["name"] == "testrecorder" 1149 assert not result[0]["recording"] 1150 1151 # With a proper name: 1152 result = client.describe_configuration_recorder_status( 1153 ConfigurationRecorderNames=["testrecorder"] 1154 )["ConfigurationRecordersStatus"] 1155 assert len(result) == 1 1156 assert result[0]["name"] == "testrecorder" 1157 assert not result[0]["recording"] 1158 1159 # Invalid name: 1160 with pytest.raises(ClientError) as ce: 1161 client.describe_configuration_recorder_status( 1162 ConfigurationRecorderNames=["testrecorder", "wrong"] 1163 ) 1164 assert ce.value.response["Error"]["Code"] == "NoSuchConfigurationRecorderException" 1165 assert "wrong" in ce.value.response["Error"]["Message"] 1166 1167 1168@mock_config 1169def test_delete_configuration_recorder(): 1170 client = boto3.client("config", region_name="us-west-2") 1171 1172 # Make the config recorder; 1173 client.put_configuration_recorder( 1174 ConfigurationRecorder={ 1175 "name": "testrecorder", 1176 "roleARN": "somearn", 1177 "recordingGroup": { 1178 "allSupported": False, 1179 "includeGlobalResourceTypes": False, 1180 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 1181 }, 1182 } 1183 ) 1184 1185 # Delete it: 1186 client.delete_configuration_recorder(ConfigurationRecorderName="testrecorder") 1187 1188 # Try again -- it should be deleted: 1189 with pytest.raises(ClientError) as ce: 1190 client.delete_configuration_recorder(ConfigurationRecorderName="testrecorder") 1191 assert ce.value.response["Error"]["Code"] == "NoSuchConfigurationRecorderException" 1192 1193 1194@mock_config 1195def test_delete_delivery_channel(): 1196 client = boto3.client("config", region_name="us-west-2") 1197 1198 # Need a recorder to test the constraint on recording being enabled: 1199 client.put_configuration_recorder( 1200 ConfigurationRecorder={ 1201 "name": "testrecorder", 1202 "roleARN": "somearn", 1203 "recordingGroup": { 1204 "allSupported": False, 1205 "includeGlobalResourceTypes": False, 1206 "resourceTypes": ["AWS::EC2::Volume", "AWS::EC2::VPC"], 1207 }, 1208 } 1209 ) 1210 client.put_delivery_channel( 1211 DeliveryChannel={"name": "testchannel", "s3BucketName": "somebucket"} 1212 ) 1213 client.start_configuration_recorder(ConfigurationRecorderName="testrecorder") 1214 1215 # With the recorder enabled: 1216 with pytest.raises(ClientError) as ce: 1217 client.delete_delivery_channel(DeliveryChannelName="testchannel") 1218 assert ( 1219 ce.value.response["Error"]["Code"] == "LastDeliveryChannelDeleteFailedException" 1220 ) 1221 assert ( 1222 "because there is a running configuration recorder." 1223 in ce.value.response["Error"]["Message"] 1224 ) 1225 1226 # Stop recording: 1227 client.stop_configuration_recorder(ConfigurationRecorderName="testrecorder") 1228 1229 # Try again: 1230 client.delete_delivery_channel(DeliveryChannelName="testchannel") 1231 1232 # Verify: 1233 with pytest.raises(ClientError) as ce: 1234 client.delete_delivery_channel(DeliveryChannelName="testchannel") 1235 assert ce.value.response["Error"]["Code"] == "NoSuchDeliveryChannelException" 1236 1237 1238@mock_config 1239@mock_s3 1240def test_list_discovered_resource(): 1241 """NOTE: We are only really testing the Config part. For each individual service, please add tests 1242 for that individual service's "list_config_service_resources" function. 1243 """ 1244 client = boto3.client("config", region_name="us-west-2") 1245 1246 # With nothing created yet: 1247 assert not client.list_discovered_resources(resourceType="AWS::S3::Bucket")[ 1248 "resourceIdentifiers" 1249 ] 1250 1251 # Create some S3 buckets: 1252 s3_client = boto3.client("s3", region_name="us-west-2") 1253 for x in range(0, 10): 1254 s3_client.create_bucket( 1255 Bucket="bucket{}".format(x), 1256 CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, 1257 ) 1258 1259 # And with an EU bucket -- this should not show up for the us-west-2 config backend: 1260 eu_client = boto3.client("s3", region_name="eu-west-1") 1261 eu_client.create_bucket( 1262 Bucket="eu-bucket", 1263 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}, 1264 ) 1265 1266 # Now try: 1267 result = client.list_discovered_resources(resourceType="AWS::S3::Bucket") 1268 assert len(result["resourceIdentifiers"]) == 10 1269 for x in range(0, 10): 1270 assert result["resourceIdentifiers"][x] == { 1271 "resourceType": "AWS::S3::Bucket", 1272 "resourceId": "bucket{}".format(x), 1273 "resourceName": "bucket{}".format(x), 1274 } 1275 assert not result.get("nextToken") 1276 1277 result = client.list_discovered_resources( 1278 resourceType="AWS::S3::Bucket", resourceName="eu-bucket" 1279 ) 1280 assert not result["resourceIdentifiers"] 1281 1282 # Test that pagination places a proper nextToken in the response and also that the limit works: 1283 result = client.list_discovered_resources( 1284 resourceType="AWS::S3::Bucket", limit=1, nextToken="bucket1" 1285 ) 1286 assert len(result["resourceIdentifiers"]) == 1 1287 assert result["nextToken"] == "bucket2" 1288 1289 # Try with a resource name: 1290 result = client.list_discovered_resources( 1291 resourceType="AWS::S3::Bucket", limit=1, resourceName="bucket1" 1292 ) 1293 assert len(result["resourceIdentifiers"]) == 1 1294 assert not result.get("nextToken") 1295 1296 # Try with a resource ID: 1297 result = client.list_discovered_resources( 1298 resourceType="AWS::S3::Bucket", limit=1, resourceIds=["bucket1"] 1299 ) 1300 assert len(result["resourceIdentifiers"]) == 1 1301 assert not result.get("nextToken") 1302 1303 # Try with duplicated resource IDs: 1304 result = client.list_discovered_resources( 1305 resourceType="AWS::S3::Bucket", limit=1, resourceIds=["bucket1", "bucket1"] 1306 ) 1307 assert len(result["resourceIdentifiers"]) == 1 1308 assert not result.get("nextToken") 1309 1310 # Test with an invalid resource type: 1311 assert not client.list_discovered_resources( 1312 resourceType="LOL::NOT::A::RESOURCE::TYPE" 1313 )["resourceIdentifiers"] 1314 1315 # Test with an invalid page num > 100: 1316 with pytest.raises(ClientError) as ce: 1317 client.list_discovered_resources(resourceType="AWS::S3::Bucket", limit=101) 1318 assert "101" in ce.value.response["Error"]["Message"] 1319 1320 # Test by supplying both resourceName and also resourceIds: 1321 with pytest.raises(ClientError) as ce: 1322 client.list_discovered_resources( 1323 resourceType="AWS::S3::Bucket", 1324 resourceName="whats", 1325 resourceIds=["up", "doc"], 1326 ) 1327 assert ( 1328 "Both Resource ID and Resource Name cannot be specified in the request" 1329 in ce.value.response["Error"]["Message"] 1330 ) 1331 1332 # More than 20 resourceIds: 1333 resource_ids = ["{}".format(x) for x in range(0, 21)] 1334 with pytest.raises(ClientError) as ce: 1335 client.list_discovered_resources( 1336 resourceType="AWS::S3::Bucket", resourceIds=resource_ids 1337 ) 1338 assert ( 1339 "The specified list had more than 20 resource ID's." 1340 in ce.value.response["Error"]["Message"] 1341 ) 1342 1343 1344@mock_config 1345@mock_s3 1346def test_list_aggregate_discovered_resource(): 1347 """NOTE: We are only really testing the Config part. For each individual service, please add tests 1348 for that individual service's "list_config_service_resources" function. 1349 """ 1350 client = boto3.client("config", region_name="us-west-2") 1351 1352 # Without an aggregator: 1353 with pytest.raises(ClientError) as ce: 1354 client.list_aggregate_discovered_resources( 1355 ConfigurationAggregatorName="lolno", ResourceType="AWS::S3::Bucket" 1356 ) 1357 assert ( 1358 "The configuration aggregator does not exist" 1359 in ce.value.response["Error"]["Message"] 1360 ) 1361 1362 # Create the aggregator: 1363 account_aggregation_source = { 1364 "AccountIds": ["012345678910", "111111111111", "222222222222"], 1365 "AllAwsRegions": True, 1366 } 1367 client.put_configuration_aggregator( 1368 ConfigurationAggregatorName="testing", 1369 AccountAggregationSources=[account_aggregation_source], 1370 ) 1371 1372 # With nothing created yet: 1373 assert not client.list_aggregate_discovered_resources( 1374 ConfigurationAggregatorName="testing", ResourceType="AWS::S3::Bucket" 1375 )["ResourceIdentifiers"] 1376 1377 # Create some S3 buckets: 1378 s3_client = boto3.client("s3", region_name="us-west-2") 1379 for x in range(0, 10): 1380 s3_client.create_bucket( 1381 Bucket="bucket{}".format(x), 1382 CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, 1383 ) 1384 1385 s3_client_eu = boto3.client("s3", region_name="eu-west-1") 1386 for x in range(10, 12): 1387 s3_client_eu.create_bucket( 1388 Bucket="eu-bucket{}".format(x), 1389 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}, 1390 ) 1391 1392 # Now try: 1393 result = client.list_aggregate_discovered_resources( 1394 ConfigurationAggregatorName="testing", ResourceType="AWS::S3::Bucket" 1395 ) 1396 assert len(result["ResourceIdentifiers"]) == 12 1397 for x in range(0, 10): 1398 assert result["ResourceIdentifiers"][x] == { 1399 "SourceAccountId": ACCOUNT_ID, 1400 "ResourceType": "AWS::S3::Bucket", 1401 "ResourceId": "bucket{}".format(x), 1402 "ResourceName": "bucket{}".format(x), 1403 "SourceRegion": "us-west-2", 1404 } 1405 for x in range(11, 12): 1406 assert result["ResourceIdentifiers"][x] == { 1407 "SourceAccountId": ACCOUNT_ID, 1408 "ResourceType": "AWS::S3::Bucket", 1409 "ResourceId": "eu-bucket{}".format(x), 1410 "ResourceName": "eu-bucket{}".format(x), 1411 "SourceRegion": "eu-west-1", 1412 } 1413 1414 assert not result.get("NextToken") 1415 1416 # Test that pagination places a proper nextToken in the response and also that the limit works: 1417 result = client.list_aggregate_discovered_resources( 1418 ConfigurationAggregatorName="testing", 1419 ResourceType="AWS::S3::Bucket", 1420 Limit=1, 1421 NextToken="bucket1", 1422 ) 1423 assert len(result["ResourceIdentifiers"]) == 1 1424 assert result["NextToken"] == "bucket2" 1425 1426 # Try with a resource name: 1427 result = client.list_aggregate_discovered_resources( 1428 ConfigurationAggregatorName="testing", 1429 ResourceType="AWS::S3::Bucket", 1430 Limit=1, 1431 NextToken="bucket1", 1432 Filters={"ResourceName": "bucket1"}, 1433 ) 1434 assert len(result["ResourceIdentifiers"]) == 1 1435 assert not result.get("NextToken") 1436 1437 # Try with a resource ID: 1438 result = client.list_aggregate_discovered_resources( 1439 ConfigurationAggregatorName="testing", 1440 ResourceType="AWS::S3::Bucket", 1441 Limit=1, 1442 NextToken="bucket1", 1443 Filters={"ResourceId": "bucket1"}, 1444 ) 1445 assert len(result["ResourceIdentifiers"]) == 1 1446 assert not result.get("NextToken") 1447 1448 # Try with a region specified: 1449 result = client.list_aggregate_discovered_resources( 1450 ConfigurationAggregatorName="testing", 1451 ResourceType="AWS::S3::Bucket", 1452 Filters={"Region": "eu-west-1"}, 1453 ) 1454 assert len(result["ResourceIdentifiers"]) == 2 1455 assert result["ResourceIdentifiers"][0]["SourceRegion"] == "eu-west-1" 1456 assert not result.get("NextToken") 1457 1458 # Try with both name and id set to the incorrect values: 1459 assert not client.list_aggregate_discovered_resources( 1460 ConfigurationAggregatorName="testing", 1461 ResourceType="AWS::S3::Bucket", 1462 Filters={"ResourceId": "bucket1", "ResourceName": "bucket2"}, 1463 )["ResourceIdentifiers"] 1464 1465 # Test with an invalid resource type: 1466 assert not client.list_aggregate_discovered_resources( 1467 ConfigurationAggregatorName="testing", 1468 ResourceType="LOL::NOT::A::RESOURCE::TYPE", 1469 )["ResourceIdentifiers"] 1470 1471 # Try with correct name but incorrect region: 1472 assert not client.list_aggregate_discovered_resources( 1473 ConfigurationAggregatorName="testing", 1474 ResourceType="AWS::S3::Bucket", 1475 Filters={"ResourceId": "bucket1", "Region": "us-west-1"}, 1476 )["ResourceIdentifiers"] 1477 1478 # Test with an invalid page num > 100: 1479 with pytest.raises(ClientError) as ce: 1480 client.list_aggregate_discovered_resources( 1481 ConfigurationAggregatorName="testing", 1482 ResourceType="AWS::S3::Bucket", 1483 Limit=101, 1484 ) 1485 assert "101" in ce.value.response["Error"]["Message"] 1486 1487 1488@mock_config 1489@mock_s3 1490def test_get_resource_config_history(): 1491 """NOTE: We are only really testing the Config part. For each individual service, please add tests 1492 for that individual service's "get_config_resource" function. 1493 """ 1494 client = boto3.client("config", region_name="us-west-2") 1495 1496 # With an invalid resource type: 1497 with pytest.raises(ClientError) as ce: 1498 client.get_resource_config_history( 1499 resourceType="NOT::A::RESOURCE", resourceId="notcreatedyet" 1500 ) 1501 assert ce.value.response["Error"] == { 1502 "Message": "Resource notcreatedyet of resourceType:NOT::A::RESOURCE is unknown or has " 1503 "not been discovered", 1504 "Code": "ResourceNotDiscoveredException", 1505 } 1506 1507 # With nothing created yet: 1508 with pytest.raises(ClientError) as ce: 1509 client.get_resource_config_history( 1510 resourceType="AWS::S3::Bucket", resourceId="notcreatedyet" 1511 ) 1512 assert ce.value.response["Error"] == { 1513 "Message": "Resource notcreatedyet of resourceType:AWS::S3::Bucket is unknown or has " 1514 "not been discovered", 1515 "Code": "ResourceNotDiscoveredException", 1516 } 1517 1518 # Create an S3 bucket: 1519 s3_client = boto3.client("s3", region_name="us-west-2") 1520 for x in range(0, 10): 1521 s3_client.create_bucket( 1522 Bucket="bucket{}".format(x), 1523 CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, 1524 ) 1525 1526 # Now try: 1527 result = client.get_resource_config_history( 1528 resourceType="AWS::S3::Bucket", resourceId="bucket1" 1529 )["configurationItems"] 1530 assert len(result) == 1 1531 assert result[0]["resourceName"] == result[0]["resourceId"] == "bucket1" 1532 assert result[0]["arn"] == "arn:aws:s3:::bucket1" 1533 1534 # Make a bucket in a different region and verify that it does not show up in the config backend: 1535 s3_client = boto3.client("s3", region_name="eu-west-1") 1536 s3_client.create_bucket( 1537 Bucket="eu-bucket", 1538 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}, 1539 ) 1540 with pytest.raises(ClientError) as ce: 1541 client.get_resource_config_history( 1542 resourceType="AWS::S3::Bucket", resourceId="eu-bucket" 1543 ) 1544 assert ce.value.response["Error"]["Code"] == "ResourceNotDiscoveredException" 1545 1546 1547@mock_config 1548@mock_s3 1549def test_batch_get_resource_config(): 1550 """NOTE: We are only really testing the Config part. For each individual service, please add tests 1551 for that individual service's "get_config_resource" function. 1552 """ 1553 client = boto3.client("config", region_name="us-west-2") 1554 1555 # With more than 100 resourceKeys: 1556 with pytest.raises(ClientError) as ce: 1557 client.batch_get_resource_config( 1558 resourceKeys=[ 1559 {"resourceType": "AWS::S3::Bucket", "resourceId": "someBucket"} 1560 ] 1561 * 101 1562 ) 1563 assert ( 1564 "Member must have length less than or equal to 100" 1565 in ce.value.response["Error"]["Message"] 1566 ) 1567 1568 # With invalid resource types and resources that don't exist: 1569 result = client.batch_get_resource_config( 1570 resourceKeys=[ 1571 {"resourceType": "NOT::A::RESOURCE", "resourceId": "NotAThing"}, 1572 {"resourceType": "AWS::S3::Bucket", "resourceId": "NotAThing"}, 1573 ] 1574 ) 1575 1576 assert not result["baseConfigurationItems"] 1577 assert not result["unprocessedResourceKeys"] 1578 1579 # Create some S3 buckets: 1580 s3_client = boto3.client("s3", region_name="us-west-2") 1581 for x in range(0, 10): 1582 s3_client.create_bucket( 1583 Bucket="bucket{}".format(x), 1584 CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, 1585 ) 1586 1587 # Get them all: 1588 keys = [ 1589 {"resourceType": "AWS::S3::Bucket", "resourceId": "bucket{}".format(x)} 1590 for x in range(0, 10) 1591 ] 1592 result = client.batch_get_resource_config(resourceKeys=keys) 1593 assert len(result["baseConfigurationItems"]) == 10 1594 buckets_missing = ["bucket{}".format(x) for x in range(0, 10)] 1595 for r in result["baseConfigurationItems"]: 1596 buckets_missing.remove(r["resourceName"]) 1597 1598 assert not buckets_missing 1599 1600 # Make a bucket in a different region and verify that it does not show up in the config backend: 1601 s3_client = boto3.client("s3", region_name="eu-west-1") 1602 s3_client.create_bucket( 1603 Bucket="eu-bucket", 1604 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}, 1605 ) 1606 keys = [{"resourceType": "AWS::S3::Bucket", "resourceId": "eu-bucket"}] 1607 result = client.batch_get_resource_config(resourceKeys=keys) 1608 assert not result["baseConfigurationItems"] 1609 1610 1611@mock_config 1612@mock_s3 1613def test_batch_get_aggregate_resource_config(): 1614 """NOTE: We are only really testing the Config part. For each individual service, please add tests 1615 for that individual service's "get_config_resource" function. 1616 """ 1617 from moto.config.models import DEFAULT_ACCOUNT_ID 1618 1619 client = boto3.client("config", region_name="us-west-2") 1620 1621 # Without an aggregator: 1622 bad_ri = { 1623 "SourceAccountId": "000000000000", 1624 "SourceRegion": "not-a-region", 1625 "ResourceType": "NOT::A::RESOURCE", 1626 "ResourceId": "nope", 1627 } 1628 with pytest.raises(ClientError) as ce: 1629 client.batch_get_aggregate_resource_config( 1630 ConfigurationAggregatorName="lolno", ResourceIdentifiers=[bad_ri] 1631 ) 1632 assert ( 1633 "The configuration aggregator does not exist" 1634 in ce.value.response["Error"]["Message"] 1635 ) 1636 1637 # Create the aggregator: 1638 account_aggregation_source = { 1639 "AccountIds": ["012345678910", "111111111111", "222222222222"], 1640 "AllAwsRegions": True, 1641 } 1642 client.put_configuration_aggregator( 1643 ConfigurationAggregatorName="testing", 1644 AccountAggregationSources=[account_aggregation_source], 1645 ) 1646 1647 # With more than 100 items: 1648 with pytest.raises(ClientError) as ce: 1649 client.batch_get_aggregate_resource_config( 1650 ConfigurationAggregatorName="testing", ResourceIdentifiers=[bad_ri] * 101 1651 ) 1652 assert ( 1653 "Member must have length less than or equal to 100" 1654 in ce.value.response["Error"]["Message"] 1655 ) 1656 1657 # Create some S3 buckets: 1658 s3_client = boto3.client("s3", region_name="us-west-2") 1659 for x in range(0, 10): 1660 s3_client.create_bucket( 1661 Bucket="bucket{}".format(x), 1662 CreateBucketConfiguration={"LocationConstraint": "us-west-2"}, 1663 ) 1664 s3_client.put_bucket_tagging( 1665 Bucket="bucket{}".format(x), 1666 Tagging={"TagSet": [{"Key": "Some", "Value": "Tag"}]}, 1667 ) 1668 1669 s3_client_eu = boto3.client("s3", region_name="eu-west-1") 1670 for x in range(10, 12): 1671 s3_client_eu.create_bucket( 1672 Bucket="eu-bucket{}".format(x), 1673 CreateBucketConfiguration={"LocationConstraint": "eu-west-1"}, 1674 ) 1675 s3_client.put_bucket_tagging( 1676 Bucket="eu-bucket{}".format(x), 1677 Tagging={"TagSet": [{"Key": "Some", "Value": "Tag"}]}, 1678 ) 1679 1680 # Now try with resources that exist and ones that don't: 1681 identifiers = [ 1682 { 1683 "SourceAccountId": DEFAULT_ACCOUNT_ID, 1684 "SourceRegion": "us-west-2", 1685 "ResourceType": "AWS::S3::Bucket", 1686 "ResourceId": "bucket{}".format(x), 1687 } 1688 for x in range(0, 10) 1689 ] 1690 identifiers += [ 1691 { 1692 "SourceAccountId": DEFAULT_ACCOUNT_ID, 1693 "SourceRegion": "eu-west-1", 1694 "ResourceType": "AWS::S3::Bucket", 1695 "ResourceId": "eu-bucket{}".format(x), 1696 } 1697 for x in range(10, 12) 1698 ] 1699 identifiers += [bad_ri] 1700 1701 result = client.batch_get_aggregate_resource_config( 1702 ConfigurationAggregatorName="testing", ResourceIdentifiers=identifiers 1703 ) 1704 assert len(result["UnprocessedResourceIdentifiers"]) == 1 1705 assert result["UnprocessedResourceIdentifiers"][0] == bad_ri 1706 1707 # Verify all the buckets are there: 1708 assert len(result["BaseConfigurationItems"]) == 12 1709 missing_buckets = ["bucket{}".format(x) for x in range(0, 10)] + [ 1710 "eu-bucket{}".format(x) for x in range(10, 12) 1711 ] 1712 1713 for r in result["BaseConfigurationItems"]: 1714 missing_buckets.remove(r["resourceName"]) 1715 1716 assert not missing_buckets 1717 1718 # Verify that 'tags' is not in the result set: 1719 for b in result["BaseConfigurationItems"]: 1720 assert not b.get("tags") 1721 assert json.loads( 1722 b["supplementaryConfiguration"]["BucketTaggingConfiguration"] 1723 ) == {"tagSets": [{"tags": {"Some": "Tag"}}]} 1724 1725 # Verify that if the resource name and ID are correct that things are good: 1726 identifiers = [ 1727 { 1728 "SourceAccountId": DEFAULT_ACCOUNT_ID, 1729 "SourceRegion": "us-west-2", 1730 "ResourceType": "AWS::S3::Bucket", 1731 "ResourceId": "bucket1", 1732 "ResourceName": "bucket1", 1733 } 1734 ] 1735 result = client.batch_get_aggregate_resource_config( 1736 ConfigurationAggregatorName="testing", ResourceIdentifiers=identifiers 1737 ) 1738 assert not result["UnprocessedResourceIdentifiers"] 1739 assert ( 1740 len(result["BaseConfigurationItems"]) == 1 1741 and result["BaseConfigurationItems"][0]["resourceName"] == "bucket1" 1742 ) 1743 1744 # Verify that if the resource name and ID mismatch that we don't get a result: 1745 identifiers = [ 1746 { 1747 "SourceAccountId": DEFAULT_ACCOUNT_ID, 1748 "SourceRegion": "us-west-2", 1749 "ResourceType": "AWS::S3::Bucket", 1750 "ResourceId": "bucket1", 1751 "ResourceName": "bucket2", 1752 } 1753 ] 1754 result = client.batch_get_aggregate_resource_config( 1755 ConfigurationAggregatorName="testing", ResourceIdentifiers=identifiers 1756 ) 1757 assert not result["BaseConfigurationItems"] 1758 assert len(result["UnprocessedResourceIdentifiers"]) == 1 1759 assert ( 1760 len(result["UnprocessedResourceIdentifiers"]) == 1 1761 and result["UnprocessedResourceIdentifiers"][0]["ResourceName"] == "bucket2" 1762 ) 1763 1764 # Verify that if the region is incorrect that we don't get a result: 1765 identifiers = [ 1766 { 1767 "SourceAccountId": DEFAULT_ACCOUNT_ID, 1768 "SourceRegion": "eu-west-1", 1769 "ResourceType": "AWS::S3::Bucket", 1770 "ResourceId": "bucket1", 1771 } 1772 ] 1773 result = client.batch_get_aggregate_resource_config( 1774 ConfigurationAggregatorName="testing", ResourceIdentifiers=identifiers 1775 ) 1776 assert not result["BaseConfigurationItems"] 1777 assert len(result["UnprocessedResourceIdentifiers"]) == 1 1778 assert ( 1779 len(result["UnprocessedResourceIdentifiers"]) == 1 1780 and result["UnprocessedResourceIdentifiers"][0]["SourceRegion"] == "eu-west-1" 1781 ) 1782 1783 1784@mock_config 1785def test_put_evaluations(): 1786 client = boto3.client("config", region_name="us-west-2") 1787 1788 # Try without Evaluations supplied: 1789 with pytest.raises(ClientError) as ce: 1790 client.put_evaluations(Evaluations=[], ResultToken="test", TestMode=True) 1791 assert ce.value.response["Error"]["Code"] == "InvalidParameterValueException" 1792 assert ( 1793 "The Evaluations object in your request cannot be null" 1794 in ce.value.response["Error"]["Message"] 1795 ) 1796 1797 # Try without a ResultToken supplied: 1798 with pytest.raises(ClientError) as ce: 1799 client.put_evaluations( 1800 Evaluations=[ 1801 { 1802 "ComplianceResourceType": "AWS::ApiGateway::RestApi", 1803 "ComplianceResourceId": "test-api", 1804 "ComplianceType": "INSUFFICIENT_DATA", 1805 "OrderingTimestamp": datetime(2015, 1, 1), 1806 } 1807 ], 1808 ResultToken="", 1809 TestMode=True, 1810 ) 1811 assert ce.value.response["Error"]["Code"] == "InvalidResultTokenException" 1812 1813 if os.environ.get("TEST_SERVER_MODE", "false").lower() == "true": 1814 raise SkipTest("Does not work in server mode due to error in Workzeug") 1815 else: 1816 # Try without TestMode supplied: 1817 with pytest.raises(NotImplementedError): 1818 client.put_evaluations( 1819 Evaluations=[ 1820 { 1821 "ComplianceResourceType": "AWS::ApiGateway::RestApi", 1822 "ComplianceResourceId": "test-api", 1823 "ComplianceType": "INSUFFICIENT_DATA", 1824 "OrderingTimestamp": datetime(2015, 1, 1), 1825 } 1826 ], 1827 ResultToken="test", 1828 ) 1829 1830 # Now with proper params: 1831 response = client.put_evaluations( 1832 Evaluations=[ 1833 { 1834 "ComplianceResourceType": "AWS::ApiGateway::RestApi", 1835 "ComplianceResourceId": "test-api", 1836 "ComplianceType": "INSUFFICIENT_DATA", 1837 "OrderingTimestamp": datetime(2015, 1, 1), 1838 } 1839 ], 1840 TestMode=True, 1841 ResultToken="test", 1842 ) 1843 1844 # this is hard to match against, so remove it 1845 response["ResponseMetadata"].pop("HTTPHeaders", None) 1846 response["ResponseMetadata"].pop("RetryAttempts", None) 1847 response.should.equal( 1848 {"FailedEvaluations": [], "ResponseMetadata": {"HTTPStatusCode": 200,},} 1849 ) 1850 1851 1852@mock_config 1853def test_put_organization_conformance_pack(): 1854 # given 1855 client = boto3.client("config", region_name="us-east-1") 1856 1857 # when 1858 response = client.put_organization_conformance_pack( 1859 DeliveryS3Bucket="awsconfigconforms-test-bucket", 1860 OrganizationConformancePackName="test-pack", 1861 TemplateS3Uri="s3://test-bucket/test-pack.yaml", 1862 ) 1863 1864 # then 1865 arn = response["OrganizationConformancePackArn"] 1866 arn.should.match( 1867 r"arn:aws:config:us-east-1:\d{12}:organization-conformance-pack/test-pack-\w{8}" 1868 ) 1869 1870 # putting an organization conformance pack with the same name should result in an update 1871 # when 1872 response = client.put_organization_conformance_pack( 1873 DeliveryS3Bucket="awsconfigconforms-test-bucket", 1874 OrganizationConformancePackName="test-pack", 1875 TemplateS3Uri="s3://test-bucket/test-pack-2.yaml", 1876 ) 1877 1878 # then 1879 response["OrganizationConformancePackArn"].should.equal(arn) 1880 1881 1882@mock_config 1883def test_put_organization_conformance_pack_errors(): 1884 # given 1885 client = boto3.client("config", region_name="us-east-1") 1886 1887 # when 1888 with pytest.raises(ClientError) as e: 1889 client.put_organization_conformance_pack( 1890 DeliveryS3Bucket="awsconfigconforms-test-bucket", 1891 OrganizationConformancePackName="test-pack", 1892 ) 1893 1894 # then 1895 ex = e.value 1896 ex.operation_name.should.equal("PutOrganizationConformancePack") 1897 ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) 1898 ex.response["Error"]["Code"].should.contain("ValidationException") 1899 ex.response["Error"]["Message"].should.equal("Template body is invalid") 1900 1901 # when 1902 with pytest.raises(ClientError) as e: 1903 client.put_organization_conformance_pack( 1904 DeliveryS3Bucket="awsconfigconforms-test-bucket", 1905 OrganizationConformancePackName="test-pack", 1906 TemplateS3Uri="invalid-s3-uri", 1907 ) 1908 1909 # then 1910 ex = e.value 1911 ex.operation_name.should.equal("PutOrganizationConformancePack") 1912 ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) 1913 ex.response["Error"]["Code"].should.contain("ValidationException") 1914 ex.response["Error"]["Message"].should.equal( 1915 "1 validation error detected: " 1916 "Value 'invalid-s3-uri' at 'templateS3Uri' failed to satisfy constraint: " 1917 "Member must satisfy regular expression pattern: " 1918 "s3://.*" 1919 ) 1920 1921 1922@mock_config 1923def test_describe_organization_conformance_packs(): 1924 # given 1925 client = boto3.client("config", region_name="us-east-1") 1926 arn = client.put_organization_conformance_pack( 1927 DeliveryS3Bucket="awsconfigconforms-test-bucket", 1928 OrganizationConformancePackName="test-pack", 1929 TemplateS3Uri="s3://test-bucket/test-pack.yaml", 1930 )["OrganizationConformancePackArn"] 1931 1932 # when 1933 response = client.describe_organization_conformance_packs( 1934 OrganizationConformancePackNames=["test-pack"] 1935 ) 1936 1937 # then 1938 response["OrganizationConformancePacks"].should.have.length_of(1) 1939 pack = response["OrganizationConformancePacks"][0] 1940 pack["OrganizationConformancePackName"].should.equal("test-pack") 1941 pack["OrganizationConformancePackArn"].should.equal(arn) 1942 pack["DeliveryS3Bucket"].should.equal("awsconfigconforms-test-bucket") 1943 pack["ConformancePackInputParameters"].should.have.length_of(0) 1944 pack["ExcludedAccounts"].should.have.length_of(0) 1945 pack["LastUpdateTime"].should.be.a("datetime.datetime") 1946 1947 1948@mock_config 1949def test_describe_organization_conformance_packs_errors(): 1950 # given 1951 client = boto3.client("config", region_name="us-east-1") 1952 1953 # when 1954 with pytest.raises(ClientError) as e: 1955 client.describe_organization_conformance_packs( 1956 OrganizationConformancePackNames=["not-existing"] 1957 ) 1958 1959 # then 1960 ex = e.value 1961 ex.operation_name.should.equal("DescribeOrganizationConformancePacks") 1962 ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) 1963 ex.response["Error"]["Code"].should.contain( 1964 "NoSuchOrganizationConformancePackException" 1965 ) 1966 ex.response["Error"]["Message"].should.equal( 1967 "One or more organization conformance packs with specified names are not present. " 1968 "Ensure your names are correct and try your request again later." 1969 ) 1970 1971 1972@mock_config 1973def test_describe_organization_conformance_pack_statuses(): 1974 # given 1975 client = boto3.client("config", region_name="us-east-1") 1976 client.put_organization_conformance_pack( 1977 DeliveryS3Bucket="awsconfigconforms-test-bucket", 1978 OrganizationConformancePackName="test-pack", 1979 TemplateS3Uri="s3://test-bucket/test-pack.yaml", 1980 ) 1981 1982 # when 1983 response = client.describe_organization_conformance_pack_statuses( 1984 OrganizationConformancePackNames=["test-pack"] 1985 ) 1986 1987 # then 1988 response["OrganizationConformancePackStatuses"].should.have.length_of(1) 1989 status = response["OrganizationConformancePackStatuses"][0] 1990 status["OrganizationConformancePackName"].should.equal("test-pack") 1991 status["Status"].should.equal("CREATE_SUCCESSFUL") 1992 update_time = status["LastUpdateTime"] 1993 update_time.should.be.a("datetime.datetime") 1994 1995 # when 1996 response = client.describe_organization_conformance_pack_statuses() 1997 1998 # then 1999 response["OrganizationConformancePackStatuses"].should.have.length_of(1) 2000 status = response["OrganizationConformancePackStatuses"][0] 2001 status["OrganizationConformancePackName"].should.equal("test-pack") 2002 status["Status"].should.equal("CREATE_SUCCESSFUL") 2003 status["LastUpdateTime"].should.equal(update_time) 2004 2005 # when 2006 time.sleep(1) 2007 client.put_organization_conformance_pack( 2008 DeliveryS3Bucket="awsconfigconforms-test-bucket", 2009 OrganizationConformancePackName="test-pack", 2010 TemplateS3Uri="s3://test-bucket/test-pack-2.yaml", 2011 ) 2012 2013 # then 2014 response = client.describe_organization_conformance_pack_statuses( 2015 OrganizationConformancePackNames=["test-pack"] 2016 ) 2017 response["OrganizationConformancePackStatuses"].should.have.length_of(1) 2018 status = response["OrganizationConformancePackStatuses"][0] 2019 status["OrganizationConformancePackName"].should.equal("test-pack") 2020 status["Status"].should.equal("UPDATE_SUCCESSFUL") 2021 status["LastUpdateTime"].should.be.greater_than(update_time) 2022 2023 2024@mock_config 2025def test_describe_organization_conformance_pack_statuses_errors(): 2026 # given 2027 client = boto3.client("config", region_name="us-east-1") 2028 2029 # when 2030 with pytest.raises(ClientError) as e: 2031 client.describe_organization_conformance_pack_statuses( 2032 OrganizationConformancePackNames=["not-existing"] 2033 ) 2034 2035 # then 2036 ex = e.value 2037 ex.operation_name.should.equal("DescribeOrganizationConformancePackStatuses") 2038 ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) 2039 ex.response["Error"]["Code"].should.contain( 2040 "NoSuchOrganizationConformancePackException" 2041 ) 2042 ex.response["Error"]["Message"].should.equal( 2043 "One or more organization conformance packs with specified names are not present. " 2044 "Ensure your names are correct and try your request again later." 2045 ) 2046 2047 2048@mock_config 2049def test_get_organization_conformance_pack_detailed_status(): 2050 # given 2051 client = boto3.client("config", region_name="us-east-1") 2052 arn = client.put_organization_conformance_pack( 2053 DeliveryS3Bucket="awsconfigconforms-test-bucket", 2054 OrganizationConformancePackName="test-pack", 2055 TemplateS3Uri="s3://test-bucket/test-pack.yaml", 2056 )["OrganizationConformancePackArn"] 2057 2058 # when 2059 response = client.get_organization_conformance_pack_detailed_status( 2060 OrganizationConformancePackName="test-pack" 2061 ) 2062 2063 # then 2064 response["OrganizationConformancePackDetailedStatuses"].should.have.length_of(1) 2065 status = response["OrganizationConformancePackDetailedStatuses"][0] 2066 status["AccountId"].should.equal(ACCOUNT_ID) 2067 status["ConformancePackName"].should.equal( 2068 "OrgConformsPack-{}".format(arn[arn.rfind("/") + 1 :]) 2069 ) 2070 status["Status"].should.equal("CREATE_SUCCESSFUL") 2071 update_time = status["LastUpdateTime"] 2072 update_time.should.be.a("datetime.datetime") 2073 2074 # when 2075 time.sleep(1) 2076 client.put_organization_conformance_pack( 2077 DeliveryS3Bucket="awsconfigconforms-test-bucket", 2078 OrganizationConformancePackName="test-pack", 2079 TemplateS3Uri="s3://test-bucket/test-pack-2.yaml", 2080 ) 2081 2082 # then 2083 response = client.get_organization_conformance_pack_detailed_status( 2084 OrganizationConformancePackName="test-pack" 2085 ) 2086 response["OrganizationConformancePackDetailedStatuses"].should.have.length_of(1) 2087 status = response["OrganizationConformancePackDetailedStatuses"][0] 2088 status["AccountId"].should.equal(ACCOUNT_ID) 2089 status["ConformancePackName"].should.equal( 2090 "OrgConformsPack-{}".format(arn[arn.rfind("/") + 1 :]) 2091 ) 2092 status["Status"].should.equal("UPDATE_SUCCESSFUL") 2093 status["LastUpdateTime"].should.be.greater_than(update_time) 2094 2095 2096@mock_config 2097def test_get_organization_conformance_pack_detailed_status_errors(): 2098 # given 2099 client = boto3.client("config", region_name="us-east-1") 2100 2101 # when 2102 with pytest.raises(ClientError) as e: 2103 client.get_organization_conformance_pack_detailed_status( 2104 OrganizationConformancePackName="not-existing" 2105 ) 2106 2107 # then 2108 ex = e.value 2109 ex.operation_name.should.equal("GetOrganizationConformancePackDetailedStatus") 2110 ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) 2111 ex.response["Error"]["Code"].should.contain( 2112 "NoSuchOrganizationConformancePackException" 2113 ) 2114 ex.response["Error"]["Message"].should.equal( 2115 "One or more organization conformance packs with specified names are not present. " 2116 "Ensure your names are correct and try your request again later." 2117 ) 2118 2119 2120@mock_config 2121def test_delete_organization_conformance_pack(): 2122 # given 2123 client = boto3.client("config", region_name="us-east-1") 2124 client.put_organization_conformance_pack( 2125 DeliveryS3Bucket="awsconfigconforms-test-bucket", 2126 OrganizationConformancePackName="test-pack", 2127 TemplateS3Uri="s3://test-bucket/test-pack.yaml", 2128 ) 2129 2130 # when 2131 client.delete_organization_conformance_pack( 2132 OrganizationConformancePackName="test-pack" 2133 ) 2134 2135 # then 2136 response = client.describe_organization_conformance_pack_statuses() 2137 response["OrganizationConformancePackStatuses"].should.have.length_of(0) 2138 2139 2140@mock_config 2141def test_delete_organization_conformance_pack_errors(): 2142 # given 2143 client = boto3.client("config", region_name="us-east-1") 2144 2145 # when 2146 with pytest.raises(ClientError) as e: 2147 client.delete_organization_conformance_pack( 2148 OrganizationConformancePackName="not-existing" 2149 ) 2150 2151 # then 2152 ex = e.value 2153 ex.operation_name.should.equal("DeleteOrganizationConformancePack") 2154 ex.response["ResponseMetadata"]["HTTPStatusCode"].should.equal(400) 2155 ex.response["Error"]["Code"].should.contain( 2156 "NoSuchOrganizationConformancePackException" 2157 ) 2158 ex.response["Error"]["Message"].should.equal( 2159 "Could not find an OrganizationConformancePack for given request with resourceName not-existing" 2160 ) 2161