1import re 2import json 3import email 4import datetime 5from email.mime.base import MIMEBase 6from email.utils import parseaddr 7from email.mime.multipart import MIMEMultipart 8from email.encoders import encode_7or8bit 9 10from moto.core import BaseBackend, BaseModel 11from moto.sns.models import sns_backends 12from .exceptions import ( 13 MessageRejectedError, 14 ConfigurationSetDoesNotExist, 15 EventDestinationAlreadyExists, 16 TemplateNameAlreadyExists, 17 ValidationError, 18 InvalidParameterValue, 19 InvalidRenderingParameterException, 20 TemplateDoesNotExist, 21 RuleDoesNotExist, 22 RuleSetNameAlreadyExists, 23 RuleSetDoesNotExist, 24 RuleAlreadyExists, 25 MissingRenderingAttributeException, 26) 27from .utils import get_random_message_id 28from .feedback import COMMON_MAIL, BOUNCE, COMPLAINT, DELIVERY 29 30RECIPIENT_LIMIT = 50 31 32 33class SESFeedback(BaseModel): 34 35 BOUNCE = "Bounce" 36 COMPLAINT = "Complaint" 37 DELIVERY = "Delivery" 38 39 SUCCESS_ADDR = "success" 40 BOUNCE_ADDR = "bounce" 41 COMPLAINT_ADDR = "complaint" 42 43 FEEDBACK_SUCCESS_MSG = {"test": "success"} 44 FEEDBACK_BOUNCE_MSG = {"test": "bounce"} 45 FEEDBACK_COMPLAINT_MSG = {"test": "complaint"} 46 47 FORWARDING_ENABLED = "feedback_forwarding_enabled" 48 49 @staticmethod 50 def generate_message(msg_type): 51 msg = dict(COMMON_MAIL) 52 if msg_type == SESFeedback.BOUNCE: 53 msg["bounce"] = BOUNCE 54 elif msg_type == SESFeedback.COMPLAINT: 55 msg["complaint"] = COMPLAINT 56 elif msg_type == SESFeedback.DELIVERY: 57 msg["delivery"] = DELIVERY 58 59 return msg 60 61 62class Message(BaseModel): 63 def __init__(self, message_id, source, subject, body, destinations): 64 self.id = message_id 65 self.source = source 66 self.subject = subject 67 self.body = body 68 self.destinations = destinations 69 70 71class TemplateMessage(BaseModel): 72 def __init__(self, message_id, source, template, template_data, destinations): 73 self.id = message_id 74 self.source = source 75 self.template = template 76 self.template_data = template_data 77 self.destinations = destinations 78 79 80class RawMessage(BaseModel): 81 def __init__(self, message_id, source, destinations, raw_data): 82 self.id = message_id 83 self.source = source 84 self.destinations = destinations 85 self.raw_data = raw_data 86 87 88class SESQuota(BaseModel): 89 def __init__(self, sent): 90 self.sent = sent 91 92 @property 93 def sent_past_24(self): 94 return self.sent 95 96 97def are_all_variables_present(template, template_data): 98 subject_part = template["subject_part"] 99 text_part = template["text_part"] 100 html_part = template["html_part"] 101 102 for var in re.findall("{{(.+?)}}", subject_part + text_part + html_part): 103 if not template_data.get(var): 104 return var, False 105 return None, True 106 107 108class SESBackend(BaseBackend): 109 def __init__(self): 110 self.addresses = [] 111 self.email_addresses = [] 112 self.domains = [] 113 self.sent_messages = [] 114 self.sent_message_count = 0 115 self.rejected_messages_count = 0 116 self.sns_topics = {} 117 self.config_set = {} 118 self.config_set_event_destination = {} 119 self.event_destinations = {} 120 self.templates = {} 121 self.receipt_rule_set = {} 122 123 def _is_verified_address(self, source): 124 _, address = parseaddr(source) 125 if address in self.addresses: 126 return True 127 if address in self.email_addresses: 128 return True 129 user, host = address.split("@", 1) 130 return host in self.domains 131 132 def verify_email_identity(self, address): 133 _, address = parseaddr(address) 134 self.addresses.append(address) 135 136 def verify_email_address(self, address): 137 _, address = parseaddr(address) 138 self.email_addresses.append(address) 139 140 def verify_domain(self, domain): 141 if domain.lower() not in self.domains: 142 self.domains.append(domain.lower()) 143 144 def list_identities(self): 145 return self.domains + self.addresses 146 147 def list_verified_email_addresses(self): 148 return self.email_addresses 149 150 def delete_identity(self, identity): 151 if "@" in identity: 152 self.addresses.remove(identity) 153 else: 154 self.domains.remove(identity) 155 156 def send_email(self, source, subject, body, destinations, region): 157 recipient_count = sum(map(len, destinations.values())) 158 if recipient_count > RECIPIENT_LIMIT: 159 raise MessageRejectedError("Too many recipients.") 160 if not self._is_verified_address(source): 161 self.rejected_messages_count += 1 162 raise MessageRejectedError("Email address not verified %s" % source) 163 164 self.__process_sns_feedback__(source, destinations, region) 165 166 message_id = get_random_message_id() 167 message = Message(message_id, source, subject, body, destinations) 168 self.sent_messages.append(message) 169 self.sent_message_count += recipient_count 170 return message 171 172 def send_templated_email( 173 self, source, template, template_data, destinations, region 174 ): 175 recipient_count = sum(map(len, destinations.values())) 176 if recipient_count > RECIPIENT_LIMIT: 177 raise MessageRejectedError("Too many recipients.") 178 if not self._is_verified_address(source): 179 self.rejected_messages_count += 1 180 raise MessageRejectedError("Email address not verified %s" % source) 181 182 if not self.templates.get(template[0]): 183 raise TemplateDoesNotExist("Template (%s) does not exist" % template[0]) 184 185 self.__process_sns_feedback__(source, destinations, region) 186 187 message_id = get_random_message_id() 188 message = TemplateMessage( 189 message_id, source, template, template_data, destinations 190 ) 191 self.sent_messages.append(message) 192 self.sent_message_count += recipient_count 193 return message 194 195 def __type_of_message__(self, destinations): 196 """Checks the destination for any special address that could indicate delivery, 197 complaint or bounce like in SES simulator""" 198 if isinstance(destinations, list): 199 alladdress = destinations 200 else: 201 alladdress = ( 202 destinations.get("ToAddresses", []) 203 + destinations.get("CcAddresses", []) 204 + destinations.get("BccAddresses", []) 205 ) 206 207 for addr in alladdress: 208 if SESFeedback.SUCCESS_ADDR in addr: 209 return SESFeedback.DELIVERY 210 elif SESFeedback.COMPLAINT_ADDR in addr: 211 return SESFeedback.COMPLAINT 212 elif SESFeedback.BOUNCE_ADDR in addr: 213 return SESFeedback.BOUNCE 214 215 return None 216 217 def __generate_feedback__(self, msg_type): 218 """Generates the SNS message for the feedback""" 219 return SESFeedback.generate_message(msg_type) 220 221 def __process_sns_feedback__(self, source, destinations, region): 222 domain = str(source) 223 if "@" in domain: 224 domain = domain.split("@")[1] 225 if domain in self.sns_topics: 226 msg_type = self.__type_of_message__(destinations) 227 if msg_type is not None: 228 sns_topic = self.sns_topics[domain].get(msg_type, None) 229 if sns_topic is not None: 230 message = self.__generate_feedback__(msg_type) 231 if message: 232 sns_backends[region].publish(message, arn=sns_topic) 233 234 def send_raw_email(self, source, destinations, raw_data, region): 235 if source is not None: 236 _, source_email_address = parseaddr(source) 237 if not self._is_verified_address(source_email_address): 238 raise MessageRejectedError( 239 "Did not have authority to send from email %s" 240 % source_email_address 241 ) 242 243 recipient_count = len(destinations) 244 message = email.message_from_string(raw_data) 245 if source is None: 246 if message["from"] is None: 247 raise MessageRejectedError("Source not specified") 248 249 _, source_email_address = parseaddr(message["from"]) 250 if not self._is_verified_address(source_email_address): 251 raise MessageRejectedError( 252 "Did not have authority to send from email %s" 253 % source_email_address 254 ) 255 256 for header in "TO", "CC", "BCC": 257 recipient_count += sum( 258 d.strip() and 1 or 0 for d in message.get(header, "").split(",") 259 ) 260 if recipient_count > RECIPIENT_LIMIT: 261 raise MessageRejectedError("Too many recipients.") 262 263 self.__process_sns_feedback__(source, destinations, region) 264 265 self.sent_message_count += recipient_count 266 message_id = get_random_message_id() 267 message = RawMessage(message_id, source, destinations, raw_data) 268 self.sent_messages.append(message) 269 return message 270 271 def get_send_quota(self): 272 return SESQuota(self.sent_message_count) 273 274 def get_identity_notification_attributes(self, identities): 275 response = {} 276 for identity in identities: 277 response[identity] = self.sns_topics.get(identity, {}) 278 return response 279 280 def set_identity_feedback_forwarding_enabled(self, identity, enabled): 281 identity_sns_topics = self.sns_topics.get(identity, {}) 282 identity_sns_topics[SESFeedback.FORWARDING_ENABLED] = enabled 283 self.sns_topics[identity] = identity_sns_topics 284 285 def set_identity_notification_topic(self, identity, notification_type, sns_topic): 286 identity_sns_topics = self.sns_topics.get(identity, {}) 287 if sns_topic is None: 288 del identity_sns_topics[notification_type] 289 else: 290 identity_sns_topics[notification_type] = sns_topic 291 292 self.sns_topics[identity] = identity_sns_topics 293 294 return {} 295 296 def create_configuration_set(self, configuration_set_name): 297 self.config_set[configuration_set_name] = 1 298 return {} 299 300 def create_configuration_set_event_destination( 301 self, configuration_set_name, event_destination 302 ): 303 304 if self.config_set.get(configuration_set_name) is None: 305 raise ConfigurationSetDoesNotExist("Invalid Configuration Set Name.") 306 307 if self.event_destinations.get(event_destination["Name"]): 308 raise EventDestinationAlreadyExists("Duplicate Event destination Name.") 309 310 self.config_set_event_destination[configuration_set_name] = event_destination 311 self.event_destinations[event_destination["Name"]] = 1 312 313 return {} 314 315 def get_send_statistics(self): 316 317 statistics = {} 318 statistics["DeliveryAttempts"] = self.sent_message_count 319 statistics["Rejects"] = self.rejected_messages_count 320 statistics["Complaints"] = 0 321 statistics["Bounces"] = 0 322 statistics["Timestamp"] = datetime.datetime.utcnow() 323 return statistics 324 325 def add_template(self, template_info): 326 template_name = template_info["template_name"] 327 if not template_name: 328 raise ValidationError( 329 "1 validation error detected: " 330 "Value null at 'template.templateName'" 331 "failed to satisfy constraint: Member must not be null" 332 ) 333 334 if self.templates.get(template_name, None): 335 raise TemplateNameAlreadyExists("Duplicate Template Name.") 336 337 template_subject = template_info["subject_part"] 338 if not template_subject: 339 raise InvalidParameterValue("The subject must be specified.") 340 self.templates[template_name] = template_info 341 342 def update_template(self, template_info): 343 template_name = template_info["template_name"] 344 if not template_name: 345 raise ValidationError( 346 "1 validation error detected: " 347 "Value null at 'template.templateName'" 348 "failed to satisfy constraint: Member must not be null" 349 ) 350 351 if not self.templates.get(template_name, None): 352 raise TemplateDoesNotExist("Invalid Template Name.") 353 354 template_subject = template_info["subject_part"] 355 if not template_subject: 356 raise InvalidParameterValue("The subject must be specified.") 357 self.templates[template_name] = template_info 358 359 def get_template(self, template_name): 360 if not self.templates.get(template_name, None): 361 raise TemplateDoesNotExist("Invalid Template Name.") 362 return self.templates[template_name] 363 364 def list_templates(self): 365 return list(self.templates.values()) 366 367 def render_template(self, render_data): 368 template_name = render_data.get("name", "") 369 template = self.templates.get(template_name, None) 370 if not template: 371 raise TemplateDoesNotExist("Invalid Template Name.") 372 373 template_data = render_data.get("data") 374 try: 375 template_data = json.loads(template_data) 376 except ValueError: 377 raise InvalidRenderingParameterException( 378 "Template rendering data is invalid" 379 ) 380 381 var, are_variables_present = are_all_variables_present(template, template_data) 382 if not are_variables_present: 383 raise MissingRenderingAttributeException(var) 384 385 subject_part = template["subject_part"] 386 text_part = template["text_part"] 387 html_part = template["html_part"] 388 389 for key, value in template_data.items(): 390 subject_part = str.replace(str(subject_part), "{{%s}}" % key, value) 391 text_part = str.replace(str(text_part), "{{%s}}" % key, value) 392 html_part = str.replace(str(html_part), "{{%s}}" % key, value) 393 394 email = MIMEMultipart("alternative") 395 396 mime_text = MIMEBase("text", "plain;charset=UTF-8") 397 mime_text.set_payload(text_part.encode("utf-8")) 398 encode_7or8bit(mime_text) 399 email.attach(mime_text) 400 401 mime_html = MIMEBase("text", "html;charset=UTF-8") 402 mime_html.set_payload(html_part.encode("utf-8")) 403 encode_7or8bit(mime_html) 404 email.attach(mime_html) 405 406 now = datetime.datetime.now().isoformat() 407 408 rendered_template = "Date: %s\r\nSubject: %s\r\n%s" % ( 409 now, 410 subject_part, 411 email.as_string(), 412 ) 413 return rendered_template 414 415 def create_receipt_rule_set(self, rule_set_name): 416 if self.receipt_rule_set.get(rule_set_name) is not None: 417 raise RuleSetNameAlreadyExists("Duplicate Receipt Rule Set Name.") 418 self.receipt_rule_set[rule_set_name] = [] 419 420 def create_receipt_rule(self, rule_set_name, rule): 421 rule_set = self.receipt_rule_set.get(rule_set_name) 422 if rule_set is None: 423 raise RuleSetDoesNotExist("Invalid Rule Set Name.") 424 if rule in rule_set: 425 raise RuleAlreadyExists("Duplicate Rule Name.") 426 rule_set.append(rule) 427 self.receipt_rule_set[rule_set_name] = rule_set 428 429 def describe_receipt_rule_set(self, rule_set_name): 430 rule_set = self.receipt_rule_set.get(rule_set_name) 431 432 if rule_set is None: 433 raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}") 434 435 return rule_set 436 437 def describe_receipt_rule(self, rule_set_name, rule_name): 438 rule_set = self.receipt_rule_set.get(rule_set_name) 439 440 if rule_set is None: 441 raise RuleSetDoesNotExist("Invalid Rule Set Name.") 442 443 for receipt_rule in rule_set: 444 if receipt_rule["name"] == rule_name: 445 return receipt_rule 446 else: 447 raise RuleDoesNotExist("Invalid Rule Name.") 448 449 def update_receipt_rule(self, rule_set_name, rule): 450 rule_set = self.receipt_rule_set.get(rule_set_name) 451 452 if rule_set is None: 453 raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}") 454 455 for i, receipt_rule in enumerate(rule_set): 456 if receipt_rule["name"] == rule["name"]: 457 rule_set[i] = rule 458 break 459 else: 460 raise RuleDoesNotExist(f"Rule does not exist: {rule['name']}") 461 462 463ses_backend = SESBackend() 464