1# -*- coding: utf-8 -*-
2
3# Copyright (c) 2011 Mitch Garnaat http://garnaat.org/
4# All rights reserved.
5#
6# Permission is hereby granted, free of charge, to any person obtaining a
7# copy of this software and associated documentation files (the
8# "Software"), to deal in the Software without restriction, including
9# without limitation the rights to use, copy, modify, merge, publish, dis-
10# tribute, sublicense, and/or sell copies of the Software, and to permit
11# persons to whom the Software is furnished to do so, subject to the fol-
12# lowing conditions:
13#
14# The above copyright notice and this permission notice shall be included
15# in all copies or substantial portions of the Software.
16#
17# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
18# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
19# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
20# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
21# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
23# IN THE SOFTWARE.
24
25"""
26Some unit tests for the S3 Bucket
27"""
28
29from mock import patch, Mock
30import unittest
31import time
32
33from boto.exception import S3ResponseError
34from boto.s3.connection import S3Connection
35from boto.s3.bucketlogging import BucketLogging
36from boto.s3.lifecycle import Lifecycle
37from boto.s3.lifecycle import Transition
38from boto.s3.lifecycle import Expiration
39from boto.s3.lifecycle import Rule
40from boto.s3.acl import Grant
41from boto.s3.tagging import Tags, TagSet
42from boto.s3.website import RedirectLocation
43from boto.compat import unquote_str
44
45
46class S3BucketTest (unittest.TestCase):
47    s3 = True
48
49    def setUp(self):
50        self.conn = S3Connection()
51        self.bucket_name = 'bucket-%d' % int(time.time())
52        self.bucket = self.conn.create_bucket(self.bucket_name)
53
54    def tearDown(self):
55        for key in self.bucket:
56            key.delete()
57        self.bucket.delete()
58
59    def test_next_marker(self):
60        expected = ["a/", "b", "c"]
61        for key_name in expected:
62            key = self.bucket.new_key(key_name)
63            key.set_contents_from_string(key_name)
64
65        # Normal list of first 2 keys will have
66        # no NextMarker set, so we use last key to iterate
67        # last element will be "b" so no issue.
68        rs = self.bucket.get_all_keys(max_keys=2)
69        for element in rs:
70            pass
71        self.assertEqual(element.name, "b")
72        self.assertEqual(rs.next_marker, None)
73
74        # list using delimiter of first 2 keys will have
75        # a NextMarker set (when truncated). As prefixes
76        # are grouped together at the end, we get "a/" as
77        # last element, but luckily we have next_marker.
78        rs = self.bucket.get_all_keys(max_keys=2, delimiter="/")
79        for element in rs:
80            pass
81        self.assertEqual(element.name, "a/")
82        self.assertEqual(rs.next_marker, "b")
83
84        # ensure bucket.list() still works by just
85        # popping elements off the front of expected.
86        rs = self.bucket.list()
87        for element in rs:
88            self.assertEqual(element.name, expected.pop(0))
89        self.assertEqual(expected, [])
90
91
92    def test_list_with_url_encoding(self):
93        expected = [u"α", u"β", u"γ"]
94        for key_name in expected:
95            key = self.bucket.new_key(key_name)
96            key.set_contents_from_string(key_name)
97
98        # ensure bucket.list() still works by just
99        # popping elements off the front of expected.
100        orig_getall = self.bucket._get_all
101        getall = lambda *a, **k: orig_getall(*a, max_keys=2, **k)
102        with patch.object(self.bucket, '_get_all', getall):
103            rs = self.bucket.list(encoding_type="url")
104            for element in rs:
105                name = unquote_str(element.name)
106                self.assertEqual(name, expected.pop(0))
107            self.assertEqual(expected, [])
108
109    def test_logging(self):
110        # use self.bucket as the target bucket so that teardown
111        # will delete any log files that make it into the bucket
112        # automatically and all we have to do is delete the
113        # source bucket.
114        sb_name = "src-" + self.bucket_name
115        sb = self.conn.create_bucket(sb_name)
116        # grant log write perms to target bucket using canned-acl
117        self.bucket.set_acl("log-delivery-write")
118        target_bucket = self.bucket_name
119        target_prefix = u"jp/ログ/"
120        # Check existing status is disabled
121        bls = sb.get_logging_status()
122        self.assertEqual(bls.target, None)
123        # Create a logging status and grant auth users READ PERM
124        authuri = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
125        authr = Grant(permission="READ", type="Group", uri=authuri)
126        sb.enable_logging(target_bucket, target_prefix=target_prefix, grants=[authr])
127        # Check the status and confirm its set.
128        bls = sb.get_logging_status()
129        self.assertEqual(bls.target, target_bucket)
130        self.assertEqual(bls.prefix, target_prefix)
131        self.assertEqual(len(bls.grants), 1)
132        self.assertEqual(bls.grants[0].type, "Group")
133        self.assertEqual(bls.grants[0].uri, authuri)
134        # finally delete the src bucket
135        sb.delete()
136
137    def test_tagging(self):
138        tagging = """
139            <Tagging>
140              <TagSet>
141                 <Tag>
142                   <Key>tagkey</Key>
143                   <Value>tagvalue</Value>
144                 </Tag>
145              </TagSet>
146            </Tagging>
147        """
148        self.bucket.set_xml_tags(tagging)
149        response = self.bucket.get_tags()
150        self.assertEqual(response[0][0].key, 'tagkey')
151        self.assertEqual(response[0][0].value, 'tagvalue')
152        self.bucket.delete_tags()
153        try:
154            self.bucket.get_tags()
155        except S3ResponseError as e:
156            self.assertEqual(e.code, 'NoSuchTagSet')
157        except Exception as e:
158            self.fail("Wrong exception raised (expected S3ResponseError): %s"
159                      % e)
160        else:
161            self.fail("Expected S3ResponseError, but no exception raised.")
162
163    def test_tagging_from_objects(self):
164        """Create tags from python objects rather than raw xml."""
165        t = Tags()
166        tag_set = TagSet()
167        tag_set.add_tag('akey', 'avalue')
168        tag_set.add_tag('anotherkey', 'anothervalue')
169        t.add_tag_set(tag_set)
170        self.bucket.set_tags(t)
171        response = self.bucket.get_tags()
172        tags = sorted(response[0], key=lambda tag: tag.key)
173        self.assertEqual(tags[0].key, 'akey')
174        self.assertEqual(tags[0].value, 'avalue')
175        self.assertEqual(tags[1].key, 'anotherkey')
176        self.assertEqual(tags[1].value, 'anothervalue')
177
178    def test_website_configuration(self):
179        response = self.bucket.configure_website('index.html')
180        self.assertTrue(response)
181        config = self.bucket.get_website_configuration()
182        self.assertEqual(config, {'WebsiteConfiguration':
183                                  {'IndexDocument': {'Suffix': 'index.html'}}})
184        config2, xml = self.bucket.get_website_configuration_with_xml()
185        self.assertEqual(config, config2)
186        self.assertTrue('<Suffix>index.html</Suffix>' in xml, xml)
187
188    def test_website_redirect_all_requests(self):
189        response = self.bucket.configure_website(
190            redirect_all_requests_to=RedirectLocation('example.com'))
191        config = self.bucket.get_website_configuration()
192        self.assertEqual(config, {
193            'WebsiteConfiguration': {
194                'RedirectAllRequestsTo': {
195                    'HostName': 'example.com'}}})
196
197        # Can configure the protocol as well.
198        response = self.bucket.configure_website(
199            redirect_all_requests_to=RedirectLocation('example.com', 'https'))
200        config = self.bucket.get_website_configuration()
201        self.assertEqual(config, {
202            'WebsiteConfiguration': {'RedirectAllRequestsTo': {
203                'HostName': 'example.com',
204                'Protocol': 'https',
205            }}}
206        )
207
208    def test_lifecycle(self):
209        lifecycle = Lifecycle()
210        lifecycle.add_rule('myid', '', 'Enabled', 30)
211        self.assertTrue(self.bucket.configure_lifecycle(lifecycle))
212        response = self.bucket.get_lifecycle_config()
213        self.assertEqual(len(response), 1)
214        actual_lifecycle = response[0]
215        self.assertEqual(actual_lifecycle.id, 'myid')
216        self.assertEqual(actual_lifecycle.prefix, '')
217        self.assertEqual(actual_lifecycle.status, 'Enabled')
218        self.assertEqual(actual_lifecycle.transition, [])
219
220    def test_lifecycle_with_glacier_transition(self):
221        lifecycle = Lifecycle()
222        transition = Transition(days=30, storage_class='GLACIER')
223        rule = Rule('myid', prefix='', status='Enabled', expiration=None,
224                    transition=transition)
225        lifecycle.append(rule)
226        self.assertTrue(self.bucket.configure_lifecycle(lifecycle))
227        response = self.bucket.get_lifecycle_config()
228        transition = response[0].transition
229        self.assertEqual(transition.days, 30)
230        self.assertEqual(transition.storage_class, 'GLACIER')
231        self.assertEqual(transition.date, None)
232
233    def test_lifecycle_multi(self):
234        date = '2022-10-12T00:00:00.000Z'
235        sc = 'GLACIER'
236        lifecycle = Lifecycle()
237        lifecycle.add_rule("1", "1/", "Enabled", 1)
238        lifecycle.add_rule("2", "2/", "Enabled", Expiration(days=2))
239        lifecycle.add_rule("3", "3/", "Enabled", Expiration(date=date))
240        lifecycle.add_rule("4", "4/", "Enabled", None,
241            Transition(days=4, storage_class=sc))
242        lifecycle.add_rule("5", "5/", "Enabled", None,
243            Transition(date=date, storage_class=sc))
244        # set the lifecycle
245        self.bucket.configure_lifecycle(lifecycle)
246        # read the lifecycle back
247        readlifecycle = self.bucket.get_lifecycle_config();
248        for rule in readlifecycle:
249            if rule.id == "1":
250                self.assertEqual(rule.prefix, "1/")
251                self.assertEqual(rule.expiration.days, 1)
252            elif rule.id == "2":
253                self.assertEqual(rule.prefix, "2/")
254                self.assertEqual(rule.expiration.days, 2)
255            elif rule.id == "3":
256                self.assertEqual(rule.prefix, "3/")
257                self.assertEqual(rule.expiration.date, date)
258            elif rule.id == "4":
259                self.assertEqual(rule.prefix, "4/")
260                self.assertEqual(rule.transition.days, 4)
261                self.assertEqual(rule.transition.storage_class, sc)
262            elif rule.id == "5":
263                self.assertEqual(rule.prefix, "5/")
264                self.assertEqual(rule.transition.date, date)
265                self.assertEqual(rule.transition.storage_class, sc)
266            else:
267                self.fail("unexpected id %s" % rule.id)
268
269    def test_lifecycle_jp(self):
270        # test lifecycle with Japanese prefix
271        name = "Japanese files"
272        prefix = "日本語/"
273        days = 30
274        lifecycle = Lifecycle()
275        lifecycle.add_rule(name, prefix, "Enabled", days)
276        # set the lifecycle
277        self.bucket.configure_lifecycle(lifecycle)
278        # read the lifecycle back
279        readlifecycle = self.bucket.get_lifecycle_config();
280        for rule in readlifecycle:
281            self.assertEqual(rule.id, name)
282            self.assertEqual(rule.expiration.days, days)
283            #Note: Boto seems correct? AWS seems broken?
284            #self.assertEqual(rule.prefix, prefix)
285
286    def test_lifecycle_with_defaults(self):
287        lifecycle = Lifecycle()
288        lifecycle.add_rule(expiration=30)
289        self.assertTrue(self.bucket.configure_lifecycle(lifecycle))
290        response = self.bucket.get_lifecycle_config()
291        self.assertEqual(len(response), 1)
292        actual_lifecycle = response[0]
293        self.assertNotEqual(len(actual_lifecycle.id), 0)
294        self.assertEqual(actual_lifecycle.prefix, '')
295
296    def test_lifecycle_rule_xml(self):
297        # create a rule directly with id, prefix defaults
298        rule = Rule(status='Enabled', expiration=30)
299        s = rule.to_xml()
300        # Confirm no ID is set in the rule.
301        self.assertEqual(s.find("<ID>"), -1)
302        # Confirm Prefix is '' and not set to 'None'
303        self.assertNotEqual(s.find("<Prefix></Prefix>"), -1)
304