1import time
2import json
3import uuid
4import datetime
5
6from boto3 import Session
7from typing import List, Tuple
8
9from moto.core import BaseBackend, BaseModel
10from .exceptions import (
11    SecretNotFoundException,
12    SecretHasNoValueException,
13    InvalidParameterException,
14    ResourceExistsException,
15    ResourceNotFoundException,
16    InvalidRequestException,
17    ClientError,
18)
19from .utils import random_password, secret_arn, get_secret_name_from_arn
20from .list_secrets.filters import all, tag_key, tag_value, description, name
21
22
23_filter_functions = {
24    "all": all,
25    "name": name,
26    "description": description,
27    "tag-key": tag_key,
28    "tag-value": tag_value,
29}
30
31
32def filter_keys():
33    return list(_filter_functions.keys())
34
35
36def _matches(secret, filters):
37    is_match = True
38
39    for f in filters:
40        # Filter names are pre-validated in the resource layer
41        filter_function = _filter_functions.get(f["Key"])
42        is_match = is_match and filter_function(secret, f["Values"])
43
44    return is_match
45
46
47class SecretsManager(BaseModel):
48    def __init__(self, region_name, **kwargs):
49        self.region = region_name
50
51
52class FakeSecret:
53    def __init__(
54        self,
55        region_name,
56        secret_id,
57        secret_string=None,
58        secret_binary=None,
59        description=None,
60        tags=[],
61        kms_key_id=None,
62        version_id=None,
63        version_stages=None,
64    ):
65        self.secret_id = secret_id
66        self.name = secret_id
67        self.arn = secret_arn(region_name, secret_id)
68        self.secret_string = secret_string
69        self.secret_binary = secret_binary
70        self.description = description
71        self.tags = tags
72        self.kms_key_id = kms_key_id
73        self.version_id = version_id
74        self.version_stages = version_stages
75        self.rotation_enabled = False
76        self.rotation_lambda_arn = ""
77        self.auto_rotate_after_days = 0
78        self.deleted_date = None
79
80    def update(self, description=None, tags=[], kms_key_id=None):
81        self.description = description
82        self.tags = tags
83
84        if kms_key_id is not None:
85            self.kms_key_id = kms_key_id
86
87    def set_versions(self, versions):
88        self.versions = versions
89
90    def set_default_version_id(self, version_id):
91        self.default_version_id = version_id
92
93    def reset_default_version(self, secret_version, version_id):
94        # remove all old AWSPREVIOUS stages
95        for old_version in self.versions.values():
96            if "AWSPREVIOUS" in old_version["version_stages"]:
97                old_version["version_stages"].remove("AWSPREVIOUS")
98
99        # set old AWSCURRENT secret to AWSPREVIOUS
100        previous_current_version_id = self.default_version_id
101        self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"]
102
103        self.versions[version_id] = secret_version
104        self.default_version_id = version_id
105
106    def delete(self, deleted_date):
107        self.deleted_date = deleted_date
108
109    def restore(self):
110        self.deleted_date = None
111
112    def is_deleted(self):
113        return self.deleted_date is not None
114
115    def to_short_dict(self, include_version_stages=False):
116        dct = {
117            "ARN": self.arn,
118            "Name": self.name,
119            "VersionId": self.default_version_id,
120        }
121        if include_version_stages:
122            dct["VersionStages"] = self.version_stages
123        return json.dumps(dct)
124
125    def to_dict(self):
126        version_id_to_stages = self._form_version_ids_to_stages()
127
128        return {
129            "ARN": self.arn,
130            "Name": self.name,
131            "Description": self.description or "",
132            "KmsKeyId": self.kms_key_id,
133            "RotationEnabled": self.rotation_enabled,
134            "RotationLambdaARN": self.rotation_lambda_arn,
135            "RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days},
136            "LastRotatedDate": None,
137            "LastChangedDate": None,
138            "LastAccessedDate": None,
139            "DeletedDate": self.deleted_date,
140            "Tags": self.tags,
141            "VersionIdsToStages": version_id_to_stages,
142            "SecretVersionsToStages": version_id_to_stages,
143        }
144
145    def _form_version_ids_to_stages(self):
146        version_id_to_stages = {}
147        for key, value in self.versions.items():
148            version_id_to_stages[key] = value["version_stages"]
149
150        return version_id_to_stages
151
152
153class SecretsStore(dict):
154    def __setitem__(self, key, value):
155        new_key = get_secret_name_from_arn(key)
156        super(SecretsStore, self).__setitem__(new_key, value)
157
158    def __getitem__(self, key):
159        new_key = get_secret_name_from_arn(key)
160        return super(SecretsStore, self).__getitem__(new_key)
161
162    def __contains__(self, key):
163        new_key = get_secret_name_from_arn(key)
164        return dict.__contains__(self, new_key)
165
166    def pop(self, key, *args, **kwargs):
167        new_key = get_secret_name_from_arn(key)
168        return super(SecretsStore, self).pop(new_key, *args, **kwargs)
169
170
171class SecretsManagerBackend(BaseBackend):
172    def __init__(self, region_name=None, **kwargs):
173        super(SecretsManagerBackend, self).__init__()
174        self.region = region_name
175        self.secrets = SecretsStore()
176
177    def reset(self):
178        region_name = self.region
179        self.__dict__ = {}
180        self.__init__(region_name)
181
182    @staticmethod
183    def default_vpc_endpoint_service(service_region, zones):
184        """Default VPC endpoint services."""
185        return BaseBackend.default_vpc_endpoint_service_factory(
186            service_region, zones, "secretsmanager"
187        )
188
189    def _is_valid_identifier(self, identifier):
190        return identifier in self.secrets
191
192    def _unix_time_secs(self, dt):
193        epoch = datetime.datetime.utcfromtimestamp(0)
194        return (dt - epoch).total_seconds()
195
196    def _client_request_token_validator(self, client_request_token):
197        token_length = len(client_request_token)
198        if token_length < 32 or token_length > 64:
199            msg = "ClientRequestToken must be 32-64 characters long."
200            raise InvalidParameterException(msg)
201
202    def get_secret_value(self, secret_id, version_id, version_stage):
203        if not self._is_valid_identifier(secret_id):
204            raise SecretNotFoundException()
205
206        if not version_id and version_stage:
207            # set version_id to match version_stage
208            versions_dict = self.secrets[secret_id].versions
209            for ver_id, ver_val in versions_dict.items():
210                if version_stage in ver_val["version_stages"]:
211                    version_id = ver_id
212                    break
213            if not version_id:
214                raise SecretNotFoundException()
215
216        # TODO check this part
217        if self.secrets[secret_id].is_deleted():
218            raise InvalidRequestException(
219                "An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \
220                perform the operation on a secret that's currently marked deleted."
221            )
222
223        secret = self.secrets[secret_id]
224        version_id = version_id or secret.default_version_id
225
226        secret_version = secret.versions.get(version_id)
227        if not secret_version:
228            raise ResourceNotFoundException(
229                "An error occurred (ResourceNotFoundException) when calling the GetSecretValue operation: Secrets "
230                "Manager can't find the specified secret value for VersionId: {}".format(
231                    version_id
232                )
233            )
234
235        response_data = {
236            "ARN": secret.arn,
237            "Name": secret.name,
238            "VersionId": secret_version["version_id"],
239            "VersionStages": secret_version["version_stages"],
240            "CreatedDate": secret_version["createdate"],
241        }
242
243        if "secret_string" in secret_version:
244            response_data["SecretString"] = secret_version["secret_string"]
245
246        if "secret_binary" in secret_version:
247            response_data["SecretBinary"] = secret_version["secret_binary"]
248
249        if (
250            "secret_string" not in secret_version
251            and "secret_binary" not in secret_version
252        ):
253            raise SecretHasNoValueException(version_stage or "AWSCURRENT")
254
255        response = json.dumps(response_data)
256
257        return response
258
259    def update_secret(
260        self,
261        secret_id,
262        secret_string=None,
263        secret_binary=None,
264        client_request_token=None,
265        kms_key_id=None,
266        **kwargs
267    ):
268
269        # error if secret does not exist
270        if secret_id not in self.secrets:
271            raise SecretNotFoundException()
272
273        if self.secrets[secret_id].is_deleted():
274            raise InvalidRequestException(
275                "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: "
276                "You can't perform this operation on the secret because it was marked for deletion."
277            )
278
279        secret = self.secrets[secret_id]
280        tags = secret.tags
281        description = secret.description
282
283        secret = self._add_secret(
284            secret_id,
285            secret_string=secret_string,
286            secret_binary=secret_binary,
287            description=description,
288            version_id=client_request_token,
289            tags=tags,
290            kms_key_id=kms_key_id,
291        )
292
293        return secret.to_short_dict()
294
295    def create_secret(
296        self,
297        name,
298        secret_string=None,
299        secret_binary=None,
300        description=None,
301        tags=[],
302        kms_key_id=None,
303    ):
304
305        # error if secret exists
306        if name in self.secrets.keys():
307            raise ResourceExistsException(
308                "A resource with the ID you requested already exists."
309            )
310
311        secret = self._add_secret(
312            name,
313            secret_string=secret_string,
314            secret_binary=secret_binary,
315            description=description,
316            tags=tags,
317            kms_key_id=kms_key_id,
318        )
319
320        return secret.to_short_dict()
321
322    def _add_secret(
323        self,
324        secret_id,
325        secret_string=None,
326        secret_binary=None,
327        description=None,
328        tags=[],
329        kms_key_id=None,
330        version_id=None,
331        version_stages=None,
332    ):
333
334        if version_stages is None:
335            version_stages = ["AWSCURRENT"]
336
337        if version_id:
338            self._client_request_token_validator(version_id)
339        else:
340            version_id = str(uuid.uuid4())
341
342        secret_version = {
343            "createdate": int(time.time()),
344            "version_id": version_id,
345            "version_stages": version_stages,
346        }
347        if secret_string is not None:
348            secret_version["secret_string"] = secret_string
349
350        if secret_binary is not None:
351            secret_version["secret_binary"] = secret_binary
352
353        if secret_id in self.secrets:
354            secret = self.secrets[secret_id]
355
356            secret.update(description, tags, kms_key_id)
357
358            if "AWSPENDING" in version_stages:
359                secret.versions[version_id] = secret_version
360            else:
361                secret.reset_default_version(secret_version, version_id)
362        else:
363            secret = FakeSecret(
364                region_name=self.region,
365                secret_id=secret_id,
366                secret_string=secret_string,
367                secret_binary=secret_binary,
368                description=description,
369                tags=tags,
370                kms_key_id=kms_key_id,
371            )
372            secret.set_versions({version_id: secret_version})
373            secret.set_default_version_id(version_id)
374            self.secrets[secret_id] = secret
375
376        return secret
377
378    def put_secret_value(
379        self,
380        secret_id,
381        secret_string,
382        secret_binary,
383        client_request_token,
384        version_stages,
385    ):
386
387        if not self._is_valid_identifier(secret_id):
388            raise SecretNotFoundException()
389        else:
390            secret = self.secrets[secret_id]
391            tags = secret.tags
392            description = secret.description
393
394        secret = self._add_secret(
395            secret_id,
396            secret_string,
397            secret_binary,
398            version_id=client_request_token,
399            description=description,
400            tags=tags,
401            version_stages=version_stages,
402        )
403
404        return secret.to_short_dict(include_version_stages=True)
405
406    def describe_secret(self, secret_id):
407        if not self._is_valid_identifier(secret_id):
408            raise SecretNotFoundException()
409
410        secret = self.secrets[secret_id]
411
412        return json.dumps(secret.to_dict())
413
414    def rotate_secret(
415        self,
416        secret_id,
417        client_request_token=None,
418        rotation_lambda_arn=None,
419        rotation_rules=None,
420    ):
421
422        rotation_days = "AutomaticallyAfterDays"
423
424        if not self._is_valid_identifier(secret_id):
425            raise SecretNotFoundException()
426
427        if self.secrets[secret_id].is_deleted():
428            raise InvalidRequestException(
429                "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \
430                perform the operation on a secret that's currently marked deleted."
431            )
432
433        if rotation_lambda_arn:
434            if len(rotation_lambda_arn) > 2048:
435                msg = "RotationLambdaARN " "must <= 2048 characters long."
436                raise InvalidParameterException(msg)
437
438        if rotation_rules:
439            if rotation_days in rotation_rules:
440                rotation_period = rotation_rules[rotation_days]
441                if rotation_period < 1 or rotation_period > 1000:
442                    msg = (
443                        "RotationRules.AutomaticallyAfterDays " "must be within 1-1000."
444                    )
445                    raise InvalidParameterException(msg)
446
447        secret = self.secrets[secret_id]
448
449        # The rotation function must end with the versions of the secret in
450        # one of two states:
451        #
452        #  - The AWSPENDING and AWSCURRENT staging labels are attached to the
453        #    same version of the secret, or
454        #  - The AWSPENDING staging label is not attached to any version of the secret.
455        #
456        # If the AWSPENDING staging label is present but not attached to the same
457        # version as AWSCURRENT then any later invocation of RotateSecret assumes
458        # that a previous rotation request is still in progress and returns an error.
459        try:
460            version = next(
461                version
462                for version in secret.versions.values()
463                if "AWSPENDING" in version["version_stages"]
464            )
465            if "AWSCURRENT" in version["version_stages"]:
466                msg = "Previous rotation request is still in progress."
467                raise InvalidRequestException(msg)
468
469        except StopIteration:
470            # Pending is not present in any version
471            pass
472
473        old_secret_version = secret.versions[secret.default_version_id]
474
475        if client_request_token:
476            self._client_request_token_validator(client_request_token)
477            new_version_id = client_request_token
478        else:
479            new_version_id = str(uuid.uuid4())
480
481        # We add the new secret version as "pending". The previous version remains
482        # as "current" for now. Once we've passed the new secret through the lambda
483        # rotation function (if provided) we can then update the status to "current".
484        self._add_secret(
485            secret_id,
486            old_secret_version["secret_string"],
487            description=secret.description,
488            tags=secret.tags,
489            version_id=new_version_id,
490            version_stages=["AWSPENDING"],
491        )
492        secret.rotation_lambda_arn = rotation_lambda_arn or ""
493        if rotation_rules:
494            secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0)
495        if secret.auto_rotate_after_days > 0:
496            secret.rotation_enabled = True
497
498        # Begin the rotation process for the given secret by invoking the lambda function.
499        if secret.rotation_lambda_arn:
500            from moto.awslambda.models import lambda_backends
501
502            lambda_backend = lambda_backends[self.region]
503
504            request_headers = {}
505            response_headers = {}
506
507            func = lambda_backend.get_function(secret.rotation_lambda_arn)
508            if not func:
509                msg = "Resource not found for ARN '{}'.".format(
510                    secret.rotation_lambda_arn
511                )
512                raise ResourceNotFoundException(msg)
513
514            for step in ["create", "set", "test", "finish"]:
515                func.invoke(
516                    json.dumps(
517                        {
518                            "Step": step + "Secret",
519                            "SecretId": secret.name,
520                            "ClientRequestToken": new_version_id,
521                        }
522                    ),
523                    request_headers,
524                    response_headers,
525                )
526
527            secret.set_default_version_id(new_version_id)
528        else:
529            secret.reset_default_version(
530                secret.versions[new_version_id], new_version_id
531            )
532            secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"]
533
534        return secret.to_short_dict()
535
536    def get_random_password(
537        self,
538        password_length,
539        exclude_characters,
540        exclude_numbers,
541        exclude_punctuation,
542        exclude_uppercase,
543        exclude_lowercase,
544        include_space,
545        require_each_included_type,
546    ):
547        # password size must have value less than or equal to 4096
548        if password_length > 4096:
549            raise ClientError(
550                "ClientError: An error occurred (ValidationException) \
551                when calling the GetRandomPassword operation: 1 validation error detected: Value '{}' at 'passwordLength' \
552                failed to satisfy constraint: Member must have value less than or equal to 4096".format(
553                    password_length
554                )
555            )
556        if password_length < 4:
557            raise InvalidParameterException(
558                "InvalidParameterException: An error occurred (InvalidParameterException) \
559                when calling the GetRandomPassword operation: Password length is too short based on the required types."
560            )
561
562        response = json.dumps(
563            {
564                "RandomPassword": random_password(
565                    password_length,
566                    exclude_characters,
567                    exclude_numbers,
568                    exclude_punctuation,
569                    exclude_uppercase,
570                    exclude_lowercase,
571                    include_space,
572                    require_each_included_type,
573                )
574            }
575        )
576
577        return response
578
579    def list_secret_version_ids(self, secret_id):
580        secret = self.secrets[secret_id]
581
582        version_list = []
583        for version_id, version in secret.versions.items():
584            version_list.append(
585                {
586                    "CreatedDate": int(time.time()),
587                    "LastAccessedDate": int(time.time()),
588                    "VersionId": version_id,
589                    "VersionStages": version["version_stages"],
590                }
591            )
592
593        response = json.dumps(
594            {
595                "ARN": secret.secret_id,
596                "Name": secret.name,
597                "NextToken": "",
598                "Versions": version_list,
599            }
600        )
601
602        return response
603
604    def list_secrets(
605        self, filters: List, max_results: int = 100, next_token: str = None
606    ) -> Tuple[List, str]:
607        """
608        Returns secrets from secretsmanager.
609        The result is paginated and page items depends on the token value, because token contains start element
610        number of secret list.
611        Response example:
612        {
613            SecretList: [
614                {
615                    ARN: 'arn:aws:secretsmanager:us-east-1:1234567890:secret:test1-gEcah',
616                    Name: 'test1',
617                    ...
618                },
619                {
620                    ARN: 'arn:aws:secretsmanager:us-east-1:1234567890:secret:test2-KZwml',
621                    Name: 'test2',
622                    ...
623                }
624            ],
625            NextToken: '2'
626        }
627
628        :param filters: (List) Filter parameters.
629        :param max_results: (int) Max number of results per page.
630        :param next_token: (str) Page token.
631        :return: (Tuple[List,str]) Returns result list and next token.
632        """
633        secret_list = []
634        for secret in self.secrets.values():
635            if _matches(secret, filters):
636                secret_list.append(secret.to_dict())
637
638        starting_point = int(next_token or 0)
639        ending_point = starting_point + int(max_results or 100)
640        secret_page = secret_list[starting_point:ending_point]
641        new_next_token = str(ending_point) if ending_point < len(secret_list) else None
642
643        return secret_page, new_next_token
644
645    def delete_secret(
646        self, secret_id, recovery_window_in_days, force_delete_without_recovery
647    ):
648
649        if not self._is_valid_identifier(secret_id):
650            raise SecretNotFoundException()
651
652        if self.secrets[secret_id].is_deleted():
653            raise InvalidRequestException(
654                "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \
655                perform the operation on a secret that's currently marked deleted."
656            )
657
658        if recovery_window_in_days and force_delete_without_recovery:
659            raise InvalidParameterException(
660                "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \
661                use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays."
662            )
663
664        if recovery_window_in_days and (
665            recovery_window_in_days < 7 or recovery_window_in_days > 30
666        ):
667            raise InvalidParameterException(
668                "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: The \
669                RecoveryWindowInDays value must be between 7 and 30 days (inclusive)."
670            )
671
672        deletion_date = datetime.datetime.utcnow()
673
674        if force_delete_without_recovery:
675            secret = self.secrets.pop(secret_id, None)
676        else:
677            deletion_date += datetime.timedelta(days=recovery_window_in_days or 30)
678            self.secrets[secret_id].delete(self._unix_time_secs(deletion_date))
679            secret = self.secrets.get(secret_id, None)
680
681        if not secret:
682            raise SecretNotFoundException()
683
684        arn = secret.arn
685        name = secret.name
686
687        return arn, name, self._unix_time_secs(deletion_date)
688
689    def restore_secret(self, secret_id):
690
691        if not self._is_valid_identifier(secret_id):
692            raise SecretNotFoundException()
693
694        secret = self.secrets[secret_id]
695        secret.restore()
696
697        return secret.arn, secret.name
698
699    def tag_resource(self, secret_id, tags):
700
701        if secret_id not in self.secrets:
702            raise SecretNotFoundException()
703
704        secret = self.secrets[secret_id]
705        old_tags = secret.tags
706
707        for tag in tags:
708            old_tags.append(tag)
709
710        return secret_id
711
712    def untag_resource(self, secret_id, tag_keys):
713
714        if secret_id not in self.secrets:
715            raise SecretNotFoundException()
716
717        secret = self.secrets[secret_id]
718        tags = secret.tags
719
720        for tag in tags:
721            if tag["Key"] in tag_keys:
722                tags.remove(tag)
723
724        return secret_id
725
726    def update_secret_version_stage(
727        self, secret_id, version_stage, remove_from_version_id, move_to_version_id
728    ):
729        if secret_id not in self.secrets:
730            raise SecretNotFoundException()
731
732        secret = self.secrets[secret_id]
733
734        if remove_from_version_id:
735            if remove_from_version_id not in secret.versions:
736                raise InvalidParameterException(
737                    "Not a valid version: %s" % remove_from_version_id
738                )
739
740            stages = secret.versions[remove_from_version_id]["version_stages"]
741            if version_stage not in stages:
742                raise InvalidParameterException(
743                    "Version stage %s not found in version %s"
744                    % (version_stage, remove_from_version_id)
745                )
746
747            stages.remove(version_stage)
748
749        if move_to_version_id:
750            if move_to_version_id not in secret.versions:
751                raise InvalidParameterException(
752                    "Not a valid version: %s" % move_to_version_id
753                )
754
755            stages = secret.versions[move_to_version_id]["version_stages"]
756            stages.append(version_stage)
757
758        if version_stage == "AWSCURRENT":
759            if remove_from_version_id:
760                # Whenever you move AWSCURRENT, Secrets Manager automatically
761                # moves the label AWSPREVIOUS to the version that AWSCURRENT
762                # was removed from.
763                secret.versions[remove_from_version_id]["version_stages"].append(
764                    "AWSPREVIOUS"
765                )
766
767            if move_to_version_id:
768                stages = secret.versions[move_to_version_id]["version_stages"]
769                if "AWSPREVIOUS" in stages:
770                    stages.remove("AWSPREVIOUS")
771
772        return secret_id
773
774    @staticmethod
775    def get_resource_policy(secret_id):
776        resource_policy = {
777            "Version": "2012-10-17",
778            "Statement": {
779                "Effect": "Allow",
780                "Principal": {
781                    "AWS": [
782                        "arn:aws:iam::111122223333:root",
783                        "arn:aws:iam::444455556666:root",
784                    ]
785                },
786                "Action": ["secretsmanager:GetSecretValue"],
787                "Resource": "*",
788            },
789        }
790        return json.dumps(
791            {
792                "ARN": secret_id,
793                "Name": secret_id,
794                "ResourcePolicy": json.dumps(resource_policy),
795            }
796        )
797
798
799secretsmanager_backends = {}
800for region in Session().get_available_regions("secretsmanager"):
801    secretsmanager_backends[region] = SecretsManagerBackend(region_name=region)
802for region in Session().get_available_regions(
803    "secretsmanager", partition_name="aws-us-gov"
804):
805    secretsmanager_backends[region] = SecretsManagerBackend(region_name=region)
806for region in Session().get_available_regions(
807    "secretsmanager", partition_name="aws-cn"
808):
809    secretsmanager_backends[region] = SecretsManagerBackend(region_name=region)
810