1from moto.core.responses import BaseResponse
2from moto.secretsmanager.exceptions import (
3    InvalidRequestException,
4    InvalidParameterException,
5    ValidationException,
6)
7
8from .models import secretsmanager_backends, filter_keys
9
10import json
11
12
13def _validate_filters(filters):
14    for idx, f in enumerate(filters):
15        filter_key = f.get("Key", None)
16        filter_values = f.get("Values", None)
17        if filter_key is None:
18            raise InvalidParameterException("Invalid filter key")
19        if filter_key not in filter_keys():
20            raise ValidationException(
21                "1 validation error detected: Value '{}' at 'filters.{}.member.key' failed to satisfy constraint: "
22                "Member must satisfy enum value set: [all, name, tag-key, description, tag-value]".format(
23                    filter_key, idx + 1
24                )
25            )
26        if filter_values is None:
27            raise InvalidParameterException(
28                "Invalid filter values for key: {}".format(filter_key)
29            )
30
31
32class SecretsManagerResponse(BaseResponse):
33    def get_secret_value(self):
34        secret_id = self._get_param("SecretId")
35        version_id = self._get_param("VersionId")
36        version_stage = self._get_param("VersionStage")
37        return secretsmanager_backends[self.region].get_secret_value(
38            secret_id=secret_id, version_id=version_id, version_stage=version_stage
39        )
40
41    def create_secret(self):
42        name = self._get_param("Name")
43        secret_string = self._get_param("SecretString")
44        secret_binary = self._get_param("SecretBinary")
45        description = self._get_param("Description", if_none="")
46        tags = self._get_param("Tags", if_none=[])
47        kms_key_id = self._get_param("KmsKeyId", if_none=None)
48        return secretsmanager_backends[self.region].create_secret(
49            name=name,
50            secret_string=secret_string,
51            secret_binary=secret_binary,
52            description=description,
53            tags=tags,
54            kms_key_id=kms_key_id,
55        )
56
57    def update_secret(self):
58        secret_id = self._get_param("SecretId")
59        secret_string = self._get_param("SecretString")
60        secret_binary = self._get_param("SecretBinary")
61        client_request_token = self._get_param("ClientRequestToken")
62        kms_key_id = self._get_param("KmsKeyId", if_none=None)
63        return secretsmanager_backends[self.region].update_secret(
64            secret_id=secret_id,
65            secret_string=secret_string,
66            secret_binary=secret_binary,
67            client_request_token=client_request_token,
68            kms_key_id=kms_key_id,
69        )
70
71    def get_random_password(self):
72        password_length = self._get_param("PasswordLength", if_none=32)
73        exclude_characters = self._get_param("ExcludeCharacters", if_none="")
74        exclude_numbers = self._get_param("ExcludeNumbers", if_none=False)
75        exclude_punctuation = self._get_param("ExcludePunctuation", if_none=False)
76        exclude_uppercase = self._get_param("ExcludeUppercase", if_none=False)
77        exclude_lowercase = self._get_param("ExcludeLowercase", if_none=False)
78        include_space = self._get_param("IncludeSpace", if_none=False)
79        require_each_included_type = self._get_param(
80            "RequireEachIncludedType", if_none=True
81        )
82        return secretsmanager_backends[self.region].get_random_password(
83            password_length=password_length,
84            exclude_characters=exclude_characters,
85            exclude_numbers=exclude_numbers,
86            exclude_punctuation=exclude_punctuation,
87            exclude_uppercase=exclude_uppercase,
88            exclude_lowercase=exclude_lowercase,
89            include_space=include_space,
90            require_each_included_type=require_each_included_type,
91        )
92
93    def describe_secret(self):
94        secret_id = self._get_param("SecretId")
95        return secretsmanager_backends[self.region].describe_secret(secret_id=secret_id)
96
97    def rotate_secret(self):
98        client_request_token = self._get_param("ClientRequestToken")
99        rotation_lambda_arn = self._get_param("RotationLambdaARN")
100        rotation_rules = self._get_param("RotationRules")
101        secret_id = self._get_param("SecretId")
102        return secretsmanager_backends[self.region].rotate_secret(
103            secret_id=secret_id,
104            client_request_token=client_request_token,
105            rotation_lambda_arn=rotation_lambda_arn,
106            rotation_rules=rotation_rules,
107        )
108
109    def put_secret_value(self):
110        secret_id = self._get_param("SecretId", if_none="")
111        secret_string = self._get_param("SecretString")
112        secret_binary = self._get_param("SecretBinary")
113        client_request_token = self._get_param("ClientRequestToken")
114        if not secret_binary and not secret_string:
115            raise InvalidRequestException(
116                "You must provide either SecretString or SecretBinary."
117            )
118        version_stages = self._get_param("VersionStages", if_none=["AWSCURRENT"])
119        if not isinstance(version_stages, list):
120            version_stages = [version_stages]
121
122        return secretsmanager_backends[self.region].put_secret_value(
123            secret_id=secret_id,
124            secret_binary=secret_binary,
125            secret_string=secret_string,
126            version_stages=version_stages,
127            client_request_token=client_request_token,
128        )
129
130    def list_secret_version_ids(self):
131        secret_id = self._get_param("SecretId", if_none="")
132        return secretsmanager_backends[self.region].list_secret_version_ids(
133            secret_id=secret_id
134        )
135
136    def list_secrets(self):
137        filters = self._get_param("Filters", if_none=[])
138        _validate_filters(filters)
139        max_results = self._get_int_param("MaxResults")
140        next_token = self._get_param("NextToken")
141        secret_list, next_token = secretsmanager_backends[self.region].list_secrets(
142            filters=filters, max_results=max_results, next_token=next_token
143        )
144        return json.dumps(dict(SecretList=secret_list, NextToken=next_token))
145
146    def delete_secret(self):
147        secret_id = self._get_param("SecretId")
148        recovery_window_in_days = self._get_param("RecoveryWindowInDays")
149        force_delete_without_recovery = self._get_param("ForceDeleteWithoutRecovery")
150        arn, name, deletion_date = secretsmanager_backends[self.region].delete_secret(
151            secret_id=secret_id,
152            recovery_window_in_days=recovery_window_in_days,
153            force_delete_without_recovery=force_delete_without_recovery,
154        )
155        return json.dumps(dict(ARN=arn, Name=name, DeletionDate=deletion_date))
156
157    def restore_secret(self):
158        secret_id = self._get_param("SecretId")
159        arn, name = secretsmanager_backends[self.region].restore_secret(
160            secret_id=secret_id
161        )
162        return json.dumps(dict(ARN=arn, Name=name))
163
164    def get_resource_policy(self):
165        secret_id = self._get_param("SecretId")
166        return secretsmanager_backends[self.region].get_resource_policy(
167            secret_id=secret_id
168        )
169
170    def tag_resource(self):
171        secret_id = self._get_param("SecretId")
172        tags = self._get_param("Tags", if_none=[])
173        return secretsmanager_backends[self.region].tag_resource(secret_id, tags)
174
175    def untag_resource(self):
176        secret_id = self._get_param("SecretId")
177        tag_keys = self._get_param("TagKeys", if_none=[])
178        return secretsmanager_backends[self.region].untag_resource(
179            secret_id=secret_id, tag_keys=tag_keys
180        )
181
182    def update_secret_version_stage(self):
183        secret_id = self._get_param("SecretId")
184        version_stage = self._get_param("VersionStage")
185        remove_from_version_id = self._get_param("RemoveFromVersionId")
186        move_to_version_id = self._get_param("MoveToVersionId")
187        return secretsmanager_backends[self.region].update_secret_version_stage(
188            secret_id=secret_id,
189            version_stage=version_stage,
190            remove_from_version_id=remove_from_version_id,
191            move_to_version_id=move_to_version_id,
192        )
193