1import boto3 2import json 3import pytest 4import sure # noqa # pylint: disable=unused-import 5 6from botocore.exceptions import ClientError 7from moto import mock_iam, mock_s3, settings 8from moto.core import set_initial_no_auth_action_count 9from unittest import SkipTest 10 11 12@mock_s3 13@set_initial_no_auth_action_count(0) 14def test_load_unexisting_object_without_auth_should_return_403(): 15 if settings.TEST_SERVER_MODE: 16 raise SkipTest("Auth decorator does not work in server mode") 17 18 """Head an S3 object we should have no access to.""" 19 resource = boto3.resource("s3", region_name="us-east-1") 20 21 obj = resource.Object("myfakebucket", "myfakekey") 22 with pytest.raises(ClientError) as ex: 23 obj.load() 24 err = ex.value.response["Error"] 25 err["Code"].should.equal("InvalidAccessKeyId") 26 err["Message"].should.equal( 27 "The AWS Access Key Id you provided does not exist in our records." 28 ) 29 30 31@set_initial_no_auth_action_count(4) 32@mock_s3 33def test_head_bucket_with_correct_credentials(): 34 if settings.TEST_SERVER_MODE: 35 raise SkipTest("Auth decorator does not work in server mode") 36 37 # These calls are all unauthenticated 38 iam_keys = create_user_with_access_key_and_policy() 39 40 # This S3-client has correct credentials 41 s3 = boto3.client( 42 "s3", 43 aws_access_key_id=iam_keys["AccessKeyId"], 44 aws_secret_access_key=iam_keys["SecretAccessKey"], 45 ) 46 s3.create_bucket(Bucket="mock_bucket") 47 48 # Calling head_bucket with the correct credentials works 49 my_head_bucket( 50 "mock_bucket", 51 aws_access_key_id=iam_keys["AccessKeyId"], 52 aws_secret_access_key=iam_keys["SecretAccessKey"], 53 ) 54 55 56@set_initial_no_auth_action_count(4) 57@mock_s3 58def test_head_bucket_with_incorrect_credentials(): 59 if settings.TEST_SERVER_MODE: 60 raise SkipTest("Auth decorator does not work in server mode") 61 62 # These calls are all authenticated 63 iam_keys = create_user_with_access_key_and_policy() 64 65 # Create the bucket with correct credentials 66 s3 = boto3.client( 67 "s3", 68 aws_access_key_id=iam_keys["AccessKeyId"], 69 aws_secret_access_key=iam_keys["SecretAccessKey"], 70 ) 71 s3.create_bucket(Bucket="mock_bucket") 72 73 # Call head_bucket with incorrect credentials 74 with pytest.raises(ClientError) as ex: 75 my_head_bucket( 76 "mock_bucket", 77 aws_access_key_id=iam_keys["AccessKeyId"], 78 aws_secret_access_key="invalid", 79 ) 80 err = ex.value.response["Error"] 81 err["Code"].should.equal("SignatureDoesNotMatch") 82 err["Message"].should.equal( 83 "The request signature we calculated does not match the signature you provided. " 84 "Check your key and signing method." 85 ) 86 87 88def my_head_bucket(bucket, aws_access_key_id, aws_secret_access_key): 89 s3_client = boto3.client( 90 "s3", 91 aws_access_key_id=aws_access_key_id, 92 aws_secret_access_key=aws_secret_access_key, 93 ) 94 s3_client.head_bucket(Bucket=bucket) 95 96 97@mock_iam 98def create_user_with_access_key_and_policy(user_name="test-user"): 99 """ 100 Should create a user with attached policy allowing read/write operations on S3. 101 """ 102 policy_document = { 103 "Version": "2012-10-17", 104 "Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}], 105 } 106 107 # Create client and user 108 client = boto3.client("iam", region_name="us-east-1") 109 client.create_user(UserName=user_name) 110 111 # Create and attach the policy 112 policy_arn = client.create_policy( 113 PolicyName="policy1", PolicyDocument=json.dumps(policy_document) 114 )["Policy"]["Arn"] 115 client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn) 116 117 # Return the access keys 118 return client.create_access_key(UserName=user_name)["AccessKey"] 119