1#!/usr/bin/env
2# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/
3# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"). You
6# may not use this file except in compliance with the License. A copy of
7# the License is located at
8#
9# http://aws.amazon.com/apache2.0/
10#
11# or in the "license" file accompanying this file. This file is
12# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
13# ANY KIND, either express or implied. See the License for the specific
14# language governing permissions and limitations under the License.
15from tests import mock
16from tests import unittest
17import datetime
18import time
19import base64
20import json
21
22import botocore.auth
23import botocore.credentials
24from botocore.compat import HTTPHeaders, urlsplit, parse_qs, six
25from botocore.awsrequest import AWSRequest
26
27
28class BaseTestWithFixedDate(unittest.TestCase):
29    def setUp(self):
30        self.fixed_date = datetime.datetime(2014, 3, 10, 17, 2, 55, 0)
31        self.datetime_patch = mock.patch('botocore.auth.datetime.datetime')
32        self.datetime_mock = self.datetime_patch.start()
33        self.datetime_mock.utcnow.return_value = self.fixed_date
34        self.datetime_mock.strptime.return_value = self.fixed_date
35
36    def tearDown(self):
37        self.datetime_patch.stop()
38
39
40class TestHMACV1(unittest.TestCase):
41
42    maxDiff = None
43
44    def setUp(self):
45        access_key = '44CF9590006BF252F707'
46        secret_key = 'OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV'
47        self.credentials = botocore.credentials.Credentials(access_key,
48                                                            secret_key)
49        self.hmacv1 = botocore.auth.HmacV1Auth(self.credentials, None, None)
50        self.date_mock = mock.patch('botocore.auth.formatdate')
51        self.formatdate = self.date_mock.start()
52        self.formatdate.return_value = 'Thu, 17 Nov 2005 18:49:58 GMT'
53
54    def tearDown(self):
55        self.date_mock.stop()
56
57    def test_put(self):
58        headers = {'Date': 'Thu, 17 Nov 2005 18:49:58 GMT',
59                   'Content-Md5': 'c8fdb181845a4ca6b8fec737b3581d76',
60                   'Content-Type': 'text/html',
61                   'X-Amz-Meta-Author': 'foo@bar.com',
62                   'X-Amz-Magic': 'abracadabra'}
63        http_headers = HTTPHeaders.from_dict(headers)
64        split = urlsplit('/quotes/nelson')
65        cs = self.hmacv1.canonical_string('PUT', split, http_headers)
66        expected_canonical = (
67            "PUT\nc8fdb181845a4ca6b8fec737b3581d76\ntext/html\n"
68            "Thu, 17 Nov 2005 18:49:58 GMT\nx-amz-magic:abracadabra\n"
69            "x-amz-meta-author:foo@bar.com\n/quotes/nelson")
70        expected_signature = 'jZNOcbfWmD/A/f3hSvVzXZjM2HU='
71        self.assertEqual(cs, expected_canonical)
72        sig = self.hmacv1.get_signature('PUT', split, http_headers)
73        self.assertEqual(sig, expected_signature)
74
75    def test_duplicate_headers(self):
76        pairs = [('Date', 'Thu, 17 Nov 2005 18:49:58 GMT'),
77                 ('Content-Md5', 'c8fdb181845a4ca6b8fec737b3581d76'),
78                 ('Content-Type', 'text/html'),
79                 ('X-Amz-Meta-Author', 'bar@baz.com'),
80                 ('X-Amz-Meta-Author', 'foo@bar.com'),
81                 ('X-Amz-Magic', 'abracadabra')]
82
83        http_headers = HTTPHeaders.from_pairs(pairs)
84        split = urlsplit('/quotes/nelson')
85        sig = self.hmacv1.get_signature('PUT', split, http_headers)
86        self.assertEqual(sig, 'kIdMxyiYB+F+83zYGR6sSb3ICcE=')
87
88    def test_query_string(self):
89        split = urlsplit('/quotes/nelson?uploads')
90        pairs = [('Date', 'Thu, 17 Nov 2005 18:49:58 GMT')]
91        sig = self.hmacv1.get_signature('PUT', split,
92                                        HTTPHeaders.from_pairs(pairs))
93        self.assertEqual(sig, 'P7pBz3Z4p3GxysRSJ/gR8nk7D4o=')
94
95    def test_bucket_operations(self):
96        # Check that the standard operations on buckets that are
97        # specified as query strings end up in the canonical resource.
98        operations = ('acl', 'cors', 'lifecycle', 'policy',
99                      'notification', 'logging', 'tagging',
100                      'requestPayment', 'versioning', 'website',
101                      'object-lock')
102        for operation in operations:
103            url = '/quotes?%s' % operation
104            split = urlsplit(url)
105            cr = self.hmacv1.canonical_resource(split)
106            self.assertEqual(cr, '/quotes?%s' % operation)
107
108    def test_sign_with_token(self):
109        credentials = botocore.credentials.Credentials(
110            access_key='foo', secret_key='bar', token='baz')
111        auth = botocore.auth.HmacV1Auth(credentials)
112        request = AWSRequest()
113        request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT'
114        request.headers['Content-Type'] = 'text/html'
115        request.method = 'PUT'
116        request.url = 'https://s3.amazonaws.com/bucket/key'
117        auth.add_auth(request)
118        self.assertIn('Authorization', request.headers)
119        # We're not actually checking the signature here, we're
120        # just making sure the auth header has the right format.
121        self.assertTrue(request.headers['Authorization'].startswith('AWS '))
122
123    def test_resign_with_token(self):
124        credentials = botocore.credentials.Credentials(
125            access_key='foo', secret_key='bar', token='baz')
126        auth = botocore.auth.HmacV1Auth(credentials)
127        request = AWSRequest()
128        request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT'
129        request.headers['Content-Type'] = 'text/html'
130        request.method = 'PUT'
131        request.url = 'https://s3.amazonaws.com/bucket/key'
132
133        auth.add_auth(request)
134        original_auth = request.headers['Authorization']
135        # Resigning the request shouldn't change the authorization
136        # header.  We are also ensuring that the date stays the same
137        # because we're mocking out the formatdate() call.  There's
138        # another unit test that verifies we use the latest time
139        # when we sign the request.
140        auth.add_auth(request)
141        self.assertEqual(request.headers.get_all('Authorization'),
142                         [original_auth])
143
144    def test_resign_uses_most_recent_date(self):
145        dates = [
146            'Thu, 17 Nov 2005 18:49:58 GMT',
147            'Thu, 17 Nov 2014 20:00:00 GMT',
148        ]
149        self.formatdate.side_effect = dates
150
151        request = AWSRequest()
152        request.headers['Content-Type'] = 'text/html'
153        request.method = 'PUT'
154        request.url = 'https://s3.amazonaws.com/bucket/key'
155
156        self.hmacv1.add_auth(request)
157        original_date = request.headers['Date']
158
159        self.hmacv1.add_auth(request)
160        modified_date = request.headers['Date']
161
162        # Each time we sign a request, we make another call to formatdate()
163        # so we should have a different date header each time.
164        self.assertEqual(original_date, dates[0])
165        self.assertEqual(modified_date, dates[1])
166
167
168class TestSigV2(unittest.TestCase):
169
170    maxDiff = None
171
172    def setUp(self):
173        access_key = 'foo'
174        secret_key = 'bar'
175        self.credentials = botocore.credentials.Credentials(access_key,
176                                                            secret_key)
177        self.signer = botocore.auth.SigV2Auth(self.credentials)
178        self.time_patcher = mock.patch.object(
179            botocore.auth.time, 'gmtime',
180            mock.Mock(wraps=time.gmtime)
181        )
182        mocked_time = self.time_patcher.start()
183        mocked_time.return_value = time.struct_time(
184            [2014, 6, 20, 8, 40, 23, 4, 171, 0])
185
186    def tearDown(self):
187        self.time_patcher.stop()
188
189    def test_put(self):
190        request = mock.Mock()
191        request.url = '/'
192        request.method = 'POST'
193        params = {'Foo': u'\u2713'}
194        result = self.signer.calc_signature(request, params)
195        self.assertEqual(
196            result, ('Foo=%E2%9C%93',
197                     u'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q='))
198
199    def test_fields(self):
200        request = AWSRequest()
201        request.url = '/'
202        request.method = 'POST'
203        request.data = {'Foo': u'\u2713'}
204        self.signer.add_auth(request)
205        self.assertEqual(request.data['AWSAccessKeyId'], 'foo')
206        self.assertEqual(request.data['Foo'], u'\u2713')
207        self.assertEqual(request.data['Timestamp'], '2014-06-20T08:40:23Z')
208        self.assertEqual(request.data['Signature'],
209                         u'Tiecw+t51tok4dTT8B4bg47zxHEM/KcD55f2/x6K22o=')
210        self.assertEqual(request.data['SignatureMethod'], 'HmacSHA256')
211        self.assertEqual(request.data['SignatureVersion'], '2')
212
213    def test_resign(self):
214        # Make sure that resigning after e.g. retries works
215        request = AWSRequest()
216        request.url = '/'
217        request.method = 'POST'
218        params = {
219            'Foo': u'\u2713',
220            'Signature': u'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q='
221        }
222        result = self.signer.calc_signature(request, params)
223        self.assertEqual(
224            result, ('Foo=%E2%9C%93',
225                     u'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q='))
226
227    def test_get(self):
228        request = AWSRequest()
229        request.url = '/'
230        request.method = 'GET'
231        request.params = {'Foo': u'\u2713'}
232        self.signer.add_auth(request)
233        self.assertEqual(request.params['AWSAccessKeyId'], 'foo')
234        self.assertEqual(request.params['Foo'], u'\u2713')
235        self.assertEqual(request.params['Timestamp'], '2014-06-20T08:40:23Z')
236        self.assertEqual(request.params['Signature'],
237                         u'Un97klqZCONP65bA1+Iv4H3AcB2I40I4DBvw5ZERFPw=')
238        self.assertEqual(request.params['SignatureMethod'], 'HmacSHA256')
239        self.assertEqual(request.params['SignatureVersion'], '2')
240
241
242class TestSigV3(unittest.TestCase):
243
244    maxDiff = None
245
246    def setUp(self):
247        self.access_key = 'access_key'
248        self.secret_key = 'secret_key'
249        self.credentials = botocore.credentials.Credentials(self.access_key,
250                                                            self.secret_key)
251        self.auth = botocore.auth.SigV3Auth(self.credentials)
252        self.date_mock = mock.patch('botocore.auth.formatdate')
253        self.formatdate = self.date_mock.start()
254        self.formatdate.return_value = 'Thu, 17 Nov 2005 18:49:58 GMT'
255
256    def tearDown(self):
257        self.date_mock.stop()
258
259    def test_signature_with_date_headers(self):
260        request = AWSRequest()
261        request.headers = {'Date': 'Thu, 17 Nov 2005 18:49:58 GMT'}
262        request.url = 'https://route53.amazonaws.com'
263        self.auth.add_auth(request)
264        self.assertEqual(
265            request.headers['X-Amzn-Authorization'],
266            ('AWS3-HTTPS AWSAccessKeyId=access_key,Algorithm=HmacSHA256,'
267             'Signature=M245fo86nVKI8rLpH4HgWs841sBTUKuwciiTpjMDgPs='))
268
269    def test_resign_with_token(self):
270        credentials = botocore.credentials.Credentials(
271            access_key='foo', secret_key='bar', token='baz')
272        auth = botocore.auth.SigV3Auth(credentials)
273        request = AWSRequest()
274        request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT'
275        request.method = 'PUT'
276        request.url = 'https://route53.amazonaws.com/'
277        auth.add_auth(request)
278        original_auth = request.headers['X-Amzn-Authorization']
279        # Resigning the request shouldn't change the authorization
280        # header.
281        auth.add_auth(request)
282        self.assertEqual(request.headers.get_all('X-Amzn-Authorization'),
283                         [original_auth])
284
285
286class TestS3SigV4Auth(BaseTestWithFixedDate):
287
288    AuthClass = botocore.auth.S3SigV4Auth
289    maxDiff = None
290
291    def setUp(self):
292        super(TestS3SigV4Auth, self).setUp()
293        self.credentials = botocore.credentials.Credentials(
294            access_key='foo', secret_key='bar', token='baz')
295        self.auth = self.AuthClass(
296            self.credentials, 'ec2', 'eu-central-1')
297        self.request = AWSRequest(data=six.BytesIO(b"foo bar baz"))
298        self.request.method = 'PUT'
299        self.request.url = 'https://s3.eu-central-1.amazonaws.com/'
300
301        self.client_config = mock.Mock()
302        self.s3_config = {}
303        self.client_config.s3 = self.s3_config
304
305        self.request.context = {
306            'client_config': self.client_config
307        }
308
309    def test_resign_with_content_hash(self):
310        self.auth.add_auth(self.request)
311        original_auth = self.request.headers['Authorization']
312
313        self.auth.add_auth(self.request)
314        self.assertEqual(self.request.headers.get_all('Authorization'),
315                         [original_auth])
316
317    def test_signature_is_not_normalized(self):
318        request = AWSRequest()
319        request.url = 'https://s3.amazonaws.com/bucket/foo/./bar/../bar'
320        request.method = 'GET'
321        credentials = botocore.credentials.Credentials('access_key',
322                                                       'secret_key')
323        auth = self.AuthClass(credentials, 's3', 'us-east-1')
324        auth.add_auth(request)
325        self.assertTrue(
326            request.headers['Authorization'].startswith('AWS4-HMAC-SHA256'))
327
328    def test_query_string_params_in_urls(self):
329        if not hasattr(self.AuthClass, 'canonical_query_string'):
330            raise unittest.SkipTest('%s does not expose interim steps' %
331                                    self.AuthClass.__name__)
332
333        request = AWSRequest()
334        request.url = (
335            'https://s3.amazonaws.com/bucket?'
336            'marker=%C3%A4%C3%B6%C3%BC-01.txt&prefix'
337        )
338        request.data = {'Action': 'MyOperation'}
339        request.method = 'GET'
340
341        # Check that the canonical query string is correct formatting
342        # by ensuring that query string paramters that are added to the
343        # canonical query string are correctly formatted.
344        cqs = self.auth.canonical_query_string(request)
345        self.assertEqual('marker=%C3%A4%C3%B6%C3%BC-01.txt&prefix=', cqs)
346
347    def _test_blacklist_header(self, header, value):
348        request = AWSRequest()
349        request.url = 'https://s3.amazonaws.com/bucket/foo'
350        request.method = 'PUT'
351        request.headers[header] = value
352        credentials = botocore.credentials.Credentials('access_key',
353                                                       'secret_key')
354        auth = self.AuthClass(credentials, 's3', 'us-east-1')
355        auth.add_auth(request)
356        self.assertNotIn(header, request.headers['Authorization'])
357
358    def test_blacklist_expect_headers(self):
359        self._test_blacklist_header('expect', '100-continue')
360
361    def test_blacklist_trace_id(self):
362        self._test_blacklist_header('x-amzn-trace-id',
363                                    'Root=foo;Parent=bar;Sampleid=1')
364
365    def test_blacklist_headers(self):
366        self._test_blacklist_header('user-agent', 'botocore/1.4.11')
367
368    def test_uses_sha256_if_config_value_is_true(self):
369        self.client_config.s3['payload_signing_enabled'] = True
370        self.auth.add_auth(self.request)
371        sha_header = self.request.headers['X-Amz-Content-SHA256']
372        self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD')
373
374    def test_does_not_use_sha256_if_config_value_is_false(self):
375        self.client_config.s3['payload_signing_enabled'] = False
376        self.auth.add_auth(self.request)
377        sha_header = self.request.headers['X-Amz-Content-SHA256']
378        self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD')
379
380    def test_uses_sha256_if_md5_unset(self):
381        self.request.context['has_streaming_input'] = True
382        self.auth.add_auth(self.request)
383        sha_header = self.request.headers['X-Amz-Content-SHA256']
384        self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD')
385
386    def test_uses_sha256_if_not_https(self):
387        self.request.context['has_streaming_input'] = True
388        self.request.headers.add_header('Content-MD5', 'foo')
389        self.request.url = 'http://s3.amazonaws.com/bucket'
390        self.auth.add_auth(self.request)
391        sha_header = self.request.headers['X-Amz-Content-SHA256']
392        self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD')
393
394    def test_uses_sha256_if_not_streaming_upload(self):
395        self.request.context['has_streaming_input'] = False
396        self.request.headers.add_header('Content-MD5', 'foo')
397        self.request.url = 'https://s3.amazonaws.com/bucket'
398        self.auth.add_auth(self.request)
399        sha_header = self.request.headers['X-Amz-Content-SHA256']
400        self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD')
401
402    def test_does_not_use_sha256_if_md5_set(self):
403        self.request.context['has_streaming_input'] = True
404        self.request.headers.add_header('Content-MD5', 'foo')
405        self.auth.add_auth(self.request)
406        sha_header = self.request.headers['X-Amz-Content-SHA256']
407        self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD')
408
409    def test_does_not_use_sha256_if_context_config_set(self):
410        self.request.context['payload_signing_enabled'] = False
411        self.request.headers.add_header('Content-MD5', 'foo')
412        self.auth.add_auth(self.request)
413        sha_header = self.request.headers['X-Amz-Content-SHA256']
414        self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD')
415
416    def test_sha256_if_context_set_on_http(self):
417        self.request.context['payload_signing_enabled'] = False
418        self.request.headers.add_header('Content-MD5', 'foo')
419        self.request.url = 'http://s3.amazonaws.com/bucket'
420        self.auth.add_auth(self.request)
421        sha_header = self.request.headers['X-Amz-Content-SHA256']
422        self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD')
423
424    def test_sha256_if_context_set_without_md5(self):
425        self.request.context['payload_signing_enabled'] = False
426        self.request.url = 'https://s3.amazonaws.com/bucket'
427        self.auth.add_auth(self.request)
428        sha_header = self.request.headers['X-Amz-Content-SHA256']
429        self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD')
430
431
432class TestSigV4(unittest.TestCase):
433    def setUp(self):
434        self.credentials = botocore.credentials.Credentials(
435            access_key='foo', secret_key='bar')
436
437    def create_signer(self, service_name='myservice', region='us-west-2'):
438        auth = botocore.auth.SigV4Auth(
439            self.credentials, service_name, region)
440        return auth
441
442    def test_canonical_query_string(self):
443        request = AWSRequest()
444        request.url = (
445            'https://search-testdomain1-j67dwxlet67gf7ghwfmik2c67i.us-west-2.'
446            'cloudsearch.amazonaws.com/'
447            '2013-01-01/search?format=sdk&pretty=true&'
448            'q.options=%7B%22defaultOperator%22%3A%20%22and%22%2C%20%22'
449            'fields%22%3A%5B%22directors%5E10%22%5D%7D&q=George%20Lucas'
450        )
451        request.method = 'GET'
452        auth = self.create_signer('cloudsearchdomain', 'us-west-2')
453        actual = auth.canonical_query_string(request)
454        # Here 'q' should come before 'q.options'.
455        expected = ("format=sdk&pretty=true&q=George%20Lucas&q.options=%7B%22"
456                    "defaultOperator%22%3A%20%22and%22%2C%20%22fields%22%3A%5B"
457                    "%22directors%5E10%22%5D%7D")
458        self.assertEqual(actual, expected)
459
460    def test_thread_safe_timestamp(self):
461        request = AWSRequest()
462        request.url = (
463            'https://search-testdomain1-j67dwxlet67gf7ghwfmik2c67i.us-west-2.'
464            'cloudsearch.amazonaws.com/'
465            '2013-01-01/search?format=sdk&pretty=true&'
466            'q.options=%7B%22defaultOperator%22%3A%20%22and%22%2C%20%22'
467            'fields%22%3A%5B%22directors%5E10%22%5D%7D&q=George%20Lucas'
468        )
469        request.method = 'GET'
470        auth = self.create_signer('cloudsearchdomain', 'us-west-2')
471        with mock.patch.object(
472                botocore.auth.datetime, 'datetime',
473                mock.Mock(wraps=datetime.datetime)) as mock_datetime:
474            original_utcnow = datetime.datetime(2014, 1, 1, 0, 0)
475
476            mock_datetime.utcnow.return_value = original_utcnow
477            # Go through the add_auth process once. This will attach
478            # a timestamp to the request at the beginning of auth.
479            auth.add_auth(request)
480            self.assertEqual(request.context['timestamp'], '20140101T000000Z')
481            # Ensure the date is in the Authorization header
482            self.assertIn('20140101', request.headers['Authorization'])
483            # Now suppose the utc time becomes the next day all of a sudden
484            mock_datetime.utcnow.return_value = datetime.datetime(
485                2014, 1, 2, 0, 0)
486            # Smaller methods like the canonical request and string_to_sign
487            # should  have the timestamp attached to the request in their
488            # body and not what the time is now mocked as. This is to ensure
489            # there is no mismatching in timestamps when signing.
490            cr = auth.canonical_request(request)
491            self.assertIn('x-amz-date:20140101T000000Z', cr)
492            self.assertNotIn('x-amz-date:20140102T000000Z', cr)
493
494            sts = auth.string_to_sign(request, cr)
495            self.assertIn('20140101T000000Z', sts)
496            self.assertNotIn('20140102T000000Z', sts)
497
498    def test_payload_is_binary_file(self):
499        request = AWSRequest()
500        request.data = six.BytesIO(u'\u2713'.encode('utf-8'))
501        request.url = 'https://amazonaws.com'
502        auth = self.create_signer()
503        payload = auth.payload(request)
504        self.assertEqual(
505            payload,
506            '1dabba21cdad44541f6b15796f8d22978fc7ea10c46aeceeeeb66c23b3ac7604')
507
508    def test_payload_is_bytes_type(self):
509        request = AWSRequest()
510        request.data = u'\u2713'.encode('utf-8')
511        request.url = 'https://amazonaws.com'
512        auth = self.create_signer()
513        payload = auth.payload(request)
514        self.assertEqual(
515            payload,
516            '1dabba21cdad44541f6b15796f8d22978fc7ea10c46aeceeeeb66c23b3ac7604')
517
518    def test_payload_not_signed_if_disabled_in_context(self):
519        request = AWSRequest()
520        request.data = u'\u2713'.encode('utf-8')
521        request.url = 'https://amazonaws.com'
522        request.context['payload_signing_enabled'] = False
523        auth = self.create_signer()
524        payload = auth.payload(request)
525        self.assertEqual(payload, 'UNSIGNED-PAYLOAD')
526
527    def test_content_sha256_set_if_payload_signing_disabled(self):
528        request = AWSRequest()
529        request.data = six.BytesIO(u'\u2713'.encode('utf-8'))
530        request.url = 'https://amazonaws.com'
531        request.context['payload_signing_enabled'] = False
532        request.method = 'PUT'
533        auth = self.create_signer()
534        auth.add_auth(request)
535        sha_header = request.headers['X-Amz-Content-SHA256']
536        self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD')
537
538    def test_collapse_multiple_spaces(self):
539        auth = self.create_signer()
540        original = HTTPHeaders()
541        original['foo'] = 'double  space'
542        headers = auth.canonical_headers(original)
543        self.assertEqual(headers, 'foo:double space')
544
545    def test_trims_leading_trailing_spaces(self):
546        auth = self.create_signer()
547        original = HTTPHeaders()
548        original['foo'] = '  leading  and  trailing  '
549        headers = auth.canonical_headers(original)
550        self.assertEqual(headers, 'foo:leading and trailing')
551
552    def test_strips_http_default_port(self):
553        request = AWSRequest()
554        request.url = 'http://s3.us-west-2.amazonaws.com:80/'
555        request.method = 'GET'
556        auth = self.create_signer('s3', 'us-west-2')
557        actual = auth.headers_to_sign(request)['host']
558        expected = 's3.us-west-2.amazonaws.com'
559        self.assertEqual(actual, expected)
560
561    def test_strips_https_default_port(self):
562        request = AWSRequest()
563        request.url = 'https://s3.us-west-2.amazonaws.com:443/'
564        request.method = 'GET'
565        auth = self.create_signer('s3', 'us-west-2')
566        actual = auth.headers_to_sign(request)['host']
567        expected = 's3.us-west-2.amazonaws.com'
568        self.assertEqual(actual, expected)
569
570    def test_strips_http_auth(self):
571        request = AWSRequest()
572        request.url = 'https://username:password@s3.us-west-2.amazonaws.com/'
573        request.method = 'GET'
574        auth = self.create_signer('s3', 'us-west-2')
575        actual = auth.headers_to_sign(request)['host']
576        expected = 's3.us-west-2.amazonaws.com'
577        self.assertEqual(actual, expected)
578
579    def test_strips_default_port_and_http_auth(self):
580        request = AWSRequest()
581        request.url = 'http://username:password@s3.us-west-2.amazonaws.com:80/'
582        request.method = 'GET'
583        auth = self.create_signer('s3', 'us-west-2')
584        actual = auth.headers_to_sign(request)['host']
585        expected = 's3.us-west-2.amazonaws.com'
586        self.assertEqual(actual, expected)
587
588
589class TestSigV4Resign(BaseTestWithFixedDate):
590
591    maxDiff = None
592    AuthClass = botocore.auth.SigV4Auth
593
594    def setUp(self):
595        super(TestSigV4Resign, self).setUp()
596        self.credentials = botocore.credentials.Credentials(
597            access_key='foo', secret_key='bar', token='baz')
598        self.auth = self.AuthClass(self.credentials, 'ec2', 'us-west-2')
599        self.request = AWSRequest()
600        self.request.method = 'PUT'
601        self.request.url = 'https://ec2.amazonaws.com/'
602
603    def test_resign_request_with_date(self):
604        self.request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT'
605        self.auth.add_auth(self.request)
606        original_auth = self.request.headers['Authorization']
607
608        self.auth.add_auth(self.request)
609        self.assertEqual(self.request.headers.get_all('Authorization'),
610                         [original_auth])
611
612    def test_sigv4_without_date(self):
613        self.auth.add_auth(self.request)
614        original_auth = self.request.headers['Authorization']
615
616        self.auth.add_auth(self.request)
617        self.assertEqual(self.request.headers.get_all('Authorization'),
618                         [original_auth])
619
620
621class BasePresignTest(unittest.TestCase):
622    def get_parsed_query_string(self, request):
623        query_string_dict = parse_qs(urlsplit(request.url).query)
624        # Also, parse_qs sets each value in the dict to be a list, but
625        # because we know that we won't have repeated keys, we simplify
626        # the dict and convert it back to a single value.
627        for key in query_string_dict:
628            query_string_dict[key] = query_string_dict[key][0]
629        return query_string_dict
630
631
632class TestS3SigV2Presign(BasePresignTest):
633
634    def setUp(self):
635        self.access_key = 'access_key'
636        self.secret_key = 'secret_key'
637        self.credentials = botocore.credentials.Credentials(self.access_key,
638                                                            self.secret_key)
639        self.expires = 3000
640        self.auth = botocore.auth.HmacV1QueryAuth(
641            self.credentials, expires=self.expires)
642
643        self.current_epoch_time = 1427427247.465591
644        self.time_patch = mock.patch('time.time')
645        self.time_mock = self.time_patch.start()
646        self.time_mock.return_value = self.current_epoch_time
647
648        self.request = AWSRequest()
649        self.bucket = 'mybucket'
650        self.key = 'myobject'
651        self.path = 'https://s3.amazonaws.com/%s/%s' % (
652            self.bucket, self.key)
653        self.request.url = self.path
654        self.request.method = 'GET'
655
656    def tearDown(self):
657        self.time_patch.stop()
658
659    def test_presign_with_query_string(self):
660        self.request.url = (
661            u'https://foo-bucket.s3.amazonaws.com/image.jpg'
662            u'?response-content-disposition='
663            'attachment%3B%20filename%3D%22download.jpg%22')
664        self.auth.add_auth(self.request)
665        query_string = self.get_parsed_query_string(self.request)
666        # We should have still kept the response-content-disposition
667        # in the query string.
668        self.assertIn('response-content-disposition', query_string)
669        self.assertEqual(query_string['response-content-disposition'],
670                         'attachment; filename="download.jpg"')
671        # But we should have also added the parts from the signer.
672        self.assertEqual(query_string['AWSAccessKeyId'], self.access_key)
673
674    def test_presign_no_headers(self):
675        self.auth.add_auth(self.request)
676        self.assertTrue(self.request.url.startswith(self.path + '?'))
677        query_string = self.get_parsed_query_string(self.request)
678        self.assertEqual(query_string['AWSAccessKeyId'], self.access_key)
679        self.assertEqual(query_string['Expires'],
680                         str(int(self.current_epoch_time) + self.expires))
681        self.assertEqual(query_string['Signature'],
682                         'ZRSgywstwIruKLTLt/Bcrf9H1K4=')
683
684    def test_presign_with_x_amz_headers(self):
685        self.request.headers['x-amz-security-token'] = 'foo'
686        self.request.headers['x-amz-acl'] = 'read-only'
687        self.auth.add_auth(self.request)
688        query_string = self.get_parsed_query_string(self.request)
689        self.assertEqual(query_string['x-amz-security-token'], 'foo')
690        self.assertEqual(query_string['x-amz-acl'], 'read-only')
691        self.assertEqual(query_string['Signature'],
692                         '5oyMAGiUk1E5Ry2BnFr6cIS3Gus=')
693
694    def test_presign_with_content_headers(self):
695        self.request.headers['content-type'] = 'txt'
696        self.request.headers['content-md5'] = 'foo'
697        self.auth.add_auth(self.request)
698        query_string = self.get_parsed_query_string(self.request)
699        self.assertEqual(query_string['content-type'], 'txt')
700        self.assertEqual(query_string['content-md5'], 'foo')
701        self.assertEqual(query_string['Signature'],
702                         '/YQRFdQGywXP74WrOx2ET/RUqz8=')
703
704    def test_presign_with_unused_headers(self):
705        self.request.headers['user-agent'] = 'botocore'
706        self.auth.add_auth(self.request)
707        query_string = self.get_parsed_query_string(self.request)
708        self.assertNotIn('user-agent', query_string)
709        self.assertEqual(query_string['Signature'],
710                         'ZRSgywstwIruKLTLt/Bcrf9H1K4=')
711
712
713class TestSigV4Presign(BasePresignTest):
714
715    maxDiff = None
716    AuthClass = botocore.auth.SigV4QueryAuth
717
718    def setUp(self):
719        self.access_key = 'access_key'
720        self.secret_key = 'secret_key'
721        self.credentials = botocore.credentials.Credentials(self.access_key,
722                                                            self.secret_key)
723        self.service_name = 'myservice'
724        self.region_name = 'myregion'
725        self.auth = self.AuthClass(
726            self.credentials, self.service_name, self.region_name, expires=60)
727        self.datetime_patcher = mock.patch.object(
728            botocore.auth.datetime, 'datetime',
729            mock.Mock(wraps=datetime.datetime)
730        )
731        mocked_datetime = self.datetime_patcher.start()
732        mocked_datetime.utcnow.return_value = datetime.datetime(
733            2014, 1, 1, 0, 0)
734
735    def tearDown(self):
736        self.datetime_patcher.stop()
737
738    def test_presign_no_params(self):
739        request = AWSRequest()
740        request.method = 'GET'
741        request.url = 'https://ec2.us-east-1.amazonaws.com/'
742        self.auth.add_auth(request)
743        query_string = self.get_parsed_query_string(request)
744        self.assertEqual(
745            query_string,
746            {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
747             'X-Amz-Credential': ('access_key/20140101/myregion/'
748                                  'myservice/aws4_request'),
749             'X-Amz-Date': '20140101T000000Z',
750             'X-Amz-Expires': '60',
751             'X-Amz-Signature': ('c70e0bcdb4cd3ee324f71c78195445b878'
752                                 '8315af0800bbbdbbb6d05a616fb84c'),
753             'X-Amz-SignedHeaders': 'host'})
754
755    def test_operation_params_before_auth_params(self):
756        # The spec is picky about this.
757        request = AWSRequest()
758        request.method = 'GET'
759        request.url = 'https://ec2.us-east-1.amazonaws.com/?Action=MyOperation'
760        self.auth.add_auth(request)
761        # Verify auth params come after the existing params.
762        self.assertIn('?Action=MyOperation&X-Amz', request.url)
763
764    def test_operation_params_before_auth_params_in_body(self):
765        request = AWSRequest()
766        request.method = 'GET'
767        request.url = 'https://ec2.us-east-1.amazonaws.com/'
768        request.data = {'Action': 'MyOperation'}
769        self.auth.add_auth(request)
770        # Same situation, the params from request.data come before the auth
771        # params in the query string.
772        self.assertIn('?Action=MyOperation&X-Amz', request.url)
773
774    def test_operation_params_before_auth_params_in_params(self):
775        request = AWSRequest()
776        request.method = 'GET'
777        request.url = 'https://ec2.us-east-1.amazonaws.com/'
778        request.params = {'Action': 'MyOperation'}
779        self.auth.add_auth(request)
780        # Same situation, the params from request.param come before the
781        # auth params in the query string.
782        self.assertIn('?Action=MyOperation&X-Amz', request.url)
783
784    def test_request_params_not_duplicated_in_prepare(self):
785        """
786        params should be moved to query string in add_auth
787        and not rewritten at the end with request.prepare()
788        """
789        request = AWSRequest(
790            method='GET',
791            url='https://ec2.us-east-1.amazonaws.com/',
792            params={'Action': 'MyOperation'}
793        )
794        self.auth.add_auth(request)
795        self.assertIn('?Action=MyOperation&X-Amz', request.url)
796        prep = request.prepare()
797        assert not prep.url.endswith('Action=MyOperation')
798
799    def test_presign_with_spaces_in_param(self):
800        request = AWSRequest()
801        request.method = 'GET'
802        request.url = 'https://ec2.us-east-1.amazonaws.com/'
803        request.data = {'Action': 'MyOperation', 'Description': 'With Spaces'}
804        self.auth.add_auth(request)
805        # Verify we encode spaces as '%20, and we don't use '+'.
806        self.assertIn('Description=With%20Spaces', request.url)
807
808    def test_presign_with_empty_param_value(self):
809        request = AWSRequest()
810        request.method = 'POST'
811        # actual URL format for creating a multipart upload
812        request.url = 'https://s3.amazonaws.com/mybucket/mykey?uploads'
813        self.auth.add_auth(request)
814        # verify that uploads param is still in URL
815        self.assertIn('uploads', request.url)
816
817    def test_s3_sigv4_presign(self):
818        auth = botocore.auth.S3SigV4QueryAuth(
819            self.credentials, self.service_name, self.region_name, expires=60)
820        request = AWSRequest()
821        request.method = 'GET'
822        request.url = (
823            'https://s3.us-west-2.amazonaws.com/mybucket/keyname/.bar')
824        auth.add_auth(request)
825        query_string = self.get_parsed_query_string(request)
826        # We use a different payload:
827        self.assertEqual(auth.payload(request), 'UNSIGNED-PAYLOAD')
828        # which will result in a different X-Amz-Signature:
829        self.assertEqual(
830            query_string,
831            {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
832             'X-Amz-Credential': ('access_key/20140101/myregion/'
833                                  'myservice/aws4_request'),
834             'X-Amz-Date': '20140101T000000Z',
835             'X-Amz-Expires': '60',
836             'X-Amz-Signature': ('ac1b8b9e47e8685c5c963d75e35e8741d55251'
837                                 'cd955239cc1efad4dc7201db66'),
838             'X-Amz-SignedHeaders': 'host'})
839
840    def test_presign_with_security_token(self):
841        self.credentials.token = 'security-token'
842        auth = botocore.auth.S3SigV4QueryAuth(
843            self.credentials, self.service_name, self.region_name, expires=60)
844        request = AWSRequest()
845        request.method = 'GET'
846        request.url = 'https://ec2.us-east-1.amazonaws.com/'
847        auth.add_auth(request)
848        query_string = self.get_parsed_query_string(request)
849        self.assertEqual(
850            query_string['X-Amz-Security-Token'], 'security-token')
851
852    def test_presign_where_body_is_json_bytes(self):
853        request = AWSRequest()
854        request.method = 'GET'
855        request.url = 'https://myservice.us-east-1.amazonaws.com/'
856        request.data = b'{"Param": "value"}'
857        self.auth.add_auth(request)
858        query_string = self.get_parsed_query_string(request)
859        expected_query_string = {
860            'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
861            'X-Amz-Credential': (
862                'access_key/20140101/myregion/myservice/aws4_request'),
863            'X-Amz-Expires': '60',
864            'X-Amz-Date': '20140101T000000Z',
865            'X-Amz-Signature': (
866                '8e1d372d168d532313ce6df8f64a7dc51d'
867                'e6f312a9cfba6e5b345d8a771e839c'),
868            'X-Amz-SignedHeaders': 'host',
869            'Param': 'value'
870        }
871        self.assertEqual(query_string, expected_query_string)
872
873    def test_presign_where_body_is_json_string(self):
874        request = AWSRequest()
875        request.method = 'GET'
876        request.url = 'https://myservice.us-east-1.amazonaws.com/'
877        request.data = '{"Param": "value"}'
878        self.auth.add_auth(request)
879        query_string = self.get_parsed_query_string(request)
880        expected_query_string = {
881            'X-Amz-Algorithm': 'AWS4-HMAC-SHA256',
882            'X-Amz-Credential': (
883                'access_key/20140101/myregion/myservice/aws4_request'),
884            'X-Amz-Expires': '60',
885            'X-Amz-Date': '20140101T000000Z',
886            'X-Amz-Signature': (
887                '8e1d372d168d532313ce6df8f64a7dc51d'
888                'e6f312a9cfba6e5b345d8a771e839c'),
889            'X-Amz-SignedHeaders': 'host',
890            'Param': 'value'
891        }
892        self.assertEqual(query_string, expected_query_string)
893
894    def test_presign_content_type_form_encoded_not_signed(self):
895        request = AWSRequest()
896        request.method = 'GET'
897        request.url = 'https://myservice.us-east-1.amazonaws.com/'
898        request.headers['Content-Type'] = (
899            'application/x-www-form-urlencoded; charset=utf-8'
900        )
901        self.auth.add_auth(request)
902        query_string = self.get_parsed_query_string(request)
903        signed_headers = query_string.get('X-Amz-SignedHeaders')
904        self.assertNotIn('content-type', signed_headers)
905
906
907class BaseS3PresignPostTest(unittest.TestCase):
908    def setUp(self):
909        self.access_key = 'access_key'
910        self.secret_key = 'secret_key'
911        self.credentials = botocore.credentials.Credentials(
912            self.access_key, self.secret_key)
913
914        self.service_name = 'myservice'
915        self.region_name = 'myregion'
916
917        self.bucket = 'mybucket'
918        self.key = 'mykey'
919        self.policy = {
920            "expiration": "2007-12-01T12:00:00.000Z",
921            "conditions": [
922                {"acl": "public-read"},
923                {"bucket": self.bucket},
924                ["starts-with", "$key", self.key],
925            ]
926        }
927        self.fields = {
928            'key': self.key,
929            'acl': 'public-read',
930        }
931
932        self.request = AWSRequest()
933        self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket
934        self.request.method = 'POST'
935
936        self.request.context['s3-presign-post-fields'] = self.fields
937        self.request.context['s3-presign-post-policy'] = self.policy
938
939
940class TestS3SigV2Post(BaseS3PresignPostTest):
941    def setUp(self):
942        super(TestS3SigV2Post, self).setUp()
943        self.auth = botocore.auth.HmacV1PostAuth(self.credentials)
944
945        self.current_epoch_time = 1427427247.465591
946        self.time_patch = mock.patch('time.time')
947        self.time_mock = self.time_patch.start()
948        self.time_mock.return_value = self.current_epoch_time
949
950    def tearDown(self):
951        self.time_patch.stop()
952
953    def test_presign_post(self):
954        self.auth.add_auth(self.request)
955        result_fields = self.request.context['s3-presign-post-fields']
956        self.assertEqual(
957            result_fields['AWSAccessKeyId'], self.credentials.access_key)
958
959        result_policy = json.loads(base64.b64decode(
960            result_fields['policy']).decode('utf-8'))
961        self.assertEqual(result_policy['expiration'],
962                         '2007-12-01T12:00:00.000Z')
963        self.assertEqual(
964            result_policy['conditions'],
965            [{"acl": "public-read"},
966             {"bucket": "mybucket"},
967             ["starts-with", "$key", "mykey"]])
968        self.assertIn('signature', result_fields)
969
970    def test_presign_post_with_security_token(self):
971        self.credentials.token = 'my-token'
972        self.auth = botocore.auth.HmacV1PostAuth(self.credentials)
973        self.auth.add_auth(self.request)
974        result_fields = self.request.context['s3-presign-post-fields']
975        self.assertEqual(result_fields['x-amz-security-token'], 'my-token')
976
977    def test_empty_fields_and_policy(self):
978        self.request = AWSRequest()
979        self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket
980        self.request.method = 'POST'
981        self.auth.add_auth(self.request)
982
983        result_fields = self.request.context['s3-presign-post-fields']
984        self.assertEqual(
985            result_fields['AWSAccessKeyId'], self.credentials.access_key)
986        result_policy = json.loads(base64.b64decode(
987            result_fields['policy']).decode('utf-8'))
988        self.assertEqual(result_policy['conditions'], [])
989        self.assertIn('signature', result_fields)
990
991
992class TestS3SigV4Post(BaseS3PresignPostTest):
993    def setUp(self):
994        super(TestS3SigV4Post, self).setUp()
995        self.auth = botocore.auth.S3SigV4PostAuth(
996            self.credentials, self.service_name, self.region_name)
997        self.datetime_patcher = mock.patch.object(
998            botocore.auth.datetime, 'datetime',
999            mock.Mock(wraps=datetime.datetime)
1000        )
1001        mocked_datetime = self.datetime_patcher.start()
1002        mocked_datetime.utcnow.return_value = datetime.datetime(
1003            2014, 1, 1, 0, 0)
1004
1005    def tearDown(self):
1006        self.datetime_patcher.stop()
1007
1008    def test_presign_post(self):
1009        self.auth.add_auth(self.request)
1010        result_fields = self.request.context['s3-presign-post-fields']
1011        self.assertEqual(result_fields['x-amz-algorithm'], 'AWS4-HMAC-SHA256')
1012        self.assertEqual(
1013            result_fields['x-amz-credential'],
1014            'access_key/20140101/myregion/myservice/aws4_request')
1015        self.assertEqual(
1016            result_fields['x-amz-date'],
1017            '20140101T000000Z')
1018
1019        result_policy = json.loads(base64.b64decode(
1020            result_fields['policy']).decode('utf-8'))
1021        self.assertEqual(result_policy['expiration'],
1022                         '2007-12-01T12:00:00.000Z')
1023        self.assertEqual(
1024            result_policy['conditions'],
1025            [{"acl": "public-read"}, {"bucket": "mybucket"},
1026             ["starts-with", "$key", "mykey"],
1027             {"x-amz-algorithm": "AWS4-HMAC-SHA256"},
1028             {"x-amz-credential":
1029              "access_key/20140101/myregion/myservice/aws4_request"},
1030             {"x-amz-date": "20140101T000000Z"}])
1031        self.assertIn('x-amz-signature', result_fields)
1032
1033    def test_presign_post_with_security_token(self):
1034        self.credentials.token = 'my-token'
1035        self.auth = botocore.auth.S3SigV4PostAuth(
1036            self.credentials, self.service_name, self.region_name)
1037        self.auth.add_auth(self.request)
1038        result_fields = self.request.context['s3-presign-post-fields']
1039        self.assertEqual(result_fields['x-amz-security-token'], 'my-token')
1040
1041    def test_empty_fields_and_policy(self):
1042        self.request = AWSRequest()
1043        self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket
1044        self.request.method = 'POST'
1045        self.auth.add_auth(self.request)
1046
1047        result_fields = self.request.context['s3-presign-post-fields']
1048        self.assertEqual(result_fields['x-amz-algorithm'], 'AWS4-HMAC-SHA256')
1049        self.assertEqual(
1050            result_fields['x-amz-credential'],
1051            'access_key/20140101/myregion/myservice/aws4_request')
1052        self.assertEqual(
1053            result_fields['x-amz-date'],
1054            '20140101T000000Z')
1055
1056        result_policy = json.loads(base64.b64decode(
1057            result_fields['policy']).decode('utf-8'))
1058        self.assertEqual(
1059            result_policy['conditions'],
1060            [{"x-amz-algorithm": "AWS4-HMAC-SHA256"},
1061             {"x-amz-credential":
1062              "access_key/20140101/myregion/myservice/aws4_request"},
1063             {"x-amz-date": "20140101T000000Z"}])
1064        self.assertIn('x-amz-signature', result_fields)
1065