1# -*- coding: utf-8 -*-
2# Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/
3#
4# Permission is hereby granted, free of charge, to any person obtaining a
5# copy of this software and associated documentation files (the
6# "Software"), to deal in the Software without restriction, including
7# without limitation the rights to use, copy, modify, merge, publish, dis-
8# tribute, sublicense, and/or sell copies of the Software, and to permit
9# persons to whom the Software is furnished to do so, subject to the fol-
10# lowing conditions:
11#
12# The above copyright notice and this permission notice shall be included
13# in all copies or substantial portions of the Software.
14#
15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21# IN THE SOFTWARE.
22
23"""
24Some unit tests for the S3Connection
25"""
26import unittest
27import time
28import os
29import socket
30
31from boto.s3.connection import S3Connection
32from boto.s3.bucket import Bucket
33from boto.exception import S3PermissionsError, S3ResponseError
34from boto.compat import http_client, six, urlopen, urlsplit
35
36
37class S3ConnectionTest (unittest.TestCase):
38    s3 = True
39
40    def test_1_basic(self):
41        print('--- running S3Connection tests ---')
42        c = S3Connection()
43        # create a new, empty bucket
44        bucket_name = 'test-%d' % int(time.time())
45        bucket = c.create_bucket(bucket_name)
46        # now try a get_bucket call and see if it's really there
47        bucket = c.get_bucket(bucket_name)
48        # test logging
49        logging_bucket = c.create_bucket(bucket_name + '-log')
50        logging_bucket.set_as_logging_target()
51        bucket.enable_logging(target_bucket=logging_bucket, target_prefix=bucket.name)
52        bucket.disable_logging()
53        c.delete_bucket(logging_bucket)
54        k = bucket.new_key('foobar')
55        s1 = 'This is a test of file upload and download'
56        s2 = 'This is a second string to test file upload and download'
57        k.set_contents_from_string(s1)
58        fp = open('foobar', 'wb')
59        # now get the contents from s3 to a local file
60        k.get_contents_to_file(fp)
61        fp.close()
62        fp = open('foobar')
63        # check to make sure content read from s3 is identical to original
64        assert s1 == fp.read(), 'corrupted file'
65        fp.close()
66        # test generated URLs
67        url = k.generate_url(3600)
68        file = urlopen(url)
69        assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url
70        url = k.generate_url(3600, force_http=True)
71        file = urlopen(url)
72        assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url
73        url = k.generate_url(3600, force_http=True, headers={'x-amz-x-token' : 'XYZ'})
74        file = urlopen(url)
75        assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url
76        rh = {'response-content-disposition': 'attachment; filename="foo.txt"'}
77        url = k.generate_url(60, response_headers=rh)
78        file = urlopen(url)
79        assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url
80        #test whether amperands and to-be-escaped characters work in header filename
81        rh = {'response-content-disposition': 'attachment; filename="foo&z%20ar&ar&zar&bar.txt"'}
82        url = k.generate_url(60, response_headers=rh, force_http=True)
83        file = urlopen(url)
84        assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url
85        # overwrite foobar contents with a PUT
86        url = k.generate_url(3600, 'PUT', force_http=True, policy='private', reduced_redundancy=True)
87        up = urlsplit(url)
88        con = http_client.HTTPConnection(up.hostname, up.port)
89        con.request("PUT", up.path + '?' + up.query, body="hello there")
90        resp = con.getresponse()
91        assert 200 == resp.status
92        assert b"hello there" == k.get_contents_as_string()
93        bucket.delete_key(k)
94        # test a few variations on get_all_keys - first load some data
95        # for the first one, let's override the content type
96        phony_mimetype = 'application/x-boto-test'
97        headers = {'Content-Type': phony_mimetype}
98        k.name = 'foo/bar'
99        k.set_contents_from_string(s1, headers)
100        k.name = 'foo/bas'
101        size = k.set_contents_from_filename('foobar')
102        assert size == 42
103        k.name = 'foo/bat'
104        k.set_contents_from_string(s1)
105        k.name = 'fie/bar'
106        k.set_contents_from_string(s1)
107        k.name = 'fie/bas'
108        k.set_contents_from_string(s1)
109        k.name = 'fie/bat'
110        k.set_contents_from_string(s1)
111        # try resetting the contents to another value
112        md5 = k.md5
113        k.set_contents_from_string(s2)
114        assert k.md5 != md5
115        os.unlink('foobar')
116        all = bucket.get_all_keys()
117        assert len(all) == 6
118        rs = bucket.get_all_keys(prefix='foo')
119        assert len(rs) == 3
120        rs = bucket.get_all_keys(prefix='', delimiter='/')
121        assert len(rs) == 2
122        rs = bucket.get_all_keys(maxkeys=5)
123        assert len(rs) == 5
124        # test the lookup method
125        k = bucket.lookup('foo/bar')
126        assert isinstance(k, bucket.key_class)
127        assert k.content_type == phony_mimetype
128        k = bucket.lookup('notthere')
129        assert k == None
130        # try some metadata stuff
131        k = bucket.new_key('has_metadata')
132        mdkey1 = 'meta1'
133        mdval1 = 'This is the first metadata value'
134        k.set_metadata(mdkey1, mdval1)
135        mdkey2 = 'meta2'
136        mdval2 = 'This is the second metadata value'
137        k.set_metadata(mdkey2, mdval2)
138        # try a unicode metadata value
139        mdval3 = u'föö'
140        mdkey3 = 'meta3'
141        k.set_metadata(mdkey3, mdval3)
142        k.set_contents_from_string(s1)
143        k = bucket.lookup('has_metadata')
144        assert k.get_metadata(mdkey1) == mdval1
145        assert k.get_metadata(mdkey2) == mdval2
146        assert k.get_metadata(mdkey3) == mdval3
147        k = bucket.new_key('has_metadata')
148        k.get_contents_as_string()
149        assert k.get_metadata(mdkey1) == mdval1
150        assert k.get_metadata(mdkey2) == mdval2
151        assert k.get_metadata(mdkey3) == mdval3
152        bucket.delete_key(k)
153        # test list and iterator
154        rs1 = bucket.list()
155        num_iter = 0
156        for r in rs1:
157            num_iter = num_iter + 1
158        rs = bucket.get_all_keys()
159        num_keys = len(rs)
160        assert num_iter == num_keys
161        # try a key with a funny character
162        k = bucket.new_key('testnewline\n')
163        k.set_contents_from_string('This is a test')
164        rs = bucket.get_all_keys()
165        assert len(rs) == num_keys + 1
166        bucket.delete_key(k)
167        rs = bucket.get_all_keys()
168        assert len(rs) == num_keys
169        # try some acl stuff
170        bucket.set_acl('public-read')
171        policy = bucket.get_acl()
172        assert len(policy.acl.grants) == 2
173        bucket.set_acl('private')
174        policy = bucket.get_acl()
175        assert len(policy.acl.grants) == 1
176        k = bucket.lookup('foo/bar')
177        k.set_acl('public-read')
178        policy = k.get_acl()
179        assert len(policy.acl.grants) == 2
180        k.set_acl('private')
181        policy = k.get_acl()
182        assert len(policy.acl.grants) == 1
183        # try the convenience methods for grants
184        bucket.add_user_grant('FULL_CONTROL',
185                              'c1e724fbfa0979a4448393c59a8c055011f739b6d102fb37a65f26414653cd67')
186        try:
187            bucket.add_email_grant('foobar', 'foo@bar.com')
188        except S3PermissionsError:
189            pass
190        # now try to create an RRS key
191        k = bucket.new_key('reduced_redundancy')
192        k.set_contents_from_string('This key has reduced redundancy',
193                                   reduced_redundancy=True)
194
195        # now try to inject a response header
196        data = k.get_contents_as_string(response_headers={'response-content-type' : 'foo/bar'})
197        assert k.content_type == 'foo/bar'
198
199        # now delete all keys in bucket
200        for k in bucket:
201            if k.name == 'reduced_redundancy':
202                assert k.storage_class == 'REDUCED_REDUNDANCY'
203            bucket.delete_key(k)
204        # now delete bucket
205        time.sleep(5)
206        c.delete_bucket(bucket)
207        print('--- tests completed ---')
208
209    def test_basic_anon(self):
210        auth_con = S3Connection()
211        # create a new, empty bucket
212        bucket_name = 'test-%d' % int(time.time())
213        auth_bucket = auth_con.create_bucket(bucket_name)
214
215        # try read the bucket anonymously
216        anon_con = S3Connection(anon=True)
217        anon_bucket = Bucket(anon_con, bucket_name)
218        try:
219            next(iter(anon_bucket.list()))
220            self.fail("anon bucket list should fail")
221        except S3ResponseError:
222            pass
223
224        # give bucket anon user access and anon read again
225        auth_bucket.set_acl('public-read')
226        time.sleep(10)  # Was 5 secondes, turns out not enough
227        try:
228            next(iter(anon_bucket.list()))
229            self.fail("not expecting contents")
230        except S3ResponseError as e:
231            self.fail("We should have public-read access, but received "
232                      "an error: %s" % e)
233        except StopIteration:
234            pass
235
236        # cleanup
237        auth_con.delete_bucket(auth_bucket)
238
239    def test_error_code_populated(self):
240        c = S3Connection()
241        try:
242            c.create_bucket('bad$bucket$name')
243        except S3ResponseError as e:
244            self.assertEqual(e.error_code, 'InvalidBucketName')
245        except socket.gaierror:
246            pass  # This is also a possible result for an invalid bucket name
247        else:
248            self.fail("S3ResponseError not raised.")
249