1import boto3
2import json
3import pytest
4import sure  # noqa # pylint: disable=unused-import
5
6from botocore.exceptions import ClientError
7from moto import mock_iam, mock_s3, settings
8from moto.core import set_initial_no_auth_action_count
9from unittest import SkipTest
10
11
12@mock_s3
13@set_initial_no_auth_action_count(0)
14def test_load_unexisting_object_without_auth_should_return_403():
15    if settings.TEST_SERVER_MODE:
16        raise SkipTest("Auth decorator does not work in server mode")
17
18    """Head an S3 object we should have no access to."""
19    resource = boto3.resource("s3", region_name="us-east-1")
20
21    obj = resource.Object("myfakebucket", "myfakekey")
22    with pytest.raises(ClientError) as ex:
23        obj.load()
24    err = ex.value.response["Error"]
25    err["Code"].should.equal("InvalidAccessKeyId")
26    err["Message"].should.equal(
27        "The AWS Access Key Id you provided does not exist in our records."
28    )
29
30
31@set_initial_no_auth_action_count(4)
32@mock_s3
33def test_head_bucket_with_correct_credentials():
34    if settings.TEST_SERVER_MODE:
35        raise SkipTest("Auth decorator does not work in server mode")
36
37    # These calls are all unauthenticated
38    iam_keys = create_user_with_access_key_and_policy()
39
40    # This S3-client has correct credentials
41    s3 = boto3.client(
42        "s3",
43        aws_access_key_id=iam_keys["AccessKeyId"],
44        aws_secret_access_key=iam_keys["SecretAccessKey"],
45    )
46    s3.create_bucket(Bucket="mock_bucket")
47
48    # Calling head_bucket with the correct credentials works
49    my_head_bucket(
50        "mock_bucket",
51        aws_access_key_id=iam_keys["AccessKeyId"],
52        aws_secret_access_key=iam_keys["SecretAccessKey"],
53    )
54
55
56@set_initial_no_auth_action_count(4)
57@mock_s3
58def test_head_bucket_with_incorrect_credentials():
59    if settings.TEST_SERVER_MODE:
60        raise SkipTest("Auth decorator does not work in server mode")
61
62    # These calls are all authenticated
63    iam_keys = create_user_with_access_key_and_policy()
64
65    # Create the bucket with correct credentials
66    s3 = boto3.client(
67        "s3",
68        aws_access_key_id=iam_keys["AccessKeyId"],
69        aws_secret_access_key=iam_keys["SecretAccessKey"],
70    )
71    s3.create_bucket(Bucket="mock_bucket")
72
73    # Call head_bucket with incorrect credentials
74    with pytest.raises(ClientError) as ex:
75        my_head_bucket(
76            "mock_bucket",
77            aws_access_key_id=iam_keys["AccessKeyId"],
78            aws_secret_access_key="invalid",
79        )
80    err = ex.value.response["Error"]
81    err["Code"].should.equal("SignatureDoesNotMatch")
82    err["Message"].should.equal(
83        "The request signature we calculated does not match the signature you provided. "
84        "Check your key and signing method."
85    )
86
87
88def my_head_bucket(bucket, aws_access_key_id, aws_secret_access_key):
89    s3_client = boto3.client(
90        "s3",
91        aws_access_key_id=aws_access_key_id,
92        aws_secret_access_key=aws_secret_access_key,
93    )
94    s3_client.head_bucket(Bucket=bucket)
95
96
97@mock_iam
98def create_user_with_access_key_and_policy(user_name="test-user"):
99    """
100    Should create a user with attached policy allowing read/write operations on S3.
101    """
102    policy_document = {
103        "Version": "2012-10-17",
104        "Statement": [{"Effect": "Allow", "Action": "s3:*", "Resource": "*"}],
105    }
106
107    # Create client and user
108    client = boto3.client("iam", region_name="us-east-1")
109    client.create_user(UserName=user_name)
110
111    # Create and attach the policy
112    policy_arn = client.create_policy(
113        PolicyName="policy1", PolicyDocument=json.dumps(policy_document)
114    )["Policy"]["Arn"]
115    client.attach_user_policy(UserName=user_name, PolicyArn=policy_arn)
116
117    # Return the access keys
118    return client.create_access_key(UserName=user_name)["AccessKey"]
119