1#!/usr/bin/env 2# Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ 3# Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"). You 6# may not use this file except in compliance with the License. A copy of 7# the License is located at 8# 9# http://aws.amazon.com/apache2.0/ 10# 11# or in the "license" file accompanying this file. This file is 12# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF 13# ANY KIND, either express or implied. See the License for the specific 14# language governing permissions and limitations under the License. 15from tests import mock 16from tests import unittest 17import datetime 18import time 19import base64 20import json 21 22import botocore.auth 23import botocore.credentials 24from botocore.compat import HTTPHeaders, urlsplit, parse_qs, six 25from botocore.awsrequest import AWSRequest 26 27 28class BaseTestWithFixedDate(unittest.TestCase): 29 def setUp(self): 30 self.fixed_date = datetime.datetime(2014, 3, 10, 17, 2, 55, 0) 31 self.datetime_patch = mock.patch('botocore.auth.datetime.datetime') 32 self.datetime_mock = self.datetime_patch.start() 33 self.datetime_mock.utcnow.return_value = self.fixed_date 34 self.datetime_mock.strptime.return_value = self.fixed_date 35 36 def tearDown(self): 37 self.datetime_patch.stop() 38 39 40class TestHMACV1(unittest.TestCase): 41 42 maxDiff = None 43 44 def setUp(self): 45 access_key = '44CF9590006BF252F707' 46 secret_key = 'OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV' 47 self.credentials = botocore.credentials.Credentials(access_key, 48 secret_key) 49 self.hmacv1 = botocore.auth.HmacV1Auth(self.credentials, None, None) 50 self.date_mock = mock.patch('botocore.auth.formatdate') 51 self.formatdate = self.date_mock.start() 52 self.formatdate.return_value = 'Thu, 17 Nov 2005 18:49:58 GMT' 53 54 def tearDown(self): 55 self.date_mock.stop() 56 57 def test_put(self): 58 headers = {'Date': 'Thu, 17 Nov 2005 18:49:58 GMT', 59 'Content-Md5': 'c8fdb181845a4ca6b8fec737b3581d76', 60 'Content-Type': 'text/html', 61 'X-Amz-Meta-Author': 'foo@bar.com', 62 'X-Amz-Magic': 'abracadabra'} 63 http_headers = HTTPHeaders.from_dict(headers) 64 split = urlsplit('/quotes/nelson') 65 cs = self.hmacv1.canonical_string('PUT', split, http_headers) 66 expected_canonical = ( 67 "PUT\nc8fdb181845a4ca6b8fec737b3581d76\ntext/html\n" 68 "Thu, 17 Nov 2005 18:49:58 GMT\nx-amz-magic:abracadabra\n" 69 "x-amz-meta-author:foo@bar.com\n/quotes/nelson") 70 expected_signature = 'jZNOcbfWmD/A/f3hSvVzXZjM2HU=' 71 self.assertEqual(cs, expected_canonical) 72 sig = self.hmacv1.get_signature('PUT', split, http_headers) 73 self.assertEqual(sig, expected_signature) 74 75 def test_duplicate_headers(self): 76 pairs = [('Date', 'Thu, 17 Nov 2005 18:49:58 GMT'), 77 ('Content-Md5', 'c8fdb181845a4ca6b8fec737b3581d76'), 78 ('Content-Type', 'text/html'), 79 ('X-Amz-Meta-Author', 'bar@baz.com'), 80 ('X-Amz-Meta-Author', 'foo@bar.com'), 81 ('X-Amz-Magic', 'abracadabra')] 82 83 http_headers = HTTPHeaders.from_pairs(pairs) 84 split = urlsplit('/quotes/nelson') 85 sig = self.hmacv1.get_signature('PUT', split, http_headers) 86 self.assertEqual(sig, 'kIdMxyiYB+F+83zYGR6sSb3ICcE=') 87 88 def test_query_string(self): 89 split = urlsplit('/quotes/nelson?uploads') 90 pairs = [('Date', 'Thu, 17 Nov 2005 18:49:58 GMT')] 91 sig = self.hmacv1.get_signature('PUT', split, 92 HTTPHeaders.from_pairs(pairs)) 93 self.assertEqual(sig, 'P7pBz3Z4p3GxysRSJ/gR8nk7D4o=') 94 95 def test_bucket_operations(self): 96 # Check that the standard operations on buckets that are 97 # specified as query strings end up in the canonical resource. 98 operations = ('acl', 'cors', 'lifecycle', 'policy', 99 'notification', 'logging', 'tagging', 100 'requestPayment', 'versioning', 'website', 101 'object-lock') 102 for operation in operations: 103 url = '/quotes?%s' % operation 104 split = urlsplit(url) 105 cr = self.hmacv1.canonical_resource(split) 106 self.assertEqual(cr, '/quotes?%s' % operation) 107 108 def test_sign_with_token(self): 109 credentials = botocore.credentials.Credentials( 110 access_key='foo', secret_key='bar', token='baz') 111 auth = botocore.auth.HmacV1Auth(credentials) 112 request = AWSRequest() 113 request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' 114 request.headers['Content-Type'] = 'text/html' 115 request.method = 'PUT' 116 request.url = 'https://s3.amazonaws.com/bucket/key' 117 auth.add_auth(request) 118 self.assertIn('Authorization', request.headers) 119 # We're not actually checking the signature here, we're 120 # just making sure the auth header has the right format. 121 self.assertTrue(request.headers['Authorization'].startswith('AWS ')) 122 123 def test_resign_with_token(self): 124 credentials = botocore.credentials.Credentials( 125 access_key='foo', secret_key='bar', token='baz') 126 auth = botocore.auth.HmacV1Auth(credentials) 127 request = AWSRequest() 128 request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' 129 request.headers['Content-Type'] = 'text/html' 130 request.method = 'PUT' 131 request.url = 'https://s3.amazonaws.com/bucket/key' 132 133 auth.add_auth(request) 134 original_auth = request.headers['Authorization'] 135 # Resigning the request shouldn't change the authorization 136 # header. We are also ensuring that the date stays the same 137 # because we're mocking out the formatdate() call. There's 138 # another unit test that verifies we use the latest time 139 # when we sign the request. 140 auth.add_auth(request) 141 self.assertEqual(request.headers.get_all('Authorization'), 142 [original_auth]) 143 144 def test_resign_uses_most_recent_date(self): 145 dates = [ 146 'Thu, 17 Nov 2005 18:49:58 GMT', 147 'Thu, 17 Nov 2014 20:00:00 GMT', 148 ] 149 self.formatdate.side_effect = dates 150 151 request = AWSRequest() 152 request.headers['Content-Type'] = 'text/html' 153 request.method = 'PUT' 154 request.url = 'https://s3.amazonaws.com/bucket/key' 155 156 self.hmacv1.add_auth(request) 157 original_date = request.headers['Date'] 158 159 self.hmacv1.add_auth(request) 160 modified_date = request.headers['Date'] 161 162 # Each time we sign a request, we make another call to formatdate() 163 # so we should have a different date header each time. 164 self.assertEqual(original_date, dates[0]) 165 self.assertEqual(modified_date, dates[1]) 166 167 168class TestSigV2(unittest.TestCase): 169 170 maxDiff = None 171 172 def setUp(self): 173 access_key = 'foo' 174 secret_key = 'bar' 175 self.credentials = botocore.credentials.Credentials(access_key, 176 secret_key) 177 self.signer = botocore.auth.SigV2Auth(self.credentials) 178 self.time_patcher = mock.patch.object( 179 botocore.auth.time, 'gmtime', 180 mock.Mock(wraps=time.gmtime) 181 ) 182 mocked_time = self.time_patcher.start() 183 mocked_time.return_value = time.struct_time( 184 [2014, 6, 20, 8, 40, 23, 4, 171, 0]) 185 186 def tearDown(self): 187 self.time_patcher.stop() 188 189 def test_put(self): 190 request = mock.Mock() 191 request.url = '/' 192 request.method = 'POST' 193 params = {'Foo': u'\u2713'} 194 result = self.signer.calc_signature(request, params) 195 self.assertEqual( 196 result, ('Foo=%E2%9C%93', 197 u'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q=')) 198 199 def test_fields(self): 200 request = AWSRequest() 201 request.url = '/' 202 request.method = 'POST' 203 request.data = {'Foo': u'\u2713'} 204 self.signer.add_auth(request) 205 self.assertEqual(request.data['AWSAccessKeyId'], 'foo') 206 self.assertEqual(request.data['Foo'], u'\u2713') 207 self.assertEqual(request.data['Timestamp'], '2014-06-20T08:40:23Z') 208 self.assertEqual(request.data['Signature'], 209 u'Tiecw+t51tok4dTT8B4bg47zxHEM/KcD55f2/x6K22o=') 210 self.assertEqual(request.data['SignatureMethod'], 'HmacSHA256') 211 self.assertEqual(request.data['SignatureVersion'], '2') 212 213 def test_resign(self): 214 # Make sure that resigning after e.g. retries works 215 request = AWSRequest() 216 request.url = '/' 217 request.method = 'POST' 218 params = { 219 'Foo': u'\u2713', 220 'Signature': u'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q=' 221 } 222 result = self.signer.calc_signature(request, params) 223 self.assertEqual( 224 result, ('Foo=%E2%9C%93', 225 u'VCtWuwaOL0yMffAT8W4y0AFW3W4KUykBqah9S40rB+Q=')) 226 227 def test_get(self): 228 request = AWSRequest() 229 request.url = '/' 230 request.method = 'GET' 231 request.params = {'Foo': u'\u2713'} 232 self.signer.add_auth(request) 233 self.assertEqual(request.params['AWSAccessKeyId'], 'foo') 234 self.assertEqual(request.params['Foo'], u'\u2713') 235 self.assertEqual(request.params['Timestamp'], '2014-06-20T08:40:23Z') 236 self.assertEqual(request.params['Signature'], 237 u'Un97klqZCONP65bA1+Iv4H3AcB2I40I4DBvw5ZERFPw=') 238 self.assertEqual(request.params['SignatureMethod'], 'HmacSHA256') 239 self.assertEqual(request.params['SignatureVersion'], '2') 240 241 242class TestSigV3(unittest.TestCase): 243 244 maxDiff = None 245 246 def setUp(self): 247 self.access_key = 'access_key' 248 self.secret_key = 'secret_key' 249 self.credentials = botocore.credentials.Credentials(self.access_key, 250 self.secret_key) 251 self.auth = botocore.auth.SigV3Auth(self.credentials) 252 self.date_mock = mock.patch('botocore.auth.formatdate') 253 self.formatdate = self.date_mock.start() 254 self.formatdate.return_value = 'Thu, 17 Nov 2005 18:49:58 GMT' 255 256 def tearDown(self): 257 self.date_mock.stop() 258 259 def test_signature_with_date_headers(self): 260 request = AWSRequest() 261 request.headers = {'Date': 'Thu, 17 Nov 2005 18:49:58 GMT'} 262 request.url = 'https://route53.amazonaws.com' 263 self.auth.add_auth(request) 264 self.assertEqual( 265 request.headers['X-Amzn-Authorization'], 266 ('AWS3-HTTPS AWSAccessKeyId=access_key,Algorithm=HmacSHA256,' 267 'Signature=M245fo86nVKI8rLpH4HgWs841sBTUKuwciiTpjMDgPs=')) 268 269 def test_resign_with_token(self): 270 credentials = botocore.credentials.Credentials( 271 access_key='foo', secret_key='bar', token='baz') 272 auth = botocore.auth.SigV3Auth(credentials) 273 request = AWSRequest() 274 request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' 275 request.method = 'PUT' 276 request.url = 'https://route53.amazonaws.com/' 277 auth.add_auth(request) 278 original_auth = request.headers['X-Amzn-Authorization'] 279 # Resigning the request shouldn't change the authorization 280 # header. 281 auth.add_auth(request) 282 self.assertEqual(request.headers.get_all('X-Amzn-Authorization'), 283 [original_auth]) 284 285 286class TestS3SigV4Auth(BaseTestWithFixedDate): 287 288 AuthClass = botocore.auth.S3SigV4Auth 289 maxDiff = None 290 291 def setUp(self): 292 super(TestS3SigV4Auth, self).setUp() 293 self.credentials = botocore.credentials.Credentials( 294 access_key='foo', secret_key='bar', token='baz') 295 self.auth = self.AuthClass( 296 self.credentials, 'ec2', 'eu-central-1') 297 self.request = AWSRequest(data=six.BytesIO(b"foo bar baz")) 298 self.request.method = 'PUT' 299 self.request.url = 'https://s3.eu-central-1.amazonaws.com/' 300 301 self.client_config = mock.Mock() 302 self.s3_config = {} 303 self.client_config.s3 = self.s3_config 304 305 self.request.context = { 306 'client_config': self.client_config 307 } 308 309 def test_resign_with_content_hash(self): 310 self.auth.add_auth(self.request) 311 original_auth = self.request.headers['Authorization'] 312 313 self.auth.add_auth(self.request) 314 self.assertEqual(self.request.headers.get_all('Authorization'), 315 [original_auth]) 316 317 def test_signature_is_not_normalized(self): 318 request = AWSRequest() 319 request.url = 'https://s3.amazonaws.com/bucket/foo/./bar/../bar' 320 request.method = 'GET' 321 credentials = botocore.credentials.Credentials('access_key', 322 'secret_key') 323 auth = self.AuthClass(credentials, 's3', 'us-east-1') 324 auth.add_auth(request) 325 self.assertTrue( 326 request.headers['Authorization'].startswith('AWS4-HMAC-SHA256')) 327 328 def test_query_string_params_in_urls(self): 329 if not hasattr(self.AuthClass, 'canonical_query_string'): 330 raise unittest.SkipTest('%s does not expose interim steps' % 331 self.AuthClass.__name__) 332 333 request = AWSRequest() 334 request.url = ( 335 'https://s3.amazonaws.com/bucket?' 336 'marker=%C3%A4%C3%B6%C3%BC-01.txt&prefix' 337 ) 338 request.data = {'Action': 'MyOperation'} 339 request.method = 'GET' 340 341 # Check that the canonical query string is correct formatting 342 # by ensuring that query string paramters that are added to the 343 # canonical query string are correctly formatted. 344 cqs = self.auth.canonical_query_string(request) 345 self.assertEqual('marker=%C3%A4%C3%B6%C3%BC-01.txt&prefix=', cqs) 346 347 def _test_blacklist_header(self, header, value): 348 request = AWSRequest() 349 request.url = 'https://s3.amazonaws.com/bucket/foo' 350 request.method = 'PUT' 351 request.headers[header] = value 352 credentials = botocore.credentials.Credentials('access_key', 353 'secret_key') 354 auth = self.AuthClass(credentials, 's3', 'us-east-1') 355 auth.add_auth(request) 356 self.assertNotIn(header, request.headers['Authorization']) 357 358 def test_blacklist_expect_headers(self): 359 self._test_blacklist_header('expect', '100-continue') 360 361 def test_blacklist_trace_id(self): 362 self._test_blacklist_header('x-amzn-trace-id', 363 'Root=foo;Parent=bar;Sampleid=1') 364 365 def test_blacklist_headers(self): 366 self._test_blacklist_header('user-agent', 'botocore/1.4.11') 367 368 def test_uses_sha256_if_config_value_is_true(self): 369 self.client_config.s3['payload_signing_enabled'] = True 370 self.auth.add_auth(self.request) 371 sha_header = self.request.headers['X-Amz-Content-SHA256'] 372 self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') 373 374 def test_does_not_use_sha256_if_config_value_is_false(self): 375 self.client_config.s3['payload_signing_enabled'] = False 376 self.auth.add_auth(self.request) 377 sha_header = self.request.headers['X-Amz-Content-SHA256'] 378 self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') 379 380 def test_uses_sha256_if_md5_unset(self): 381 self.request.context['has_streaming_input'] = True 382 self.auth.add_auth(self.request) 383 sha_header = self.request.headers['X-Amz-Content-SHA256'] 384 self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') 385 386 def test_uses_sha256_if_not_https(self): 387 self.request.context['has_streaming_input'] = True 388 self.request.headers.add_header('Content-MD5', 'foo') 389 self.request.url = 'http://s3.amazonaws.com/bucket' 390 self.auth.add_auth(self.request) 391 sha_header = self.request.headers['X-Amz-Content-SHA256'] 392 self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') 393 394 def test_uses_sha256_if_not_streaming_upload(self): 395 self.request.context['has_streaming_input'] = False 396 self.request.headers.add_header('Content-MD5', 'foo') 397 self.request.url = 'https://s3.amazonaws.com/bucket' 398 self.auth.add_auth(self.request) 399 sha_header = self.request.headers['X-Amz-Content-SHA256'] 400 self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') 401 402 def test_does_not_use_sha256_if_md5_set(self): 403 self.request.context['has_streaming_input'] = True 404 self.request.headers.add_header('Content-MD5', 'foo') 405 self.auth.add_auth(self.request) 406 sha_header = self.request.headers['X-Amz-Content-SHA256'] 407 self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') 408 409 def test_does_not_use_sha256_if_context_config_set(self): 410 self.request.context['payload_signing_enabled'] = False 411 self.request.headers.add_header('Content-MD5', 'foo') 412 self.auth.add_auth(self.request) 413 sha_header = self.request.headers['X-Amz-Content-SHA256'] 414 self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') 415 416 def test_sha256_if_context_set_on_http(self): 417 self.request.context['payload_signing_enabled'] = False 418 self.request.headers.add_header('Content-MD5', 'foo') 419 self.request.url = 'http://s3.amazonaws.com/bucket' 420 self.auth.add_auth(self.request) 421 sha_header = self.request.headers['X-Amz-Content-SHA256'] 422 self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') 423 424 def test_sha256_if_context_set_without_md5(self): 425 self.request.context['payload_signing_enabled'] = False 426 self.request.url = 'https://s3.amazonaws.com/bucket' 427 self.auth.add_auth(self.request) 428 sha_header = self.request.headers['X-Amz-Content-SHA256'] 429 self.assertNotEqual(sha_header, 'UNSIGNED-PAYLOAD') 430 431 432class TestSigV4(unittest.TestCase): 433 def setUp(self): 434 self.credentials = botocore.credentials.Credentials( 435 access_key='foo', secret_key='bar') 436 437 def create_signer(self, service_name='myservice', region='us-west-2'): 438 auth = botocore.auth.SigV4Auth( 439 self.credentials, service_name, region) 440 return auth 441 442 def test_canonical_query_string(self): 443 request = AWSRequest() 444 request.url = ( 445 'https://search-testdomain1-j67dwxlet67gf7ghwfmik2c67i.us-west-2.' 446 'cloudsearch.amazonaws.com/' 447 '2013-01-01/search?format=sdk&pretty=true&' 448 'q.options=%7B%22defaultOperator%22%3A%20%22and%22%2C%20%22' 449 'fields%22%3A%5B%22directors%5E10%22%5D%7D&q=George%20Lucas' 450 ) 451 request.method = 'GET' 452 auth = self.create_signer('cloudsearchdomain', 'us-west-2') 453 actual = auth.canonical_query_string(request) 454 # Here 'q' should come before 'q.options'. 455 expected = ("format=sdk&pretty=true&q=George%20Lucas&q.options=%7B%22" 456 "defaultOperator%22%3A%20%22and%22%2C%20%22fields%22%3A%5B" 457 "%22directors%5E10%22%5D%7D") 458 self.assertEqual(actual, expected) 459 460 def test_thread_safe_timestamp(self): 461 request = AWSRequest() 462 request.url = ( 463 'https://search-testdomain1-j67dwxlet67gf7ghwfmik2c67i.us-west-2.' 464 'cloudsearch.amazonaws.com/' 465 '2013-01-01/search?format=sdk&pretty=true&' 466 'q.options=%7B%22defaultOperator%22%3A%20%22and%22%2C%20%22' 467 'fields%22%3A%5B%22directors%5E10%22%5D%7D&q=George%20Lucas' 468 ) 469 request.method = 'GET' 470 auth = self.create_signer('cloudsearchdomain', 'us-west-2') 471 with mock.patch.object( 472 botocore.auth.datetime, 'datetime', 473 mock.Mock(wraps=datetime.datetime)) as mock_datetime: 474 original_utcnow = datetime.datetime(2014, 1, 1, 0, 0) 475 476 mock_datetime.utcnow.return_value = original_utcnow 477 # Go through the add_auth process once. This will attach 478 # a timestamp to the request at the beginning of auth. 479 auth.add_auth(request) 480 self.assertEqual(request.context['timestamp'], '20140101T000000Z') 481 # Ensure the date is in the Authorization header 482 self.assertIn('20140101', request.headers['Authorization']) 483 # Now suppose the utc time becomes the next day all of a sudden 484 mock_datetime.utcnow.return_value = datetime.datetime( 485 2014, 1, 2, 0, 0) 486 # Smaller methods like the canonical request and string_to_sign 487 # should have the timestamp attached to the request in their 488 # body and not what the time is now mocked as. This is to ensure 489 # there is no mismatching in timestamps when signing. 490 cr = auth.canonical_request(request) 491 self.assertIn('x-amz-date:20140101T000000Z', cr) 492 self.assertNotIn('x-amz-date:20140102T000000Z', cr) 493 494 sts = auth.string_to_sign(request, cr) 495 self.assertIn('20140101T000000Z', sts) 496 self.assertNotIn('20140102T000000Z', sts) 497 498 def test_payload_is_binary_file(self): 499 request = AWSRequest() 500 request.data = six.BytesIO(u'\u2713'.encode('utf-8')) 501 request.url = 'https://amazonaws.com' 502 auth = self.create_signer() 503 payload = auth.payload(request) 504 self.assertEqual( 505 payload, 506 '1dabba21cdad44541f6b15796f8d22978fc7ea10c46aeceeeeb66c23b3ac7604') 507 508 def test_payload_is_bytes_type(self): 509 request = AWSRequest() 510 request.data = u'\u2713'.encode('utf-8') 511 request.url = 'https://amazonaws.com' 512 auth = self.create_signer() 513 payload = auth.payload(request) 514 self.assertEqual( 515 payload, 516 '1dabba21cdad44541f6b15796f8d22978fc7ea10c46aeceeeeb66c23b3ac7604') 517 518 def test_payload_not_signed_if_disabled_in_context(self): 519 request = AWSRequest() 520 request.data = u'\u2713'.encode('utf-8') 521 request.url = 'https://amazonaws.com' 522 request.context['payload_signing_enabled'] = False 523 auth = self.create_signer() 524 payload = auth.payload(request) 525 self.assertEqual(payload, 'UNSIGNED-PAYLOAD') 526 527 def test_content_sha256_set_if_payload_signing_disabled(self): 528 request = AWSRequest() 529 request.data = six.BytesIO(u'\u2713'.encode('utf-8')) 530 request.url = 'https://amazonaws.com' 531 request.context['payload_signing_enabled'] = False 532 request.method = 'PUT' 533 auth = self.create_signer() 534 auth.add_auth(request) 535 sha_header = request.headers['X-Amz-Content-SHA256'] 536 self.assertEqual(sha_header, 'UNSIGNED-PAYLOAD') 537 538 def test_collapse_multiple_spaces(self): 539 auth = self.create_signer() 540 original = HTTPHeaders() 541 original['foo'] = 'double space' 542 headers = auth.canonical_headers(original) 543 self.assertEqual(headers, 'foo:double space') 544 545 def test_trims_leading_trailing_spaces(self): 546 auth = self.create_signer() 547 original = HTTPHeaders() 548 original['foo'] = ' leading and trailing ' 549 headers = auth.canonical_headers(original) 550 self.assertEqual(headers, 'foo:leading and trailing') 551 552 def test_strips_http_default_port(self): 553 request = AWSRequest() 554 request.url = 'http://s3.us-west-2.amazonaws.com:80/' 555 request.method = 'GET' 556 auth = self.create_signer('s3', 'us-west-2') 557 actual = auth.headers_to_sign(request)['host'] 558 expected = 's3.us-west-2.amazonaws.com' 559 self.assertEqual(actual, expected) 560 561 def test_strips_https_default_port(self): 562 request = AWSRequest() 563 request.url = 'https://s3.us-west-2.amazonaws.com:443/' 564 request.method = 'GET' 565 auth = self.create_signer('s3', 'us-west-2') 566 actual = auth.headers_to_sign(request)['host'] 567 expected = 's3.us-west-2.amazonaws.com' 568 self.assertEqual(actual, expected) 569 570 def test_strips_http_auth(self): 571 request = AWSRequest() 572 request.url = 'https://username:password@s3.us-west-2.amazonaws.com/' 573 request.method = 'GET' 574 auth = self.create_signer('s3', 'us-west-2') 575 actual = auth.headers_to_sign(request)['host'] 576 expected = 's3.us-west-2.amazonaws.com' 577 self.assertEqual(actual, expected) 578 579 def test_strips_default_port_and_http_auth(self): 580 request = AWSRequest() 581 request.url = 'http://username:password@s3.us-west-2.amazonaws.com:80/' 582 request.method = 'GET' 583 auth = self.create_signer('s3', 'us-west-2') 584 actual = auth.headers_to_sign(request)['host'] 585 expected = 's3.us-west-2.amazonaws.com' 586 self.assertEqual(actual, expected) 587 588 589class TestSigV4Resign(BaseTestWithFixedDate): 590 591 maxDiff = None 592 AuthClass = botocore.auth.SigV4Auth 593 594 def setUp(self): 595 super(TestSigV4Resign, self).setUp() 596 self.credentials = botocore.credentials.Credentials( 597 access_key='foo', secret_key='bar', token='baz') 598 self.auth = self.AuthClass(self.credentials, 'ec2', 'us-west-2') 599 self.request = AWSRequest() 600 self.request.method = 'PUT' 601 self.request.url = 'https://ec2.amazonaws.com/' 602 603 def test_resign_request_with_date(self): 604 self.request.headers['Date'] = 'Thu, 17 Nov 2005 18:49:58 GMT' 605 self.auth.add_auth(self.request) 606 original_auth = self.request.headers['Authorization'] 607 608 self.auth.add_auth(self.request) 609 self.assertEqual(self.request.headers.get_all('Authorization'), 610 [original_auth]) 611 612 def test_sigv4_without_date(self): 613 self.auth.add_auth(self.request) 614 original_auth = self.request.headers['Authorization'] 615 616 self.auth.add_auth(self.request) 617 self.assertEqual(self.request.headers.get_all('Authorization'), 618 [original_auth]) 619 620 621class BasePresignTest(unittest.TestCase): 622 def get_parsed_query_string(self, request): 623 query_string_dict = parse_qs(urlsplit(request.url).query) 624 # Also, parse_qs sets each value in the dict to be a list, but 625 # because we know that we won't have repeated keys, we simplify 626 # the dict and convert it back to a single value. 627 for key in query_string_dict: 628 query_string_dict[key] = query_string_dict[key][0] 629 return query_string_dict 630 631 632class TestS3SigV2Presign(BasePresignTest): 633 634 def setUp(self): 635 self.access_key = 'access_key' 636 self.secret_key = 'secret_key' 637 self.credentials = botocore.credentials.Credentials(self.access_key, 638 self.secret_key) 639 self.expires = 3000 640 self.auth = botocore.auth.HmacV1QueryAuth( 641 self.credentials, expires=self.expires) 642 643 self.current_epoch_time = 1427427247.465591 644 self.time_patch = mock.patch('time.time') 645 self.time_mock = self.time_patch.start() 646 self.time_mock.return_value = self.current_epoch_time 647 648 self.request = AWSRequest() 649 self.bucket = 'mybucket' 650 self.key = 'myobject' 651 self.path = 'https://s3.amazonaws.com/%s/%s' % ( 652 self.bucket, self.key) 653 self.request.url = self.path 654 self.request.method = 'GET' 655 656 def tearDown(self): 657 self.time_patch.stop() 658 659 def test_presign_with_query_string(self): 660 self.request.url = ( 661 u'https://foo-bucket.s3.amazonaws.com/image.jpg' 662 u'?response-content-disposition=' 663 'attachment%3B%20filename%3D%22download.jpg%22') 664 self.auth.add_auth(self.request) 665 query_string = self.get_parsed_query_string(self.request) 666 # We should have still kept the response-content-disposition 667 # in the query string. 668 self.assertIn('response-content-disposition', query_string) 669 self.assertEqual(query_string['response-content-disposition'], 670 'attachment; filename="download.jpg"') 671 # But we should have also added the parts from the signer. 672 self.assertEqual(query_string['AWSAccessKeyId'], self.access_key) 673 674 def test_presign_no_headers(self): 675 self.auth.add_auth(self.request) 676 self.assertTrue(self.request.url.startswith(self.path + '?')) 677 query_string = self.get_parsed_query_string(self.request) 678 self.assertEqual(query_string['AWSAccessKeyId'], self.access_key) 679 self.assertEqual(query_string['Expires'], 680 str(int(self.current_epoch_time) + self.expires)) 681 self.assertEqual(query_string['Signature'], 682 'ZRSgywstwIruKLTLt/Bcrf9H1K4=') 683 684 def test_presign_with_x_amz_headers(self): 685 self.request.headers['x-amz-security-token'] = 'foo' 686 self.request.headers['x-amz-acl'] = 'read-only' 687 self.auth.add_auth(self.request) 688 query_string = self.get_parsed_query_string(self.request) 689 self.assertEqual(query_string['x-amz-security-token'], 'foo') 690 self.assertEqual(query_string['x-amz-acl'], 'read-only') 691 self.assertEqual(query_string['Signature'], 692 '5oyMAGiUk1E5Ry2BnFr6cIS3Gus=') 693 694 def test_presign_with_content_headers(self): 695 self.request.headers['content-type'] = 'txt' 696 self.request.headers['content-md5'] = 'foo' 697 self.auth.add_auth(self.request) 698 query_string = self.get_parsed_query_string(self.request) 699 self.assertEqual(query_string['content-type'], 'txt') 700 self.assertEqual(query_string['content-md5'], 'foo') 701 self.assertEqual(query_string['Signature'], 702 '/YQRFdQGywXP74WrOx2ET/RUqz8=') 703 704 def test_presign_with_unused_headers(self): 705 self.request.headers['user-agent'] = 'botocore' 706 self.auth.add_auth(self.request) 707 query_string = self.get_parsed_query_string(self.request) 708 self.assertNotIn('user-agent', query_string) 709 self.assertEqual(query_string['Signature'], 710 'ZRSgywstwIruKLTLt/Bcrf9H1K4=') 711 712 713class TestSigV4Presign(BasePresignTest): 714 715 maxDiff = None 716 AuthClass = botocore.auth.SigV4QueryAuth 717 718 def setUp(self): 719 self.access_key = 'access_key' 720 self.secret_key = 'secret_key' 721 self.credentials = botocore.credentials.Credentials(self.access_key, 722 self.secret_key) 723 self.service_name = 'myservice' 724 self.region_name = 'myregion' 725 self.auth = self.AuthClass( 726 self.credentials, self.service_name, self.region_name, expires=60) 727 self.datetime_patcher = mock.patch.object( 728 botocore.auth.datetime, 'datetime', 729 mock.Mock(wraps=datetime.datetime) 730 ) 731 mocked_datetime = self.datetime_patcher.start() 732 mocked_datetime.utcnow.return_value = datetime.datetime( 733 2014, 1, 1, 0, 0) 734 735 def tearDown(self): 736 self.datetime_patcher.stop() 737 738 def test_presign_no_params(self): 739 request = AWSRequest() 740 request.method = 'GET' 741 request.url = 'https://ec2.us-east-1.amazonaws.com/' 742 self.auth.add_auth(request) 743 query_string = self.get_parsed_query_string(request) 744 self.assertEqual( 745 query_string, 746 {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 747 'X-Amz-Credential': ('access_key/20140101/myregion/' 748 'myservice/aws4_request'), 749 'X-Amz-Date': '20140101T000000Z', 750 'X-Amz-Expires': '60', 751 'X-Amz-Signature': ('c70e0bcdb4cd3ee324f71c78195445b878' 752 '8315af0800bbbdbbb6d05a616fb84c'), 753 'X-Amz-SignedHeaders': 'host'}) 754 755 def test_operation_params_before_auth_params(self): 756 # The spec is picky about this. 757 request = AWSRequest() 758 request.method = 'GET' 759 request.url = 'https://ec2.us-east-1.amazonaws.com/?Action=MyOperation' 760 self.auth.add_auth(request) 761 # Verify auth params come after the existing params. 762 self.assertIn('?Action=MyOperation&X-Amz', request.url) 763 764 def test_operation_params_before_auth_params_in_body(self): 765 request = AWSRequest() 766 request.method = 'GET' 767 request.url = 'https://ec2.us-east-1.amazonaws.com/' 768 request.data = {'Action': 'MyOperation'} 769 self.auth.add_auth(request) 770 # Same situation, the params from request.data come before the auth 771 # params in the query string. 772 self.assertIn('?Action=MyOperation&X-Amz', request.url) 773 774 def test_operation_params_before_auth_params_in_params(self): 775 request = AWSRequest() 776 request.method = 'GET' 777 request.url = 'https://ec2.us-east-1.amazonaws.com/' 778 request.params = {'Action': 'MyOperation'} 779 self.auth.add_auth(request) 780 # Same situation, the params from request.param come before the 781 # auth params in the query string. 782 self.assertIn('?Action=MyOperation&X-Amz', request.url) 783 784 def test_request_params_not_duplicated_in_prepare(self): 785 """ 786 params should be moved to query string in add_auth 787 and not rewritten at the end with request.prepare() 788 """ 789 request = AWSRequest( 790 method='GET', 791 url='https://ec2.us-east-1.amazonaws.com/', 792 params={'Action': 'MyOperation'} 793 ) 794 self.auth.add_auth(request) 795 self.assertIn('?Action=MyOperation&X-Amz', request.url) 796 prep = request.prepare() 797 assert not prep.url.endswith('Action=MyOperation') 798 799 def test_presign_with_spaces_in_param(self): 800 request = AWSRequest() 801 request.method = 'GET' 802 request.url = 'https://ec2.us-east-1.amazonaws.com/' 803 request.data = {'Action': 'MyOperation', 'Description': 'With Spaces'} 804 self.auth.add_auth(request) 805 # Verify we encode spaces as '%20, and we don't use '+'. 806 self.assertIn('Description=With%20Spaces', request.url) 807 808 def test_presign_with_empty_param_value(self): 809 request = AWSRequest() 810 request.method = 'POST' 811 # actual URL format for creating a multipart upload 812 request.url = 'https://s3.amazonaws.com/mybucket/mykey?uploads' 813 self.auth.add_auth(request) 814 # verify that uploads param is still in URL 815 self.assertIn('uploads', request.url) 816 817 def test_s3_sigv4_presign(self): 818 auth = botocore.auth.S3SigV4QueryAuth( 819 self.credentials, self.service_name, self.region_name, expires=60) 820 request = AWSRequest() 821 request.method = 'GET' 822 request.url = ( 823 'https://s3.us-west-2.amazonaws.com/mybucket/keyname/.bar') 824 auth.add_auth(request) 825 query_string = self.get_parsed_query_string(request) 826 # We use a different payload: 827 self.assertEqual(auth.payload(request), 'UNSIGNED-PAYLOAD') 828 # which will result in a different X-Amz-Signature: 829 self.assertEqual( 830 query_string, 831 {'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 832 'X-Amz-Credential': ('access_key/20140101/myregion/' 833 'myservice/aws4_request'), 834 'X-Amz-Date': '20140101T000000Z', 835 'X-Amz-Expires': '60', 836 'X-Amz-Signature': ('ac1b8b9e47e8685c5c963d75e35e8741d55251' 837 'cd955239cc1efad4dc7201db66'), 838 'X-Amz-SignedHeaders': 'host'}) 839 840 def test_presign_with_security_token(self): 841 self.credentials.token = 'security-token' 842 auth = botocore.auth.S3SigV4QueryAuth( 843 self.credentials, self.service_name, self.region_name, expires=60) 844 request = AWSRequest() 845 request.method = 'GET' 846 request.url = 'https://ec2.us-east-1.amazonaws.com/' 847 auth.add_auth(request) 848 query_string = self.get_parsed_query_string(request) 849 self.assertEqual( 850 query_string['X-Amz-Security-Token'], 'security-token') 851 852 def test_presign_where_body_is_json_bytes(self): 853 request = AWSRequest() 854 request.method = 'GET' 855 request.url = 'https://myservice.us-east-1.amazonaws.com/' 856 request.data = b'{"Param": "value"}' 857 self.auth.add_auth(request) 858 query_string = self.get_parsed_query_string(request) 859 expected_query_string = { 860 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 861 'X-Amz-Credential': ( 862 'access_key/20140101/myregion/myservice/aws4_request'), 863 'X-Amz-Expires': '60', 864 'X-Amz-Date': '20140101T000000Z', 865 'X-Amz-Signature': ( 866 '8e1d372d168d532313ce6df8f64a7dc51d' 867 'e6f312a9cfba6e5b345d8a771e839c'), 868 'X-Amz-SignedHeaders': 'host', 869 'Param': 'value' 870 } 871 self.assertEqual(query_string, expected_query_string) 872 873 def test_presign_where_body_is_json_string(self): 874 request = AWSRequest() 875 request.method = 'GET' 876 request.url = 'https://myservice.us-east-1.amazonaws.com/' 877 request.data = '{"Param": "value"}' 878 self.auth.add_auth(request) 879 query_string = self.get_parsed_query_string(request) 880 expected_query_string = { 881 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', 882 'X-Amz-Credential': ( 883 'access_key/20140101/myregion/myservice/aws4_request'), 884 'X-Amz-Expires': '60', 885 'X-Amz-Date': '20140101T000000Z', 886 'X-Amz-Signature': ( 887 '8e1d372d168d532313ce6df8f64a7dc51d' 888 'e6f312a9cfba6e5b345d8a771e839c'), 889 'X-Amz-SignedHeaders': 'host', 890 'Param': 'value' 891 } 892 self.assertEqual(query_string, expected_query_string) 893 894 def test_presign_content_type_form_encoded_not_signed(self): 895 request = AWSRequest() 896 request.method = 'GET' 897 request.url = 'https://myservice.us-east-1.amazonaws.com/' 898 request.headers['Content-Type'] = ( 899 'application/x-www-form-urlencoded; charset=utf-8' 900 ) 901 self.auth.add_auth(request) 902 query_string = self.get_parsed_query_string(request) 903 signed_headers = query_string.get('X-Amz-SignedHeaders') 904 self.assertNotIn('content-type', signed_headers) 905 906 907class BaseS3PresignPostTest(unittest.TestCase): 908 def setUp(self): 909 self.access_key = 'access_key' 910 self.secret_key = 'secret_key' 911 self.credentials = botocore.credentials.Credentials( 912 self.access_key, self.secret_key) 913 914 self.service_name = 'myservice' 915 self.region_name = 'myregion' 916 917 self.bucket = 'mybucket' 918 self.key = 'mykey' 919 self.policy = { 920 "expiration": "2007-12-01T12:00:00.000Z", 921 "conditions": [ 922 {"acl": "public-read"}, 923 {"bucket": self.bucket}, 924 ["starts-with", "$key", self.key], 925 ] 926 } 927 self.fields = { 928 'key': self.key, 929 'acl': 'public-read', 930 } 931 932 self.request = AWSRequest() 933 self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket 934 self.request.method = 'POST' 935 936 self.request.context['s3-presign-post-fields'] = self.fields 937 self.request.context['s3-presign-post-policy'] = self.policy 938 939 940class TestS3SigV2Post(BaseS3PresignPostTest): 941 def setUp(self): 942 super(TestS3SigV2Post, self).setUp() 943 self.auth = botocore.auth.HmacV1PostAuth(self.credentials) 944 945 self.current_epoch_time = 1427427247.465591 946 self.time_patch = mock.patch('time.time') 947 self.time_mock = self.time_patch.start() 948 self.time_mock.return_value = self.current_epoch_time 949 950 def tearDown(self): 951 self.time_patch.stop() 952 953 def test_presign_post(self): 954 self.auth.add_auth(self.request) 955 result_fields = self.request.context['s3-presign-post-fields'] 956 self.assertEqual( 957 result_fields['AWSAccessKeyId'], self.credentials.access_key) 958 959 result_policy = json.loads(base64.b64decode( 960 result_fields['policy']).decode('utf-8')) 961 self.assertEqual(result_policy['expiration'], 962 '2007-12-01T12:00:00.000Z') 963 self.assertEqual( 964 result_policy['conditions'], 965 [{"acl": "public-read"}, 966 {"bucket": "mybucket"}, 967 ["starts-with", "$key", "mykey"]]) 968 self.assertIn('signature', result_fields) 969 970 def test_presign_post_with_security_token(self): 971 self.credentials.token = 'my-token' 972 self.auth = botocore.auth.HmacV1PostAuth(self.credentials) 973 self.auth.add_auth(self.request) 974 result_fields = self.request.context['s3-presign-post-fields'] 975 self.assertEqual(result_fields['x-amz-security-token'], 'my-token') 976 977 def test_empty_fields_and_policy(self): 978 self.request = AWSRequest() 979 self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket 980 self.request.method = 'POST' 981 self.auth.add_auth(self.request) 982 983 result_fields = self.request.context['s3-presign-post-fields'] 984 self.assertEqual( 985 result_fields['AWSAccessKeyId'], self.credentials.access_key) 986 result_policy = json.loads(base64.b64decode( 987 result_fields['policy']).decode('utf-8')) 988 self.assertEqual(result_policy['conditions'], []) 989 self.assertIn('signature', result_fields) 990 991 992class TestS3SigV4Post(BaseS3PresignPostTest): 993 def setUp(self): 994 super(TestS3SigV4Post, self).setUp() 995 self.auth = botocore.auth.S3SigV4PostAuth( 996 self.credentials, self.service_name, self.region_name) 997 self.datetime_patcher = mock.patch.object( 998 botocore.auth.datetime, 'datetime', 999 mock.Mock(wraps=datetime.datetime) 1000 ) 1001 mocked_datetime = self.datetime_patcher.start() 1002 mocked_datetime.utcnow.return_value = datetime.datetime( 1003 2014, 1, 1, 0, 0) 1004 1005 def tearDown(self): 1006 self.datetime_patcher.stop() 1007 1008 def test_presign_post(self): 1009 self.auth.add_auth(self.request) 1010 result_fields = self.request.context['s3-presign-post-fields'] 1011 self.assertEqual(result_fields['x-amz-algorithm'], 'AWS4-HMAC-SHA256') 1012 self.assertEqual( 1013 result_fields['x-amz-credential'], 1014 'access_key/20140101/myregion/myservice/aws4_request') 1015 self.assertEqual( 1016 result_fields['x-amz-date'], 1017 '20140101T000000Z') 1018 1019 result_policy = json.loads(base64.b64decode( 1020 result_fields['policy']).decode('utf-8')) 1021 self.assertEqual(result_policy['expiration'], 1022 '2007-12-01T12:00:00.000Z') 1023 self.assertEqual( 1024 result_policy['conditions'], 1025 [{"acl": "public-read"}, {"bucket": "mybucket"}, 1026 ["starts-with", "$key", "mykey"], 1027 {"x-amz-algorithm": "AWS4-HMAC-SHA256"}, 1028 {"x-amz-credential": 1029 "access_key/20140101/myregion/myservice/aws4_request"}, 1030 {"x-amz-date": "20140101T000000Z"}]) 1031 self.assertIn('x-amz-signature', result_fields) 1032 1033 def test_presign_post_with_security_token(self): 1034 self.credentials.token = 'my-token' 1035 self.auth = botocore.auth.S3SigV4PostAuth( 1036 self.credentials, self.service_name, self.region_name) 1037 self.auth.add_auth(self.request) 1038 result_fields = self.request.context['s3-presign-post-fields'] 1039 self.assertEqual(result_fields['x-amz-security-token'], 'my-token') 1040 1041 def test_empty_fields_and_policy(self): 1042 self.request = AWSRequest() 1043 self.request.url = 'https://s3.amazonaws.com/%s' % self.bucket 1044 self.request.method = 'POST' 1045 self.auth.add_auth(self.request) 1046 1047 result_fields = self.request.context['s3-presign-post-fields'] 1048 self.assertEqual(result_fields['x-amz-algorithm'], 'AWS4-HMAC-SHA256') 1049 self.assertEqual( 1050 result_fields['x-amz-credential'], 1051 'access_key/20140101/myregion/myservice/aws4_request') 1052 self.assertEqual( 1053 result_fields['x-amz-date'], 1054 '20140101T000000Z') 1055 1056 result_policy = json.loads(base64.b64decode( 1057 result_fields['policy']).decode('utf-8')) 1058 self.assertEqual( 1059 result_policy['conditions'], 1060 [{"x-amz-algorithm": "AWS4-HMAC-SHA256"}, 1061 {"x-amz-credential": 1062 "access_key/20140101/myregion/myservice/aws4_request"}, 1063 {"x-amz-date": "20140101T000000Z"}]) 1064 self.assertIn('x-amz-signature', result_fields) 1065