1# -*- coding: utf-8 -*- 2# Copyright (c) 2006-2011 Mitch Garnaat http://garnaat.org/ 3# 4# Permission is hereby granted, free of charge, to any person obtaining a 5# copy of this software and associated documentation files (the 6# "Software"), to deal in the Software without restriction, including 7# without limitation the rights to use, copy, modify, merge, publish, dis- 8# tribute, sublicense, and/or sell copies of the Software, and to permit 9# persons to whom the Software is furnished to do so, subject to the fol- 10# lowing conditions: 11# 12# The above copyright notice and this permission notice shall be included 13# in all copies or substantial portions of the Software. 14# 15# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 16# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 17# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 18# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 19# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 20# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21# IN THE SOFTWARE. 22 23""" 24Some unit tests for the S3Connection 25""" 26import unittest 27import time 28import os 29import socket 30 31from boto.s3.connection import S3Connection 32from boto.s3.bucket import Bucket 33from boto.exception import S3PermissionsError, S3ResponseError 34from boto.compat import http_client, six, urlopen, urlsplit 35 36 37class S3ConnectionTest (unittest.TestCase): 38 s3 = True 39 40 def test_1_basic(self): 41 print('--- running S3Connection tests ---') 42 c = S3Connection() 43 # create a new, empty bucket 44 bucket_name = 'test-%d' % int(time.time()) 45 bucket = c.create_bucket(bucket_name) 46 # now try a get_bucket call and see if it's really there 47 bucket = c.get_bucket(bucket_name) 48 # test logging 49 logging_bucket = c.create_bucket(bucket_name + '-log') 50 logging_bucket.set_as_logging_target() 51 bucket.enable_logging(target_bucket=logging_bucket, target_prefix=bucket.name) 52 bucket.disable_logging() 53 c.delete_bucket(logging_bucket) 54 k = bucket.new_key('foobar') 55 s1 = 'This is a test of file upload and download' 56 s2 = 'This is a second string to test file upload and download' 57 k.set_contents_from_string(s1) 58 fp = open('foobar', 'wb') 59 # now get the contents from s3 to a local file 60 k.get_contents_to_file(fp) 61 fp.close() 62 fp = open('foobar') 63 # check to make sure content read from s3 is identical to original 64 assert s1 == fp.read(), 'corrupted file' 65 fp.close() 66 # test generated URLs 67 url = k.generate_url(3600) 68 file = urlopen(url) 69 assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url 70 url = k.generate_url(3600, force_http=True) 71 file = urlopen(url) 72 assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url 73 url = k.generate_url(3600, force_http=True, headers={'x-amz-x-token' : 'XYZ'}) 74 file = urlopen(url) 75 assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url 76 rh = {'response-content-disposition': 'attachment; filename="foo.txt"'} 77 url = k.generate_url(60, response_headers=rh) 78 file = urlopen(url) 79 assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url 80 #test whether amperands and to-be-escaped characters work in header filename 81 rh = {'response-content-disposition': 'attachment; filename="foo&z%20ar&ar&zar&bar.txt"'} 82 url = k.generate_url(60, response_headers=rh, force_http=True) 83 file = urlopen(url) 84 assert s1 == file.read().decode('utf-8'), 'invalid URL %s' % url 85 # overwrite foobar contents with a PUT 86 url = k.generate_url(3600, 'PUT', force_http=True, policy='private', reduced_redundancy=True) 87 up = urlsplit(url) 88 con = http_client.HTTPConnection(up.hostname, up.port) 89 con.request("PUT", up.path + '?' + up.query, body="hello there") 90 resp = con.getresponse() 91 assert 200 == resp.status 92 assert b"hello there" == k.get_contents_as_string() 93 bucket.delete_key(k) 94 # test a few variations on get_all_keys - first load some data 95 # for the first one, let's override the content type 96 phony_mimetype = 'application/x-boto-test' 97 headers = {'Content-Type': phony_mimetype} 98 k.name = 'foo/bar' 99 k.set_contents_from_string(s1, headers) 100 k.name = 'foo/bas' 101 size = k.set_contents_from_filename('foobar') 102 assert size == 42 103 k.name = 'foo/bat' 104 k.set_contents_from_string(s1) 105 k.name = 'fie/bar' 106 k.set_contents_from_string(s1) 107 k.name = 'fie/bas' 108 k.set_contents_from_string(s1) 109 k.name = 'fie/bat' 110 k.set_contents_from_string(s1) 111 # try resetting the contents to another value 112 md5 = k.md5 113 k.set_contents_from_string(s2) 114 assert k.md5 != md5 115 os.unlink('foobar') 116 all = bucket.get_all_keys() 117 assert len(all) == 6 118 rs = bucket.get_all_keys(prefix='foo') 119 assert len(rs) == 3 120 rs = bucket.get_all_keys(prefix='', delimiter='/') 121 assert len(rs) == 2 122 rs = bucket.get_all_keys(maxkeys=5) 123 assert len(rs) == 5 124 # test the lookup method 125 k = bucket.lookup('foo/bar') 126 assert isinstance(k, bucket.key_class) 127 assert k.content_type == phony_mimetype 128 k = bucket.lookup('notthere') 129 assert k == None 130 # try some metadata stuff 131 k = bucket.new_key('has_metadata') 132 mdkey1 = 'meta1' 133 mdval1 = 'This is the first metadata value' 134 k.set_metadata(mdkey1, mdval1) 135 mdkey2 = 'meta2' 136 mdval2 = 'This is the second metadata value' 137 k.set_metadata(mdkey2, mdval2) 138 # try a unicode metadata value 139 mdval3 = u'föö' 140 mdkey3 = 'meta3' 141 k.set_metadata(mdkey3, mdval3) 142 k.set_contents_from_string(s1) 143 k = bucket.lookup('has_metadata') 144 assert k.get_metadata(mdkey1) == mdval1 145 assert k.get_metadata(mdkey2) == mdval2 146 assert k.get_metadata(mdkey3) == mdval3 147 k = bucket.new_key('has_metadata') 148 k.get_contents_as_string() 149 assert k.get_metadata(mdkey1) == mdval1 150 assert k.get_metadata(mdkey2) == mdval2 151 assert k.get_metadata(mdkey3) == mdval3 152 bucket.delete_key(k) 153 # test list and iterator 154 rs1 = bucket.list() 155 num_iter = 0 156 for r in rs1: 157 num_iter = num_iter + 1 158 rs = bucket.get_all_keys() 159 num_keys = len(rs) 160 assert num_iter == num_keys 161 # try a key with a funny character 162 k = bucket.new_key('testnewline\n') 163 k.set_contents_from_string('This is a test') 164 rs = bucket.get_all_keys() 165 assert len(rs) == num_keys + 1 166 bucket.delete_key(k) 167 rs = bucket.get_all_keys() 168 assert len(rs) == num_keys 169 # try some acl stuff 170 bucket.set_acl('public-read') 171 policy = bucket.get_acl() 172 assert len(policy.acl.grants) == 2 173 bucket.set_acl('private') 174 policy = bucket.get_acl() 175 assert len(policy.acl.grants) == 1 176 k = bucket.lookup('foo/bar') 177 k.set_acl('public-read') 178 policy = k.get_acl() 179 assert len(policy.acl.grants) == 2 180 k.set_acl('private') 181 policy = k.get_acl() 182 assert len(policy.acl.grants) == 1 183 # try the convenience methods for grants 184 bucket.add_user_grant('FULL_CONTROL', 185 'c1e724fbfa0979a4448393c59a8c055011f739b6d102fb37a65f26414653cd67') 186 try: 187 bucket.add_email_grant('foobar', 'foo@bar.com') 188 except S3PermissionsError: 189 pass 190 # now try to create an RRS key 191 k = bucket.new_key('reduced_redundancy') 192 k.set_contents_from_string('This key has reduced redundancy', 193 reduced_redundancy=True) 194 195 # now try to inject a response header 196 data = k.get_contents_as_string(response_headers={'response-content-type' : 'foo/bar'}) 197 assert k.content_type == 'foo/bar' 198 199 # now delete all keys in bucket 200 for k in bucket: 201 if k.name == 'reduced_redundancy': 202 assert k.storage_class == 'REDUCED_REDUNDANCY' 203 bucket.delete_key(k) 204 # now delete bucket 205 time.sleep(5) 206 c.delete_bucket(bucket) 207 print('--- tests completed ---') 208 209 def test_basic_anon(self): 210 auth_con = S3Connection() 211 # create a new, empty bucket 212 bucket_name = 'test-%d' % int(time.time()) 213 auth_bucket = auth_con.create_bucket(bucket_name) 214 215 # try read the bucket anonymously 216 anon_con = S3Connection(anon=True) 217 anon_bucket = Bucket(anon_con, bucket_name) 218 try: 219 next(iter(anon_bucket.list())) 220 self.fail("anon bucket list should fail") 221 except S3ResponseError: 222 pass 223 224 # give bucket anon user access and anon read again 225 auth_bucket.set_acl('public-read') 226 time.sleep(10) # Was 5 secondes, turns out not enough 227 try: 228 next(iter(anon_bucket.list())) 229 self.fail("not expecting contents") 230 except S3ResponseError as e: 231 self.fail("We should have public-read access, but received " 232 "an error: %s" % e) 233 except StopIteration: 234 pass 235 236 # cleanup 237 auth_con.delete_bucket(auth_bucket) 238 239 def test_error_code_populated(self): 240 c = S3Connection() 241 try: 242 c.create_bucket('bad$bucket$name') 243 except S3ResponseError as e: 244 self.assertEqual(e.error_code, 'InvalidBucketName') 245 except socket.gaierror: 246 pass # This is also a possible result for an invalid bucket name 247 else: 248 self.fail("S3ResponseError not raised.") 249