1from moto.core.responses import BaseResponse 2from moto.secretsmanager.exceptions import ( 3 InvalidRequestException, 4 InvalidParameterException, 5 ValidationException, 6) 7 8from .models import secretsmanager_backends, filter_keys 9 10import json 11 12 13def _validate_filters(filters): 14 for idx, f in enumerate(filters): 15 filter_key = f.get("Key", None) 16 filter_values = f.get("Values", None) 17 if filter_key is None: 18 raise InvalidParameterException("Invalid filter key") 19 if filter_key not in filter_keys(): 20 raise ValidationException( 21 "1 validation error detected: Value '{}' at 'filters.{}.member.key' failed to satisfy constraint: " 22 "Member must satisfy enum value set: [all, name, tag-key, description, tag-value]".format( 23 filter_key, idx + 1 24 ) 25 ) 26 if filter_values is None: 27 raise InvalidParameterException( 28 "Invalid filter values for key: {}".format(filter_key) 29 ) 30 31 32class SecretsManagerResponse(BaseResponse): 33 def get_secret_value(self): 34 secret_id = self._get_param("SecretId") 35 version_id = self._get_param("VersionId") 36 version_stage = self._get_param("VersionStage") 37 return secretsmanager_backends[self.region].get_secret_value( 38 secret_id=secret_id, version_id=version_id, version_stage=version_stage 39 ) 40 41 def create_secret(self): 42 name = self._get_param("Name") 43 secret_string = self._get_param("SecretString") 44 secret_binary = self._get_param("SecretBinary") 45 description = self._get_param("Description", if_none="") 46 tags = self._get_param("Tags", if_none=[]) 47 kms_key_id = self._get_param("KmsKeyId", if_none=None) 48 return secretsmanager_backends[self.region].create_secret( 49 name=name, 50 secret_string=secret_string, 51 secret_binary=secret_binary, 52 description=description, 53 tags=tags, 54 kms_key_id=kms_key_id, 55 ) 56 57 def update_secret(self): 58 secret_id = self._get_param("SecretId") 59 secret_string = self._get_param("SecretString") 60 secret_binary = self._get_param("SecretBinary") 61 client_request_token = self._get_param("ClientRequestToken") 62 kms_key_id = self._get_param("KmsKeyId", if_none=None) 63 return secretsmanager_backends[self.region].update_secret( 64 secret_id=secret_id, 65 secret_string=secret_string, 66 secret_binary=secret_binary, 67 client_request_token=client_request_token, 68 kms_key_id=kms_key_id, 69 ) 70 71 def get_random_password(self): 72 password_length = self._get_param("PasswordLength", if_none=32) 73 exclude_characters = self._get_param("ExcludeCharacters", if_none="") 74 exclude_numbers = self._get_param("ExcludeNumbers", if_none=False) 75 exclude_punctuation = self._get_param("ExcludePunctuation", if_none=False) 76 exclude_uppercase = self._get_param("ExcludeUppercase", if_none=False) 77 exclude_lowercase = self._get_param("ExcludeLowercase", if_none=False) 78 include_space = self._get_param("IncludeSpace", if_none=False) 79 require_each_included_type = self._get_param( 80 "RequireEachIncludedType", if_none=True 81 ) 82 return secretsmanager_backends[self.region].get_random_password( 83 password_length=password_length, 84 exclude_characters=exclude_characters, 85 exclude_numbers=exclude_numbers, 86 exclude_punctuation=exclude_punctuation, 87 exclude_uppercase=exclude_uppercase, 88 exclude_lowercase=exclude_lowercase, 89 include_space=include_space, 90 require_each_included_type=require_each_included_type, 91 ) 92 93 def describe_secret(self): 94 secret_id = self._get_param("SecretId") 95 return secretsmanager_backends[self.region].describe_secret(secret_id=secret_id) 96 97 def rotate_secret(self): 98 client_request_token = self._get_param("ClientRequestToken") 99 rotation_lambda_arn = self._get_param("RotationLambdaARN") 100 rotation_rules = self._get_param("RotationRules") 101 secret_id = self._get_param("SecretId") 102 return secretsmanager_backends[self.region].rotate_secret( 103 secret_id=secret_id, 104 client_request_token=client_request_token, 105 rotation_lambda_arn=rotation_lambda_arn, 106 rotation_rules=rotation_rules, 107 ) 108 109 def put_secret_value(self): 110 secret_id = self._get_param("SecretId", if_none="") 111 secret_string = self._get_param("SecretString") 112 secret_binary = self._get_param("SecretBinary") 113 client_request_token = self._get_param("ClientRequestToken") 114 if not secret_binary and not secret_string: 115 raise InvalidRequestException( 116 "You must provide either SecretString or SecretBinary." 117 ) 118 version_stages = self._get_param("VersionStages", if_none=["AWSCURRENT"]) 119 if not isinstance(version_stages, list): 120 version_stages = [version_stages] 121 122 return secretsmanager_backends[self.region].put_secret_value( 123 secret_id=secret_id, 124 secret_binary=secret_binary, 125 secret_string=secret_string, 126 version_stages=version_stages, 127 client_request_token=client_request_token, 128 ) 129 130 def list_secret_version_ids(self): 131 secret_id = self._get_param("SecretId", if_none="") 132 return secretsmanager_backends[self.region].list_secret_version_ids( 133 secret_id=secret_id 134 ) 135 136 def list_secrets(self): 137 filters = self._get_param("Filters", if_none=[]) 138 _validate_filters(filters) 139 max_results = self._get_int_param("MaxResults") 140 next_token = self._get_param("NextToken") 141 secret_list, next_token = secretsmanager_backends[self.region].list_secrets( 142 filters=filters, max_results=max_results, next_token=next_token 143 ) 144 return json.dumps(dict(SecretList=secret_list, NextToken=next_token)) 145 146 def delete_secret(self): 147 secret_id = self._get_param("SecretId") 148 recovery_window_in_days = self._get_param("RecoveryWindowInDays") 149 force_delete_without_recovery = self._get_param("ForceDeleteWithoutRecovery") 150 arn, name, deletion_date = secretsmanager_backends[self.region].delete_secret( 151 secret_id=secret_id, 152 recovery_window_in_days=recovery_window_in_days, 153 force_delete_without_recovery=force_delete_without_recovery, 154 ) 155 return json.dumps(dict(ARN=arn, Name=name, DeletionDate=deletion_date)) 156 157 def restore_secret(self): 158 secret_id = self._get_param("SecretId") 159 arn, name = secretsmanager_backends[self.region].restore_secret( 160 secret_id=secret_id 161 ) 162 return json.dumps(dict(ARN=arn, Name=name)) 163 164 def get_resource_policy(self): 165 secret_id = self._get_param("SecretId") 166 return secretsmanager_backends[self.region].get_resource_policy( 167 secret_id=secret_id 168 ) 169 170 def tag_resource(self): 171 secret_id = self._get_param("SecretId") 172 tags = self._get_param("Tags", if_none=[]) 173 return secretsmanager_backends[self.region].tag_resource(secret_id, tags) 174 175 def untag_resource(self): 176 secret_id = self._get_param("SecretId") 177 tag_keys = self._get_param("TagKeys", if_none=[]) 178 return secretsmanager_backends[self.region].untag_resource( 179 secret_id=secret_id, tag_keys=tag_keys 180 ) 181 182 def update_secret_version_stage(self): 183 secret_id = self._get_param("SecretId") 184 version_stage = self._get_param("VersionStage") 185 remove_from_version_id = self._get_param("RemoveFromVersionId") 186 move_to_version_id = self._get_param("MoveToVersionId") 187 return secretsmanager_backends[self.region].update_secret_version_stage( 188 secret_id=secret_id, 189 version_stage=version_stage, 190 remove_from_version_id=remove_from_version_id, 191 move_to_version_id=move_to_version_id, 192 ) 193