1import time 2import json 3import uuid 4import datetime 5 6from boto3 import Session 7from typing import List, Tuple 8 9from moto.core import BaseBackend, BaseModel 10from .exceptions import ( 11 SecretNotFoundException, 12 SecretHasNoValueException, 13 InvalidParameterException, 14 ResourceExistsException, 15 ResourceNotFoundException, 16 InvalidRequestException, 17 ClientError, 18) 19from .utils import random_password, secret_arn, get_secret_name_from_arn 20from .list_secrets.filters import all, tag_key, tag_value, description, name 21 22 23_filter_functions = { 24 "all": all, 25 "name": name, 26 "description": description, 27 "tag-key": tag_key, 28 "tag-value": tag_value, 29} 30 31 32def filter_keys(): 33 return list(_filter_functions.keys()) 34 35 36def _matches(secret, filters): 37 is_match = True 38 39 for f in filters: 40 # Filter names are pre-validated in the resource layer 41 filter_function = _filter_functions.get(f["Key"]) 42 is_match = is_match and filter_function(secret, f["Values"]) 43 44 return is_match 45 46 47class SecretsManager(BaseModel): 48 def __init__(self, region_name, **kwargs): 49 self.region = region_name 50 51 52class FakeSecret: 53 def __init__( 54 self, 55 region_name, 56 secret_id, 57 secret_string=None, 58 secret_binary=None, 59 description=None, 60 tags=[], 61 kms_key_id=None, 62 version_id=None, 63 version_stages=None, 64 ): 65 self.secret_id = secret_id 66 self.name = secret_id 67 self.arn = secret_arn(region_name, secret_id) 68 self.secret_string = secret_string 69 self.secret_binary = secret_binary 70 self.description = description 71 self.tags = tags 72 self.kms_key_id = kms_key_id 73 self.version_id = version_id 74 self.version_stages = version_stages 75 self.rotation_enabled = False 76 self.rotation_lambda_arn = "" 77 self.auto_rotate_after_days = 0 78 self.deleted_date = None 79 80 def update(self, description=None, tags=[], kms_key_id=None): 81 self.description = description 82 self.tags = tags 83 84 if kms_key_id is not None: 85 self.kms_key_id = kms_key_id 86 87 def set_versions(self, versions): 88 self.versions = versions 89 90 def set_default_version_id(self, version_id): 91 self.default_version_id = version_id 92 93 def reset_default_version(self, secret_version, version_id): 94 # remove all old AWSPREVIOUS stages 95 for old_version in self.versions.values(): 96 if "AWSPREVIOUS" in old_version["version_stages"]: 97 old_version["version_stages"].remove("AWSPREVIOUS") 98 99 # set old AWSCURRENT secret to AWSPREVIOUS 100 previous_current_version_id = self.default_version_id 101 self.versions[previous_current_version_id]["version_stages"] = ["AWSPREVIOUS"] 102 103 self.versions[version_id] = secret_version 104 self.default_version_id = version_id 105 106 def delete(self, deleted_date): 107 self.deleted_date = deleted_date 108 109 def restore(self): 110 self.deleted_date = None 111 112 def is_deleted(self): 113 return self.deleted_date is not None 114 115 def to_short_dict(self, include_version_stages=False): 116 dct = { 117 "ARN": self.arn, 118 "Name": self.name, 119 "VersionId": self.default_version_id, 120 } 121 if include_version_stages: 122 dct["VersionStages"] = self.version_stages 123 return json.dumps(dct) 124 125 def to_dict(self): 126 version_id_to_stages = self._form_version_ids_to_stages() 127 128 return { 129 "ARN": self.arn, 130 "Name": self.name, 131 "Description": self.description or "", 132 "KmsKeyId": self.kms_key_id, 133 "RotationEnabled": self.rotation_enabled, 134 "RotationLambdaARN": self.rotation_lambda_arn, 135 "RotationRules": {"AutomaticallyAfterDays": self.auto_rotate_after_days}, 136 "LastRotatedDate": None, 137 "LastChangedDate": None, 138 "LastAccessedDate": None, 139 "DeletedDate": self.deleted_date, 140 "Tags": self.tags, 141 "VersionIdsToStages": version_id_to_stages, 142 "SecretVersionsToStages": version_id_to_stages, 143 } 144 145 def _form_version_ids_to_stages(self): 146 version_id_to_stages = {} 147 for key, value in self.versions.items(): 148 version_id_to_stages[key] = value["version_stages"] 149 150 return version_id_to_stages 151 152 153class SecretsStore(dict): 154 def __setitem__(self, key, value): 155 new_key = get_secret_name_from_arn(key) 156 super(SecretsStore, self).__setitem__(new_key, value) 157 158 def __getitem__(self, key): 159 new_key = get_secret_name_from_arn(key) 160 return super(SecretsStore, self).__getitem__(new_key) 161 162 def __contains__(self, key): 163 new_key = get_secret_name_from_arn(key) 164 return dict.__contains__(self, new_key) 165 166 def pop(self, key, *args, **kwargs): 167 new_key = get_secret_name_from_arn(key) 168 return super(SecretsStore, self).pop(new_key, *args, **kwargs) 169 170 171class SecretsManagerBackend(BaseBackend): 172 def __init__(self, region_name=None, **kwargs): 173 super(SecretsManagerBackend, self).__init__() 174 self.region = region_name 175 self.secrets = SecretsStore() 176 177 def reset(self): 178 region_name = self.region 179 self.__dict__ = {} 180 self.__init__(region_name) 181 182 @staticmethod 183 def default_vpc_endpoint_service(service_region, zones): 184 """Default VPC endpoint services.""" 185 return BaseBackend.default_vpc_endpoint_service_factory( 186 service_region, zones, "secretsmanager" 187 ) 188 189 def _is_valid_identifier(self, identifier): 190 return identifier in self.secrets 191 192 def _unix_time_secs(self, dt): 193 epoch = datetime.datetime.utcfromtimestamp(0) 194 return (dt - epoch).total_seconds() 195 196 def _client_request_token_validator(self, client_request_token): 197 token_length = len(client_request_token) 198 if token_length < 32 or token_length > 64: 199 msg = "ClientRequestToken must be 32-64 characters long." 200 raise InvalidParameterException(msg) 201 202 def get_secret_value(self, secret_id, version_id, version_stage): 203 if not self._is_valid_identifier(secret_id): 204 raise SecretNotFoundException() 205 206 if not version_id and version_stage: 207 # set version_id to match version_stage 208 versions_dict = self.secrets[secret_id].versions 209 for ver_id, ver_val in versions_dict.items(): 210 if version_stage in ver_val["version_stages"]: 211 version_id = ver_id 212 break 213 if not version_id: 214 raise SecretNotFoundException() 215 216 # TODO check this part 217 if self.secrets[secret_id].is_deleted(): 218 raise InvalidRequestException( 219 "An error occurred (InvalidRequestException) when calling the GetSecretValue operation: You tried to \ 220 perform the operation on a secret that's currently marked deleted." 221 ) 222 223 secret = self.secrets[secret_id] 224 version_id = version_id or secret.default_version_id 225 226 secret_version = secret.versions.get(version_id) 227 if not secret_version: 228 raise ResourceNotFoundException( 229 "An error occurred (ResourceNotFoundException) when calling the GetSecretValue operation: Secrets " 230 "Manager can't find the specified secret value for VersionId: {}".format( 231 version_id 232 ) 233 ) 234 235 response_data = { 236 "ARN": secret.arn, 237 "Name": secret.name, 238 "VersionId": secret_version["version_id"], 239 "VersionStages": secret_version["version_stages"], 240 "CreatedDate": secret_version["createdate"], 241 } 242 243 if "secret_string" in secret_version: 244 response_data["SecretString"] = secret_version["secret_string"] 245 246 if "secret_binary" in secret_version: 247 response_data["SecretBinary"] = secret_version["secret_binary"] 248 249 if ( 250 "secret_string" not in secret_version 251 and "secret_binary" not in secret_version 252 ): 253 raise SecretHasNoValueException(version_stage or "AWSCURRENT") 254 255 response = json.dumps(response_data) 256 257 return response 258 259 def update_secret( 260 self, 261 secret_id, 262 secret_string=None, 263 secret_binary=None, 264 client_request_token=None, 265 kms_key_id=None, 266 **kwargs 267 ): 268 269 # error if secret does not exist 270 if secret_id not in self.secrets: 271 raise SecretNotFoundException() 272 273 if self.secrets[secret_id].is_deleted(): 274 raise InvalidRequestException( 275 "An error occurred (InvalidRequestException) when calling the UpdateSecret operation: " 276 "You can't perform this operation on the secret because it was marked for deletion." 277 ) 278 279 secret = self.secrets[secret_id] 280 tags = secret.tags 281 description = secret.description 282 283 secret = self._add_secret( 284 secret_id, 285 secret_string=secret_string, 286 secret_binary=secret_binary, 287 description=description, 288 version_id=client_request_token, 289 tags=tags, 290 kms_key_id=kms_key_id, 291 ) 292 293 return secret.to_short_dict() 294 295 def create_secret( 296 self, 297 name, 298 secret_string=None, 299 secret_binary=None, 300 description=None, 301 tags=[], 302 kms_key_id=None, 303 ): 304 305 # error if secret exists 306 if name in self.secrets.keys(): 307 raise ResourceExistsException( 308 "A resource with the ID you requested already exists." 309 ) 310 311 secret = self._add_secret( 312 name, 313 secret_string=secret_string, 314 secret_binary=secret_binary, 315 description=description, 316 tags=tags, 317 kms_key_id=kms_key_id, 318 ) 319 320 return secret.to_short_dict() 321 322 def _add_secret( 323 self, 324 secret_id, 325 secret_string=None, 326 secret_binary=None, 327 description=None, 328 tags=[], 329 kms_key_id=None, 330 version_id=None, 331 version_stages=None, 332 ): 333 334 if version_stages is None: 335 version_stages = ["AWSCURRENT"] 336 337 if version_id: 338 self._client_request_token_validator(version_id) 339 else: 340 version_id = str(uuid.uuid4()) 341 342 secret_version = { 343 "createdate": int(time.time()), 344 "version_id": version_id, 345 "version_stages": version_stages, 346 } 347 if secret_string is not None: 348 secret_version["secret_string"] = secret_string 349 350 if secret_binary is not None: 351 secret_version["secret_binary"] = secret_binary 352 353 if secret_id in self.secrets: 354 secret = self.secrets[secret_id] 355 356 secret.update(description, tags, kms_key_id) 357 358 if "AWSPENDING" in version_stages: 359 secret.versions[version_id] = secret_version 360 else: 361 secret.reset_default_version(secret_version, version_id) 362 else: 363 secret = FakeSecret( 364 region_name=self.region, 365 secret_id=secret_id, 366 secret_string=secret_string, 367 secret_binary=secret_binary, 368 description=description, 369 tags=tags, 370 kms_key_id=kms_key_id, 371 ) 372 secret.set_versions({version_id: secret_version}) 373 secret.set_default_version_id(version_id) 374 self.secrets[secret_id] = secret 375 376 return secret 377 378 def put_secret_value( 379 self, 380 secret_id, 381 secret_string, 382 secret_binary, 383 client_request_token, 384 version_stages, 385 ): 386 387 if not self._is_valid_identifier(secret_id): 388 raise SecretNotFoundException() 389 else: 390 secret = self.secrets[secret_id] 391 tags = secret.tags 392 description = secret.description 393 394 secret = self._add_secret( 395 secret_id, 396 secret_string, 397 secret_binary, 398 version_id=client_request_token, 399 description=description, 400 tags=tags, 401 version_stages=version_stages, 402 ) 403 404 return secret.to_short_dict(include_version_stages=True) 405 406 def describe_secret(self, secret_id): 407 if not self._is_valid_identifier(secret_id): 408 raise SecretNotFoundException() 409 410 secret = self.secrets[secret_id] 411 412 return json.dumps(secret.to_dict()) 413 414 def rotate_secret( 415 self, 416 secret_id, 417 client_request_token=None, 418 rotation_lambda_arn=None, 419 rotation_rules=None, 420 ): 421 422 rotation_days = "AutomaticallyAfterDays" 423 424 if not self._is_valid_identifier(secret_id): 425 raise SecretNotFoundException() 426 427 if self.secrets[secret_id].is_deleted(): 428 raise InvalidRequestException( 429 "An error occurred (InvalidRequestException) when calling the RotateSecret operation: You tried to \ 430 perform the operation on a secret that's currently marked deleted." 431 ) 432 433 if rotation_lambda_arn: 434 if len(rotation_lambda_arn) > 2048: 435 msg = "RotationLambdaARN " "must <= 2048 characters long." 436 raise InvalidParameterException(msg) 437 438 if rotation_rules: 439 if rotation_days in rotation_rules: 440 rotation_period = rotation_rules[rotation_days] 441 if rotation_period < 1 or rotation_period > 1000: 442 msg = ( 443 "RotationRules.AutomaticallyAfterDays " "must be within 1-1000." 444 ) 445 raise InvalidParameterException(msg) 446 447 secret = self.secrets[secret_id] 448 449 # The rotation function must end with the versions of the secret in 450 # one of two states: 451 # 452 # - The AWSPENDING and AWSCURRENT staging labels are attached to the 453 # same version of the secret, or 454 # - The AWSPENDING staging label is not attached to any version of the secret. 455 # 456 # If the AWSPENDING staging label is present but not attached to the same 457 # version as AWSCURRENT then any later invocation of RotateSecret assumes 458 # that a previous rotation request is still in progress and returns an error. 459 try: 460 version = next( 461 version 462 for version in secret.versions.values() 463 if "AWSPENDING" in version["version_stages"] 464 ) 465 if "AWSCURRENT" in version["version_stages"]: 466 msg = "Previous rotation request is still in progress." 467 raise InvalidRequestException(msg) 468 469 except StopIteration: 470 # Pending is not present in any version 471 pass 472 473 old_secret_version = secret.versions[secret.default_version_id] 474 475 if client_request_token: 476 self._client_request_token_validator(client_request_token) 477 new_version_id = client_request_token 478 else: 479 new_version_id = str(uuid.uuid4()) 480 481 # We add the new secret version as "pending". The previous version remains 482 # as "current" for now. Once we've passed the new secret through the lambda 483 # rotation function (if provided) we can then update the status to "current". 484 self._add_secret( 485 secret_id, 486 old_secret_version["secret_string"], 487 description=secret.description, 488 tags=secret.tags, 489 version_id=new_version_id, 490 version_stages=["AWSPENDING"], 491 ) 492 secret.rotation_lambda_arn = rotation_lambda_arn or "" 493 if rotation_rules: 494 secret.auto_rotate_after_days = rotation_rules.get(rotation_days, 0) 495 if secret.auto_rotate_after_days > 0: 496 secret.rotation_enabled = True 497 498 # Begin the rotation process for the given secret by invoking the lambda function. 499 if secret.rotation_lambda_arn: 500 from moto.awslambda.models import lambda_backends 501 502 lambda_backend = lambda_backends[self.region] 503 504 request_headers = {} 505 response_headers = {} 506 507 func = lambda_backend.get_function(secret.rotation_lambda_arn) 508 if not func: 509 msg = "Resource not found for ARN '{}'.".format( 510 secret.rotation_lambda_arn 511 ) 512 raise ResourceNotFoundException(msg) 513 514 for step in ["create", "set", "test", "finish"]: 515 func.invoke( 516 json.dumps( 517 { 518 "Step": step + "Secret", 519 "SecretId": secret.name, 520 "ClientRequestToken": new_version_id, 521 } 522 ), 523 request_headers, 524 response_headers, 525 ) 526 527 secret.set_default_version_id(new_version_id) 528 else: 529 secret.reset_default_version( 530 secret.versions[new_version_id], new_version_id 531 ) 532 secret.versions[new_version_id]["version_stages"] = ["AWSCURRENT"] 533 534 return secret.to_short_dict() 535 536 def get_random_password( 537 self, 538 password_length, 539 exclude_characters, 540 exclude_numbers, 541 exclude_punctuation, 542 exclude_uppercase, 543 exclude_lowercase, 544 include_space, 545 require_each_included_type, 546 ): 547 # password size must have value less than or equal to 4096 548 if password_length > 4096: 549 raise ClientError( 550 "ClientError: An error occurred (ValidationException) \ 551 when calling the GetRandomPassword operation: 1 validation error detected: Value '{}' at 'passwordLength' \ 552 failed to satisfy constraint: Member must have value less than or equal to 4096".format( 553 password_length 554 ) 555 ) 556 if password_length < 4: 557 raise InvalidParameterException( 558 "InvalidParameterException: An error occurred (InvalidParameterException) \ 559 when calling the GetRandomPassword operation: Password length is too short based on the required types." 560 ) 561 562 response = json.dumps( 563 { 564 "RandomPassword": random_password( 565 password_length, 566 exclude_characters, 567 exclude_numbers, 568 exclude_punctuation, 569 exclude_uppercase, 570 exclude_lowercase, 571 include_space, 572 require_each_included_type, 573 ) 574 } 575 ) 576 577 return response 578 579 def list_secret_version_ids(self, secret_id): 580 secret = self.secrets[secret_id] 581 582 version_list = [] 583 for version_id, version in secret.versions.items(): 584 version_list.append( 585 { 586 "CreatedDate": int(time.time()), 587 "LastAccessedDate": int(time.time()), 588 "VersionId": version_id, 589 "VersionStages": version["version_stages"], 590 } 591 ) 592 593 response = json.dumps( 594 { 595 "ARN": secret.secret_id, 596 "Name": secret.name, 597 "NextToken": "", 598 "Versions": version_list, 599 } 600 ) 601 602 return response 603 604 def list_secrets( 605 self, filters: List, max_results: int = 100, next_token: str = None 606 ) -> Tuple[List, str]: 607 """ 608 Returns secrets from secretsmanager. 609 The result is paginated and page items depends on the token value, because token contains start element 610 number of secret list. 611 Response example: 612 { 613 SecretList: [ 614 { 615 ARN: 'arn:aws:secretsmanager:us-east-1:1234567890:secret:test1-gEcah', 616 Name: 'test1', 617 ... 618 }, 619 { 620 ARN: 'arn:aws:secretsmanager:us-east-1:1234567890:secret:test2-KZwml', 621 Name: 'test2', 622 ... 623 } 624 ], 625 NextToken: '2' 626 } 627 628 :param filters: (List) Filter parameters. 629 :param max_results: (int) Max number of results per page. 630 :param next_token: (str) Page token. 631 :return: (Tuple[List,str]) Returns result list and next token. 632 """ 633 secret_list = [] 634 for secret in self.secrets.values(): 635 if _matches(secret, filters): 636 secret_list.append(secret.to_dict()) 637 638 starting_point = int(next_token or 0) 639 ending_point = starting_point + int(max_results or 100) 640 secret_page = secret_list[starting_point:ending_point] 641 new_next_token = str(ending_point) if ending_point < len(secret_list) else None 642 643 return secret_page, new_next_token 644 645 def delete_secret( 646 self, secret_id, recovery_window_in_days, force_delete_without_recovery 647 ): 648 649 if not self._is_valid_identifier(secret_id): 650 raise SecretNotFoundException() 651 652 if self.secrets[secret_id].is_deleted(): 653 raise InvalidRequestException( 654 "An error occurred (InvalidRequestException) when calling the DeleteSecret operation: You tried to \ 655 perform the operation on a secret that's currently marked deleted." 656 ) 657 658 if recovery_window_in_days and force_delete_without_recovery: 659 raise InvalidParameterException( 660 "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: You can't \ 661 use ForceDeleteWithoutRecovery in conjunction with RecoveryWindowInDays." 662 ) 663 664 if recovery_window_in_days and ( 665 recovery_window_in_days < 7 or recovery_window_in_days > 30 666 ): 667 raise InvalidParameterException( 668 "An error occurred (InvalidParameterException) when calling the DeleteSecret operation: The \ 669 RecoveryWindowInDays value must be between 7 and 30 days (inclusive)." 670 ) 671 672 deletion_date = datetime.datetime.utcnow() 673 674 if force_delete_without_recovery: 675 secret = self.secrets.pop(secret_id, None) 676 else: 677 deletion_date += datetime.timedelta(days=recovery_window_in_days or 30) 678 self.secrets[secret_id].delete(self._unix_time_secs(deletion_date)) 679 secret = self.secrets.get(secret_id, None) 680 681 if not secret: 682 raise SecretNotFoundException() 683 684 arn = secret.arn 685 name = secret.name 686 687 return arn, name, self._unix_time_secs(deletion_date) 688 689 def restore_secret(self, secret_id): 690 691 if not self._is_valid_identifier(secret_id): 692 raise SecretNotFoundException() 693 694 secret = self.secrets[secret_id] 695 secret.restore() 696 697 return secret.arn, secret.name 698 699 def tag_resource(self, secret_id, tags): 700 701 if secret_id not in self.secrets: 702 raise SecretNotFoundException() 703 704 secret = self.secrets[secret_id] 705 old_tags = secret.tags 706 707 for tag in tags: 708 old_tags.append(tag) 709 710 return secret_id 711 712 def untag_resource(self, secret_id, tag_keys): 713 714 if secret_id not in self.secrets: 715 raise SecretNotFoundException() 716 717 secret = self.secrets[secret_id] 718 tags = secret.tags 719 720 for tag in tags: 721 if tag["Key"] in tag_keys: 722 tags.remove(tag) 723 724 return secret_id 725 726 def update_secret_version_stage( 727 self, secret_id, version_stage, remove_from_version_id, move_to_version_id 728 ): 729 if secret_id not in self.secrets: 730 raise SecretNotFoundException() 731 732 secret = self.secrets[secret_id] 733 734 if remove_from_version_id: 735 if remove_from_version_id not in secret.versions: 736 raise InvalidParameterException( 737 "Not a valid version: %s" % remove_from_version_id 738 ) 739 740 stages = secret.versions[remove_from_version_id]["version_stages"] 741 if version_stage not in stages: 742 raise InvalidParameterException( 743 "Version stage %s not found in version %s" 744 % (version_stage, remove_from_version_id) 745 ) 746 747 stages.remove(version_stage) 748 749 if move_to_version_id: 750 if move_to_version_id not in secret.versions: 751 raise InvalidParameterException( 752 "Not a valid version: %s" % move_to_version_id 753 ) 754 755 stages = secret.versions[move_to_version_id]["version_stages"] 756 stages.append(version_stage) 757 758 if version_stage == "AWSCURRENT": 759 if remove_from_version_id: 760 # Whenever you move AWSCURRENT, Secrets Manager automatically 761 # moves the label AWSPREVIOUS to the version that AWSCURRENT 762 # was removed from. 763 secret.versions[remove_from_version_id]["version_stages"].append( 764 "AWSPREVIOUS" 765 ) 766 767 if move_to_version_id: 768 stages = secret.versions[move_to_version_id]["version_stages"] 769 if "AWSPREVIOUS" in stages: 770 stages.remove("AWSPREVIOUS") 771 772 return secret_id 773 774 @staticmethod 775 def get_resource_policy(secret_id): 776 resource_policy = { 777 "Version": "2012-10-17", 778 "Statement": { 779 "Effect": "Allow", 780 "Principal": { 781 "AWS": [ 782 "arn:aws:iam::111122223333:root", 783 "arn:aws:iam::444455556666:root", 784 ] 785 }, 786 "Action": ["secretsmanager:GetSecretValue"], 787 "Resource": "*", 788 }, 789 } 790 return json.dumps( 791 { 792 "ARN": secret_id, 793 "Name": secret_id, 794 "ResourcePolicy": json.dumps(resource_policy), 795 } 796 ) 797 798 799secretsmanager_backends = {} 800for region in Session().get_available_regions("secretsmanager"): 801 secretsmanager_backends[region] = SecretsManagerBackend(region_name=region) 802for region in Session().get_available_regions( 803 "secretsmanager", partition_name="aws-us-gov" 804): 805 secretsmanager_backends[region] = SecretsManagerBackend(region_name=region) 806for region in Session().get_available_regions( 807 "secretsmanager", partition_name="aws-cn" 808): 809 secretsmanager_backends[region] = SecretsManagerBackend(region_name=region) 810