1import re
2import json
3import email
4import datetime
5from email.mime.base import MIMEBase
6from email.utils import parseaddr
7from email.mime.multipart import MIMEMultipart
8from email.encoders import encode_7or8bit
9
10from moto.core import BaseBackend, BaseModel
11from moto.sns.models import sns_backends
12from .exceptions import (
13    MessageRejectedError,
14    ConfigurationSetDoesNotExist,
15    EventDestinationAlreadyExists,
16    TemplateNameAlreadyExists,
17    ValidationError,
18    InvalidParameterValue,
19    InvalidRenderingParameterException,
20    TemplateDoesNotExist,
21    RuleDoesNotExist,
22    RuleSetNameAlreadyExists,
23    RuleSetDoesNotExist,
24    RuleAlreadyExists,
25    MissingRenderingAttributeException,
26)
27from .utils import get_random_message_id
28from .feedback import COMMON_MAIL, BOUNCE, COMPLAINT, DELIVERY
29
30RECIPIENT_LIMIT = 50
31
32
33class SESFeedback(BaseModel):
34
35    BOUNCE = "Bounce"
36    COMPLAINT = "Complaint"
37    DELIVERY = "Delivery"
38
39    SUCCESS_ADDR = "success"
40    BOUNCE_ADDR = "bounce"
41    COMPLAINT_ADDR = "complaint"
42
43    FEEDBACK_SUCCESS_MSG = {"test": "success"}
44    FEEDBACK_BOUNCE_MSG = {"test": "bounce"}
45    FEEDBACK_COMPLAINT_MSG = {"test": "complaint"}
46
47    FORWARDING_ENABLED = "feedback_forwarding_enabled"
48
49    @staticmethod
50    def generate_message(msg_type):
51        msg = dict(COMMON_MAIL)
52        if msg_type == SESFeedback.BOUNCE:
53            msg["bounce"] = BOUNCE
54        elif msg_type == SESFeedback.COMPLAINT:
55            msg["complaint"] = COMPLAINT
56        elif msg_type == SESFeedback.DELIVERY:
57            msg["delivery"] = DELIVERY
58
59        return msg
60
61
62class Message(BaseModel):
63    def __init__(self, message_id, source, subject, body, destinations):
64        self.id = message_id
65        self.source = source
66        self.subject = subject
67        self.body = body
68        self.destinations = destinations
69
70
71class TemplateMessage(BaseModel):
72    def __init__(self, message_id, source, template, template_data, destinations):
73        self.id = message_id
74        self.source = source
75        self.template = template
76        self.template_data = template_data
77        self.destinations = destinations
78
79
80class RawMessage(BaseModel):
81    def __init__(self, message_id, source, destinations, raw_data):
82        self.id = message_id
83        self.source = source
84        self.destinations = destinations
85        self.raw_data = raw_data
86
87
88class SESQuota(BaseModel):
89    def __init__(self, sent):
90        self.sent = sent
91
92    @property
93    def sent_past_24(self):
94        return self.sent
95
96
97def are_all_variables_present(template, template_data):
98    subject_part = template["subject_part"]
99    text_part = template["text_part"]
100    html_part = template["html_part"]
101
102    for var in re.findall("{{(.+?)}}", subject_part + text_part + html_part):
103        if not template_data.get(var):
104            return var, False
105    return None, True
106
107
108class SESBackend(BaseBackend):
109    def __init__(self):
110        self.addresses = []
111        self.email_addresses = []
112        self.domains = []
113        self.sent_messages = []
114        self.sent_message_count = 0
115        self.rejected_messages_count = 0
116        self.sns_topics = {}
117        self.config_set = {}
118        self.config_set_event_destination = {}
119        self.event_destinations = {}
120        self.templates = {}
121        self.receipt_rule_set = {}
122
123    def _is_verified_address(self, source):
124        _, address = parseaddr(source)
125        if address in self.addresses:
126            return True
127        if address in self.email_addresses:
128            return True
129        user, host = address.split("@", 1)
130        return host in self.domains
131
132    def verify_email_identity(self, address):
133        _, address = parseaddr(address)
134        self.addresses.append(address)
135
136    def verify_email_address(self, address):
137        _, address = parseaddr(address)
138        self.email_addresses.append(address)
139
140    def verify_domain(self, domain):
141        if domain.lower() not in self.domains:
142            self.domains.append(domain.lower())
143
144    def list_identities(self):
145        return self.domains + self.addresses
146
147    def list_verified_email_addresses(self):
148        return self.email_addresses
149
150    def delete_identity(self, identity):
151        if "@" in identity:
152            self.addresses.remove(identity)
153        else:
154            self.domains.remove(identity)
155
156    def send_email(self, source, subject, body, destinations, region):
157        recipient_count = sum(map(len, destinations.values()))
158        if recipient_count > RECIPIENT_LIMIT:
159            raise MessageRejectedError("Too many recipients.")
160        if not self._is_verified_address(source):
161            self.rejected_messages_count += 1
162            raise MessageRejectedError("Email address not verified %s" % source)
163
164        self.__process_sns_feedback__(source, destinations, region)
165
166        message_id = get_random_message_id()
167        message = Message(message_id, source, subject, body, destinations)
168        self.sent_messages.append(message)
169        self.sent_message_count += recipient_count
170        return message
171
172    def send_templated_email(
173        self, source, template, template_data, destinations, region
174    ):
175        recipient_count = sum(map(len, destinations.values()))
176        if recipient_count > RECIPIENT_LIMIT:
177            raise MessageRejectedError("Too many recipients.")
178        if not self._is_verified_address(source):
179            self.rejected_messages_count += 1
180            raise MessageRejectedError("Email address not verified %s" % source)
181
182        if not self.templates.get(template[0]):
183            raise TemplateDoesNotExist("Template (%s) does not exist" % template[0])
184
185        self.__process_sns_feedback__(source, destinations, region)
186
187        message_id = get_random_message_id()
188        message = TemplateMessage(
189            message_id, source, template, template_data, destinations
190        )
191        self.sent_messages.append(message)
192        self.sent_message_count += recipient_count
193        return message
194
195    def __type_of_message__(self, destinations):
196        """Checks the destination for any special address that could indicate delivery,
197        complaint or bounce like in SES simulator"""
198        if isinstance(destinations, list):
199            alladdress = destinations
200        else:
201            alladdress = (
202                destinations.get("ToAddresses", [])
203                + destinations.get("CcAddresses", [])
204                + destinations.get("BccAddresses", [])
205            )
206
207        for addr in alladdress:
208            if SESFeedback.SUCCESS_ADDR in addr:
209                return SESFeedback.DELIVERY
210            elif SESFeedback.COMPLAINT_ADDR in addr:
211                return SESFeedback.COMPLAINT
212            elif SESFeedback.BOUNCE_ADDR in addr:
213                return SESFeedback.BOUNCE
214
215        return None
216
217    def __generate_feedback__(self, msg_type):
218        """Generates the SNS message for the feedback"""
219        return SESFeedback.generate_message(msg_type)
220
221    def __process_sns_feedback__(self, source, destinations, region):
222        domain = str(source)
223        if "@" in domain:
224            domain = domain.split("@")[1]
225        if domain in self.sns_topics:
226            msg_type = self.__type_of_message__(destinations)
227            if msg_type is not None:
228                sns_topic = self.sns_topics[domain].get(msg_type, None)
229                if sns_topic is not None:
230                    message = self.__generate_feedback__(msg_type)
231                    if message:
232                        sns_backends[region].publish(message, arn=sns_topic)
233
234    def send_raw_email(self, source, destinations, raw_data, region):
235        if source is not None:
236            _, source_email_address = parseaddr(source)
237            if not self._is_verified_address(source_email_address):
238                raise MessageRejectedError(
239                    "Did not have authority to send from email %s"
240                    % source_email_address
241                )
242
243        recipient_count = len(destinations)
244        message = email.message_from_string(raw_data)
245        if source is None:
246            if message["from"] is None:
247                raise MessageRejectedError("Source not specified")
248
249            _, source_email_address = parseaddr(message["from"])
250            if not self._is_verified_address(source_email_address):
251                raise MessageRejectedError(
252                    "Did not have authority to send from email %s"
253                    % source_email_address
254                )
255
256        for header in "TO", "CC", "BCC":
257            recipient_count += sum(
258                d.strip() and 1 or 0 for d in message.get(header, "").split(",")
259            )
260        if recipient_count > RECIPIENT_LIMIT:
261            raise MessageRejectedError("Too many recipients.")
262
263        self.__process_sns_feedback__(source, destinations, region)
264
265        self.sent_message_count += recipient_count
266        message_id = get_random_message_id()
267        message = RawMessage(message_id, source, destinations, raw_data)
268        self.sent_messages.append(message)
269        return message
270
271    def get_send_quota(self):
272        return SESQuota(self.sent_message_count)
273
274    def get_identity_notification_attributes(self, identities):
275        response = {}
276        for identity in identities:
277            response[identity] = self.sns_topics.get(identity, {})
278        return response
279
280    def set_identity_feedback_forwarding_enabled(self, identity, enabled):
281        identity_sns_topics = self.sns_topics.get(identity, {})
282        identity_sns_topics[SESFeedback.FORWARDING_ENABLED] = enabled
283        self.sns_topics[identity] = identity_sns_topics
284
285    def set_identity_notification_topic(self, identity, notification_type, sns_topic):
286        identity_sns_topics = self.sns_topics.get(identity, {})
287        if sns_topic is None:
288            del identity_sns_topics[notification_type]
289        else:
290            identity_sns_topics[notification_type] = sns_topic
291
292        self.sns_topics[identity] = identity_sns_topics
293
294        return {}
295
296    def create_configuration_set(self, configuration_set_name):
297        self.config_set[configuration_set_name] = 1
298        return {}
299
300    def create_configuration_set_event_destination(
301        self, configuration_set_name, event_destination
302    ):
303
304        if self.config_set.get(configuration_set_name) is None:
305            raise ConfigurationSetDoesNotExist("Invalid Configuration Set Name.")
306
307        if self.event_destinations.get(event_destination["Name"]):
308            raise EventDestinationAlreadyExists("Duplicate Event destination Name.")
309
310        self.config_set_event_destination[configuration_set_name] = event_destination
311        self.event_destinations[event_destination["Name"]] = 1
312
313        return {}
314
315    def get_send_statistics(self):
316
317        statistics = {}
318        statistics["DeliveryAttempts"] = self.sent_message_count
319        statistics["Rejects"] = self.rejected_messages_count
320        statistics["Complaints"] = 0
321        statistics["Bounces"] = 0
322        statistics["Timestamp"] = datetime.datetime.utcnow()
323        return statistics
324
325    def add_template(self, template_info):
326        template_name = template_info["template_name"]
327        if not template_name:
328            raise ValidationError(
329                "1 validation error detected: "
330                "Value null at 'template.templateName'"
331                "failed to satisfy constraint: Member must not be null"
332            )
333
334        if self.templates.get(template_name, None):
335            raise TemplateNameAlreadyExists("Duplicate Template Name.")
336
337        template_subject = template_info["subject_part"]
338        if not template_subject:
339            raise InvalidParameterValue("The subject must be specified.")
340        self.templates[template_name] = template_info
341
342    def update_template(self, template_info):
343        template_name = template_info["template_name"]
344        if not template_name:
345            raise ValidationError(
346                "1 validation error detected: "
347                "Value null at 'template.templateName'"
348                "failed to satisfy constraint: Member must not be null"
349            )
350
351        if not self.templates.get(template_name, None):
352            raise TemplateDoesNotExist("Invalid Template Name.")
353
354        template_subject = template_info["subject_part"]
355        if not template_subject:
356            raise InvalidParameterValue("The subject must be specified.")
357        self.templates[template_name] = template_info
358
359    def get_template(self, template_name):
360        if not self.templates.get(template_name, None):
361            raise TemplateDoesNotExist("Invalid Template Name.")
362        return self.templates[template_name]
363
364    def list_templates(self):
365        return list(self.templates.values())
366
367    def render_template(self, render_data):
368        template_name = render_data.get("name", "")
369        template = self.templates.get(template_name, None)
370        if not template:
371            raise TemplateDoesNotExist("Invalid Template Name.")
372
373        template_data = render_data.get("data")
374        try:
375            template_data = json.loads(template_data)
376        except ValueError:
377            raise InvalidRenderingParameterException(
378                "Template rendering data is invalid"
379            )
380
381        var, are_variables_present = are_all_variables_present(template, template_data)
382        if not are_variables_present:
383            raise MissingRenderingAttributeException(var)
384
385        subject_part = template["subject_part"]
386        text_part = template["text_part"]
387        html_part = template["html_part"]
388
389        for key, value in template_data.items():
390            subject_part = str.replace(str(subject_part), "{{%s}}" % key, value)
391            text_part = str.replace(str(text_part), "{{%s}}" % key, value)
392            html_part = str.replace(str(html_part), "{{%s}}" % key, value)
393
394        email = MIMEMultipart("alternative")
395
396        mime_text = MIMEBase("text", "plain;charset=UTF-8")
397        mime_text.set_payload(text_part.encode("utf-8"))
398        encode_7or8bit(mime_text)
399        email.attach(mime_text)
400
401        mime_html = MIMEBase("text", "html;charset=UTF-8")
402        mime_html.set_payload(html_part.encode("utf-8"))
403        encode_7or8bit(mime_html)
404        email.attach(mime_html)
405
406        now = datetime.datetime.now().isoformat()
407
408        rendered_template = "Date: %s\r\nSubject: %s\r\n%s" % (
409            now,
410            subject_part,
411            email.as_string(),
412        )
413        return rendered_template
414
415    def create_receipt_rule_set(self, rule_set_name):
416        if self.receipt_rule_set.get(rule_set_name) is not None:
417            raise RuleSetNameAlreadyExists("Duplicate Receipt Rule Set Name.")
418        self.receipt_rule_set[rule_set_name] = []
419
420    def create_receipt_rule(self, rule_set_name, rule):
421        rule_set = self.receipt_rule_set.get(rule_set_name)
422        if rule_set is None:
423            raise RuleSetDoesNotExist("Invalid Rule Set Name.")
424        if rule in rule_set:
425            raise RuleAlreadyExists("Duplicate Rule Name.")
426        rule_set.append(rule)
427        self.receipt_rule_set[rule_set_name] = rule_set
428
429    def describe_receipt_rule_set(self, rule_set_name):
430        rule_set = self.receipt_rule_set.get(rule_set_name)
431
432        if rule_set is None:
433            raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}")
434
435        return rule_set
436
437    def describe_receipt_rule(self, rule_set_name, rule_name):
438        rule_set = self.receipt_rule_set.get(rule_set_name)
439
440        if rule_set is None:
441            raise RuleSetDoesNotExist("Invalid Rule Set Name.")
442
443        for receipt_rule in rule_set:
444            if receipt_rule["name"] == rule_name:
445                return receipt_rule
446        else:
447            raise RuleDoesNotExist("Invalid Rule Name.")
448
449    def update_receipt_rule(self, rule_set_name, rule):
450        rule_set = self.receipt_rule_set.get(rule_set_name)
451
452        if rule_set is None:
453            raise RuleSetDoesNotExist(f"Rule set does not exist: {rule_set_name}")
454
455        for i, receipt_rule in enumerate(rule_set):
456            if receipt_rule["name"] == rule["name"]:
457                rule_set[i] = rule
458                break
459        else:
460            raise RuleDoesNotExist(f"Rule does not exist: {rule['name']}")
461
462
463ses_backend = SESBackend()
464