1"""Route53Backend class with methods for supported APIs."""
2import itertools
3from collections import defaultdict
4import re
5
6import string
7import random
8import uuid
9from jinja2 import Template
10
11from moto.route53.exceptions import (
12    InvalidInput,
13    NoSuchCloudWatchLogsLogGroup,
14    NoSuchHostedZone,
15    NoSuchQueryLoggingConfig,
16    QueryLoggingConfigAlreadyExists,
17)
18from moto.core import BaseBackend, BaseModel, CloudFormationModel
19from moto.utilities.paginator import paginate
20from .utils import PAGINATION_MODEL
21
22ROUTE53_ID_CHOICE = string.ascii_uppercase + string.digits
23
24
25def create_route53_zone_id():
26    # New ID's look like this Z1RWWTK7Y8UDDQ
27    return "".join([random.choice(ROUTE53_ID_CHOICE) for _ in range(0, 15)])
28
29
30class HealthCheck(CloudFormationModel):
31    def __init__(self, health_check_id, caller_reference, health_check_args):
32        self.id = health_check_id
33        self.ip_address = health_check_args.get("ip_address")
34        self.port = health_check_args.get("port") or 80
35        self.type_ = health_check_args.get("type")
36        self.resource_path = health_check_args.get("resource_path")
37        self.fqdn = health_check_args.get("fqdn")
38        self.search_string = health_check_args.get("search_string")
39        self.request_interval = health_check_args.get("request_interval") or 30
40        self.failure_threshold = health_check_args.get("failure_threshold") or 3
41        self.health_threshold = health_check_args.get("health_threshold")
42        self.measure_latency = health_check_args.get("measure_latency") or False
43        self.inverted = health_check_args.get("inverted") or False
44        self.disabled = health_check_args.get("disabled") or False
45        self.enable_sni = health_check_args.get("enable_sni") or False
46        self.children = health_check_args.get("children") or None
47        self.caller_reference = caller_reference
48
49    @property
50    def physical_resource_id(self):
51        return self.id
52
53    @staticmethod
54    def cloudformation_name_type():
55        return None
56
57    @staticmethod
58    def cloudformation_type():
59        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-healthcheck.html
60        return "AWS::Route53::HealthCheck"
61
62    @classmethod
63    def create_from_cloudformation_json(
64        cls, resource_name, cloudformation_json, region_name, **kwargs
65    ):
66        properties = cloudformation_json["Properties"]["HealthCheckConfig"]
67        health_check_args = {
68            "ip_address": properties.get("IPAddress"),
69            "port": properties.get("Port"),
70            "type": properties["Type"],
71            "resource_path": properties.get("ResourcePath"),
72            "fqdn": properties.get("FullyQualifiedDomainName"),
73            "search_string": properties.get("SearchString"),
74            "request_interval": properties.get("RequestInterval"),
75            "failure_threshold": properties.get("FailureThreshold"),
76        }
77        health_check = route53_backend.create_health_check(
78            caller_reference=resource_name, health_check_args=health_check_args
79        )
80        return health_check
81
82    def to_xml(self):
83        template = Template(
84            """<HealthCheck>
85            <Id>{{ health_check.id }}</Id>
86            <CallerReference>{{ health_check.caller_reference }}</CallerReference>
87            <HealthCheckConfig>
88                {% if health_check.type_ != "CALCULATED" %}
89                    <IPAddress>{{ health_check.ip_address }}</IPAddress>
90                    <Port>{{ health_check.port }}</Port>
91                {% endif %}
92                <Type>{{ health_check.type_ }}</Type>
93                {% if health_check.resource_path %}
94                    <ResourcePath>{{ health_check.resource_path }}</ResourcePath>
95                {% endif %}
96                {% if health_check.fqdn %}
97                    <FullyQualifiedDomainName>{{ health_check.fqdn }}</FullyQualifiedDomainName>
98                {% endif %}
99                {% if health_check.type_ != "CALCULATED" %}
100                    <RequestInterval>{{ health_check.request_interval }}</RequestInterval>
101                    <FailureThreshold>{{ health_check.failure_threshold }}</FailureThreshold>
102                    <MeasureLatency>{{ health_check.measure_latency }}</MeasureLatency>
103                {% endif %}
104                {% if health_check.type_ == "CALCULATED" %}
105                    <HealthThreshold>{{ health_check.health_threshold }}</HealthThreshold>
106                {% endif %}
107                <Inverted>{{ health_check.inverted }}</Inverted>
108                <Disabled>{{ health_check.disabled }}</Disabled>
109                <EnableSNI>{{ health_check.enable_sni }}</EnableSNI>
110                {% if health_check.search_string %}
111                    <SearchString>{{ health_check.search_string }}</SearchString>
112                {% endif %}
113                {% if health_check.children %}
114                    <ChildHealthChecks>
115                    {% for child in health_check.children %}
116                        <member>{{ child }}</member>
117                    {% endfor %}
118                    </ChildHealthChecks>
119                {% endif %}
120            </HealthCheckConfig>
121            <HealthCheckVersion>1</HealthCheckVersion>
122        </HealthCheck>"""
123        )
124        return template.render(health_check=self)
125
126
127class RecordSet(CloudFormationModel):
128    def __init__(self, kwargs):
129        self.name = kwargs.get("Name")
130        self.type_ = kwargs.get("Type")
131        self.ttl = kwargs.get("TTL")
132        self.records = kwargs.get("ResourceRecords", [])
133        self.set_identifier = kwargs.get("SetIdentifier")
134        self.weight = kwargs.get("Weight")
135        self.region = kwargs.get("Region")
136        self.health_check = kwargs.get("HealthCheckId")
137        self.hosted_zone_name = kwargs.get("HostedZoneName")
138        self.hosted_zone_id = kwargs.get("HostedZoneId")
139        self.alias_target = kwargs.get("AliasTarget")
140        self.failover = kwargs.get("Failover")
141        self.geo_location = kwargs.get("GeoLocation")
142
143    @staticmethod
144    def cloudformation_name_type():
145        return "Name"
146
147    @staticmethod
148    def cloudformation_type():
149        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-recordset.html
150        return "AWS::Route53::RecordSet"
151
152    @classmethod
153    def create_from_cloudformation_json(
154        cls, resource_name, cloudformation_json, region_name, **kwargs
155    ):
156        properties = cloudformation_json["Properties"]
157
158        zone_name = properties.get("HostedZoneName")
159        if zone_name:
160            hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name)
161        else:
162            hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"])
163        record_set = hosted_zone.add_rrset(properties)
164        return record_set
165
166    @classmethod
167    def update_from_cloudformation_json(
168        cls, original_resource, new_resource_name, cloudformation_json, region_name
169    ):
170        cls.delete_from_cloudformation_json(
171            original_resource.name, cloudformation_json, region_name
172        )
173        return cls.create_from_cloudformation_json(
174            new_resource_name, cloudformation_json, region_name
175        )
176
177    @classmethod
178    def delete_from_cloudformation_json(
179        cls, resource_name, cloudformation_json, region_name
180    ):
181        # this will break if you changed the zone the record is in,
182        # unfortunately
183        properties = cloudformation_json["Properties"]
184
185        zone_name = properties.get("HostedZoneName")
186        if zone_name:
187            hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name)
188        else:
189            hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"])
190
191        try:
192            hosted_zone.delete_rrset({"Name": resource_name})
193        except KeyError:
194            pass
195
196    @property
197    def physical_resource_id(self):
198        return self.name
199
200    def to_xml(self):
201        template = Template(
202            """<ResourceRecordSet>
203                <Name>{{ record_set.name }}</Name>
204                <Type>{{ record_set.type_ }}</Type>
205                {% if record_set.set_identifier %}
206                    <SetIdentifier>{{ record_set.set_identifier }}</SetIdentifier>
207                {% endif %}
208                {% if record_set.weight %}
209                    <Weight>{{ record_set.weight }}</Weight>
210                {% endif %}
211                {% if record_set.region %}
212                    <Region>{{ record_set.region }}</Region>
213                {% endif %}
214                {% if record_set.ttl %}
215                    <TTL>{{ record_set.ttl }}</TTL>
216                {% endif %}
217                {% if record_set.failover %}
218                    <Failover>{{ record_set.failover }}</Failover>
219                {% endif %}
220                {% if record_set.geo_location %}
221                <GeoLocation>
222                {% for geo_key in ['ContinentCode','CountryCode','SubdivisionCode'] %}
223                  {% if record_set.geo_location[geo_key] %}<{{ geo_key }}>{{ record_set.geo_location[geo_key] }}</{{ geo_key }}>{% endif %}
224                {% endfor %}
225                </GeoLocation>
226                {% endif %}
227                {% if record_set.alias_target %}
228                <AliasTarget>
229                    <HostedZoneId>{{ record_set.alias_target['HostedZoneId'] }}</HostedZoneId>
230                    <DNSName>{{ record_set.alias_target['DNSName'] }}</DNSName>
231                    <EvaluateTargetHealth>{{ record_set.alias_target['EvaluateTargetHealth'] }}</EvaluateTargetHealth>
232                </AliasTarget>
233                {% else %}
234                <ResourceRecords>
235                    {% for record in record_set.records %}
236                    <ResourceRecord>
237                        <Value>{{ record|e }}</Value>
238                    </ResourceRecord>
239                    {% endfor %}
240                </ResourceRecords>
241                {% endif %}
242                {% if record_set.health_check %}
243                    <HealthCheckId>{{ record_set.health_check }}</HealthCheckId>
244                {% endif %}
245            </ResourceRecordSet>"""
246        )
247        return template.render(record_set=self)
248
249    def delete(self, *args, **kwargs):
250        """Not exposed as part of the Route 53 API - used for CloudFormation. args are ignored"""
251        hosted_zone = route53_backend.get_hosted_zone_by_name(self.hosted_zone_name)
252        if not hosted_zone:
253            hosted_zone = route53_backend.get_hosted_zone(self.hosted_zone_id)
254        hosted_zone.delete_rrset({"Name": self.name, "Type": self.type_})
255
256
257def reverse_domain_name(domain_name):
258    if domain_name.endswith("."):  # normalize without trailing dot
259        domain_name = domain_name[:-1]
260    return ".".join(reversed(domain_name.split(".")))
261
262
263class FakeZone(CloudFormationModel):
264    def __init__(self, name, id_, private_zone, comment=None):
265        self.name = name
266        self.id = id_
267        if comment is not None:
268            self.comment = comment
269        self.private_zone = private_zone
270        self.rrsets = []
271
272    def add_rrset(self, record_set):
273        record_set = RecordSet(record_set)
274        self.rrsets.append(record_set)
275        return record_set
276
277    def upsert_rrset(self, record_set):
278        new_rrset = RecordSet(record_set)
279        for i, rrset in enumerate(self.rrsets):
280            if (
281                rrset.name == new_rrset.name
282                and rrset.type_ == new_rrset.type_
283                and rrset.set_identifier == new_rrset.set_identifier
284            ):
285                self.rrsets[i] = new_rrset
286                break
287        else:
288            self.rrsets.append(new_rrset)
289        return new_rrset
290
291    def delete_rrset(self, rrset):
292        self.rrsets = [
293            record_set
294            for record_set in self.rrsets
295            if record_set.name != rrset["Name"]
296            or (rrset.get("Type") is not None and record_set.type_ != rrset["Type"])
297        ]
298
299    def delete_rrset_by_id(self, set_identifier):
300        self.rrsets = [
301            record_set
302            for record_set in self.rrsets
303            if record_set.set_identifier != set_identifier
304        ]
305
306    def get_record_sets(self, start_type, start_name):
307        def predicate(rrset):
308            rrset_name_reversed = reverse_domain_name(rrset.name)
309            start_name_reversed = reverse_domain_name(start_name)
310            return rrset_name_reversed < start_name_reversed or (
311                rrset_name_reversed == start_name_reversed and rrset.type_ < start_type
312            )
313
314        record_sets = sorted(
315            self.rrsets,
316            key=lambda rrset: (reverse_domain_name(rrset.name), rrset.type_),
317        )
318
319        if start_name:
320            start_type = start_type or ""
321            record_sets = itertools.dropwhile(predicate, record_sets)
322
323        return record_sets
324
325    @property
326    def physical_resource_id(self):
327        return self.id
328
329    @staticmethod
330    def cloudformation_name_type():
331        return "Name"
332
333    @staticmethod
334    def cloudformation_type():
335        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-hostedzone.html
336        return "AWS::Route53::HostedZone"
337
338    @classmethod
339    def create_from_cloudformation_json(
340        cls, resource_name, cloudformation_json, region_name, **kwargs
341    ):
342        hosted_zone = route53_backend.create_hosted_zone(
343            resource_name, private_zone=False
344        )
345        return hosted_zone
346
347
348class RecordSetGroup(CloudFormationModel):
349    def __init__(self, hosted_zone_id, record_sets):
350        self.hosted_zone_id = hosted_zone_id
351        self.record_sets = record_sets
352
353    @property
354    def physical_resource_id(self):
355        return f"arn:aws:route53:::hostedzone/{self.hosted_zone_id}"
356
357    @staticmethod
358    def cloudformation_name_type():
359        return None
360
361    @staticmethod
362    def cloudformation_type():
363        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-recordsetgroup.html
364        return "AWS::Route53::RecordSetGroup"
365
366    @classmethod
367    def create_from_cloudformation_json(
368        cls, resource_name, cloudformation_json, region_name, **kwargs
369    ):
370        properties = cloudformation_json["Properties"]
371
372        zone_name = properties.get("HostedZoneName")
373        if zone_name:
374            hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name)
375        else:
376            hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"])
377        record_sets = properties["RecordSets"]
378        for record_set in record_sets:
379            hosted_zone.add_rrset(record_set)
380
381        record_set_group = RecordSetGroup(hosted_zone.id, record_sets)
382        return record_set_group
383
384
385class QueryLoggingConfig(BaseModel):
386
387    """QueryLoggingConfig class; this object isn't part of Cloudformation."""
388
389    def __init__(
390        self, query_logging_config_id, hosted_zone_id, cloudwatch_logs_log_group_arn
391    ):
392        self.hosted_zone_id = hosted_zone_id
393        self.cloudwatch_logs_log_group_arn = cloudwatch_logs_log_group_arn
394        self.query_logging_config_id = query_logging_config_id
395        self.location = f"https://route53.amazonaws.com/2013-04-01/queryloggingconfig/{self.query_logging_config_id}"
396
397    def to_xml(self):
398        template = Template(
399            """<QueryLoggingConfig>
400                <CloudWatchLogsLogGroupArn>{{ query_logging_config.cloudwatch_logs_log_group_arn }}</CloudWatchLogsLogGroupArn>
401                <HostedZoneId>{{ query_logging_config.hosted_zone_id }}</HostedZoneId>
402                <Id>{{ query_logging_config.query_logging_config_id }}</Id>
403            </QueryLoggingConfig>"""
404        )
405        # The "Location" value must be put into the header; that's done in
406        # responses.py.
407        return template.render(query_logging_config=self)
408
409
410class Route53Backend(BaseBackend):
411    def __init__(self):
412        self.zones = {}
413        self.health_checks = {}
414        self.resource_tags = defaultdict(dict)
415        self.query_logging_configs = {}
416
417    def create_hosted_zone(self, name, private_zone, comment=None):
418        new_id = create_route53_zone_id()
419        new_zone = FakeZone(name, new_id, private_zone=private_zone, comment=comment)
420        self.zones[new_id] = new_zone
421        return new_zone
422
423    def change_tags_for_resource(self, resource_id, tags):
424        if "Tag" in tags:
425            if isinstance(tags["Tag"], list):
426                for tag in tags["Tag"]:
427                    self.resource_tags[resource_id][tag["Key"]] = tag["Value"]
428            else:
429                key, value = (tags["Tag"]["Key"], tags["Tag"]["Value"])
430                self.resource_tags[resource_id][key] = value
431        else:
432            if "Key" in tags:
433                if isinstance(tags["Key"], list):
434                    for key in tags["Key"]:
435                        del self.resource_tags[resource_id][key]
436                else:
437                    del self.resource_tags[resource_id][tags["Key"]]
438
439    def list_tags_for_resource(self, resource_id):
440        if resource_id in self.resource_tags:
441            return self.resource_tags[resource_id]
442        return {}
443
444    def change_resource_record_sets(self, the_zone, change_list):
445        for value in change_list:
446            action = value["Action"]
447            record_set = value["ResourceRecordSet"]
448
449            cleaned_record_name = record_set["Name"].strip(".")
450            cleaned_hosted_zone_name = the_zone.name.strip(".")
451
452            if not cleaned_record_name.endswith(cleaned_hosted_zone_name):
453                error_msg = f"""
454                An error occurred (InvalidChangeBatch) when calling the ChangeResourceRecordSets operation:
455                RRSet with DNS name {record_set["Name"]} is not permitted in zone {the_zone.name}
456                """
457                return error_msg
458
459            if not record_set["Name"].endswith("."):
460                record_set["Name"] += "."
461
462            if action in ("CREATE", "UPSERT"):
463                if "ResourceRecords" in record_set:
464                    resource_records = list(record_set["ResourceRecords"].values())[0]
465                    if not isinstance(resource_records, list):
466                        # Depending on how many records there are, this may
467                        # or may not be a list
468                        resource_records = [resource_records]
469                    record_set["ResourceRecords"] = [
470                        x["Value"] for x in resource_records
471                    ]
472                if action == "CREATE":
473                    the_zone.add_rrset(record_set)
474                else:
475                    the_zone.upsert_rrset(record_set)
476            elif action == "DELETE":
477                if "SetIdentifier" in record_set:
478                    the_zone.delete_rrset_by_id(record_set["SetIdentifier"])
479                else:
480                    the_zone.delete_rrset(record_set)
481        return None
482
483    def list_hosted_zones(self):
484        return self.zones.values()
485
486    def list_hosted_zones_by_name(self, dnsname):
487        if dnsname:
488            dnsname = dnsname[0]
489            if dnsname[-1] != ".":
490                dnsname += "."
491            zones = [zone for zone in self.list_hosted_zones() if zone.name == dnsname]
492        else:
493            # sort by names, but with domain components reversed
494            # see http://boto3.readthedocs.io/en/latest/reference/services/route53.html#Route53.Client.list_hosted_zones_by_name
495
496            def sort_key(zone):
497                domains = zone.name.split(".")
498                if domains[-1] == "":
499                    domains = domains[-1:] + domains[:-1]
500                return ".".join(reversed(domains))
501
502            zones = self.list_hosted_zones()
503            zones = sorted(zones, key=sort_key)
504        return dnsname, zones
505
506    def get_hosted_zone(self, id_):
507        return self.zones.get(id_.replace("/hostedzone/", ""))
508
509    def get_hosted_zone_by_name(self, name):
510        for zone in self.list_hosted_zones():
511            if zone.name == name:
512                return zone
513        return None
514
515    def delete_hosted_zone(self, id_):
516        return self.zones.pop(id_.replace("/hostedzone/", ""), None)
517
518    def create_health_check(self, caller_reference, health_check_args):
519        health_check_id = str(uuid.uuid4())
520        health_check = HealthCheck(health_check_id, caller_reference, health_check_args)
521        self.health_checks[health_check_id] = health_check
522        return health_check
523
524    def list_health_checks(self):
525        return self.health_checks.values()
526
527    def delete_health_check(self, health_check_id):
528        return self.health_checks.pop(health_check_id, None)
529
530    @staticmethod
531    def _validate_arn(region, arn):
532        match = re.match(fr"arn:aws:logs:{region}:\d{{12}}:log-group:.+", arn)
533        if not arn or not match:
534            raise InvalidInput()
535
536        # The CloudWatch Logs log group must be in the "us-east-1" region.
537        match = re.match(r"^(?:[^:]+:){3}(?P<region>[^:]+).*", arn)
538        if match.group("region") != "us-east-1":
539            raise InvalidInput()
540
541    def create_query_logging_config(self, region, hosted_zone_id, log_group_arn):
542        """Process the create_query_logging_config request."""
543        # Does the hosted_zone_id exist?
544        response = self.list_hosted_zones()
545        zones = list(response) if response else []
546        for zone in zones:
547            if zone.id == hosted_zone_id:
548                break
549        else:
550            raise NoSuchHostedZone(hosted_zone_id)
551
552        # Ensure CloudWatch Logs log ARN is valid, otherwise raise an error.
553        self._validate_arn(region, log_group_arn)
554
555        # Note:  boto3 checks the resource policy permissions before checking
556        # whether the log group exists.  moto doesn't have a way of checking
557        # the resource policy, so in some instances moto will complain
558        # about a log group that doesn't exist whereas boto3 will complain
559        # that "The resource policy that you're using for Route 53 query
560        # logging doesn't grant Route 53 sufficient permission to create
561        # a log stream in the specified log group."
562
563        from moto.logs import logs_backends  # pylint: disable=import-outside-toplevel
564
565        response = logs_backends[region].describe_log_groups()
566        log_groups = response[0] if response else []
567        for entry in log_groups:
568            if log_group_arn == entry["arn"]:
569                break
570        else:
571            # There is no CloudWatch Logs log group with the specified ARN.
572            raise NoSuchCloudWatchLogsLogGroup()
573
574        # Verify there is no existing query log config using the same hosted
575        # zone.
576        for query_log in self.query_logging_configs.values():
577            if query_log.hosted_zone_id == hosted_zone_id:
578                raise QueryLoggingConfigAlreadyExists()
579
580        # Create an instance of the query logging config.
581        query_logging_config_id = str(uuid.uuid4())
582        query_logging_config = QueryLoggingConfig(
583            query_logging_config_id, hosted_zone_id, log_group_arn
584        )
585        self.query_logging_configs[query_logging_config_id] = query_logging_config
586        return query_logging_config
587
588    def delete_query_logging_config(self, query_logging_config_id):
589        """Delete query logging config, if it exists."""
590        if query_logging_config_id not in self.query_logging_configs:
591            raise NoSuchQueryLoggingConfig()
592        self.query_logging_configs.pop(query_logging_config_id)
593
594    def get_query_logging_config(self, query_logging_config_id):
595        """Return query logging config, if it exists."""
596        if query_logging_config_id not in self.query_logging_configs:
597            raise NoSuchQueryLoggingConfig()
598        return self.query_logging_configs[query_logging_config_id]
599
600    @paginate(pagination_model=PAGINATION_MODEL)
601    def list_query_logging_configs(
602        self, hosted_zone_id=None, next_token=None, max_results=None,
603    ):  # pylint: disable=unused-argument
604        """Return a list of query logging configs."""
605        if hosted_zone_id:
606            # Does the hosted_zone_id exist?
607            response = self.list_hosted_zones()
608            zones = list(response) if response else []
609            for zone in zones:
610                if zone.id == hosted_zone_id:
611                    break
612            else:
613                raise NoSuchHostedZone(hosted_zone_id)
614
615        return list(self.query_logging_configs.values())
616
617
618route53_backend = Route53Backend()
619