1"""Route53Backend class with methods for supported APIs.""" 2import itertools 3from collections import defaultdict 4import re 5 6import string 7import random 8import uuid 9from jinja2 import Template 10 11from moto.route53.exceptions import ( 12 InvalidInput, 13 NoSuchCloudWatchLogsLogGroup, 14 NoSuchHostedZone, 15 NoSuchQueryLoggingConfig, 16 QueryLoggingConfigAlreadyExists, 17) 18from moto.core import BaseBackend, BaseModel, CloudFormationModel 19from moto.utilities.paginator import paginate 20from .utils import PAGINATION_MODEL 21 22ROUTE53_ID_CHOICE = string.ascii_uppercase + string.digits 23 24 25def create_route53_zone_id(): 26 # New ID's look like this Z1RWWTK7Y8UDDQ 27 return "".join([random.choice(ROUTE53_ID_CHOICE) for _ in range(0, 15)]) 28 29 30class HealthCheck(CloudFormationModel): 31 def __init__(self, health_check_id, caller_reference, health_check_args): 32 self.id = health_check_id 33 self.ip_address = health_check_args.get("ip_address") 34 self.port = health_check_args.get("port") or 80 35 self.type_ = health_check_args.get("type") 36 self.resource_path = health_check_args.get("resource_path") 37 self.fqdn = health_check_args.get("fqdn") 38 self.search_string = health_check_args.get("search_string") 39 self.request_interval = health_check_args.get("request_interval") or 30 40 self.failure_threshold = health_check_args.get("failure_threshold") or 3 41 self.health_threshold = health_check_args.get("health_threshold") 42 self.measure_latency = health_check_args.get("measure_latency") or False 43 self.inverted = health_check_args.get("inverted") or False 44 self.disabled = health_check_args.get("disabled") or False 45 self.enable_sni = health_check_args.get("enable_sni") or False 46 self.children = health_check_args.get("children") or None 47 self.caller_reference = caller_reference 48 49 @property 50 def physical_resource_id(self): 51 return self.id 52 53 @staticmethod 54 def cloudformation_name_type(): 55 return None 56 57 @staticmethod 58 def cloudformation_type(): 59 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-healthcheck.html 60 return "AWS::Route53::HealthCheck" 61 62 @classmethod 63 def create_from_cloudformation_json( 64 cls, resource_name, cloudformation_json, region_name, **kwargs 65 ): 66 properties = cloudformation_json["Properties"]["HealthCheckConfig"] 67 health_check_args = { 68 "ip_address": properties.get("IPAddress"), 69 "port": properties.get("Port"), 70 "type": properties["Type"], 71 "resource_path": properties.get("ResourcePath"), 72 "fqdn": properties.get("FullyQualifiedDomainName"), 73 "search_string": properties.get("SearchString"), 74 "request_interval": properties.get("RequestInterval"), 75 "failure_threshold": properties.get("FailureThreshold"), 76 } 77 health_check = route53_backend.create_health_check( 78 caller_reference=resource_name, health_check_args=health_check_args 79 ) 80 return health_check 81 82 def to_xml(self): 83 template = Template( 84 """<HealthCheck> 85 <Id>{{ health_check.id }}</Id> 86 <CallerReference>{{ health_check.caller_reference }}</CallerReference> 87 <HealthCheckConfig> 88 {% if health_check.type_ != "CALCULATED" %} 89 <IPAddress>{{ health_check.ip_address }}</IPAddress> 90 <Port>{{ health_check.port }}</Port> 91 {% endif %} 92 <Type>{{ health_check.type_ }}</Type> 93 {% if health_check.resource_path %} 94 <ResourcePath>{{ health_check.resource_path }}</ResourcePath> 95 {% endif %} 96 {% if health_check.fqdn %} 97 <FullyQualifiedDomainName>{{ health_check.fqdn }}</FullyQualifiedDomainName> 98 {% endif %} 99 {% if health_check.type_ != "CALCULATED" %} 100 <RequestInterval>{{ health_check.request_interval }}</RequestInterval> 101 <FailureThreshold>{{ health_check.failure_threshold }}</FailureThreshold> 102 <MeasureLatency>{{ health_check.measure_latency }}</MeasureLatency> 103 {% endif %} 104 {% if health_check.type_ == "CALCULATED" %} 105 <HealthThreshold>{{ health_check.health_threshold }}</HealthThreshold> 106 {% endif %} 107 <Inverted>{{ health_check.inverted }}</Inverted> 108 <Disabled>{{ health_check.disabled }}</Disabled> 109 <EnableSNI>{{ health_check.enable_sni }}</EnableSNI> 110 {% if health_check.search_string %} 111 <SearchString>{{ health_check.search_string }}</SearchString> 112 {% endif %} 113 {% if health_check.children %} 114 <ChildHealthChecks> 115 {% for child in health_check.children %} 116 <member>{{ child }}</member> 117 {% endfor %} 118 </ChildHealthChecks> 119 {% endif %} 120 </HealthCheckConfig> 121 <HealthCheckVersion>1</HealthCheckVersion> 122 </HealthCheck>""" 123 ) 124 return template.render(health_check=self) 125 126 127class RecordSet(CloudFormationModel): 128 def __init__(self, kwargs): 129 self.name = kwargs.get("Name") 130 self.type_ = kwargs.get("Type") 131 self.ttl = kwargs.get("TTL") 132 self.records = kwargs.get("ResourceRecords", []) 133 self.set_identifier = kwargs.get("SetIdentifier") 134 self.weight = kwargs.get("Weight") 135 self.region = kwargs.get("Region") 136 self.health_check = kwargs.get("HealthCheckId") 137 self.hosted_zone_name = kwargs.get("HostedZoneName") 138 self.hosted_zone_id = kwargs.get("HostedZoneId") 139 self.alias_target = kwargs.get("AliasTarget") 140 self.failover = kwargs.get("Failover") 141 self.geo_location = kwargs.get("GeoLocation") 142 143 @staticmethod 144 def cloudformation_name_type(): 145 return "Name" 146 147 @staticmethod 148 def cloudformation_type(): 149 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-recordset.html 150 return "AWS::Route53::RecordSet" 151 152 @classmethod 153 def create_from_cloudformation_json( 154 cls, resource_name, cloudformation_json, region_name, **kwargs 155 ): 156 properties = cloudformation_json["Properties"] 157 158 zone_name = properties.get("HostedZoneName") 159 if zone_name: 160 hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name) 161 else: 162 hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"]) 163 record_set = hosted_zone.add_rrset(properties) 164 return record_set 165 166 @classmethod 167 def update_from_cloudformation_json( 168 cls, original_resource, new_resource_name, cloudformation_json, region_name 169 ): 170 cls.delete_from_cloudformation_json( 171 original_resource.name, cloudformation_json, region_name 172 ) 173 return cls.create_from_cloudformation_json( 174 new_resource_name, cloudformation_json, region_name 175 ) 176 177 @classmethod 178 def delete_from_cloudformation_json( 179 cls, resource_name, cloudformation_json, region_name 180 ): 181 # this will break if you changed the zone the record is in, 182 # unfortunately 183 properties = cloudformation_json["Properties"] 184 185 zone_name = properties.get("HostedZoneName") 186 if zone_name: 187 hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name) 188 else: 189 hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"]) 190 191 try: 192 hosted_zone.delete_rrset({"Name": resource_name}) 193 except KeyError: 194 pass 195 196 @property 197 def physical_resource_id(self): 198 return self.name 199 200 def to_xml(self): 201 template = Template( 202 """<ResourceRecordSet> 203 <Name>{{ record_set.name }}</Name> 204 <Type>{{ record_set.type_ }}</Type> 205 {% if record_set.set_identifier %} 206 <SetIdentifier>{{ record_set.set_identifier }}</SetIdentifier> 207 {% endif %} 208 {% if record_set.weight %} 209 <Weight>{{ record_set.weight }}</Weight> 210 {% endif %} 211 {% if record_set.region %} 212 <Region>{{ record_set.region }}</Region> 213 {% endif %} 214 {% if record_set.ttl %} 215 <TTL>{{ record_set.ttl }}</TTL> 216 {% endif %} 217 {% if record_set.failover %} 218 <Failover>{{ record_set.failover }}</Failover> 219 {% endif %} 220 {% if record_set.geo_location %} 221 <GeoLocation> 222 {% for geo_key in ['ContinentCode','CountryCode','SubdivisionCode'] %} 223 {% if record_set.geo_location[geo_key] %}<{{ geo_key }}>{{ record_set.geo_location[geo_key] }}</{{ geo_key }}>{% endif %} 224 {% endfor %} 225 </GeoLocation> 226 {% endif %} 227 {% if record_set.alias_target %} 228 <AliasTarget> 229 <HostedZoneId>{{ record_set.alias_target['HostedZoneId'] }}</HostedZoneId> 230 <DNSName>{{ record_set.alias_target['DNSName'] }}</DNSName> 231 <EvaluateTargetHealth>{{ record_set.alias_target['EvaluateTargetHealth'] }}</EvaluateTargetHealth> 232 </AliasTarget> 233 {% else %} 234 <ResourceRecords> 235 {% for record in record_set.records %} 236 <ResourceRecord> 237 <Value>{{ record|e }}</Value> 238 </ResourceRecord> 239 {% endfor %} 240 </ResourceRecords> 241 {% endif %} 242 {% if record_set.health_check %} 243 <HealthCheckId>{{ record_set.health_check }}</HealthCheckId> 244 {% endif %} 245 </ResourceRecordSet>""" 246 ) 247 return template.render(record_set=self) 248 249 def delete(self, *args, **kwargs): 250 """Not exposed as part of the Route 53 API - used for CloudFormation. args are ignored""" 251 hosted_zone = route53_backend.get_hosted_zone_by_name(self.hosted_zone_name) 252 if not hosted_zone: 253 hosted_zone = route53_backend.get_hosted_zone(self.hosted_zone_id) 254 hosted_zone.delete_rrset({"Name": self.name, "Type": self.type_}) 255 256 257def reverse_domain_name(domain_name): 258 if domain_name.endswith("."): # normalize without trailing dot 259 domain_name = domain_name[:-1] 260 return ".".join(reversed(domain_name.split("."))) 261 262 263class FakeZone(CloudFormationModel): 264 def __init__(self, name, id_, private_zone, comment=None): 265 self.name = name 266 self.id = id_ 267 if comment is not None: 268 self.comment = comment 269 self.private_zone = private_zone 270 self.rrsets = [] 271 272 def add_rrset(self, record_set): 273 record_set = RecordSet(record_set) 274 self.rrsets.append(record_set) 275 return record_set 276 277 def upsert_rrset(self, record_set): 278 new_rrset = RecordSet(record_set) 279 for i, rrset in enumerate(self.rrsets): 280 if ( 281 rrset.name == new_rrset.name 282 and rrset.type_ == new_rrset.type_ 283 and rrset.set_identifier == new_rrset.set_identifier 284 ): 285 self.rrsets[i] = new_rrset 286 break 287 else: 288 self.rrsets.append(new_rrset) 289 return new_rrset 290 291 def delete_rrset(self, rrset): 292 self.rrsets = [ 293 record_set 294 for record_set in self.rrsets 295 if record_set.name != rrset["Name"] 296 or (rrset.get("Type") is not None and record_set.type_ != rrset["Type"]) 297 ] 298 299 def delete_rrset_by_id(self, set_identifier): 300 self.rrsets = [ 301 record_set 302 for record_set in self.rrsets 303 if record_set.set_identifier != set_identifier 304 ] 305 306 def get_record_sets(self, start_type, start_name): 307 def predicate(rrset): 308 rrset_name_reversed = reverse_domain_name(rrset.name) 309 start_name_reversed = reverse_domain_name(start_name) 310 return rrset_name_reversed < start_name_reversed or ( 311 rrset_name_reversed == start_name_reversed and rrset.type_ < start_type 312 ) 313 314 record_sets = sorted( 315 self.rrsets, 316 key=lambda rrset: (reverse_domain_name(rrset.name), rrset.type_), 317 ) 318 319 if start_name: 320 start_type = start_type or "" 321 record_sets = itertools.dropwhile(predicate, record_sets) 322 323 return record_sets 324 325 @property 326 def physical_resource_id(self): 327 return self.id 328 329 @staticmethod 330 def cloudformation_name_type(): 331 return "Name" 332 333 @staticmethod 334 def cloudformation_type(): 335 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-hostedzone.html 336 return "AWS::Route53::HostedZone" 337 338 @classmethod 339 def create_from_cloudformation_json( 340 cls, resource_name, cloudformation_json, region_name, **kwargs 341 ): 342 hosted_zone = route53_backend.create_hosted_zone( 343 resource_name, private_zone=False 344 ) 345 return hosted_zone 346 347 348class RecordSetGroup(CloudFormationModel): 349 def __init__(self, hosted_zone_id, record_sets): 350 self.hosted_zone_id = hosted_zone_id 351 self.record_sets = record_sets 352 353 @property 354 def physical_resource_id(self): 355 return f"arn:aws:route53:::hostedzone/{self.hosted_zone_id}" 356 357 @staticmethod 358 def cloudformation_name_type(): 359 return None 360 361 @staticmethod 362 def cloudformation_type(): 363 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-route53-recordsetgroup.html 364 return "AWS::Route53::RecordSetGroup" 365 366 @classmethod 367 def create_from_cloudformation_json( 368 cls, resource_name, cloudformation_json, region_name, **kwargs 369 ): 370 properties = cloudformation_json["Properties"] 371 372 zone_name = properties.get("HostedZoneName") 373 if zone_name: 374 hosted_zone = route53_backend.get_hosted_zone_by_name(zone_name) 375 else: 376 hosted_zone = route53_backend.get_hosted_zone(properties["HostedZoneId"]) 377 record_sets = properties["RecordSets"] 378 for record_set in record_sets: 379 hosted_zone.add_rrset(record_set) 380 381 record_set_group = RecordSetGroup(hosted_zone.id, record_sets) 382 return record_set_group 383 384 385class QueryLoggingConfig(BaseModel): 386 387 """QueryLoggingConfig class; this object isn't part of Cloudformation.""" 388 389 def __init__( 390 self, query_logging_config_id, hosted_zone_id, cloudwatch_logs_log_group_arn 391 ): 392 self.hosted_zone_id = hosted_zone_id 393 self.cloudwatch_logs_log_group_arn = cloudwatch_logs_log_group_arn 394 self.query_logging_config_id = query_logging_config_id 395 self.location = f"https://route53.amazonaws.com/2013-04-01/queryloggingconfig/{self.query_logging_config_id}" 396 397 def to_xml(self): 398 template = Template( 399 """<QueryLoggingConfig> 400 <CloudWatchLogsLogGroupArn>{{ query_logging_config.cloudwatch_logs_log_group_arn }}</CloudWatchLogsLogGroupArn> 401 <HostedZoneId>{{ query_logging_config.hosted_zone_id }}</HostedZoneId> 402 <Id>{{ query_logging_config.query_logging_config_id }}</Id> 403 </QueryLoggingConfig>""" 404 ) 405 # The "Location" value must be put into the header; that's done in 406 # responses.py. 407 return template.render(query_logging_config=self) 408 409 410class Route53Backend(BaseBackend): 411 def __init__(self): 412 self.zones = {} 413 self.health_checks = {} 414 self.resource_tags = defaultdict(dict) 415 self.query_logging_configs = {} 416 417 def create_hosted_zone(self, name, private_zone, comment=None): 418 new_id = create_route53_zone_id() 419 new_zone = FakeZone(name, new_id, private_zone=private_zone, comment=comment) 420 self.zones[new_id] = new_zone 421 return new_zone 422 423 def change_tags_for_resource(self, resource_id, tags): 424 if "Tag" in tags: 425 if isinstance(tags["Tag"], list): 426 for tag in tags["Tag"]: 427 self.resource_tags[resource_id][tag["Key"]] = tag["Value"] 428 else: 429 key, value = (tags["Tag"]["Key"], tags["Tag"]["Value"]) 430 self.resource_tags[resource_id][key] = value 431 else: 432 if "Key" in tags: 433 if isinstance(tags["Key"], list): 434 for key in tags["Key"]: 435 del self.resource_tags[resource_id][key] 436 else: 437 del self.resource_tags[resource_id][tags["Key"]] 438 439 def list_tags_for_resource(self, resource_id): 440 if resource_id in self.resource_tags: 441 return self.resource_tags[resource_id] 442 return {} 443 444 def change_resource_record_sets(self, the_zone, change_list): 445 for value in change_list: 446 action = value["Action"] 447 record_set = value["ResourceRecordSet"] 448 449 cleaned_record_name = record_set["Name"].strip(".") 450 cleaned_hosted_zone_name = the_zone.name.strip(".") 451 452 if not cleaned_record_name.endswith(cleaned_hosted_zone_name): 453 error_msg = f""" 454 An error occurred (InvalidChangeBatch) when calling the ChangeResourceRecordSets operation: 455 RRSet with DNS name {record_set["Name"]} is not permitted in zone {the_zone.name} 456 """ 457 return error_msg 458 459 if not record_set["Name"].endswith("."): 460 record_set["Name"] += "." 461 462 if action in ("CREATE", "UPSERT"): 463 if "ResourceRecords" in record_set: 464 resource_records = list(record_set["ResourceRecords"].values())[0] 465 if not isinstance(resource_records, list): 466 # Depending on how many records there are, this may 467 # or may not be a list 468 resource_records = [resource_records] 469 record_set["ResourceRecords"] = [ 470 x["Value"] for x in resource_records 471 ] 472 if action == "CREATE": 473 the_zone.add_rrset(record_set) 474 else: 475 the_zone.upsert_rrset(record_set) 476 elif action == "DELETE": 477 if "SetIdentifier" in record_set: 478 the_zone.delete_rrset_by_id(record_set["SetIdentifier"]) 479 else: 480 the_zone.delete_rrset(record_set) 481 return None 482 483 def list_hosted_zones(self): 484 return self.zones.values() 485 486 def list_hosted_zones_by_name(self, dnsname): 487 if dnsname: 488 dnsname = dnsname[0] 489 if dnsname[-1] != ".": 490 dnsname += "." 491 zones = [zone for zone in self.list_hosted_zones() if zone.name == dnsname] 492 else: 493 # sort by names, but with domain components reversed 494 # see http://boto3.readthedocs.io/en/latest/reference/services/route53.html#Route53.Client.list_hosted_zones_by_name 495 496 def sort_key(zone): 497 domains = zone.name.split(".") 498 if domains[-1] == "": 499 domains = domains[-1:] + domains[:-1] 500 return ".".join(reversed(domains)) 501 502 zones = self.list_hosted_zones() 503 zones = sorted(zones, key=sort_key) 504 return dnsname, zones 505 506 def get_hosted_zone(self, id_): 507 return self.zones.get(id_.replace("/hostedzone/", "")) 508 509 def get_hosted_zone_by_name(self, name): 510 for zone in self.list_hosted_zones(): 511 if zone.name == name: 512 return zone 513 return None 514 515 def delete_hosted_zone(self, id_): 516 return self.zones.pop(id_.replace("/hostedzone/", ""), None) 517 518 def create_health_check(self, caller_reference, health_check_args): 519 health_check_id = str(uuid.uuid4()) 520 health_check = HealthCheck(health_check_id, caller_reference, health_check_args) 521 self.health_checks[health_check_id] = health_check 522 return health_check 523 524 def list_health_checks(self): 525 return self.health_checks.values() 526 527 def delete_health_check(self, health_check_id): 528 return self.health_checks.pop(health_check_id, None) 529 530 @staticmethod 531 def _validate_arn(region, arn): 532 match = re.match(fr"arn:aws:logs:{region}:\d{{12}}:log-group:.+", arn) 533 if not arn or not match: 534 raise InvalidInput() 535 536 # The CloudWatch Logs log group must be in the "us-east-1" region. 537 match = re.match(r"^(?:[^:]+:){3}(?P<region>[^:]+).*", arn) 538 if match.group("region") != "us-east-1": 539 raise InvalidInput() 540 541 def create_query_logging_config(self, region, hosted_zone_id, log_group_arn): 542 """Process the create_query_logging_config request.""" 543 # Does the hosted_zone_id exist? 544 response = self.list_hosted_zones() 545 zones = list(response) if response else [] 546 for zone in zones: 547 if zone.id == hosted_zone_id: 548 break 549 else: 550 raise NoSuchHostedZone(hosted_zone_id) 551 552 # Ensure CloudWatch Logs log ARN is valid, otherwise raise an error. 553 self._validate_arn(region, log_group_arn) 554 555 # Note: boto3 checks the resource policy permissions before checking 556 # whether the log group exists. moto doesn't have a way of checking 557 # the resource policy, so in some instances moto will complain 558 # about a log group that doesn't exist whereas boto3 will complain 559 # that "The resource policy that you're using for Route 53 query 560 # logging doesn't grant Route 53 sufficient permission to create 561 # a log stream in the specified log group." 562 563 from moto.logs import logs_backends # pylint: disable=import-outside-toplevel 564 565 response = logs_backends[region].describe_log_groups() 566 log_groups = response[0] if response else [] 567 for entry in log_groups: 568 if log_group_arn == entry["arn"]: 569 break 570 else: 571 # There is no CloudWatch Logs log group with the specified ARN. 572 raise NoSuchCloudWatchLogsLogGroup() 573 574 # Verify there is no existing query log config using the same hosted 575 # zone. 576 for query_log in self.query_logging_configs.values(): 577 if query_log.hosted_zone_id == hosted_zone_id: 578 raise QueryLoggingConfigAlreadyExists() 579 580 # Create an instance of the query logging config. 581 query_logging_config_id = str(uuid.uuid4()) 582 query_logging_config = QueryLoggingConfig( 583 query_logging_config_id, hosted_zone_id, log_group_arn 584 ) 585 self.query_logging_configs[query_logging_config_id] = query_logging_config 586 return query_logging_config 587 588 def delete_query_logging_config(self, query_logging_config_id): 589 """Delete query logging config, if it exists.""" 590 if query_logging_config_id not in self.query_logging_configs: 591 raise NoSuchQueryLoggingConfig() 592 self.query_logging_configs.pop(query_logging_config_id) 593 594 def get_query_logging_config(self, query_logging_config_id): 595 """Return query logging config, if it exists.""" 596 if query_logging_config_id not in self.query_logging_configs: 597 raise NoSuchQueryLoggingConfig() 598 return self.query_logging_configs[query_logging_config_id] 599 600 @paginate(pagination_model=PAGINATION_MODEL) 601 def list_query_logging_configs( 602 self, hosted_zone_id=None, next_token=None, max_results=None, 603 ): # pylint: disable=unused-argument 604 """Return a list of query logging configs.""" 605 if hosted_zone_id: 606 # Does the hosted_zone_id exist? 607 response = self.list_hosted_zones() 608 zones = list(response) if response else [] 609 for zone in zones: 610 if zone.id == hosted_zone_id: 611 break 612 else: 613 raise NoSuchHostedZone(hosted_zone_id) 614 615 return list(self.query_logging_configs.values()) 616 617 618route53_backend = Route53Backend() 619