1import logging 2import random 3import string 4from copy import deepcopy 5 6import salt.loader 7import salt.modules.boto_s3_bucket as boto_s3_bucket 8from salt.utils.versions import LooseVersion 9from tests.support.mixins import LoaderModuleMockMixin 10from tests.support.mock import MagicMock, patch 11from tests.support.unit import TestCase, skipIf 12 13# pylint: disable=import-error,no-name-in-module,unused-import 14try: 15 import boto 16 import boto3 17 from botocore.exceptions import ClientError 18 19 HAS_BOTO = True 20except ImportError: 21 HAS_BOTO = False 22 23# pylint: enable=import-error,no-name-in-module,unused-import 24 25# the boto_s3_bucket module relies on the connect_to_region() method 26# which was added in boto 2.8.0 27# https://github.com/boto/boto/commit/33ac26b416fbb48a60602542b4ce15dcc7029f12 28required_boto3_version = "1.2.1" 29 30log = logging.getLogger(__name__) 31 32 33def _has_required_boto(): 34 """ 35 Returns True/False boolean depending on if Boto is installed and correct 36 version. 37 """ 38 if not HAS_BOTO: 39 return False 40 elif LooseVersion(boto3.__version__) < LooseVersion(required_boto3_version): 41 return False 42 else: 43 return True 44 45 46if _has_required_boto(): 47 region = "us-east-1" 48 access_key = "GKTADJGHEIQSXMKKRBJ08H" 49 secret_key = "askdjghsdfjkghWupUjasdflkdfklgjsdfjajkghs" 50 conn_parameters = { 51 "region": region, 52 "key": access_key, 53 "keyid": secret_key, 54 "profile": {}, 55 } 56 error_message = ( 57 "An error occurred (101) when calling the {0} operation: Test-defined error" 58 ) 59 e404_error = ClientError( 60 {"Error": {"Code": "404", "Message": "Test-defined error"}}, "msg" 61 ) 62 not_found_error = ClientError( 63 {"Error": {"Code": "NoSuchBucket", "Message": "Test-defined error"}}, "msg" 64 ) 65 error_content = {"Error": {"Code": 101, "Message": "Test-defined error"}} 66 create_ret = { 67 "Location": "nowhere", 68 } 69 list_ret = { 70 "Buckets": [{"Name": "mybucket", "CreationDate": None}], 71 "Owner": {"DisplayName": "testuser", "ID": "12341234123"}, 72 "ResponseMetadata": {"Key": "Value"}, 73 } 74 config_ret = { 75 "get_bucket_acl": { 76 "Grants": [ 77 { 78 "Grantee": { 79 "DisplayName": "testowner", 80 "ID": "sdfghjklqwertyuiopzxcvbnm", 81 }, 82 "Permission": "FULL_CONTROL", 83 }, 84 { 85 "Grantee": { 86 "URI": "http://acs.amazonaws.com/groups/global/AllUsers" 87 }, 88 "Permission": "READ", 89 }, 90 ], 91 "Owner": {"DisplayName": "testowner", "ID": "sdfghjklqwertyuiopzxcvbnm"}, 92 }, 93 "get_bucket_cors": { 94 "CORSRules": [{"AllowedMethods": ["GET"], "AllowedOrigins": ["*"]}] 95 }, 96 "get_bucket_lifecycle_configuration": { 97 "Rules": [ 98 { 99 "Expiration": {"Days": 1}, 100 "Prefix": "prefix", 101 "Status": "Enabled", 102 "ID": "asdfghjklpoiuytrewq", 103 } 104 ] 105 }, 106 "get_bucket_location": {"LocationConstraint": "EU"}, 107 "get_bucket_logging": { 108 "LoggingEnabled": {"TargetBucket": "my-bucket", "TargetPrefix": "prefix"} 109 }, 110 "get_bucket_notification_configuration": { 111 "LambdaFunctionConfigurations": [ 112 { 113 "LambdaFunctionArn": ( 114 "arn:aws:lambda:us-east-1:111111222222:function:my-function" 115 ), 116 "Id": "zxcvbnmlkjhgfdsa", 117 "Events": ["s3:ObjectCreated:*"], 118 "Filter": { 119 "Key": {"FilterRules": [{"Name": "prefix", "Value": "string"}]} 120 }, 121 } 122 ] 123 }, 124 "get_bucket_policy": { 125 "Policy": '{"Version":"2012-10-17","Statement":[{"Sid":"","Effect":"Allow","Principal":{"AWS":"arn:aws:iam::111111222222:root"},"Action":"s3:PutObject","Resource":"arn:aws:s3:::my-bucket/*"}]}' 126 }, 127 "get_bucket_replication": { 128 "ReplicationConfiguration": { 129 "Role": "arn:aws:iam::11111222222:my-role", 130 "Rules": [ 131 { 132 "ID": "r1", 133 "Prefix": "prefix", 134 "Status": "Enabled", 135 "Destination": {"Bucket": "arn:aws:s3:::my-bucket"}, 136 } 137 ], 138 } 139 }, 140 "get_bucket_request_payment": {"Payer": "Requester"}, 141 "get_bucket_tagging": { 142 "TagSet": [{"Key": "c", "Value": "d"}, {"Key": "a", "Value": "b"}] 143 }, 144 "get_bucket_versioning": {"Status": "Enabled"}, 145 "get_bucket_website": { 146 "ErrorDocument": {"Key": "error.html"}, 147 "IndexDocument": {"Suffix": "index.html"}, 148 }, 149 } 150 151 152@skipIf(HAS_BOTO is False, "The boto module must be installed.") 153@skipIf( 154 _has_required_boto() is False, 155 "The boto3 module must be greater than or equal to version {}".format( 156 required_boto3_version 157 ), 158) 159class BotoS3BucketTestCaseBase(TestCase, LoaderModuleMockMixin): 160 conn = None 161 162 def setup_loader_modules(self): 163 self.opts = opts = salt.config.DEFAULT_MINION_OPTS.copy() 164 utils = salt.loader.utils( 165 opts, whitelist=["boto3", "args", "systemd", "path", "platform"], context={} 166 ) 167 return {boto_s3_bucket: {"__utils__": utils}} 168 169 def setUp(self): 170 super().setUp() 171 boto_s3_bucket.__init__(self.opts) 172 del self.opts 173 # Set up MagicMock to replace the boto3 session 174 # connections keep getting cached from prior tests, can't find the 175 # correct context object to clear it. So randomize the cache key, to prevent any 176 # cache hits 177 conn_parameters["key"] = "".join( 178 random.choice(string.ascii_lowercase + string.digits) for _ in range(50) 179 ) 180 181 self.patcher = patch("boto3.session.Session") 182 self.addCleanup(self.patcher.stop) 183 self.addCleanup(delattr, self, "patcher") 184 mock_session = self.patcher.start() 185 186 session_instance = mock_session.return_value 187 self.conn = MagicMock() 188 self.addCleanup(delattr, self, "conn") 189 session_instance.client.return_value = self.conn 190 191 192class BotoS3BucketTestCaseMixin: 193 pass 194 195 196class BotoS3BucketTestCase(BotoS3BucketTestCaseBase, BotoS3BucketTestCaseMixin): 197 """ 198 TestCase for salt.modules.boto_s3_bucket module 199 """ 200 201 def test_that_when_checking_if_a_bucket_exists_and_a_bucket_exists_the_bucket_exists_method_returns_true( 202 self, 203 ): 204 """ 205 Tests checking s3 bucket existence when the s3 bucket already exists 206 """ 207 self.conn.head_bucket.return_value = None 208 result = boto_s3_bucket.exists(Bucket="mybucket", **conn_parameters) 209 210 self.assertTrue(result["exists"]) 211 212 def test_that_when_checking_if_a_bucket_exists_and_a_bucket_does_not_exist_the_bucket_exists_method_returns_false( 213 self, 214 ): 215 """ 216 Tests checking s3 bucket existence when the s3 bucket does not exist 217 """ 218 self.conn.head_bucket.side_effect = e404_error 219 result = boto_s3_bucket.exists(Bucket="mybucket", **conn_parameters) 220 221 self.assertFalse(result["exists"]) 222 223 def test_that_when_checking_if_a_bucket_exists_and_boto3_returns_an_error_the_bucket_exists_method_returns_error( 224 self, 225 ): 226 """ 227 Tests checking s3 bucket existence when boto returns an error 228 """ 229 self.conn.head_bucket.side_effect = ClientError(error_content, "head_bucket") 230 result = boto_s3_bucket.exists(Bucket="mybucket", **conn_parameters) 231 232 self.assertEqual( 233 result.get("error", {}).get("message"), error_message.format("head_bucket") 234 ) 235 236 def test_that_when_creating_a_bucket_succeeds_the_create_bucket_method_returns_true( 237 self, 238 ): 239 """ 240 tests True bucket created. 241 """ 242 self.conn.create_bucket.return_value = create_ret 243 result = boto_s3_bucket.create( 244 Bucket="mybucket", LocationConstraint="nowhere", **conn_parameters 245 ) 246 247 self.assertTrue(result["created"]) 248 249 def test_that_when_creating_a_bucket_fails_the_create_bucket_method_returns_error( 250 self, 251 ): 252 """ 253 tests False bucket not created. 254 """ 255 self.conn.create_bucket.side_effect = ClientError( 256 error_content, "create_bucket" 257 ) 258 result = boto_s3_bucket.create( 259 Bucket="mybucket", LocationConstraint="nowhere", **conn_parameters 260 ) 261 self.assertEqual( 262 result.get("error", {}).get("message"), 263 error_message.format("create_bucket"), 264 ) 265 266 def test_that_when_deleting_a_bucket_succeeds_the_delete_bucket_method_returns_true( 267 self, 268 ): 269 """ 270 tests True bucket deleted. 271 """ 272 result = boto_s3_bucket.delete(Bucket="mybucket", **conn_parameters) 273 274 self.assertTrue(result["deleted"]) 275 276 def test_that_when_deleting_a_bucket_fails_the_delete_bucket_method_returns_false( 277 self, 278 ): 279 """ 280 tests False bucket not deleted. 281 """ 282 self.conn.delete_bucket.side_effect = ClientError( 283 error_content, "delete_bucket" 284 ) 285 result = boto_s3_bucket.delete(Bucket="mybucket", **conn_parameters) 286 self.assertFalse(result["deleted"]) 287 288 def test_that_when_describing_bucket_it_returns_the_dict_of_properties_returns_true( 289 self, 290 ): 291 """ 292 Tests describing parameters if bucket exists 293 """ 294 for key, value in config_ret.items(): 295 getattr(self.conn, key).return_value = deepcopy(value) 296 297 result = boto_s3_bucket.describe(Bucket="mybucket", **conn_parameters) 298 299 self.assertTrue(result["bucket"]) 300 301 def test_that_when_describing_bucket_it_returns_the_dict_of_properties_returns_false( 302 self, 303 ): 304 """ 305 Tests describing parameters if bucket does not exist 306 """ 307 self.conn.get_bucket_acl.side_effect = not_found_error 308 result = boto_s3_bucket.describe(Bucket="mybucket", **conn_parameters) 309 310 self.assertFalse(result["bucket"]) 311 312 def test_that_when_describing_bucket_on_client_error_it_returns_error(self): 313 """ 314 Tests describing parameters failure 315 """ 316 self.conn.get_bucket_acl.side_effect = ClientError( 317 error_content, "get_bucket_acl" 318 ) 319 result = boto_s3_bucket.describe(Bucket="mybucket", **conn_parameters) 320 self.assertTrue("error" in result) 321 322 def test_that_when_listing_buckets_succeeds_the_list_buckets_method_returns_true( 323 self, 324 ): 325 """ 326 tests True buckets listed. 327 """ 328 self.conn.list_buckets.return_value = deepcopy(list_ret) 329 result = boto_s3_bucket.list(**conn_parameters) 330 331 self.assertTrue(result["Buckets"]) 332 333 def test_that_when_listing_bucket_fails_the_list_bucket_method_returns_false(self): 334 """ 335 tests False no bucket listed. 336 """ 337 ret = deepcopy(list_ret) 338 log.info(ret) 339 ret["Buckets"] = list() 340 self.conn.list_buckets.return_value = ret 341 result = boto_s3_bucket.list(**conn_parameters) 342 self.assertFalse(result["Buckets"]) 343 344 def test_that_when_listing_bucket_fails_the_list_bucket_method_returns_error(self): 345 """ 346 tests False bucket error. 347 """ 348 self.conn.list_buckets.side_effect = ClientError(error_content, "list_buckets") 349 result = boto_s3_bucket.list(**conn_parameters) 350 self.assertEqual( 351 result.get("error", {}).get("message"), error_message.format("list_buckets") 352 ) 353 354 def test_that_when_putting_acl_succeeds_the_put_acl_method_returns_true(self): 355 """ 356 tests True bucket updated. 357 """ 358 result = boto_s3_bucket.put_acl(Bucket="mybucket", **conn_parameters) 359 360 self.assertTrue(result["updated"]) 361 362 def test_that_when_putting_acl_fails_the_put_acl_method_returns_error(self): 363 """ 364 tests False bucket not updated. 365 """ 366 self.conn.put_bucket_acl.side_effect = ClientError( 367 error_content, "put_bucket_acl" 368 ) 369 result = boto_s3_bucket.put_acl(Bucket="mybucket", **conn_parameters) 370 self.assertEqual( 371 result.get("error", {}).get("message"), 372 error_message.format("put_bucket_acl"), 373 ) 374 375 def test_that_when_putting_cors_succeeds_the_put_cors_method_returns_true(self): 376 """ 377 tests True bucket updated. 378 """ 379 result = boto_s3_bucket.put_cors( 380 Bucket="mybucket", CORSRules="[]", **conn_parameters 381 ) 382 383 self.assertTrue(result["updated"]) 384 385 def test_that_when_putting_cors_fails_the_put_cors_method_returns_error(self): 386 """ 387 tests False bucket not updated. 388 """ 389 self.conn.put_bucket_cors.side_effect = ClientError( 390 error_content, "put_bucket_cors" 391 ) 392 result = boto_s3_bucket.put_cors( 393 Bucket="mybucket", CORSRules="[]", **conn_parameters 394 ) 395 self.assertEqual( 396 result.get("error", {}).get("message"), 397 error_message.format("put_bucket_cors"), 398 ) 399 400 def test_that_when_putting_lifecycle_configuration_succeeds_the_put_lifecycle_configuration_method_returns_true( 401 self, 402 ): 403 """ 404 tests True bucket updated. 405 """ 406 result = boto_s3_bucket.put_lifecycle_configuration( 407 Bucket="mybucket", Rules="[]", **conn_parameters 408 ) 409 410 self.assertTrue(result["updated"]) 411 412 def test_that_when_putting_lifecycle_configuration_fails_the_put_lifecycle_configuration_method_returns_error( 413 self, 414 ): 415 """ 416 tests False bucket not updated. 417 """ 418 self.conn.put_bucket_lifecycle_configuration.side_effect = ClientError( 419 error_content, "put_bucket_lifecycle_configuration" 420 ) 421 result = boto_s3_bucket.put_lifecycle_configuration( 422 Bucket="mybucket", Rules="[]", **conn_parameters 423 ) 424 self.assertEqual( 425 result.get("error", {}).get("message"), 426 error_message.format("put_bucket_lifecycle_configuration"), 427 ) 428 429 def test_that_when_putting_logging_succeeds_the_put_logging_method_returns_true( 430 self, 431 ): 432 """ 433 tests True bucket updated. 434 """ 435 result = boto_s3_bucket.put_logging( 436 Bucket="mybucket", 437 TargetBucket="arn:::::", 438 TargetPrefix="asdf", 439 TargetGrants="[]", 440 **conn_parameters 441 ) 442 443 self.assertTrue(result["updated"]) 444 445 def test_that_when_putting_logging_fails_the_put_logging_method_returns_error(self): 446 """ 447 tests False bucket not updated. 448 """ 449 self.conn.put_bucket_logging.side_effect = ClientError( 450 error_content, "put_bucket_logging" 451 ) 452 result = boto_s3_bucket.put_logging( 453 Bucket="mybucket", 454 TargetBucket="arn:::::", 455 TargetPrefix="asdf", 456 TargetGrants="[]", 457 **conn_parameters 458 ) 459 self.assertEqual( 460 result.get("error", {}).get("message"), 461 error_message.format("put_bucket_logging"), 462 ) 463 464 def test_that_when_putting_notification_configuration_succeeds_the_put_notification_configuration_method_returns_true( 465 self, 466 ): 467 """ 468 tests True bucket updated. 469 """ 470 result = boto_s3_bucket.put_notification_configuration( 471 Bucket="mybucket", **conn_parameters 472 ) 473 474 self.assertTrue(result["updated"]) 475 476 def test_that_when_putting_notification_configuration_fails_the_put_notification_configuration_method_returns_error( 477 self, 478 ): 479 """ 480 tests False bucket not updated. 481 """ 482 self.conn.put_bucket_notification_configuration.side_effect = ClientError( 483 error_content, "put_bucket_notification_configuration" 484 ) 485 result = boto_s3_bucket.put_notification_configuration( 486 Bucket="mybucket", **conn_parameters 487 ) 488 self.assertEqual( 489 result.get("error", {}).get("message"), 490 error_message.format("put_bucket_notification_configuration"), 491 ) 492 493 def test_that_when_putting_policy_succeeds_the_put_policy_method_returns_true(self): 494 """ 495 tests True bucket updated. 496 """ 497 result = boto_s3_bucket.put_policy( 498 Bucket="mybucket", Policy="{}", **conn_parameters 499 ) 500 501 self.assertTrue(result["updated"]) 502 503 def test_that_when_putting_policy_fails_the_put_policy_method_returns_error(self): 504 """ 505 tests False bucket not updated. 506 """ 507 self.conn.put_bucket_policy.side_effect = ClientError( 508 error_content, "put_bucket_policy" 509 ) 510 result = boto_s3_bucket.put_policy( 511 Bucket="mybucket", Policy="{}", **conn_parameters 512 ) 513 self.assertEqual( 514 result.get("error", {}).get("message"), 515 error_message.format("put_bucket_policy"), 516 ) 517 518 def test_that_when_putting_replication_succeeds_the_put_replication_method_returns_true( 519 self, 520 ): 521 """ 522 tests True bucket updated. 523 """ 524 result = boto_s3_bucket.put_replication( 525 Bucket="mybucket", Role="arn:aws:iam:::", Rules="[]", **conn_parameters 526 ) 527 528 self.assertTrue(result["updated"]) 529 530 def test_that_when_putting_replication_fails_the_put_replication_method_returns_error( 531 self, 532 ): 533 """ 534 tests False bucket not updated. 535 """ 536 self.conn.put_bucket_replication.side_effect = ClientError( 537 error_content, "put_bucket_replication" 538 ) 539 result = boto_s3_bucket.put_replication( 540 Bucket="mybucket", Role="arn:aws:iam:::", Rules="[]", **conn_parameters 541 ) 542 self.assertEqual( 543 result.get("error", {}).get("message"), 544 error_message.format("put_bucket_replication"), 545 ) 546 547 def test_that_when_putting_request_payment_succeeds_the_put_request_payment_method_returns_true( 548 self, 549 ): 550 """ 551 tests True bucket updated. 552 """ 553 result = boto_s3_bucket.put_request_payment( 554 Bucket="mybucket", Payer="Requester", **conn_parameters 555 ) 556 557 self.assertTrue(result["updated"]) 558 559 def test_that_when_putting_request_payment_fails_the_put_request_payment_method_returns_error( 560 self, 561 ): 562 """ 563 tests False bucket not updated. 564 """ 565 self.conn.put_bucket_request_payment.side_effect = ClientError( 566 error_content, "put_bucket_request_payment" 567 ) 568 result = boto_s3_bucket.put_request_payment( 569 Bucket="mybucket", Payer="Requester", **conn_parameters 570 ) 571 self.assertEqual( 572 result.get("error", {}).get("message"), 573 error_message.format("put_bucket_request_payment"), 574 ) 575 576 def test_that_when_putting_tagging_succeeds_the_put_tagging_method_returns_true( 577 self, 578 ): 579 """ 580 tests True bucket updated. 581 """ 582 result = boto_s3_bucket.put_tagging(Bucket="mybucket", **conn_parameters) 583 584 self.assertTrue(result["updated"]) 585 586 def test_that_when_putting_tagging_fails_the_put_tagging_method_returns_error(self): 587 """ 588 tests False bucket not updated. 589 """ 590 self.conn.put_bucket_tagging.side_effect = ClientError( 591 error_content, "put_bucket_tagging" 592 ) 593 result = boto_s3_bucket.put_tagging(Bucket="mybucket", **conn_parameters) 594 self.assertEqual( 595 result.get("error", {}).get("message"), 596 error_message.format("put_bucket_tagging"), 597 ) 598 599 def test_that_when_putting_versioning_succeeds_the_put_versioning_method_returns_true( 600 self, 601 ): 602 """ 603 tests True bucket updated. 604 """ 605 result = boto_s3_bucket.put_versioning( 606 Bucket="mybucket", Status="Enabled", **conn_parameters 607 ) 608 609 self.assertTrue(result["updated"]) 610 611 def test_that_when_putting_versioning_fails_the_put_versioning_method_returns_error( 612 self, 613 ): 614 """ 615 tests False bucket not updated. 616 """ 617 self.conn.put_bucket_versioning.side_effect = ClientError( 618 error_content, "put_bucket_versioning" 619 ) 620 result = boto_s3_bucket.put_versioning( 621 Bucket="mybucket", Status="Enabled", **conn_parameters 622 ) 623 self.assertEqual( 624 result.get("error", {}).get("message"), 625 error_message.format("put_bucket_versioning"), 626 ) 627 628 def test_that_when_putting_website_succeeds_the_put_website_method_returns_true( 629 self, 630 ): 631 """ 632 tests True bucket updated. 633 """ 634 result = boto_s3_bucket.put_website(Bucket="mybucket", **conn_parameters) 635 636 self.assertTrue(result["updated"]) 637 638 def test_that_when_putting_website_fails_the_put_website_method_returns_error(self): 639 """ 640 tests False bucket not updated. 641 """ 642 self.conn.put_bucket_website.side_effect = ClientError( 643 error_content, "put_bucket_website" 644 ) 645 result = boto_s3_bucket.put_website(Bucket="mybucket", **conn_parameters) 646 self.assertEqual( 647 result.get("error", {}).get("message"), 648 error_message.format("put_bucket_website"), 649 ) 650 651 def test_that_when_deleting_cors_succeeds_the_delete_cors_method_returns_true(self): 652 """ 653 tests True bucket attribute deleted. 654 """ 655 result = boto_s3_bucket.delete_cors(Bucket="mybucket", **conn_parameters) 656 657 self.assertTrue(result["deleted"]) 658 659 def test_that_when_deleting_cors_fails_the_delete_cors_method_returns_error(self): 660 """ 661 tests False bucket attribute not deleted. 662 """ 663 self.conn.delete_bucket_cors.side_effect = ClientError( 664 error_content, "delete_bucket_cors" 665 ) 666 result = boto_s3_bucket.delete_cors(Bucket="mybucket", **conn_parameters) 667 self.assertEqual( 668 result.get("error", {}).get("message"), 669 error_message.format("delete_bucket_cors"), 670 ) 671 672 def test_that_when_deleting_lifecycle_configuration_succeeds_the_delete_lifecycle_configuration_method_returns_true( 673 self, 674 ): 675 """ 676 tests True bucket attribute deleted. 677 """ 678 result = boto_s3_bucket.delete_lifecycle_configuration( 679 Bucket="mybucket", **conn_parameters 680 ) 681 682 self.assertTrue(result["deleted"]) 683 684 def test_that_when_deleting_lifecycle_configuration_fails_the_delete_lifecycle_configuration_method_returns_error( 685 self, 686 ): 687 """ 688 tests False bucket attribute not deleted. 689 """ 690 self.conn.delete_bucket_lifecycle.side_effect = ClientError( 691 error_content, "delete_bucket_lifecycle_configuration" 692 ) 693 result = boto_s3_bucket.delete_lifecycle_configuration( 694 Bucket="mybucket", **conn_parameters 695 ) 696 self.assertEqual( 697 result.get("error", {}).get("message"), 698 error_message.format("delete_bucket_lifecycle_configuration"), 699 ) 700 701 def test_that_when_deleting_policy_succeeds_the_delete_policy_method_returns_true( 702 self, 703 ): 704 """ 705 tests True bucket attribute deleted. 706 """ 707 result = boto_s3_bucket.delete_policy(Bucket="mybucket", **conn_parameters) 708 709 self.assertTrue(result["deleted"]) 710 711 def test_that_when_deleting_policy_fails_the_delete_policy_method_returns_error( 712 self, 713 ): 714 """ 715 tests False bucket attribute not deleted. 716 """ 717 self.conn.delete_bucket_policy.side_effect = ClientError( 718 error_content, "delete_bucket_policy" 719 ) 720 result = boto_s3_bucket.delete_policy(Bucket="mybucket", **conn_parameters) 721 self.assertEqual( 722 result.get("error", {}).get("message"), 723 error_message.format("delete_bucket_policy"), 724 ) 725 726 def test_that_when_deleting_replication_succeeds_the_delete_replication_method_returns_true( 727 self, 728 ): 729 """ 730 tests True bucket attribute deleted. 731 """ 732 result = boto_s3_bucket.delete_replication(Bucket="mybucket", **conn_parameters) 733 734 self.assertTrue(result["deleted"]) 735 736 def test_that_when_deleting_replication_fails_the_delete_replication_method_returns_error( 737 self, 738 ): 739 """ 740 tests False bucket attribute not deleted. 741 """ 742 self.conn.delete_bucket_replication.side_effect = ClientError( 743 error_content, "delete_bucket_replication" 744 ) 745 result = boto_s3_bucket.delete_replication(Bucket="mybucket", **conn_parameters) 746 self.assertEqual( 747 result.get("error", {}).get("message"), 748 error_message.format("delete_bucket_replication"), 749 ) 750 751 def test_that_when_deleting_tagging_succeeds_the_delete_tagging_method_returns_true( 752 self, 753 ): 754 """ 755 tests True bucket attribute deleted. 756 """ 757 result = boto_s3_bucket.delete_tagging(Bucket="mybucket", **conn_parameters) 758 759 self.assertTrue(result["deleted"]) 760 761 def test_that_when_deleting_tagging_fails_the_delete_tagging_method_returns_error( 762 self, 763 ): 764 """ 765 tests False bucket attribute not deleted. 766 """ 767 self.conn.delete_bucket_tagging.side_effect = ClientError( 768 error_content, "delete_bucket_tagging" 769 ) 770 result = boto_s3_bucket.delete_tagging(Bucket="mybucket", **conn_parameters) 771 self.assertEqual( 772 result.get("error", {}).get("message"), 773 error_message.format("delete_bucket_tagging"), 774 ) 775 776 def test_that_when_deleting_website_succeeds_the_delete_website_method_returns_true( 777 self, 778 ): 779 """ 780 tests True bucket attribute deleted. 781 """ 782 result = boto_s3_bucket.delete_website(Bucket="mybucket", **conn_parameters) 783 784 self.assertTrue(result["deleted"]) 785 786 def test_that_when_deleting_website_fails_the_delete_website_method_returns_error( 787 self, 788 ): 789 """ 790 tests False bucket attribute not deleted. 791 """ 792 self.conn.delete_bucket_website.side_effect = ClientError( 793 error_content, "delete_bucket_website" 794 ) 795 result = boto_s3_bucket.delete_website(Bucket="mybucket", **conn_parameters) 796 self.assertEqual( 797 result.get("error", {}).get("message"), 798 error_message.format("delete_bucket_website"), 799 ) 800