1import boto3 2import pytest 3import sure # noqa # pylint: disable=unused-import 4 5from botocore.exceptions import ClientError 6from moto import mock_s3 7from uuid import uuid4 8 9DEFAULT_REGION_NAME = "us-east-1" 10 11 12@mock_s3 13def test_get_bucket_replication_for_unexisting_bucket(): 14 bucket_name = str(uuid4()) 15 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 16 with pytest.raises(ClientError) as exc: 17 s3.get_bucket_replication(Bucket=bucket_name) 18 err = exc.value.response["Error"] 19 err["Code"].should.equal("NoSuchBucket") 20 err["Message"].should.equal("The specified bucket does not exist") 21 err["BucketName"].should.equal(bucket_name) 22 23 24@mock_s3 25def test_get_bucket_replication_bucket_without_replication(): 26 bucket_name = str(uuid4()) 27 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 28 s3.create_bucket(Bucket=bucket_name) 29 30 with pytest.raises(ClientError) as exc: 31 s3.get_bucket_replication(Bucket=bucket_name) 32 err = exc.value.response["Error"] 33 err["Code"].should.equal("ReplicationConfigurationNotFoundError") 34 err["Message"].should.equal("The replication configuration was not found") 35 err["BucketName"].should.equal(bucket_name) 36 37 38@mock_s3 39def test_delete_bucket_replication_unknown_bucket(): 40 bucket_name = str(uuid4()) 41 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 42 with pytest.raises(ClientError) as exc: 43 s3.delete_bucket_replication(Bucket=bucket_name) 44 err = exc.value.response["Error"] 45 err["Code"].should.equal("NoSuchBucket") 46 err["Message"].should.equal("The specified bucket does not exist") 47 err["BucketName"].should.equal(bucket_name) 48 49 50@mock_s3 51def test_delete_bucket_replication_bucket_without_replication(): 52 bucket_name = str(uuid4()) 53 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 54 55 s3.create_bucket(Bucket=bucket_name) 56 # No-op 57 s3.delete_bucket_replication(Bucket=bucket_name) 58 59 60@mock_s3 61def test_create_replication_without_versioning(): 62 bucket_name = str(uuid4()) 63 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 64 s3.create_bucket(Bucket=bucket_name) 65 66 with pytest.raises(ClientError) as exc: 67 s3.put_bucket_replication( 68 Bucket=bucket_name, 69 ReplicationConfiguration={ 70 "Role": "myrole", 71 "Rules": [ 72 {"Destination": {"Bucket": "secondbucket"}, "Status": "Enabled"} 73 ], 74 }, 75 ) 76 err = exc.value.response["Error"] 77 err["Code"].should.equal("InvalidRequest") 78 err["Message"].should.equal( 79 "Versioning must be 'Enabled' on the bucket to apply a replication configuration" 80 ) 81 err["BucketName"].should.equal(bucket_name) 82 83 84@mock_s3 85def test_create_and_retrieve_replication_with_single_rules(): 86 bucket_name = str(uuid4()) 87 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 88 89 s3.create_bucket(Bucket=bucket_name) 90 s3.put_bucket_versioning( 91 Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"} 92 ) 93 s3.put_bucket_replication( 94 Bucket=bucket_name, 95 ReplicationConfiguration={ 96 "Role": "myrole", 97 "Rules": [ 98 { 99 "ID": "firstrule", 100 "Priority": 2, 101 "Destination": {"Bucket": "secondbucket"}, 102 "Status": "Enabled", 103 } 104 ], 105 }, 106 ) 107 108 config = s3.get_bucket_replication(Bucket=bucket_name)["ReplicationConfiguration"] 109 config.should.equal( 110 { 111 "Role": "myrole", 112 "Rules": [ 113 { 114 "DeleteMarkerReplication": {"Status": "Disabled"}, 115 "Destination": {"Bucket": "secondbucket"}, 116 "Filter": {"Prefix": ""}, 117 "ID": "firstrule", 118 "Priority": 2, 119 "Status": "Enabled", 120 } 121 ], 122 } 123 ) 124 125 s3.delete_bucket_replication(Bucket=bucket_name) 126 127 # Can't retrieve replication that has been deleted 128 with pytest.raises(ClientError) as exc: 129 s3.get_bucket_replication(Bucket=bucket_name) 130 err = exc.value.response["Error"] 131 err["Code"].should.equal("ReplicationConfigurationNotFoundError") 132 err["Message"].should.equal("The replication configuration was not found") 133 err["BucketName"].should.equal(bucket_name) 134 135 136@mock_s3 137def test_create_and_retrieve_replication_with_multiple_rules(): 138 bucket_name = str(uuid4()) 139 s3 = boto3.client("s3", region_name=DEFAULT_REGION_NAME) 140 141 s3.create_bucket(Bucket=bucket_name) 142 s3.put_bucket_versioning( 143 Bucket=bucket_name, VersioningConfiguration={"Status": "Enabled"} 144 ) 145 s3.put_bucket_replication( 146 Bucket=bucket_name, 147 ReplicationConfiguration={ 148 "Role": "myrole", 149 "Rules": [ 150 {"Destination": {"Bucket": "secondbucket"}, "Status": "Enabled"}, 151 { 152 "ID": "secondrule", 153 "Priority": 2, 154 "Destination": {"Bucket": "thirdbucket"}, 155 "Status": "Disabled", 156 }, 157 ], 158 }, 159 ) 160 161 config = s3.get_bucket_replication(Bucket=bucket_name)["ReplicationConfiguration"] 162 config.should.have.key("Role").equal("myrole") 163 rules = config["Rules"] 164 rules.should.have.length_of(2) 165 166 first_rule = rules[0] 167 first_rule.should.have.key("ID") 168 first_rule.should.have.key("Priority").equal(1) 169 first_rule.should.have.key("Status").equal("Enabled") 170 first_rule.should.have.key("Destination").equal({"Bucket": "secondbucket"}) 171 172 second = rules[1] 173 second.should.have.key("ID").equal("secondrule") 174 second.should.have.key("Priority").equal(2) 175 second.should.have.key("Status").equal("Disabled") 176 second.should.have.key("Destination").equal({"Bucket": "thirdbucket"}) 177