1from __future__ import absolute_import 2 3import random 4import string 5import re 6from copy import copy 7 8import requests 9import time 10 11from boto3.session import Session 12 13try: 14 from urlparse import urlparse 15except ImportError: 16 from urllib.parse import urlparse 17import responses 18from moto.core import ACCOUNT_ID, BaseBackend, BaseModel, CloudFormationModel 19from .utils import create_id, to_path 20from moto.core.utils import path_url 21from .exceptions import ( 22 ApiKeyNotFoundException, 23 UsagePlanNotFoundException, 24 AwsProxyNotAllowed, 25 CrossAccountNotAllowed, 26 IntegrationMethodNotDefined, 27 InvalidArn, 28 InvalidIntegrationArn, 29 InvalidHttpEndpoint, 30 InvalidResourcePathException, 31 AuthorizerNotFoundException, 32 StageNotFoundException, 33 RoleNotSpecified, 34 NoIntegrationDefined, 35 NoIntegrationResponseDefined, 36 NoMethodDefined, 37 ApiKeyAlreadyExists, 38 DomainNameNotFound, 39 InvalidDomainName, 40 InvalidRestApiId, 41 InvalidModelName, 42 RestAPINotFound, 43 RequestValidatorNotFound, 44 ModelNotFound, 45 ApiKeyValueMinLength, 46 InvalidBasePathException, 47 InvalidRestApiIdForBasePathMappingException, 48 InvalidStageException, 49 BasePathConflictException, 50 BasePathNotFoundException, 51) 52from ..core.models import responses_mock 53from moto.apigateway.exceptions import MethodNotFoundException 54 55STAGE_URL = "https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}" 56 57 58class Deployment(CloudFormationModel, dict): 59 def __init__(self, deployment_id, name, description=""): 60 super(Deployment, self).__init__() 61 self["id"] = deployment_id 62 self["stageName"] = name 63 self["description"] = description 64 self["createdDate"] = int(time.time()) 65 66 @staticmethod 67 def cloudformation_name_type(): 68 return "Deployment" 69 70 @staticmethod 71 def cloudformation_type(): 72 return "AWS::ApiGateway::Deployment" 73 74 @classmethod 75 def create_from_cloudformation_json( 76 cls, resource_name, cloudformation_json, region_name, **kwargs 77 ): 78 properties = cloudformation_json["Properties"] 79 rest_api_id = properties["RestApiId"] 80 name = properties["StageName"] 81 desc = properties.get("Description", "") 82 backend = apigateway_backends[region_name] 83 return backend.create_deployment( 84 function_id=rest_api_id, name=name, description=desc 85 ) 86 87 88class IntegrationResponse(BaseModel, dict): 89 def __init__( 90 self, 91 status_code, 92 selection_pattern=None, 93 response_templates=None, 94 content_handling=None, 95 ): 96 if response_templates is None: 97 # response_templates = {"application/json": None} # Note: removed for compatibility with TF 98 response_templates = {} 99 for key in response_templates.keys(): 100 response_templates[key] = ( 101 response_templates[key] or None 102 ) # required for compatibility with TF 103 self["responseTemplates"] = response_templates 104 self["statusCode"] = status_code 105 if selection_pattern: 106 self["selectionPattern"] = selection_pattern 107 if content_handling: 108 self["contentHandling"] = content_handling 109 110 111class Integration(BaseModel, dict): 112 def __init__( 113 self, 114 integration_type, 115 uri, 116 http_method, 117 request_templates=None, 118 tls_config=None, 119 cache_namespace=None, 120 ): 121 super(Integration, self).__init__() 122 self["type"] = integration_type 123 self["uri"] = uri 124 self["httpMethod"] = http_method 125 self["requestTemplates"] = request_templates 126 # self["integrationResponses"] = {"200": IntegrationResponse(200)} # commented out (tf-compat) 127 self[ 128 "integrationResponses" 129 ] = None # prevent json serialization from including them if none provided 130 self["tlsConfig"] = tls_config 131 self["cacheNamespace"] = cache_namespace 132 133 def create_integration_response( 134 self, status_code, selection_pattern, response_templates, content_handling 135 ): 136 if response_templates == {}: 137 response_templates = None 138 integration_response = IntegrationResponse( 139 status_code, selection_pattern, response_templates, content_handling 140 ) 141 if self.get("integrationResponses") is None: 142 self["integrationResponses"] = {} 143 self["integrationResponses"][status_code] = integration_response 144 return integration_response 145 146 def get_integration_response(self, status_code): 147 result = self.get("integrationResponses", {}).get(status_code) 148 if not result: 149 raise NoIntegrationResponseDefined(status_code) 150 return result 151 152 def delete_integration_response(self, status_code): 153 return self.get("integrationResponses", {}).pop(status_code, None) 154 155 156class MethodResponse(BaseModel, dict): 157 def __init__(self, status_code, response_models=None, response_parameters=None): 158 super(MethodResponse, self).__init__() 159 self["statusCode"] = status_code 160 self["responseModels"] = response_models 161 self["responseParameters"] = response_parameters 162 163 164class Method(CloudFormationModel, dict): 165 def __init__(self, method_type, authorization_type, **kwargs): 166 super(Method, self).__init__() 167 self.update( 168 dict( 169 httpMethod=method_type, 170 authorizationType=authorization_type, 171 authorizerId=kwargs.get("authorizer_id"), 172 authorizationScopes=kwargs.get("authorization_scopes"), 173 apiKeyRequired=kwargs.get("api_key_required") or False, 174 requestParameters=None, 175 requestModels=kwargs.get("request_models"), 176 methodIntegration=None, 177 operationName=kwargs.get("operation_name"), 178 requestValidatorId=kwargs.get("request_validator_id"), 179 ) 180 ) 181 self.method_responses = {} 182 183 @staticmethod 184 def cloudformation_name_type(): 185 return "Method" 186 187 @staticmethod 188 def cloudformation_type(): 189 return "AWS::ApiGateway::Method" 190 191 @classmethod 192 def create_from_cloudformation_json( 193 cls, resource_name, cloudformation_json, region_name, **kwargs 194 ): 195 properties = cloudformation_json["Properties"] 196 rest_api_id = properties["RestApiId"] 197 resource_id = properties["ResourceId"] 198 method_type = properties["HttpMethod"] 199 auth_type = properties["AuthorizationType"] 200 key_req = properties["ApiKeyRequired"] 201 backend = apigateway_backends[region_name] 202 m = backend.create_method( 203 function_id=rest_api_id, 204 resource_id=resource_id, 205 method_type=method_type, 206 authorization_type=auth_type, 207 api_key_required=key_req, 208 ) 209 int_method = properties["Integration"]["IntegrationHttpMethod"] 210 int_type = properties["Integration"]["Type"] 211 int_uri = properties["Integration"]["Uri"] 212 backend.create_integration( 213 function_id=rest_api_id, 214 resource_id=resource_id, 215 method_type=method_type, 216 integration_type=int_type, 217 uri=int_uri, 218 integration_method=int_method, 219 ) 220 return m 221 222 def create_response(self, response_code, response_models, response_parameters): 223 method_response = MethodResponse( 224 response_code, response_models, response_parameters 225 ) 226 self.method_responses[response_code] = method_response 227 return method_response 228 229 def get_response(self, response_code): 230 return self.method_responses.get(response_code) 231 232 def delete_response(self, response_code): 233 return self.method_responses.pop(response_code, None) 234 235 236class Resource(CloudFormationModel): 237 def __init__(self, id, region_name, api_id, path_part, parent_id): 238 super(Resource, self).__init__() 239 self.id = id 240 self.region_name = region_name 241 self.api_id = api_id 242 self.path_part = path_part 243 self.parent_id = parent_id 244 self.resource_methods = {} 245 246 def to_dict(self): 247 response = { 248 "path": self.get_path(), 249 "id": self.id, 250 } 251 if self.resource_methods: 252 response["resourceMethods"] = self.resource_methods 253 if self.parent_id: 254 response["parentId"] = self.parent_id 255 response["pathPart"] = self.path_part 256 return response 257 258 @property 259 def physical_resource_id(self): 260 return self.id 261 262 @staticmethod 263 def cloudformation_name_type(): 264 return "Resource" 265 266 @staticmethod 267 def cloudformation_type(): 268 return "AWS::ApiGateway::Resource" 269 270 @classmethod 271 def create_from_cloudformation_json( 272 cls, resource_name, cloudformation_json, region_name, **kwargs 273 ): 274 properties = cloudformation_json["Properties"] 275 api_id = properties["RestApiId"] 276 parent = properties["ParentId"] 277 path = properties["PathPart"] 278 279 backend = apigateway_backends[region_name] 280 if parent == api_id: 281 # A Root path (/) is automatically created. Any new paths should use this as their parent 282 resources = backend.list_resources(function_id=api_id) 283 root_id = [resource for resource in resources if resource.path_part == "/"][ 284 0 285 ].id 286 parent = root_id 287 return backend.create_resource( 288 function_id=api_id, parent_resource_id=parent, path_part=path 289 ) 290 291 def get_path(self): 292 return self.get_parent_path() + self.path_part 293 294 def get_parent_path(self): 295 if self.parent_id: 296 backend = apigateway_backends[self.region_name] 297 parent = backend.get_resource(self.api_id, self.parent_id) 298 parent_path = parent.get_path() 299 if parent_path != "/": # Root parent 300 parent_path += "/" 301 return parent_path 302 else: 303 return "" 304 305 def get_response(self, request): 306 integration = self.get_integration(request.method) 307 integration_type = integration["type"] 308 309 if integration_type == "HTTP": 310 uri = integration["uri"] 311 requests_func = getattr(requests, integration["httpMethod"].lower()) 312 response = requests_func(uri) 313 else: 314 raise NotImplementedError( 315 "The {0} type has not been implemented".format(integration_type) 316 ) 317 return response.status_code, response.text 318 319 def add_method( 320 self, 321 method_type, 322 authorization_type, 323 api_key_required, 324 request_models=None, 325 operation_name=None, 326 authorizer_id=None, 327 authorization_scopes=None, 328 request_validator_id=None, 329 ): 330 if authorization_scopes and not isinstance(authorization_scopes, list): 331 authorization_scopes = [authorization_scopes] 332 method = Method( 333 method_type=method_type, 334 authorization_type=authorization_type, 335 api_key_required=api_key_required, 336 request_models=request_models, 337 operation_name=operation_name, 338 authorizer_id=authorizer_id, 339 authorization_scopes=authorization_scopes, 340 request_validator_id=request_validator_id, 341 ) 342 self.resource_methods[method_type] = method 343 return method 344 345 def get_method(self, method_type): 346 method = self.resource_methods.get(method_type) 347 if not method: 348 raise MethodNotFoundException() 349 return method 350 351 def delete_method(self, method_type): 352 self.resource_methods.pop(method_type) 353 354 def add_integration( 355 self, 356 method_type, 357 integration_type, 358 uri, 359 request_templates=None, 360 integration_method=None, 361 tls_config=None, 362 cache_namespace=None, 363 ): 364 integration_method = integration_method or method_type 365 integration = Integration( 366 integration_type, 367 uri, 368 integration_method, 369 request_templates=request_templates, 370 tls_config=tls_config, 371 cache_namespace=cache_namespace, 372 ) 373 self.resource_methods[method_type]["methodIntegration"] = integration 374 return integration 375 376 def get_integration(self, method_type): 377 return self.resource_methods.get(method_type, {}).get("methodIntegration", {}) 378 379 def delete_integration(self, method_type): 380 return self.resource_methods[method_type].pop("methodIntegration") 381 382 383class Authorizer(BaseModel, dict): 384 def __init__(self, id, name, authorizer_type, **kwargs): 385 super(Authorizer, self).__init__() 386 self["id"] = id 387 self["name"] = name 388 self["type"] = authorizer_type 389 if kwargs.get("provider_arns"): 390 self["providerARNs"] = kwargs.get("provider_arns") 391 if kwargs.get("auth_type"): 392 self["authType"] = kwargs.get("auth_type") 393 if kwargs.get("authorizer_uri"): 394 self["authorizerUri"] = kwargs.get("authorizer_uri") 395 if kwargs.get("authorizer_credentials"): 396 self["authorizerCredentials"] = kwargs.get("authorizer_credentials") 397 if kwargs.get("identity_source"): 398 self["identitySource"] = kwargs.get("identity_source") 399 if kwargs.get("identity_validation_expression"): 400 self["identityValidationExpression"] = kwargs.get( 401 "identity_validation_expression" 402 ) 403 self["authorizerResultTtlInSeconds"] = kwargs.get("authorizer_result_ttl") 404 405 def apply_operations(self, patch_operations): 406 for op in patch_operations: 407 if "/authorizerUri" in op["path"]: 408 self["authorizerUri"] = op["value"] 409 elif "/authorizerCredentials" in op["path"]: 410 self["authorizerCredentials"] = op["value"] 411 elif "/authorizerResultTtlInSeconds" in op["path"]: 412 self["authorizerResultTtlInSeconds"] = int(op["value"]) 413 elif "/authType" in op["path"]: 414 self["authType"] = op["value"] 415 elif "/identitySource" in op["path"]: 416 self["identitySource"] = op["value"] 417 elif "/identityValidationExpression" in op["path"]: 418 self["identityValidationExpression"] = op["value"] 419 elif "/name" in op["path"]: 420 self["name"] = op["value"] 421 elif "/providerARNs" in op["path"]: 422 # TODO: add and remove 423 raise Exception('Patch operation for "%s" not implemented' % op["path"]) 424 elif "/type" in op["path"]: 425 self["type"] = op["value"] 426 else: 427 raise Exception('Patch operation "%s" not implemented' % op["op"]) 428 return self 429 430 431class Stage(BaseModel, dict): 432 def __init__( 433 self, 434 name=None, 435 deployment_id=None, 436 variables=None, 437 description="", 438 cacheClusterEnabled=False, 439 cacheClusterSize=None, 440 tags=None, 441 tracing_enabled=None, 442 ): 443 super(Stage, self).__init__() 444 if variables is None: 445 variables = {} 446 self["stageName"] = name 447 self["deploymentId"] = deployment_id 448 self["methodSettings"] = {} 449 self["variables"] = variables 450 self["description"] = description 451 self["cacheClusterEnabled"] = cacheClusterEnabled 452 if self["cacheClusterEnabled"]: 453 self["cacheClusterSize"] = str(0.5) 454 if cacheClusterSize is not None: 455 self["cacheClusterSize"] = str(cacheClusterSize) 456 if tags is not None: 457 self["tags"] = tags 458 if tracing_enabled is not None: 459 self["tracingEnabled"] = tracing_enabled 460 461 def apply_operations(self, patch_operations): 462 for op in patch_operations: 463 if "variables/" in op["path"]: 464 self._apply_operation_to_variables(op) 465 elif "/cacheClusterEnabled" in op["path"]: 466 self["cacheClusterEnabled"] = self._str2bool(op["value"]) 467 if "cacheClusterSize" not in self and self["cacheClusterEnabled"]: 468 self["cacheClusterSize"] = str(0.5) 469 elif "/cacheClusterSize" in op["path"]: 470 self["cacheClusterSize"] = str(float(op["value"])) 471 elif "/description" in op["path"]: 472 self["description"] = op["value"] 473 elif "/deploymentId" in op["path"]: 474 self["deploymentId"] = op["value"] 475 elif op["op"] == "replace": 476 # Method Settings drop into here 477 # (e.g., path could be '/*/*/logging/loglevel') 478 split_path = op["path"].split("/", 3) 479 if len(split_path) != 4: 480 continue 481 self._patch_method_setting( 482 "/".join(split_path[1:3]), split_path[3], op["value"] 483 ) 484 else: 485 raise Exception('Patch operation "%s" not implemented' % op["op"]) 486 return self 487 488 def _patch_method_setting(self, resource_path_and_method, key, value): 489 updated_key = self._method_settings_translations(key) 490 if updated_key is not None: 491 if resource_path_and_method not in self["methodSettings"]: 492 self["methodSettings"][ 493 resource_path_and_method 494 ] = self._get_default_method_settings() 495 self["methodSettings"][resource_path_and_method][ 496 updated_key 497 ] = self._convert_to_type(updated_key, value) 498 499 def _get_default_method_settings(self): 500 return { 501 "throttlingRateLimit": 1000.0, 502 "dataTraceEnabled": False, 503 "metricsEnabled": False, 504 "unauthorizedCacheControlHeaderStrategy": "SUCCEED_WITH_RESPONSE_HEADER", 505 "cacheTtlInSeconds": 300, 506 "cacheDataEncrypted": True, 507 "cachingEnabled": False, 508 "throttlingBurstLimit": 2000, 509 "requireAuthorizationForCacheControl": True, 510 } 511 512 def _method_settings_translations(self, key): 513 mappings = { 514 "metrics/enabled": "metricsEnabled", 515 "logging/loglevel": "loggingLevel", 516 "logging/dataTrace": "dataTraceEnabled", 517 "throttling/burstLimit": "throttlingBurstLimit", 518 "throttling/rateLimit": "throttlingRateLimit", 519 "caching/enabled": "cachingEnabled", 520 "caching/ttlInSeconds": "cacheTtlInSeconds", 521 "caching/dataEncrypted": "cacheDataEncrypted", 522 "caching/requireAuthorizationForCacheControl": "requireAuthorizationForCacheControl", 523 "caching/unauthorizedCacheControlHeaderStrategy": "unauthorizedCacheControlHeaderStrategy", 524 } 525 526 if key in mappings: 527 return mappings[key] 528 else: 529 None 530 531 def _str2bool(self, v): 532 return v.lower() == "true" 533 534 def _convert_to_type(self, key, val): 535 type_mappings = { 536 "metricsEnabled": "bool", 537 "loggingLevel": "str", 538 "dataTraceEnabled": "bool", 539 "throttlingBurstLimit": "int", 540 "throttlingRateLimit": "float", 541 "cachingEnabled": "bool", 542 "cacheTtlInSeconds": "int", 543 "cacheDataEncrypted": "bool", 544 "requireAuthorizationForCacheControl": "bool", 545 "unauthorizedCacheControlHeaderStrategy": "str", 546 } 547 548 if key in type_mappings: 549 type_value = type_mappings[key] 550 551 if type_value == "bool": 552 return self._str2bool(val) 553 elif type_value == "int": 554 return int(val) 555 elif type_value == "float": 556 return float(val) 557 else: 558 return str(val) 559 else: 560 return str(val) 561 562 def _apply_operation_to_variables(self, op): 563 key = op["path"][op["path"].rindex("variables/") + 10 :] 564 if op["op"] == "remove": 565 self["variables"].pop(key, None) 566 elif op["op"] == "replace": 567 self["variables"][key] = op["value"] 568 else: 569 raise Exception('Patch operation "%s" not implemented' % op["op"]) 570 571 572class ApiKey(BaseModel, dict): 573 def __init__( 574 self, 575 name=None, 576 description=None, 577 enabled=False, 578 generateDistinctId=False, 579 value=None, 580 stageKeys=[], 581 tags=None, 582 customerId=None, 583 ): 584 super(ApiKey, self).__init__() 585 self["id"] = create_id() 586 self["value"] = ( 587 value 588 if value 589 else "".join(random.sample(string.ascii_letters + string.digits, 40)) 590 ) 591 self["name"] = name 592 self["customerId"] = customerId 593 self["description"] = description 594 self["enabled"] = enabled 595 self["createdDate"] = self["lastUpdatedDate"] = int(time.time()) 596 self["stageKeys"] = stageKeys 597 self["tags"] = tags 598 599 def update_operations(self, patch_operations): 600 for op in patch_operations: 601 if op["op"] == "replace": 602 if "/name" in op["path"]: 603 self["name"] = op["value"] 604 elif "/customerId" in op["path"]: 605 self["customerId"] = op["value"] 606 elif "/description" in op["path"]: 607 self["description"] = op["value"] 608 elif "/enabled" in op["path"]: 609 self["enabled"] = self._str2bool(op["value"]) 610 else: 611 raise Exception('Patch operation "%s" not implemented' % op["op"]) 612 return self 613 614 def _str2bool(self, v): 615 return v.lower() == "true" 616 617 618class UsagePlan(BaseModel, dict): 619 def __init__( 620 self, 621 name=None, 622 description=None, 623 apiStages=None, 624 throttle=None, 625 quota=None, 626 productCode=None, 627 tags=None, 628 ): 629 super(UsagePlan, self).__init__() 630 self["id"] = create_id() 631 self["name"] = name 632 self["description"] = description 633 self["apiStages"] = apiStages if apiStages else [] 634 self["throttle"] = throttle 635 self["quota"] = quota 636 self["productCode"] = productCode 637 self["tags"] = tags 638 639 def apply_patch_operations(self, patch_operations): 640 for op in patch_operations: 641 path = op["path"] 642 value = op["value"] 643 if op["op"] == "replace": 644 if "/name" in path: 645 self["name"] = value 646 if "/productCode" in path: 647 self["productCode"] = value 648 if "/description" in path: 649 self["description"] = value 650 if "/quota/limit" in path: 651 self["quota"]["limit"] = value 652 if "/quota/period" in path: 653 self["quota"]["period"] = value 654 if "/throttle/rateLimit" in path: 655 self["throttle"]["rateLimit"] = value 656 if "/throttle/burstLimit" in path: 657 self["throttle"]["burstLimit"] = value 658 659 660class RequestValidator(BaseModel, dict): 661 PROP_ID = "id" 662 PROP_NAME = "name" 663 PROP_VALIDATE_REQUEST_BODY = "validateRequestBody" 664 PROP_VALIDATE_REQUEST_PARAMETERS = "validateRequestParameters" 665 666 # operations 667 OP_PATH = "path" 668 OP_VALUE = "value" 669 OP_REPLACE = "replace" 670 OP_OP = "op" 671 672 def __init__(self, id, name, validateRequestBody, validateRequestParameters): 673 super(RequestValidator, self).__init__() 674 self[RequestValidator.PROP_ID] = id 675 self[RequestValidator.PROP_NAME] = name 676 self[RequestValidator.PROP_VALIDATE_REQUEST_BODY] = validateRequestBody 677 self[ 678 RequestValidator.PROP_VALIDATE_REQUEST_PARAMETERS 679 ] = validateRequestParameters 680 681 def apply_patch_operations(self, operations): 682 for operation in operations: 683 path = operation[RequestValidator.OP_PATH] 684 value = operation[RequestValidator.OP_VALUE] 685 if operation[RequestValidator.OP_OP] == RequestValidator.OP_REPLACE: 686 if to_path(RequestValidator.PROP_NAME) in path: 687 self[RequestValidator.PROP_NAME] = value 688 if to_path(RequestValidator.PROP_VALIDATE_REQUEST_BODY) in path: 689 self[ 690 RequestValidator.PROP_VALIDATE_REQUEST_BODY 691 ] = value.lower() in ("true") 692 if to_path(RequestValidator.PROP_VALIDATE_REQUEST_PARAMETERS) in path: 693 self[ 694 RequestValidator.PROP_VALIDATE_REQUEST_PARAMETERS 695 ] = value.lower() in ("true") 696 697 def to_dict(self): 698 return { 699 "id": self["id"], 700 "name": self["name"], 701 "validateRequestBody": self["validateRequestBody"], 702 "validateRequestParameters": self["validateRequestParameters"], 703 } 704 705 706class UsagePlanKey(BaseModel, dict): 707 def __init__(self, id, type, name, value): 708 super(UsagePlanKey, self).__init__() 709 self["id"] = id 710 self["name"] = name 711 self["type"] = type 712 self["value"] = value 713 714 715class RestAPI(CloudFormationModel): 716 717 PROP_ID = "id" 718 PROP_NAME = "name" 719 PROP_DESCRIPTON = "description" 720 PROP_VERSION = "version" 721 PROP_BINARY_MEDIA_TYPES = "binaryMediaTypes" 722 PROP_CREATED_DATE = "createdDate" 723 PROP_API_KEY_SOURCE = "apiKeySource" 724 PROP_ENDPOINT_CONFIGURATION = "endpointConfiguration" 725 PROP_TAGS = "tags" 726 PROP_POLICY = "policy" 727 PROP_DISABLE_EXECUTE_API_ENDPOINT = "disableExecuteApiEndpoint" 728 PROP_MINIMUM_COMPRESSION_SIZE = "minimumCompressionSize" 729 730 # operations 731 OPERATION_ADD = "add" 732 OPERATION_REPLACE = "replace" 733 OPERATION_REMOVE = "remove" 734 OPERATION_PATH = "path" 735 OPERATION_VALUE = "value" 736 OPERATION_OP = "op" 737 738 def __init__(self, id, region_name, name, description, **kwargs): 739 super(RestAPI, self).__init__() 740 self.id = id 741 self.region_name = region_name 742 self.name = name 743 self.description = description 744 self.version = kwargs.get(RestAPI.PROP_VERSION) or "V1" 745 self.binaryMediaTypes = kwargs.get(RestAPI.PROP_BINARY_MEDIA_TYPES) or [] 746 self.create_date = int(time.time()) 747 self.api_key_source = kwargs.get("api_key_source") or "HEADER" 748 self.policy = kwargs.get(RestAPI.PROP_POLICY) or None 749 self.endpoint_configuration = kwargs.get("endpoint_configuration") or { 750 "types": ["EDGE"] 751 } 752 self.tags = kwargs.get(RestAPI.PROP_TAGS) or {} 753 self.disableExecuteApiEndpoint = ( 754 kwargs.get(RestAPI.PROP_DISABLE_EXECUTE_API_ENDPOINT) or False 755 ) 756 self.minimum_compression_size = kwargs.get("minimum_compression_size") 757 self.deployments = {} 758 self.authorizers = {} 759 self.stages = {} 760 self.resources = {} 761 self.models = {} 762 self.request_validators = {} 763 self.add_child("/") # Add default child 764 765 def __repr__(self): 766 return str(self.id) 767 768 def to_dict(self): 769 return { 770 self.PROP_ID: self.id, 771 self.PROP_NAME: self.name, 772 self.PROP_DESCRIPTON: self.description, 773 self.PROP_VERSION: self.version, 774 self.PROP_BINARY_MEDIA_TYPES: self.binaryMediaTypes, 775 self.PROP_CREATED_DATE: self.create_date, 776 self.PROP_API_KEY_SOURCE: self.api_key_source, 777 self.PROP_ENDPOINT_CONFIGURATION: self.endpoint_configuration, 778 self.PROP_TAGS: self.tags, 779 self.PROP_POLICY: self.policy, 780 self.PROP_DISABLE_EXECUTE_API_ENDPOINT: self.disableExecuteApiEndpoint, 781 self.PROP_MINIMUM_COMPRESSION_SIZE: self.minimum_compression_size, 782 } 783 784 def apply_patch_operations(self, patch_operations): 785 def to_path(prop): 786 return "/" + prop 787 788 for op in patch_operations: 789 path = op[self.OPERATION_PATH] 790 value = "" 791 if self.OPERATION_VALUE in op: 792 value = op[self.OPERATION_VALUE] 793 operaton = op[self.OPERATION_OP] 794 if operaton == self.OPERATION_REPLACE: 795 if to_path(self.PROP_NAME) in path: 796 self.name = value 797 if to_path(self.PROP_DESCRIPTON) in path: 798 self.description = value 799 if to_path(self.PROP_API_KEY_SOURCE) in path: 800 self.api_key_source = value 801 if to_path(self.PROP_BINARY_MEDIA_TYPES) in path: 802 self.binaryMediaTypes = [value] 803 if to_path(self.PROP_DISABLE_EXECUTE_API_ENDPOINT) in path: 804 self.disableExecuteApiEndpoint = bool(value) 805 elif operaton == self.OPERATION_ADD: 806 if to_path(self.PROP_BINARY_MEDIA_TYPES) in path: 807 self.binaryMediaTypes.append(value) 808 elif operaton == self.OPERATION_REMOVE: 809 if to_path(self.PROP_BINARY_MEDIA_TYPES) in path: 810 self.binaryMediaTypes.remove(value) 811 if to_path(self.PROP_DESCRIPTON) in path: 812 self.description = "" 813 814 @classmethod 815 def has_cfn_attr(cls, attribute): 816 return attribute in ["RootResourceId"] 817 818 def get_cfn_attribute(self, attribute_name): 819 from moto.cloudformation.exceptions import UnformattedGetAttTemplateException 820 821 if attribute_name == "RootResourceId": 822 for res_id, res_obj in self.resources.items(): 823 if res_obj.path_part == "/" and not res_obj.parent_id: 824 return res_id 825 raise Exception("Unable to find root resource for API %s" % self) 826 raise UnformattedGetAttTemplateException() 827 828 @property 829 def physical_resource_id(self): 830 return self.id 831 832 @staticmethod 833 def cloudformation_name_type(): 834 return "RestApi" 835 836 @staticmethod 837 def cloudformation_type(): 838 return "AWS::ApiGateway::RestApi" 839 840 @classmethod 841 def create_from_cloudformation_json( 842 cls, resource_name, cloudformation_json, region_name, **kwargs 843 ): 844 properties = cloudformation_json["Properties"] 845 name = properties["Name"] 846 desc = properties.get("Description", "") 847 config = properties.get("EndpointConfiguration", None) 848 backend = apigateway_backends[region_name] 849 return backend.create_rest_api( 850 name=name, description=desc, endpoint_configuration=config 851 ) 852 853 def add_child(self, path, parent_id=None): 854 child_id = create_id() 855 child = Resource( 856 id=child_id, 857 region_name=self.region_name, 858 api_id=self.id, 859 path_part=path, 860 parent_id=parent_id, 861 ) 862 self.resources[child_id] = child 863 return child 864 865 def add_model( 866 self, 867 name, 868 description=None, 869 schema=None, 870 content_type=None, 871 cli_input_json=None, 872 generate_cli_skeleton=None, 873 ): 874 model_id = create_id() 875 new_model = Model( 876 id=model_id, 877 name=name, 878 description=description, 879 schema=schema, 880 content_type=content_type, 881 cli_input_json=cli_input_json, 882 generate_cli_skeleton=generate_cli_skeleton, 883 ) 884 885 self.models[name] = new_model 886 return new_model 887 888 def get_resource_for_path(self, path_after_stage_name): 889 for resource in self.resources.values(): 890 if resource.get_path() == path_after_stage_name: 891 return resource 892 # TODO deal with no matching resource 893 894 def resource_callback(self, request): 895 path = path_url(request.url) 896 path_after_stage_name = "/".join(path.split("/")[2:]) 897 if not path_after_stage_name: 898 path_after_stage_name = "/" 899 900 resource = self.get_resource_for_path(path_after_stage_name) 901 status_code, response = resource.get_response(request) 902 return status_code, {}, response 903 904 def update_integration_mocks(self, stage_name): 905 stage_url_lower = STAGE_URL.format( 906 api_id=self.id.lower(), region_name=self.region_name, stage_name=stage_name 907 ) 908 stage_url_upper = STAGE_URL.format( 909 api_id=self.id.upper(), region_name=self.region_name, stage_name=stage_name 910 ) 911 912 for url in [stage_url_lower, stage_url_upper]: 913 responses_mock._matches.insert( 914 0, 915 responses.CallbackResponse( 916 url=url, 917 method=responses.GET, 918 callback=self.resource_callback, 919 content_type="text/plain", 920 match_querystring=False, 921 ), 922 ) 923 924 def create_authorizer( 925 self, 926 id, 927 name, 928 authorizer_type, 929 provider_arns=None, 930 auth_type=None, 931 authorizer_uri=None, 932 authorizer_credentials=None, 933 identity_source=None, 934 identiy_validation_expression=None, 935 authorizer_result_ttl=None, 936 ): 937 authorizer = Authorizer( 938 id=id, 939 name=name, 940 authorizer_type=authorizer_type, 941 provider_arns=provider_arns, 942 auth_type=auth_type, 943 authorizer_uri=authorizer_uri, 944 authorizer_credentials=authorizer_credentials, 945 identity_source=identity_source, 946 identiy_validation_expression=identiy_validation_expression, 947 authorizer_result_ttl=authorizer_result_ttl, 948 ) 949 self.authorizers[id] = authorizer 950 return authorizer 951 952 def create_stage( 953 self, 954 name, 955 deployment_id, 956 variables=None, 957 description="", 958 cacheClusterEnabled=None, 959 cacheClusterSize=None, 960 tags=None, 961 tracing_enabled=None, 962 ): 963 if variables is None: 964 variables = {} 965 stage = Stage( 966 name=name, 967 deployment_id=deployment_id, 968 variables=variables, 969 description=description, 970 cacheClusterSize=cacheClusterSize, 971 cacheClusterEnabled=cacheClusterEnabled, 972 tags=tags, 973 tracing_enabled=tracing_enabled, 974 ) 975 self.stages[name] = stage 976 self.update_integration_mocks(name) 977 return stage 978 979 def create_deployment(self, name, description="", stage_variables=None): 980 if stage_variables is None: 981 stage_variables = {} 982 deployment_id = create_id() 983 deployment = Deployment(deployment_id, name, description) 984 self.deployments[deployment_id] = deployment 985 self.stages[name] = Stage( 986 name=name, deployment_id=deployment_id, variables=stage_variables 987 ) 988 self.update_integration_mocks(name) 989 990 return deployment 991 992 def get_deployment(self, deployment_id): 993 return self.deployments[deployment_id] 994 995 def get_authorizers(self): 996 return list(self.authorizers.values()) 997 998 def get_stages(self): 999 return list(self.stages.values()) 1000 1001 def get_deployments(self): 1002 return list(self.deployments.values()) 1003 1004 def delete_deployment(self, deployment_id): 1005 return self.deployments.pop(deployment_id) 1006 1007 def create_request_validator( 1008 self, name, validateRequestBody, validateRequestParameters 1009 ): 1010 validator_id = create_id() 1011 request_validator = RequestValidator( 1012 id=validator_id, 1013 name=name, 1014 validateRequestBody=validateRequestBody, 1015 validateRequestParameters=validateRequestParameters, 1016 ) 1017 self.request_validators[validator_id] = request_validator 1018 return request_validator 1019 1020 def get_request_validators(self): 1021 return list(self.request_validators.values()) 1022 1023 def get_request_validator(self, validator_id): 1024 reqeust_validator = self.request_validators.get(validator_id) 1025 if reqeust_validator is None: 1026 raise RequestValidatorNotFound() 1027 return reqeust_validator 1028 1029 def delete_request_validator(self, validator_id): 1030 reqeust_validator = self.request_validators.pop(validator_id) 1031 return reqeust_validator 1032 1033 def update_request_validator(self, validator_id, patch_operations): 1034 self.request_validators[validator_id].apply_patch_operations(patch_operations) 1035 return self.request_validators[validator_id] 1036 1037 1038class DomainName(BaseModel, dict): 1039 def __init__(self, domain_name, **kwargs): 1040 super(DomainName, self).__init__() 1041 self["domainName"] = domain_name 1042 self["regionalDomainName"] = "d-%s.execute-api.%s.amazonaws.com" % ( 1043 create_id(), 1044 kwargs.get("region_name") or "us-east-1", 1045 ) 1046 self["distributionDomainName"] = "d%s.cloudfront.net" % create_id() 1047 self["domainNameStatus"] = "AVAILABLE" 1048 self["domainNameStatusMessage"] = "Domain Name Available" 1049 self["regionalHostedZoneId"] = "Z2FDTNDATAQYW2" 1050 self["distributionHostedZoneId"] = "Z2FDTNDATAQYW2" 1051 self["certificateUploadDate"] = int(time.time()) 1052 if kwargs.get("certificate_name"): 1053 self["certificateName"] = kwargs.get("certificate_name") 1054 if kwargs.get("certificate_arn"): 1055 self["certificateArn"] = kwargs.get("certificate_arn") 1056 if kwargs.get("certificate_body"): 1057 self["certificateBody"] = kwargs.get("certificate_body") 1058 if kwargs.get("tags"): 1059 self["tags"] = kwargs.get("tags") 1060 if kwargs.get("security_policy"): 1061 self["securityPolicy"] = kwargs.get("security_policy") 1062 if kwargs.get("certificate_chain"): 1063 self["certificateChain"] = kwargs.get("certificate_chain") 1064 if kwargs.get("regional_certificate_name"): 1065 self["regionalCertificateName"] = kwargs.get("regional_certificate_name") 1066 if kwargs.get("certificate_private_key"): 1067 self["certificatePrivateKey"] = kwargs.get("certificate_private_key") 1068 if kwargs.get("regional_certificate_arn"): 1069 self["regionalCertificateArn"] = kwargs.get("regional_certificate_arn") 1070 if kwargs.get("endpoint_configuration"): 1071 self["endpointConfiguration"] = kwargs.get("endpoint_configuration") 1072 if kwargs.get("generate_cli_skeleton"): 1073 self["generateCliSkeleton"] = kwargs.get("generate_cli_skeleton") 1074 1075 1076class Model(BaseModel, dict): 1077 def __init__(self, id, name, **kwargs): 1078 super(Model, self).__init__() 1079 self["id"] = id 1080 self["name"] = name 1081 if kwargs.get("description"): 1082 self["description"] = kwargs.get("description") 1083 if kwargs.get("schema"): 1084 self["schema"] = kwargs.get("schema") 1085 if kwargs.get("content_type"): 1086 self["contentType"] = kwargs.get("content_type") 1087 if kwargs.get("cli_input_json"): 1088 self["cliInputJson"] = kwargs.get("cli_input_json") 1089 if kwargs.get("generate_cli_skeleton"): 1090 self["generateCliSkeleton"] = kwargs.get("generate_cli_skeleton") 1091 1092 1093class BasePathMapping(BaseModel, dict): 1094 def __init__(self, domain_name, rest_api_id, **kwargs): 1095 super(BasePathMapping, self).__init__() 1096 self["domain_name"] = domain_name 1097 self["restApiId"] = rest_api_id 1098 if kwargs.get("basePath"): 1099 self["basePath"] = kwargs.get("basePath") 1100 else: 1101 self["basePath"] = "(none)" 1102 1103 if kwargs.get("stage"): 1104 self["stage"] = kwargs.get("stage") 1105 1106 1107class APIGatewayBackend(BaseBackend): 1108 def __init__(self, region_name): 1109 super(APIGatewayBackend, self).__init__() 1110 self.apis = {} 1111 self.keys = {} 1112 self.usage_plans = {} 1113 self.usage_plan_keys = {} 1114 self.domain_names = {} 1115 self.models = {} 1116 self.region_name = region_name 1117 self.base_path_mappings = {} 1118 1119 def reset(self): 1120 region_name = self.region_name 1121 self.__dict__ = {} 1122 self.__init__(region_name) 1123 1124 def create_rest_api( 1125 self, 1126 name, 1127 description, 1128 api_key_source=None, 1129 endpoint_configuration=None, 1130 tags=None, 1131 policy=None, 1132 minimum_compression_size=None, 1133 ): 1134 api_id = create_id() 1135 rest_api = RestAPI( 1136 api_id, 1137 self.region_name, 1138 name, 1139 description, 1140 api_key_source=api_key_source, 1141 endpoint_configuration=endpoint_configuration, 1142 tags=tags, 1143 policy=policy, 1144 minimum_compression_size=minimum_compression_size, 1145 ) 1146 self.apis[api_id] = rest_api 1147 return rest_api 1148 1149 def get_rest_api(self, function_id): 1150 rest_api = self.apis.get(function_id) 1151 if rest_api is None: 1152 raise RestAPINotFound() 1153 return rest_api 1154 1155 def update_rest_api(self, function_id, patch_operations): 1156 rest_api = self.apis.get(function_id) 1157 if rest_api is None: 1158 raise RestAPINotFound() 1159 self.apis[function_id].apply_patch_operations(patch_operations) 1160 return self.apis[function_id] 1161 1162 def list_apis(self): 1163 return self.apis.values() 1164 1165 def delete_rest_api(self, function_id): 1166 rest_api = self.apis.pop(function_id) 1167 return rest_api 1168 1169 def list_resources(self, function_id): 1170 api = self.get_rest_api(function_id) 1171 return api.resources.values() 1172 1173 def get_resource(self, function_id, resource_id): 1174 api = self.get_rest_api(function_id) 1175 resource = api.resources[resource_id] 1176 return resource 1177 1178 def create_resource(self, function_id, parent_resource_id, path_part): 1179 if not re.match("^\\{?[a-zA-Z0-9._-]+\\+?\\}?$", path_part): 1180 raise InvalidResourcePathException() 1181 api = self.get_rest_api(function_id) 1182 child = api.add_child(path=path_part, parent_id=parent_resource_id) 1183 return child 1184 1185 def delete_resource(self, function_id, resource_id): 1186 api = self.get_rest_api(function_id) 1187 resource = api.resources.pop(resource_id) 1188 return resource 1189 1190 def get_method(self, function_id, resource_id, method_type): 1191 resource = self.get_resource(function_id, resource_id) 1192 return resource.get_method(method_type) 1193 1194 def create_method( 1195 self, 1196 function_id, 1197 resource_id, 1198 method_type, 1199 authorization_type, 1200 api_key_required=None, 1201 request_models=None, 1202 operation_name=None, 1203 authorizer_id=None, 1204 authorization_scopes=None, 1205 request_validator_id=None, 1206 ): 1207 resource = self.get_resource(function_id, resource_id) 1208 method = resource.add_method( 1209 method_type, 1210 authorization_type, 1211 api_key_required=api_key_required, 1212 request_models=request_models, 1213 operation_name=operation_name, 1214 authorizer_id=authorizer_id, 1215 authorization_scopes=authorization_scopes, 1216 request_validator_id=request_validator_id, 1217 ) 1218 return method 1219 1220 def update_method(self, function_id, resource_id, method_type, patch_operations): 1221 resource = self.get_resource(function_id, resource_id) 1222 method = resource.get_method(method_type) 1223 return method.apply_operations(patch_operations) 1224 1225 def delete_method(self, function_id, resource_id, method_type): 1226 resource = self.get_resource(function_id, resource_id) 1227 resource.delete_method(method_type) 1228 1229 def get_authorizer(self, restapi_id, authorizer_id): 1230 api = self.get_rest_api(restapi_id) 1231 authorizer = api.authorizers.get(authorizer_id) 1232 if authorizer is None: 1233 raise AuthorizerNotFoundException() 1234 else: 1235 return authorizer 1236 1237 def get_authorizers(self, restapi_id): 1238 api = self.get_rest_api(restapi_id) 1239 return api.get_authorizers() 1240 1241 def create_authorizer(self, restapi_id, name, authorizer_type, **kwargs): 1242 api = self.get_rest_api(restapi_id) 1243 authorizer_id = create_id() 1244 authorizer = api.create_authorizer( 1245 authorizer_id, 1246 name, 1247 authorizer_type, 1248 provider_arns=kwargs.get("provider_arns"), 1249 auth_type=kwargs.get("auth_type"), 1250 authorizer_uri=kwargs.get("authorizer_uri"), 1251 authorizer_credentials=kwargs.get("authorizer_credentials"), 1252 identity_source=kwargs.get("identity_source"), 1253 identiy_validation_expression=kwargs.get("identiy_validation_expression"), 1254 authorizer_result_ttl=kwargs.get("authorizer_result_ttl"), 1255 ) 1256 return api.authorizers.get(authorizer["id"]) 1257 1258 def update_authorizer(self, restapi_id, authorizer_id, patch_operations): 1259 authorizer = self.get_authorizer(restapi_id, authorizer_id) 1260 if not authorizer: 1261 api = self.get_rest_api(restapi_id) 1262 authorizer = api.authorizers[authorizer_id] = Authorizer() 1263 return authorizer.apply_operations(patch_operations) 1264 1265 def delete_authorizer(self, restapi_id, authorizer_id): 1266 api = self.get_rest_api(restapi_id) 1267 del api.authorizers[authorizer_id] 1268 1269 def get_stage(self, function_id, stage_name): 1270 api = self.get_rest_api(function_id) 1271 stage = api.stages.get(stage_name) 1272 if stage is None: 1273 raise StageNotFoundException() 1274 return stage 1275 1276 def get_stages(self, function_id): 1277 api = self.get_rest_api(function_id) 1278 return api.get_stages() 1279 1280 def create_stage( 1281 self, 1282 function_id, 1283 stage_name, 1284 deploymentId, 1285 variables=None, 1286 description="", 1287 cacheClusterEnabled=None, 1288 cacheClusterSize=None, 1289 tags=None, 1290 tracing_enabled=None, 1291 ): 1292 if variables is None: 1293 variables = {} 1294 api = self.get_rest_api(function_id) 1295 api.create_stage( 1296 stage_name, 1297 deploymentId, 1298 variables=variables, 1299 description=description, 1300 cacheClusterEnabled=cacheClusterEnabled, 1301 cacheClusterSize=cacheClusterSize, 1302 tags=tags, 1303 tracing_enabled=tracing_enabled, 1304 ) 1305 return api.stages.get(stage_name) 1306 1307 def update_stage(self, function_id, stage_name, patch_operations): 1308 stage = self.get_stage(function_id, stage_name) 1309 if not stage: 1310 api = self.get_rest_api(function_id) 1311 stage = api.stages[stage_name] = Stage() 1312 return stage.apply_operations(patch_operations) 1313 1314 def delete_stage(self, function_id, stage_name): 1315 api = self.get_rest_api(function_id) 1316 deleted = api.stages.pop(stage_name, None) 1317 if not deleted: 1318 raise StageNotFoundException() 1319 1320 def get_method_response(self, function_id, resource_id, method_type, response_code): 1321 method = self.get_method(function_id, resource_id, method_type) 1322 method_response = method.get_response(response_code) 1323 return method_response 1324 1325 def create_method_response( 1326 self, 1327 function_id, 1328 resource_id, 1329 method_type, 1330 response_code, 1331 response_models, 1332 response_parameters, 1333 ): 1334 method = self.get_method(function_id, resource_id, method_type) 1335 method_response = method.create_response( 1336 response_code, response_models, response_parameters 1337 ) 1338 return method_response 1339 1340 def update_method_response( 1341 self, function_id, resource_id, method_type, response_code, patch_operations 1342 ): 1343 method = self.get_method(function_id, resource_id, method_type) 1344 method_response = method.get_response(response_code) 1345 method_response.apply_operations(patch_operations) 1346 return method_response 1347 1348 def delete_method_response( 1349 self, function_id, resource_id, method_type, response_code 1350 ): 1351 method = self.get_method(function_id, resource_id, method_type) 1352 method_response = method.delete_response(response_code) 1353 return method_response 1354 1355 def create_integration( 1356 self, 1357 function_id, 1358 resource_id, 1359 method_type, 1360 integration_type, 1361 uri, 1362 integration_method=None, 1363 credentials=None, 1364 request_templates=None, 1365 tls_config=None, 1366 cache_namespace=None, 1367 ): 1368 resource = self.get_resource(function_id, resource_id) 1369 if credentials and not re.match( 1370 "^arn:aws:iam::" + str(ACCOUNT_ID), credentials 1371 ): 1372 raise CrossAccountNotAllowed() 1373 if not integration_method and integration_type in [ 1374 "HTTP", 1375 "HTTP_PROXY", 1376 "AWS", 1377 "AWS_PROXY", 1378 ]: 1379 raise IntegrationMethodNotDefined() 1380 if integration_type in ["AWS_PROXY"] and re.match( 1381 "^arn:aws:apigateway:[a-zA-Z0-9-]+:s3", uri 1382 ): 1383 raise AwsProxyNotAllowed() 1384 if ( 1385 integration_type in ["AWS"] 1386 and re.match("^arn:aws:apigateway:[a-zA-Z0-9-]+:s3", uri) 1387 and not credentials 1388 ): 1389 raise RoleNotSpecified() 1390 if integration_type in ["HTTP", "HTTP_PROXY"] and not self._uri_validator(uri): 1391 raise InvalidHttpEndpoint() 1392 if integration_type in ["AWS", "AWS_PROXY"] and not re.match("^arn:aws:", uri): 1393 raise InvalidArn() 1394 if integration_type in ["AWS", "AWS_PROXY"] and not re.match( 1395 "^arn:aws:apigateway:[a-zA-Z0-9-]+:[a-zA-Z0-9-]+:(path|action)/", uri 1396 ): 1397 raise InvalidIntegrationArn() 1398 integration = resource.add_integration( 1399 method_type, 1400 integration_type, 1401 uri, 1402 integration_method=integration_method, 1403 request_templates=request_templates, 1404 tls_config=tls_config, 1405 cache_namespace=cache_namespace, 1406 ) 1407 return integration 1408 1409 def get_integration(self, function_id, resource_id, method_type): 1410 resource = self.get_resource(function_id, resource_id) 1411 return resource.get_integration(method_type) 1412 1413 def delete_integration(self, function_id, resource_id, method_type): 1414 resource = self.get_resource(function_id, resource_id) 1415 return resource.delete_integration(method_type) 1416 1417 def create_integration_response( 1418 self, 1419 function_id, 1420 resource_id, 1421 method_type, 1422 status_code, 1423 selection_pattern, 1424 response_templates, 1425 content_handling, 1426 ): 1427 integration = self.get_integration(function_id, resource_id, method_type) 1428 integration_response = integration.create_integration_response( 1429 status_code, selection_pattern, response_templates, content_handling 1430 ) 1431 return integration_response 1432 1433 def get_integration_response( 1434 self, function_id, resource_id, method_type, status_code 1435 ): 1436 integration = self.get_integration(function_id, resource_id, method_type) 1437 integration_response = integration.get_integration_response(status_code) 1438 return integration_response 1439 1440 def delete_integration_response( 1441 self, function_id, resource_id, method_type, status_code 1442 ): 1443 integration = self.get_integration(function_id, resource_id, method_type) 1444 integration_response = integration.delete_integration_response(status_code) 1445 return integration_response 1446 1447 def create_deployment( 1448 self, function_id, name, description="", stage_variables=None 1449 ): 1450 if stage_variables is None: 1451 stage_variables = {} 1452 api = self.get_rest_api(function_id) 1453 methods = [ 1454 list(res.resource_methods.values()) 1455 for res in self.list_resources(function_id) 1456 ] 1457 methods = [m for sublist in methods for m in sublist] 1458 if not any(methods): 1459 raise NoMethodDefined() 1460 method_integrations = [ 1461 method["methodIntegration"] if "methodIntegration" in method else None 1462 for method in methods 1463 ] 1464 if not any(method_integrations): 1465 raise NoIntegrationDefined() 1466 deployment = api.create_deployment(name, description, stage_variables) 1467 return deployment 1468 1469 def get_deployment(self, function_id, deployment_id): 1470 api = self.get_rest_api(function_id) 1471 return api.get_deployment(deployment_id) 1472 1473 def get_deployments(self, function_id): 1474 api = self.get_rest_api(function_id) 1475 return api.get_deployments() 1476 1477 def delete_deployment(self, function_id, deployment_id): 1478 api = self.get_rest_api(function_id) 1479 return api.delete_deployment(deployment_id) 1480 1481 def create_api_key(self, payload): 1482 if payload.get("value"): 1483 if len(payload.get("value", [])) < 20: 1484 raise ApiKeyValueMinLength() 1485 for api_key in self.get_api_keys(include_values=True): 1486 if api_key.get("value") == payload["value"]: 1487 raise ApiKeyAlreadyExists() 1488 key = ApiKey(**payload) 1489 self.keys[key["id"]] = key 1490 return key 1491 1492 def get_api_keys(self, include_values=False): 1493 api_keys = list(self.keys.values()) 1494 1495 if not include_values: 1496 keys = [] 1497 for api_key in list(self.keys.values()): 1498 new_key = copy(api_key) 1499 del new_key["value"] 1500 keys.append(new_key) 1501 api_keys = keys 1502 1503 return api_keys 1504 1505 def get_api_key(self, api_key_id, include_value=False): 1506 api_key = self.keys.get(api_key_id) 1507 if not api_key: 1508 raise ApiKeyNotFoundException() 1509 1510 if not include_value: 1511 new_key = copy(api_key) 1512 del new_key["value"] 1513 api_key = new_key 1514 1515 return api_key 1516 1517 def update_api_key(self, api_key_id, patch_operations): 1518 key = self.keys[api_key_id] 1519 return key.update_operations(patch_operations) 1520 1521 def delete_api_key(self, api_key_id): 1522 self.keys.pop(api_key_id) 1523 return {} 1524 1525 def create_usage_plan(self, payload): 1526 plan = UsagePlan(**payload) 1527 self.usage_plans[plan["id"]] = plan 1528 return plan 1529 1530 def get_usage_plans(self, api_key_id=None): 1531 plans = list(self.usage_plans.values()) 1532 if api_key_id is not None: 1533 plans = [ 1534 plan 1535 for plan in plans 1536 if self.usage_plan_keys.get(plan["id"], {}).get(api_key_id, False) 1537 ] 1538 return plans 1539 1540 def get_usage_plan(self, usage_plan_id): 1541 if usage_plan_id not in self.usage_plans: 1542 raise UsagePlanNotFoundException() 1543 return self.usage_plans[usage_plan_id] 1544 1545 def update_usage_plan(self, usage_plan_id, patch_operations): 1546 if usage_plan_id not in self.usage_plans: 1547 raise UsagePlanNotFoundException() 1548 self.usage_plans[usage_plan_id].apply_patch_operations(patch_operations) 1549 return self.usage_plans[usage_plan_id] 1550 1551 def delete_usage_plan(self, usage_plan_id): 1552 self.usage_plans.pop(usage_plan_id) 1553 return {} 1554 1555 def create_usage_plan_key(self, usage_plan_id, payload): 1556 if usage_plan_id not in self.usage_plan_keys: 1557 self.usage_plan_keys[usage_plan_id] = {} 1558 1559 key_id = payload["keyId"] 1560 if key_id not in self.keys: 1561 raise ApiKeyNotFoundException() 1562 1563 api_key = self.keys[key_id] 1564 1565 usage_plan_key = UsagePlanKey( 1566 id=key_id, 1567 type=payload["keyType"], 1568 name=api_key["name"], 1569 value=api_key["value"], 1570 ) 1571 self.usage_plan_keys[usage_plan_id][usage_plan_key["id"]] = usage_plan_key 1572 return usage_plan_key 1573 1574 def get_usage_plan_keys(self, usage_plan_id): 1575 if usage_plan_id not in self.usage_plan_keys: 1576 return [] 1577 1578 return list(self.usage_plan_keys[usage_plan_id].values()) 1579 1580 def get_usage_plan_key(self, usage_plan_id, key_id): 1581 # first check if is a valid api key 1582 if key_id not in self.keys: 1583 raise ApiKeyNotFoundException() 1584 1585 # then check if is a valid api key and that the key is in the plan 1586 if ( 1587 usage_plan_id not in self.usage_plan_keys 1588 or key_id not in self.usage_plan_keys[usage_plan_id] 1589 ): 1590 raise UsagePlanNotFoundException() 1591 1592 return self.usage_plan_keys[usage_plan_id][key_id] 1593 1594 def delete_usage_plan_key(self, usage_plan_id, key_id): 1595 self.usage_plan_keys[usage_plan_id].pop(key_id) 1596 return {} 1597 1598 def _uri_validator(self, uri): 1599 try: 1600 result = urlparse(uri) 1601 return all([result.scheme, result.netloc, result.path or "/"]) 1602 except Exception: 1603 return False 1604 1605 def create_domain_name( 1606 self, 1607 domain_name, 1608 certificate_name=None, 1609 tags=None, 1610 certificate_arn=None, 1611 certificate_body=None, 1612 certificate_private_key=None, 1613 certificate_chain=None, 1614 regional_certificate_name=None, 1615 regional_certificate_arn=None, 1616 endpoint_configuration=None, 1617 security_policy=None, 1618 generate_cli_skeleton=None, 1619 ): 1620 1621 if not domain_name: 1622 raise InvalidDomainName() 1623 1624 new_domain_name = DomainName( 1625 domain_name=domain_name, 1626 certificate_name=certificate_name, 1627 certificate_private_key=certificate_private_key, 1628 certificate_arn=certificate_arn, 1629 certificate_body=certificate_body, 1630 certificate_chain=certificate_chain, 1631 regional_certificate_name=regional_certificate_name, 1632 regional_certificate_arn=regional_certificate_arn, 1633 endpoint_configuration=endpoint_configuration, 1634 tags=tags, 1635 security_policy=security_policy, 1636 generate_cli_skeleton=generate_cli_skeleton, 1637 region_name=self.region_name, 1638 ) 1639 1640 self.domain_names[domain_name] = new_domain_name 1641 return new_domain_name 1642 1643 def get_domain_names(self): 1644 return list(self.domain_names.values()) 1645 1646 def get_domain_name(self, domain_name): 1647 domain_info = self.domain_names.get(domain_name) 1648 if domain_info is None: 1649 raise DomainNameNotFound() 1650 else: 1651 return self.domain_names[domain_name] 1652 1653 def delete_domain_name(self, domain_name): 1654 domain_info = self.domain_names.pop(domain_name, None) 1655 if domain_info is None: 1656 raise DomainNameNotFound() 1657 1658 def update_domain_name(self, domain_name, patch_operations): 1659 domain_info = self.domain_names.get(domain_name) 1660 if not domain_info: 1661 raise DomainNameNotFound() 1662 domain_info.apply_patch_operations(patch_operations) 1663 return domain_info 1664 1665 def create_model( 1666 self, 1667 rest_api_id, 1668 name, 1669 content_type, 1670 description=None, 1671 schema=None, 1672 cli_input_json=None, 1673 generate_cli_skeleton=None, 1674 ): 1675 1676 if not rest_api_id: 1677 raise InvalidRestApiId 1678 if not name: 1679 raise InvalidModelName 1680 1681 api = self.get_rest_api(rest_api_id) 1682 new_model = api.add_model( 1683 name=name, 1684 description=description, 1685 schema=schema, 1686 content_type=content_type, 1687 cli_input_json=cli_input_json, 1688 generate_cli_skeleton=generate_cli_skeleton, 1689 ) 1690 1691 return new_model 1692 1693 def get_models(self, rest_api_id): 1694 if not rest_api_id: 1695 raise InvalidRestApiId 1696 api = self.get_rest_api(rest_api_id) 1697 models = api.models.values() 1698 return list(models) 1699 1700 def get_model(self, rest_api_id, model_name): 1701 if not rest_api_id: 1702 raise InvalidRestApiId 1703 api = self.get_rest_api(rest_api_id) 1704 model = api.models.get(model_name) 1705 if model is None: 1706 raise ModelNotFound 1707 else: 1708 return model 1709 1710 def get_request_validators(self, restapi_id): 1711 restApi = self.get_rest_api(restapi_id) 1712 return restApi.get_request_validators() 1713 1714 def create_request_validator(self, restapi_id, name, body, params): 1715 restApi = self.get_rest_api(restapi_id) 1716 return restApi.create_request_validator( 1717 name=name, validateRequestBody=body, validateRequestParameters=params, 1718 ) 1719 1720 def get_request_validator(self, restapi_id, validator_id): 1721 restApi = self.get_rest_api(restapi_id) 1722 return restApi.get_request_validator(validator_id) 1723 1724 def delete_request_validator(self, restapi_id, validator_id): 1725 restApi = self.get_rest_api(restapi_id) 1726 restApi.delete_request_validator(validator_id) 1727 1728 def update_request_validator(self, restapi_id, validator_id, patch_operations): 1729 restApi = self.get_rest_api(restapi_id) 1730 return restApi.update_request_validator(validator_id, patch_operations) 1731 1732 def create_base_path_mapping( 1733 self, domain_name, rest_api_id, base_path=None, stage=None 1734 ): 1735 if domain_name not in self.domain_names: 1736 raise DomainNameNotFound() 1737 1738 if base_path and "/" in base_path: 1739 raise InvalidBasePathException() 1740 1741 if rest_api_id not in self.apis: 1742 raise InvalidRestApiIdForBasePathMappingException() 1743 1744 if stage and self.apis[rest_api_id].stages.get(stage) is None: 1745 raise InvalidStageException() 1746 1747 new_base_path_mapping = BasePathMapping( 1748 domain_name=domain_name, 1749 rest_api_id=rest_api_id, 1750 basePath=base_path, 1751 stage=stage, 1752 ) 1753 1754 new_base_path = new_base_path_mapping.get("basePath") 1755 if self.base_path_mappings.get(domain_name) is None: 1756 self.base_path_mappings[domain_name] = {} 1757 else: 1758 if ( 1759 self.base_path_mappings[domain_name].get(new_base_path) 1760 and new_base_path != "(none)" 1761 ): 1762 raise BasePathConflictException() 1763 self.base_path_mappings[domain_name][new_base_path] = new_base_path_mapping 1764 return new_base_path_mapping 1765 1766 def get_base_path_mappings(self, domain_name): 1767 1768 if domain_name not in self.domain_names: 1769 raise DomainNameNotFound() 1770 1771 return list(self.base_path_mappings[domain_name].values()) 1772 1773 def get_base_path_mapping(self, domain_name, base_path): 1774 1775 if domain_name not in self.domain_names: 1776 raise DomainNameNotFound() 1777 1778 if base_path not in self.base_path_mappings[domain_name]: 1779 raise BasePathNotFoundException() 1780 1781 return self.base_path_mappings[domain_name][base_path] 1782 1783 def delete_base_path_mapping(self, domain_name, base_path): 1784 1785 if domain_name not in self.domain_names: 1786 raise DomainNameNotFound() 1787 1788 if base_path not in self.base_path_mappings[domain_name]: 1789 raise BasePathNotFoundException() 1790 1791 self.base_path_mappings[domain_name].pop(base_path) 1792 1793 1794apigateway_backends = {} 1795for region_name in Session().get_available_regions("apigateway"): 1796 apigateway_backends[region_name] = APIGatewayBackend(region_name) 1797for region_name in Session().get_available_regions( 1798 "apigateway", partition_name="aws-us-gov" 1799): 1800 apigateway_backends[region_name] = APIGatewayBackend(region_name) 1801for region_name in Session().get_available_regions( 1802 "apigateway", partition_name="aws-cn" 1803): 1804 apigateway_backends[region_name] = APIGatewayBackend(region_name) 1805