1from urllib.request import urlopen
2from urllib.error import HTTPError
3
4import boto
5from boto.exception import S3ResponseError
6from boto.s3.key import Key
7from boto.s3.connection import OrdinaryCallingFormat
8
9from freezegun import freeze_time
10import requests
11
12import sure  # noqa # pylint: disable=unused-import
13
14from moto import mock_s3, mock_s3_deprecated
15
16
17def create_connection(key=None, secret=None):
18    return boto.connect_s3(key, secret, calling_format=OrdinaryCallingFormat())
19
20
21class MyModel(object):
22    def __init__(self, name, value):
23        self.name = name
24        self.value = value
25
26    def save(self):
27        conn = create_connection("the_key", "the_secret")
28        bucket = conn.get_bucket("mybucket")
29        k = Key(bucket)
30        k.key = self.name
31        k.set_contents_from_string(self.value)
32
33
34@mock_s3_deprecated
35def test_my_model_save():
36    # Create Bucket so that test can run
37    conn = create_connection("the_key", "the_secret")
38    conn.create_bucket("mybucket")
39    ####################################
40
41    model_instance = MyModel("steve", "is awesome")
42    model_instance.save()
43
44    conn.get_bucket("mybucket").get_key("steve").get_contents_as_string().should.equal(
45        b"is awesome"
46    )
47
48
49@mock_s3_deprecated
50def test_missing_key():
51    conn = create_connection("the_key", "the_secret")
52    bucket = conn.create_bucket("foobar")
53    bucket.get_key("the-key").should.equal(None)
54
55
56@mock_s3_deprecated
57def test_missing_key_urllib2():
58    conn = create_connection("the_key", "the_secret")
59    conn.create_bucket("foobar")
60
61    urlopen.when.called_with("http://s3.amazonaws.com/foobar/the-key").should.throw(
62        HTTPError
63    )
64
65
66@mock_s3_deprecated
67def test_empty_key():
68    conn = create_connection("the_key", "the_secret")
69    bucket = conn.create_bucket("foobar")
70    key = Key(bucket)
71    key.key = "the-key"
72    key.set_contents_from_string("")
73
74    bucket.get_key("the-key").get_contents_as_string().should.equal(b"")
75
76
77@mock_s3_deprecated
78def test_empty_key_set_on_existing_key():
79    conn = create_connection("the_key", "the_secret")
80    bucket = conn.create_bucket("foobar")
81    key = Key(bucket)
82    key.key = "the-key"
83    key.set_contents_from_string("foobar")
84
85    bucket.get_key("the-key").get_contents_as_string().should.equal(b"foobar")
86
87    key.set_contents_from_string("")
88    bucket.get_key("the-key").get_contents_as_string().should.equal(b"")
89
90
91@mock_s3_deprecated
92def test_large_key_save():
93    conn = create_connection("the_key", "the_secret")
94    bucket = conn.create_bucket("foobar")
95    key = Key(bucket)
96    key.key = "the-key"
97    key.set_contents_from_string("foobar" * 100000)
98
99    bucket.get_key("the-key").get_contents_as_string().should.equal(b"foobar" * 100000)
100
101
102@mock_s3_deprecated
103def test_copy_key():
104    conn = create_connection("the_key", "the_secret")
105    bucket = conn.create_bucket("foobar")
106    key = Key(bucket)
107    key.key = "the-key"
108    key.set_contents_from_string("some value")
109
110    bucket.copy_key("new-key", "foobar", "the-key")
111
112    bucket.get_key("the-key").get_contents_as_string().should.equal(b"some value")
113    bucket.get_key("new-key").get_contents_as_string().should.equal(b"some value")
114
115
116@mock_s3_deprecated
117def test_set_metadata():
118    conn = create_connection("the_key", "the_secret")
119    bucket = conn.create_bucket("foobar")
120    key = Key(bucket)
121    key.key = "the-key"
122    key.set_metadata("md", "Metadatastring")
123    key.set_contents_from_string("Testval")
124
125    bucket.get_key("the-key").get_metadata("md").should.equal("Metadatastring")
126
127
128@freeze_time("2012-01-01 12:00:00")
129@mock_s3_deprecated
130def test_last_modified():
131    # See https://github.com/boto/boto/issues/466
132    conn = create_connection()
133    bucket = conn.create_bucket("foobar")
134    key = Key(bucket)
135    key.key = "the-key"
136    key.set_contents_from_string("some value")
137
138    rs = bucket.get_all_keys()
139    rs[0].last_modified.should.equal("2012-01-01T12:00:00.000Z")
140
141    bucket.get_key("the-key").last_modified.should.equal(
142        "Sun, 01 Jan 2012 12:00:00 GMT"
143    )
144
145
146@mock_s3_deprecated
147def test_missing_bucket():
148    conn = create_connection("the_key", "the_secret")
149    conn.get_bucket.when.called_with("mybucket").should.throw(S3ResponseError)
150
151
152@mock_s3_deprecated
153def test_bucket_with_dash():
154    conn = create_connection("the_key", "the_secret")
155    conn.get_bucket.when.called_with("mybucket-test").should.throw(S3ResponseError)
156
157
158@mock_s3_deprecated
159def test_bucket_deletion():
160    conn = create_connection("the_key", "the_secret")
161    bucket = conn.create_bucket("foobar")
162
163    key = Key(bucket)
164    key.key = "the-key"
165    key.set_contents_from_string("some value")
166
167    # Try to delete a bucket that still has keys
168    conn.delete_bucket.when.called_with("foobar").should.throw(S3ResponseError)
169
170    bucket.delete_key("the-key")
171    conn.delete_bucket("foobar")
172
173    # Get non-existing bucket
174    conn.get_bucket.when.called_with("foobar").should.throw(S3ResponseError)
175
176    # Delete non-existent bucket
177    conn.delete_bucket.when.called_with("foobar").should.throw(S3ResponseError)
178
179
180@mock_s3_deprecated
181def test_get_all_buckets():
182    conn = create_connection("the_key", "the_secret")
183    conn.create_bucket("foobar")
184    conn.create_bucket("foobar2")
185    buckets = conn.get_all_buckets()
186
187    buckets.should.have.length_of(2)
188
189
190@mock_s3
191@mock_s3_deprecated
192def test_post_to_bucket():
193    conn = create_connection("the_key", "the_secret")
194    bucket = conn.create_bucket("foobar")
195
196    requests.post(
197        "https://s3.amazonaws.com/foobar", {"key": "the-key", "file": "nothing"}
198    )
199
200    bucket.get_key("the-key").get_contents_as_string().should.equal(b"nothing")
201
202
203@mock_s3
204@mock_s3_deprecated
205def test_post_with_metadata_to_bucket():
206    conn = create_connection("the_key", "the_secret")
207    bucket = conn.create_bucket("foobar")
208
209    requests.post(
210        "https://s3.amazonaws.com/foobar",
211        {"key": "the-key", "file": "nothing", "x-amz-meta-test": "metadata"},
212    )
213
214    bucket.get_key("the-key").get_metadata("test").should.equal("metadata")
215
216
217@mock_s3_deprecated
218def test_bucket_name_with_dot():
219    conn = create_connection()
220    bucket = conn.create_bucket("firstname.lastname")
221
222    k = Key(bucket, "somekey")
223    k.set_contents_from_string("somedata")
224
225
226@mock_s3_deprecated
227def test_key_with_special_characters():
228    conn = create_connection()
229    bucket = conn.create_bucket("test_bucket_name")
230
231    key = Key(bucket, "test_list_keys_2/*x+?^@~!y")
232    key.set_contents_from_string("value1")
233
234    key_list = bucket.list("test_list_keys_2/", "/")
235    keys = [x for x in key_list]
236    keys[0].name.should.equal("test_list_keys_2/*x+?^@~!y")
237
238
239@mock_s3_deprecated
240def test_bucket_key_listing_order():
241    conn = create_connection()
242    bucket = conn.create_bucket("test_bucket")
243    prefix = "toplevel/"
244
245    def store(name):
246        k = Key(bucket, prefix + name)
247        k.set_contents_from_string("somedata")
248
249    names = ["x/key", "y.key1", "y.key2", "y.key3", "x/y/key", "x/y/z/key"]
250
251    for name in names:
252        store(name)
253
254    delimiter = None
255    keys = [x.name for x in bucket.list(prefix, delimiter)]
256    keys.should.equal(
257        [
258            "toplevel/x/key",
259            "toplevel/x/y/key",
260            "toplevel/x/y/z/key",
261            "toplevel/y.key1",
262            "toplevel/y.key2",
263            "toplevel/y.key3",
264        ]
265    )
266
267    delimiter = "/"
268    keys = [x.name for x in bucket.list(prefix, delimiter)]
269    keys.should.equal(
270        ["toplevel/y.key1", "toplevel/y.key2", "toplevel/y.key3", "toplevel/x/"]
271    )
272
273    # Test delimiter with no prefix
274    delimiter = "/"
275    keys = [x.name for x in bucket.list(prefix=None, delimiter=delimiter)]
276    keys.should.equal(["toplevel/"])
277
278    delimiter = None
279    keys = [x.name for x in bucket.list(prefix + "x", delimiter)]
280    keys.should.equal(["toplevel/x/key", "toplevel/x/y/key", "toplevel/x/y/z/key"])
281
282    delimiter = "/"
283    keys = [x.name for x in bucket.list(prefix + "x", delimiter)]
284    keys.should.equal(["toplevel/x/"])
285
286
287@mock_s3_deprecated
288def test_delete_keys():
289    conn = create_connection()
290    bucket = conn.create_bucket("foobar")
291
292    Key(bucket=bucket, name="file1").set_contents_from_string("abc")
293    Key(bucket=bucket, name="file2").set_contents_from_string("abc")
294    Key(bucket=bucket, name="file3").set_contents_from_string("abc")
295    Key(bucket=bucket, name="file4").set_contents_from_string("abc")
296
297    result = bucket.delete_keys(["file2", "file3"])
298    result.deleted.should.have.length_of(2)
299    result.errors.should.have.length_of(0)
300    keys = bucket.get_all_keys()
301    keys.should.have.length_of(2)
302    keys[0].name.should.equal("file1")
303
304
305@mock_s3_deprecated
306def test_delete_keys_with_invalid():
307    conn = create_connection()
308    bucket = conn.create_bucket("foobar")
309
310    Key(bucket=bucket, name="file1").set_contents_from_string("abc")
311    Key(bucket=bucket, name="file2").set_contents_from_string("abc")
312    Key(bucket=bucket, name="file3").set_contents_from_string("abc")
313    Key(bucket=bucket, name="file4").set_contents_from_string("abc")
314
315    result = bucket.delete_keys(["abc", "file3"])
316
317    result.deleted.should.have.length_of(2)
318    result.errors.should.have.length_of(0)
319    keys = bucket.get_all_keys()
320    keys.should.have.length_of(3)
321    keys[0].name.should.equal("file1")
322