1# -*- coding: utf-8 -*- 2 3# Copyright (c) 2011 Mitch Garnaat http://garnaat.org/ 4# All rights reserved. 5# 6# Permission is hereby granted, free of charge, to any person obtaining a 7# copy of this software and associated documentation files (the 8# "Software"), to deal in the Software without restriction, including 9# without limitation the rights to use, copy, modify, merge, publish, dis- 10# tribute, sublicense, and/or sell copies of the Software, and to permit 11# persons to whom the Software is furnished to do so, subject to the fol- 12# lowing conditions: 13# 14# The above copyright notice and this permission notice shall be included 15# in all copies or substantial portions of the Software. 16# 17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 18# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 19# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 20# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 21# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 23# IN THE SOFTWARE. 24 25""" 26Some unit tests for the S3 Bucket 27""" 28 29from mock import patch, Mock 30import unittest 31import time 32 33from boto.exception import S3ResponseError 34from boto.s3.connection import S3Connection 35from boto.s3.bucketlogging import BucketLogging 36from boto.s3.lifecycle import Lifecycle 37from boto.s3.lifecycle import Transition 38from boto.s3.lifecycle import Expiration 39from boto.s3.lifecycle import Rule 40from boto.s3.acl import Grant 41from boto.s3.tagging import Tags, TagSet 42from boto.s3.website import RedirectLocation 43from boto.compat import unquote_str 44 45 46class S3BucketTest (unittest.TestCase): 47 s3 = True 48 49 def setUp(self): 50 self.conn = S3Connection() 51 self.bucket_name = 'bucket-%d' % int(time.time()) 52 self.bucket = self.conn.create_bucket(self.bucket_name) 53 54 def tearDown(self): 55 for key in self.bucket: 56 key.delete() 57 self.bucket.delete() 58 59 def test_next_marker(self): 60 expected = ["a/", "b", "c"] 61 for key_name in expected: 62 key = self.bucket.new_key(key_name) 63 key.set_contents_from_string(key_name) 64 65 # Normal list of first 2 keys will have 66 # no NextMarker set, so we use last key to iterate 67 # last element will be "b" so no issue. 68 rs = self.bucket.get_all_keys(max_keys=2) 69 for element in rs: 70 pass 71 self.assertEqual(element.name, "b") 72 self.assertEqual(rs.next_marker, None) 73 74 # list using delimiter of first 2 keys will have 75 # a NextMarker set (when truncated). As prefixes 76 # are grouped together at the end, we get "a/" as 77 # last element, but luckily we have next_marker. 78 rs = self.bucket.get_all_keys(max_keys=2, delimiter="/") 79 for element in rs: 80 pass 81 self.assertEqual(element.name, "a/") 82 self.assertEqual(rs.next_marker, "b") 83 84 # ensure bucket.list() still works by just 85 # popping elements off the front of expected. 86 rs = self.bucket.list() 87 for element in rs: 88 self.assertEqual(element.name, expected.pop(0)) 89 self.assertEqual(expected, []) 90 91 92 def test_list_with_url_encoding(self): 93 expected = [u"α", u"β", u"γ"] 94 for key_name in expected: 95 key = self.bucket.new_key(key_name) 96 key.set_contents_from_string(key_name) 97 98 # ensure bucket.list() still works by just 99 # popping elements off the front of expected. 100 orig_getall = self.bucket._get_all 101 getall = lambda *a, **k: orig_getall(*a, max_keys=2, **k) 102 with patch.object(self.bucket, '_get_all', getall): 103 rs = self.bucket.list(encoding_type="url") 104 for element in rs: 105 name = unquote_str(element.name) 106 self.assertEqual(name, expected.pop(0)) 107 self.assertEqual(expected, []) 108 109 def test_logging(self): 110 # use self.bucket as the target bucket so that teardown 111 # will delete any log files that make it into the bucket 112 # automatically and all we have to do is delete the 113 # source bucket. 114 sb_name = "src-" + self.bucket_name 115 sb = self.conn.create_bucket(sb_name) 116 # grant log write perms to target bucket using canned-acl 117 self.bucket.set_acl("log-delivery-write") 118 target_bucket = self.bucket_name 119 target_prefix = u"jp/ログ/" 120 # Check existing status is disabled 121 bls = sb.get_logging_status() 122 self.assertEqual(bls.target, None) 123 # Create a logging status and grant auth users READ PERM 124 authuri = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers" 125 authr = Grant(permission="READ", type="Group", uri=authuri) 126 sb.enable_logging(target_bucket, target_prefix=target_prefix, grants=[authr]) 127 # Check the status and confirm its set. 128 bls = sb.get_logging_status() 129 self.assertEqual(bls.target, target_bucket) 130 self.assertEqual(bls.prefix, target_prefix) 131 self.assertEqual(len(bls.grants), 1) 132 self.assertEqual(bls.grants[0].type, "Group") 133 self.assertEqual(bls.grants[0].uri, authuri) 134 # finally delete the src bucket 135 sb.delete() 136 137 def test_tagging(self): 138 tagging = """ 139 <Tagging> 140 <TagSet> 141 <Tag> 142 <Key>tagkey</Key> 143 <Value>tagvalue</Value> 144 </Tag> 145 </TagSet> 146 </Tagging> 147 """ 148 self.bucket.set_xml_tags(tagging) 149 response = self.bucket.get_tags() 150 self.assertEqual(response[0][0].key, 'tagkey') 151 self.assertEqual(response[0][0].value, 'tagvalue') 152 self.bucket.delete_tags() 153 try: 154 self.bucket.get_tags() 155 except S3ResponseError as e: 156 self.assertEqual(e.code, 'NoSuchTagSet') 157 except Exception as e: 158 self.fail("Wrong exception raised (expected S3ResponseError): %s" 159 % e) 160 else: 161 self.fail("Expected S3ResponseError, but no exception raised.") 162 163 def test_tagging_from_objects(self): 164 """Create tags from python objects rather than raw xml.""" 165 t = Tags() 166 tag_set = TagSet() 167 tag_set.add_tag('akey', 'avalue') 168 tag_set.add_tag('anotherkey', 'anothervalue') 169 t.add_tag_set(tag_set) 170 self.bucket.set_tags(t) 171 response = self.bucket.get_tags() 172 tags = sorted(response[0], key=lambda tag: tag.key) 173 self.assertEqual(tags[0].key, 'akey') 174 self.assertEqual(tags[0].value, 'avalue') 175 self.assertEqual(tags[1].key, 'anotherkey') 176 self.assertEqual(tags[1].value, 'anothervalue') 177 178 def test_website_configuration(self): 179 response = self.bucket.configure_website('index.html') 180 self.assertTrue(response) 181 config = self.bucket.get_website_configuration() 182 self.assertEqual(config, {'WebsiteConfiguration': 183 {'IndexDocument': {'Suffix': 'index.html'}}}) 184 config2, xml = self.bucket.get_website_configuration_with_xml() 185 self.assertEqual(config, config2) 186 self.assertTrue('<Suffix>index.html</Suffix>' in xml, xml) 187 188 def test_website_redirect_all_requests(self): 189 response = self.bucket.configure_website( 190 redirect_all_requests_to=RedirectLocation('example.com')) 191 config = self.bucket.get_website_configuration() 192 self.assertEqual(config, { 193 'WebsiteConfiguration': { 194 'RedirectAllRequestsTo': { 195 'HostName': 'example.com'}}}) 196 197 # Can configure the protocol as well. 198 response = self.bucket.configure_website( 199 redirect_all_requests_to=RedirectLocation('example.com', 'https')) 200 config = self.bucket.get_website_configuration() 201 self.assertEqual(config, { 202 'WebsiteConfiguration': {'RedirectAllRequestsTo': { 203 'HostName': 'example.com', 204 'Protocol': 'https', 205 }}} 206 ) 207 208 def test_lifecycle(self): 209 lifecycle = Lifecycle() 210 lifecycle.add_rule('myid', '', 'Enabled', 30) 211 self.assertTrue(self.bucket.configure_lifecycle(lifecycle)) 212 response = self.bucket.get_lifecycle_config() 213 self.assertEqual(len(response), 1) 214 actual_lifecycle = response[0] 215 self.assertEqual(actual_lifecycle.id, 'myid') 216 self.assertEqual(actual_lifecycle.prefix, '') 217 self.assertEqual(actual_lifecycle.status, 'Enabled') 218 self.assertEqual(actual_lifecycle.transition, []) 219 220 def test_lifecycle_with_glacier_transition(self): 221 lifecycle = Lifecycle() 222 transition = Transition(days=30, storage_class='GLACIER') 223 rule = Rule('myid', prefix='', status='Enabled', expiration=None, 224 transition=transition) 225 lifecycle.append(rule) 226 self.assertTrue(self.bucket.configure_lifecycle(lifecycle)) 227 response = self.bucket.get_lifecycle_config() 228 transition = response[0].transition 229 self.assertEqual(transition.days, 30) 230 self.assertEqual(transition.storage_class, 'GLACIER') 231 self.assertEqual(transition.date, None) 232 233 def test_lifecycle_multi(self): 234 date = '2022-10-12T00:00:00.000Z' 235 sc = 'GLACIER' 236 lifecycle = Lifecycle() 237 lifecycle.add_rule("1", "1/", "Enabled", 1) 238 lifecycle.add_rule("2", "2/", "Enabled", Expiration(days=2)) 239 lifecycle.add_rule("3", "3/", "Enabled", Expiration(date=date)) 240 lifecycle.add_rule("4", "4/", "Enabled", None, 241 Transition(days=4, storage_class=sc)) 242 lifecycle.add_rule("5", "5/", "Enabled", None, 243 Transition(date=date, storage_class=sc)) 244 # set the lifecycle 245 self.bucket.configure_lifecycle(lifecycle) 246 # read the lifecycle back 247 readlifecycle = self.bucket.get_lifecycle_config(); 248 for rule in readlifecycle: 249 if rule.id == "1": 250 self.assertEqual(rule.prefix, "1/") 251 self.assertEqual(rule.expiration.days, 1) 252 elif rule.id == "2": 253 self.assertEqual(rule.prefix, "2/") 254 self.assertEqual(rule.expiration.days, 2) 255 elif rule.id == "3": 256 self.assertEqual(rule.prefix, "3/") 257 self.assertEqual(rule.expiration.date, date) 258 elif rule.id == "4": 259 self.assertEqual(rule.prefix, "4/") 260 self.assertEqual(rule.transition.days, 4) 261 self.assertEqual(rule.transition.storage_class, sc) 262 elif rule.id == "5": 263 self.assertEqual(rule.prefix, "5/") 264 self.assertEqual(rule.transition.date, date) 265 self.assertEqual(rule.transition.storage_class, sc) 266 else: 267 self.fail("unexpected id %s" % rule.id) 268 269 def test_lifecycle_jp(self): 270 # test lifecycle with Japanese prefix 271 name = "Japanese files" 272 prefix = "日本語/" 273 days = 30 274 lifecycle = Lifecycle() 275 lifecycle.add_rule(name, prefix, "Enabled", days) 276 # set the lifecycle 277 self.bucket.configure_lifecycle(lifecycle) 278 # read the lifecycle back 279 readlifecycle = self.bucket.get_lifecycle_config(); 280 for rule in readlifecycle: 281 self.assertEqual(rule.id, name) 282 self.assertEqual(rule.expiration.days, days) 283 #Note: Boto seems correct? AWS seems broken? 284 #self.assertEqual(rule.prefix, prefix) 285 286 def test_lifecycle_with_defaults(self): 287 lifecycle = Lifecycle() 288 lifecycle.add_rule(expiration=30) 289 self.assertTrue(self.bucket.configure_lifecycle(lifecycle)) 290 response = self.bucket.get_lifecycle_config() 291 self.assertEqual(len(response), 1) 292 actual_lifecycle = response[0] 293 self.assertNotEqual(len(actual_lifecycle.id), 0) 294 self.assertEqual(actual_lifecycle.prefix, '') 295 296 def test_lifecycle_rule_xml(self): 297 # create a rule directly with id, prefix defaults 298 rule = Rule(status='Enabled', expiration=30) 299 s = rule.to_xml() 300 # Confirm no ID is set in the rule. 301 self.assertEqual(s.find("<ID>"), -1) 302 # Confirm Prefix is '' and not set to 'None' 303 self.assertNotEqual(s.find("<Prefix></Prefix>"), -1) 304