1from __future__ import absolute_import
2
3import random
4import string
5import re
6from copy import copy
7
8import requests
9import time
10
11from boto3.session import Session
12
13try:
14    from urlparse import urlparse
15except ImportError:
16    from urllib.parse import urlparse
17import responses
18from moto.core import ACCOUNT_ID, BaseBackend, BaseModel, CloudFormationModel
19from .utils import create_id, to_path
20from moto.core.utils import path_url
21from .exceptions import (
22    ApiKeyNotFoundException,
23    UsagePlanNotFoundException,
24    AwsProxyNotAllowed,
25    CrossAccountNotAllowed,
26    IntegrationMethodNotDefined,
27    InvalidArn,
28    InvalidIntegrationArn,
29    InvalidHttpEndpoint,
30    InvalidResourcePathException,
31    AuthorizerNotFoundException,
32    StageNotFoundException,
33    RoleNotSpecified,
34    NoIntegrationDefined,
35    NoIntegrationResponseDefined,
36    NoMethodDefined,
37    ApiKeyAlreadyExists,
38    DomainNameNotFound,
39    InvalidDomainName,
40    InvalidRestApiId,
41    InvalidModelName,
42    RestAPINotFound,
43    RequestValidatorNotFound,
44    ModelNotFound,
45    ApiKeyValueMinLength,
46    InvalidBasePathException,
47    InvalidRestApiIdForBasePathMappingException,
48    InvalidStageException,
49    BasePathConflictException,
50    BasePathNotFoundException,
51)
52from ..core.models import responses_mock
53from moto.apigateway.exceptions import MethodNotFoundException
54
55STAGE_URL = "https://{api_id}.execute-api.{region_name}.amazonaws.com/{stage_name}"
56
57
58class Deployment(CloudFormationModel, dict):
59    def __init__(self, deployment_id, name, description=""):
60        super(Deployment, self).__init__()
61        self["id"] = deployment_id
62        self["stageName"] = name
63        self["description"] = description
64        self["createdDate"] = int(time.time())
65
66    @staticmethod
67    def cloudformation_name_type():
68        return "Deployment"
69
70    @staticmethod
71    def cloudformation_type():
72        return "AWS::ApiGateway::Deployment"
73
74    @classmethod
75    def create_from_cloudformation_json(
76        cls, resource_name, cloudformation_json, region_name, **kwargs
77    ):
78        properties = cloudformation_json["Properties"]
79        rest_api_id = properties["RestApiId"]
80        name = properties["StageName"]
81        desc = properties.get("Description", "")
82        backend = apigateway_backends[region_name]
83        return backend.create_deployment(
84            function_id=rest_api_id, name=name, description=desc
85        )
86
87
88class IntegrationResponse(BaseModel, dict):
89    def __init__(
90        self,
91        status_code,
92        selection_pattern=None,
93        response_templates=None,
94        content_handling=None,
95    ):
96        if response_templates is None:
97            # response_templates = {"application/json": None}  # Note: removed for compatibility with TF
98            response_templates = {}
99        for key in response_templates.keys():
100            response_templates[key] = (
101                response_templates[key] or None
102            )  # required for compatibility with TF
103        self["responseTemplates"] = response_templates
104        self["statusCode"] = status_code
105        if selection_pattern:
106            self["selectionPattern"] = selection_pattern
107        if content_handling:
108            self["contentHandling"] = content_handling
109
110
111class Integration(BaseModel, dict):
112    def __init__(
113        self,
114        integration_type,
115        uri,
116        http_method,
117        request_templates=None,
118        tls_config=None,
119        cache_namespace=None,
120    ):
121        super(Integration, self).__init__()
122        self["type"] = integration_type
123        self["uri"] = uri
124        self["httpMethod"] = http_method
125        self["requestTemplates"] = request_templates
126        # self["integrationResponses"] = {"200": IntegrationResponse(200)}  # commented out (tf-compat)
127        self[
128            "integrationResponses"
129        ] = None  # prevent json serialization from including them if none provided
130        self["tlsConfig"] = tls_config
131        self["cacheNamespace"] = cache_namespace
132
133    def create_integration_response(
134        self, status_code, selection_pattern, response_templates, content_handling
135    ):
136        if response_templates == {}:
137            response_templates = None
138        integration_response = IntegrationResponse(
139            status_code, selection_pattern, response_templates, content_handling
140        )
141        if self.get("integrationResponses") is None:
142            self["integrationResponses"] = {}
143        self["integrationResponses"][status_code] = integration_response
144        return integration_response
145
146    def get_integration_response(self, status_code):
147        result = self.get("integrationResponses", {}).get(status_code)
148        if not result:
149            raise NoIntegrationResponseDefined(status_code)
150        return result
151
152    def delete_integration_response(self, status_code):
153        return self.get("integrationResponses", {}).pop(status_code, None)
154
155
156class MethodResponse(BaseModel, dict):
157    def __init__(self, status_code, response_models=None, response_parameters=None):
158        super(MethodResponse, self).__init__()
159        self["statusCode"] = status_code
160        self["responseModels"] = response_models
161        self["responseParameters"] = response_parameters
162
163
164class Method(CloudFormationModel, dict):
165    def __init__(self, method_type, authorization_type, **kwargs):
166        super(Method, self).__init__()
167        self.update(
168            dict(
169                httpMethod=method_type,
170                authorizationType=authorization_type,
171                authorizerId=kwargs.get("authorizer_id"),
172                authorizationScopes=kwargs.get("authorization_scopes"),
173                apiKeyRequired=kwargs.get("api_key_required") or False,
174                requestParameters=None,
175                requestModels=kwargs.get("request_models"),
176                methodIntegration=None,
177                operationName=kwargs.get("operation_name"),
178                requestValidatorId=kwargs.get("request_validator_id"),
179            )
180        )
181        self.method_responses = {}
182
183    @staticmethod
184    def cloudformation_name_type():
185        return "Method"
186
187    @staticmethod
188    def cloudformation_type():
189        return "AWS::ApiGateway::Method"
190
191    @classmethod
192    def create_from_cloudformation_json(
193        cls, resource_name, cloudformation_json, region_name, **kwargs
194    ):
195        properties = cloudformation_json["Properties"]
196        rest_api_id = properties["RestApiId"]
197        resource_id = properties["ResourceId"]
198        method_type = properties["HttpMethod"]
199        auth_type = properties["AuthorizationType"]
200        key_req = properties["ApiKeyRequired"]
201        backend = apigateway_backends[region_name]
202        m = backend.create_method(
203            function_id=rest_api_id,
204            resource_id=resource_id,
205            method_type=method_type,
206            authorization_type=auth_type,
207            api_key_required=key_req,
208        )
209        int_method = properties["Integration"]["IntegrationHttpMethod"]
210        int_type = properties["Integration"]["Type"]
211        int_uri = properties["Integration"]["Uri"]
212        backend.create_integration(
213            function_id=rest_api_id,
214            resource_id=resource_id,
215            method_type=method_type,
216            integration_type=int_type,
217            uri=int_uri,
218            integration_method=int_method,
219        )
220        return m
221
222    def create_response(self, response_code, response_models, response_parameters):
223        method_response = MethodResponse(
224            response_code, response_models, response_parameters
225        )
226        self.method_responses[response_code] = method_response
227        return method_response
228
229    def get_response(self, response_code):
230        return self.method_responses.get(response_code)
231
232    def delete_response(self, response_code):
233        return self.method_responses.pop(response_code, None)
234
235
236class Resource(CloudFormationModel):
237    def __init__(self, id, region_name, api_id, path_part, parent_id):
238        super(Resource, self).__init__()
239        self.id = id
240        self.region_name = region_name
241        self.api_id = api_id
242        self.path_part = path_part
243        self.parent_id = parent_id
244        self.resource_methods = {}
245
246    def to_dict(self):
247        response = {
248            "path": self.get_path(),
249            "id": self.id,
250        }
251        if self.resource_methods:
252            response["resourceMethods"] = self.resource_methods
253        if self.parent_id:
254            response["parentId"] = self.parent_id
255            response["pathPart"] = self.path_part
256        return response
257
258    @property
259    def physical_resource_id(self):
260        return self.id
261
262    @staticmethod
263    def cloudformation_name_type():
264        return "Resource"
265
266    @staticmethod
267    def cloudformation_type():
268        return "AWS::ApiGateway::Resource"
269
270    @classmethod
271    def create_from_cloudformation_json(
272        cls, resource_name, cloudformation_json, region_name, **kwargs
273    ):
274        properties = cloudformation_json["Properties"]
275        api_id = properties["RestApiId"]
276        parent = properties["ParentId"]
277        path = properties["PathPart"]
278
279        backend = apigateway_backends[region_name]
280        if parent == api_id:
281            # A Root path (/) is automatically created. Any new paths should use this as their parent
282            resources = backend.list_resources(function_id=api_id)
283            root_id = [resource for resource in resources if resource.path_part == "/"][
284                0
285            ].id
286            parent = root_id
287        return backend.create_resource(
288            function_id=api_id, parent_resource_id=parent, path_part=path
289        )
290
291    def get_path(self):
292        return self.get_parent_path() + self.path_part
293
294    def get_parent_path(self):
295        if self.parent_id:
296            backend = apigateway_backends[self.region_name]
297            parent = backend.get_resource(self.api_id, self.parent_id)
298            parent_path = parent.get_path()
299            if parent_path != "/":  # Root parent
300                parent_path += "/"
301            return parent_path
302        else:
303            return ""
304
305    def get_response(self, request):
306        integration = self.get_integration(request.method)
307        integration_type = integration["type"]
308
309        if integration_type == "HTTP":
310            uri = integration["uri"]
311            requests_func = getattr(requests, integration["httpMethod"].lower())
312            response = requests_func(uri)
313        else:
314            raise NotImplementedError(
315                "The {0} type has not been implemented".format(integration_type)
316            )
317        return response.status_code, response.text
318
319    def add_method(
320        self,
321        method_type,
322        authorization_type,
323        api_key_required,
324        request_models=None,
325        operation_name=None,
326        authorizer_id=None,
327        authorization_scopes=None,
328        request_validator_id=None,
329    ):
330        if authorization_scopes and not isinstance(authorization_scopes, list):
331            authorization_scopes = [authorization_scopes]
332        method = Method(
333            method_type=method_type,
334            authorization_type=authorization_type,
335            api_key_required=api_key_required,
336            request_models=request_models,
337            operation_name=operation_name,
338            authorizer_id=authorizer_id,
339            authorization_scopes=authorization_scopes,
340            request_validator_id=request_validator_id,
341        )
342        self.resource_methods[method_type] = method
343        return method
344
345    def get_method(self, method_type):
346        method = self.resource_methods.get(method_type)
347        if not method:
348            raise MethodNotFoundException()
349        return method
350
351    def delete_method(self, method_type):
352        self.resource_methods.pop(method_type)
353
354    def add_integration(
355        self,
356        method_type,
357        integration_type,
358        uri,
359        request_templates=None,
360        integration_method=None,
361        tls_config=None,
362        cache_namespace=None,
363    ):
364        integration_method = integration_method or method_type
365        integration = Integration(
366            integration_type,
367            uri,
368            integration_method,
369            request_templates=request_templates,
370            tls_config=tls_config,
371            cache_namespace=cache_namespace,
372        )
373        self.resource_methods[method_type]["methodIntegration"] = integration
374        return integration
375
376    def get_integration(self, method_type):
377        return self.resource_methods.get(method_type, {}).get("methodIntegration", {})
378
379    def delete_integration(self, method_type):
380        return self.resource_methods[method_type].pop("methodIntegration")
381
382
383class Authorizer(BaseModel, dict):
384    def __init__(self, id, name, authorizer_type, **kwargs):
385        super(Authorizer, self).__init__()
386        self["id"] = id
387        self["name"] = name
388        self["type"] = authorizer_type
389        if kwargs.get("provider_arns"):
390            self["providerARNs"] = kwargs.get("provider_arns")
391        if kwargs.get("auth_type"):
392            self["authType"] = kwargs.get("auth_type")
393        if kwargs.get("authorizer_uri"):
394            self["authorizerUri"] = kwargs.get("authorizer_uri")
395        if kwargs.get("authorizer_credentials"):
396            self["authorizerCredentials"] = kwargs.get("authorizer_credentials")
397        if kwargs.get("identity_source"):
398            self["identitySource"] = kwargs.get("identity_source")
399        if kwargs.get("identity_validation_expression"):
400            self["identityValidationExpression"] = kwargs.get(
401                "identity_validation_expression"
402            )
403        self["authorizerResultTtlInSeconds"] = kwargs.get("authorizer_result_ttl")
404
405    def apply_operations(self, patch_operations):
406        for op in patch_operations:
407            if "/authorizerUri" in op["path"]:
408                self["authorizerUri"] = op["value"]
409            elif "/authorizerCredentials" in op["path"]:
410                self["authorizerCredentials"] = op["value"]
411            elif "/authorizerResultTtlInSeconds" in op["path"]:
412                self["authorizerResultTtlInSeconds"] = int(op["value"])
413            elif "/authType" in op["path"]:
414                self["authType"] = op["value"]
415            elif "/identitySource" in op["path"]:
416                self["identitySource"] = op["value"]
417            elif "/identityValidationExpression" in op["path"]:
418                self["identityValidationExpression"] = op["value"]
419            elif "/name" in op["path"]:
420                self["name"] = op["value"]
421            elif "/providerARNs" in op["path"]:
422                # TODO: add and remove
423                raise Exception('Patch operation for "%s" not implemented' % op["path"])
424            elif "/type" in op["path"]:
425                self["type"] = op["value"]
426            else:
427                raise Exception('Patch operation "%s" not implemented' % op["op"])
428        return self
429
430
431class Stage(BaseModel, dict):
432    def __init__(
433        self,
434        name=None,
435        deployment_id=None,
436        variables=None,
437        description="",
438        cacheClusterEnabled=False,
439        cacheClusterSize=None,
440        tags=None,
441        tracing_enabled=None,
442    ):
443        super(Stage, self).__init__()
444        if variables is None:
445            variables = {}
446        self["stageName"] = name
447        self["deploymentId"] = deployment_id
448        self["methodSettings"] = {}
449        self["variables"] = variables
450        self["description"] = description
451        self["cacheClusterEnabled"] = cacheClusterEnabled
452        if self["cacheClusterEnabled"]:
453            self["cacheClusterSize"] = str(0.5)
454        if cacheClusterSize is not None:
455            self["cacheClusterSize"] = str(cacheClusterSize)
456        if tags is not None:
457            self["tags"] = tags
458        if tracing_enabled is not None:
459            self["tracingEnabled"] = tracing_enabled
460
461    def apply_operations(self, patch_operations):
462        for op in patch_operations:
463            if "variables/" in op["path"]:
464                self._apply_operation_to_variables(op)
465            elif "/cacheClusterEnabled" in op["path"]:
466                self["cacheClusterEnabled"] = self._str2bool(op["value"])
467                if "cacheClusterSize" not in self and self["cacheClusterEnabled"]:
468                    self["cacheClusterSize"] = str(0.5)
469            elif "/cacheClusterSize" in op["path"]:
470                self["cacheClusterSize"] = str(float(op["value"]))
471            elif "/description" in op["path"]:
472                self["description"] = op["value"]
473            elif "/deploymentId" in op["path"]:
474                self["deploymentId"] = op["value"]
475            elif op["op"] == "replace":
476                # Method Settings drop into here
477                # (e.g., path could be '/*/*/logging/loglevel')
478                split_path = op["path"].split("/", 3)
479                if len(split_path) != 4:
480                    continue
481                self._patch_method_setting(
482                    "/".join(split_path[1:3]), split_path[3], op["value"]
483                )
484            else:
485                raise Exception('Patch operation "%s" not implemented' % op["op"])
486        return self
487
488    def _patch_method_setting(self, resource_path_and_method, key, value):
489        updated_key = self._method_settings_translations(key)
490        if updated_key is not None:
491            if resource_path_and_method not in self["methodSettings"]:
492                self["methodSettings"][
493                    resource_path_and_method
494                ] = self._get_default_method_settings()
495            self["methodSettings"][resource_path_and_method][
496                updated_key
497            ] = self._convert_to_type(updated_key, value)
498
499    def _get_default_method_settings(self):
500        return {
501            "throttlingRateLimit": 1000.0,
502            "dataTraceEnabled": False,
503            "metricsEnabled": False,
504            "unauthorizedCacheControlHeaderStrategy": "SUCCEED_WITH_RESPONSE_HEADER",
505            "cacheTtlInSeconds": 300,
506            "cacheDataEncrypted": True,
507            "cachingEnabled": False,
508            "throttlingBurstLimit": 2000,
509            "requireAuthorizationForCacheControl": True,
510        }
511
512    def _method_settings_translations(self, key):
513        mappings = {
514            "metrics/enabled": "metricsEnabled",
515            "logging/loglevel": "loggingLevel",
516            "logging/dataTrace": "dataTraceEnabled",
517            "throttling/burstLimit": "throttlingBurstLimit",
518            "throttling/rateLimit": "throttlingRateLimit",
519            "caching/enabled": "cachingEnabled",
520            "caching/ttlInSeconds": "cacheTtlInSeconds",
521            "caching/dataEncrypted": "cacheDataEncrypted",
522            "caching/requireAuthorizationForCacheControl": "requireAuthorizationForCacheControl",
523            "caching/unauthorizedCacheControlHeaderStrategy": "unauthorizedCacheControlHeaderStrategy",
524        }
525
526        if key in mappings:
527            return mappings[key]
528        else:
529            None
530
531    def _str2bool(self, v):
532        return v.lower() == "true"
533
534    def _convert_to_type(self, key, val):
535        type_mappings = {
536            "metricsEnabled": "bool",
537            "loggingLevel": "str",
538            "dataTraceEnabled": "bool",
539            "throttlingBurstLimit": "int",
540            "throttlingRateLimit": "float",
541            "cachingEnabled": "bool",
542            "cacheTtlInSeconds": "int",
543            "cacheDataEncrypted": "bool",
544            "requireAuthorizationForCacheControl": "bool",
545            "unauthorizedCacheControlHeaderStrategy": "str",
546        }
547
548        if key in type_mappings:
549            type_value = type_mappings[key]
550
551            if type_value == "bool":
552                return self._str2bool(val)
553            elif type_value == "int":
554                return int(val)
555            elif type_value == "float":
556                return float(val)
557            else:
558                return str(val)
559        else:
560            return str(val)
561
562    def _apply_operation_to_variables(self, op):
563        key = op["path"][op["path"].rindex("variables/") + 10 :]
564        if op["op"] == "remove":
565            self["variables"].pop(key, None)
566        elif op["op"] == "replace":
567            self["variables"][key] = op["value"]
568        else:
569            raise Exception('Patch operation "%s" not implemented' % op["op"])
570
571
572class ApiKey(BaseModel, dict):
573    def __init__(
574        self,
575        name=None,
576        description=None,
577        enabled=False,
578        generateDistinctId=False,
579        value=None,
580        stageKeys=[],
581        tags=None,
582        customerId=None,
583    ):
584        super(ApiKey, self).__init__()
585        self["id"] = create_id()
586        self["value"] = (
587            value
588            if value
589            else "".join(random.sample(string.ascii_letters + string.digits, 40))
590        )
591        self["name"] = name
592        self["customerId"] = customerId
593        self["description"] = description
594        self["enabled"] = enabled
595        self["createdDate"] = self["lastUpdatedDate"] = int(time.time())
596        self["stageKeys"] = stageKeys
597        self["tags"] = tags
598
599    def update_operations(self, patch_operations):
600        for op in patch_operations:
601            if op["op"] == "replace":
602                if "/name" in op["path"]:
603                    self["name"] = op["value"]
604                elif "/customerId" in op["path"]:
605                    self["customerId"] = op["value"]
606                elif "/description" in op["path"]:
607                    self["description"] = op["value"]
608                elif "/enabled" in op["path"]:
609                    self["enabled"] = self._str2bool(op["value"])
610            else:
611                raise Exception('Patch operation "%s" not implemented' % op["op"])
612        return self
613
614    def _str2bool(self, v):
615        return v.lower() == "true"
616
617
618class UsagePlan(BaseModel, dict):
619    def __init__(
620        self,
621        name=None,
622        description=None,
623        apiStages=None,
624        throttle=None,
625        quota=None,
626        productCode=None,
627        tags=None,
628    ):
629        super(UsagePlan, self).__init__()
630        self["id"] = create_id()
631        self["name"] = name
632        self["description"] = description
633        self["apiStages"] = apiStages if apiStages else []
634        self["throttle"] = throttle
635        self["quota"] = quota
636        self["productCode"] = productCode
637        self["tags"] = tags
638
639    def apply_patch_operations(self, patch_operations):
640        for op in patch_operations:
641            path = op["path"]
642            value = op["value"]
643            if op["op"] == "replace":
644                if "/name" in path:
645                    self["name"] = value
646                if "/productCode" in path:
647                    self["productCode"] = value
648                if "/description" in path:
649                    self["description"] = value
650                if "/quota/limit" in path:
651                    self["quota"]["limit"] = value
652                if "/quota/period" in path:
653                    self["quota"]["period"] = value
654                if "/throttle/rateLimit" in path:
655                    self["throttle"]["rateLimit"] = value
656                if "/throttle/burstLimit" in path:
657                    self["throttle"]["burstLimit"] = value
658
659
660class RequestValidator(BaseModel, dict):
661    PROP_ID = "id"
662    PROP_NAME = "name"
663    PROP_VALIDATE_REQUEST_BODY = "validateRequestBody"
664    PROP_VALIDATE_REQUEST_PARAMETERS = "validateRequestParameters"
665
666    # operations
667    OP_PATH = "path"
668    OP_VALUE = "value"
669    OP_REPLACE = "replace"
670    OP_OP = "op"
671
672    def __init__(self, id, name, validateRequestBody, validateRequestParameters):
673        super(RequestValidator, self).__init__()
674        self[RequestValidator.PROP_ID] = id
675        self[RequestValidator.PROP_NAME] = name
676        self[RequestValidator.PROP_VALIDATE_REQUEST_BODY] = validateRequestBody
677        self[
678            RequestValidator.PROP_VALIDATE_REQUEST_PARAMETERS
679        ] = validateRequestParameters
680
681    def apply_patch_operations(self, operations):
682        for operation in operations:
683            path = operation[RequestValidator.OP_PATH]
684            value = operation[RequestValidator.OP_VALUE]
685            if operation[RequestValidator.OP_OP] == RequestValidator.OP_REPLACE:
686                if to_path(RequestValidator.PROP_NAME) in path:
687                    self[RequestValidator.PROP_NAME] = value
688                if to_path(RequestValidator.PROP_VALIDATE_REQUEST_BODY) in path:
689                    self[
690                        RequestValidator.PROP_VALIDATE_REQUEST_BODY
691                    ] = value.lower() in ("true")
692                if to_path(RequestValidator.PROP_VALIDATE_REQUEST_PARAMETERS) in path:
693                    self[
694                        RequestValidator.PROP_VALIDATE_REQUEST_PARAMETERS
695                    ] = value.lower() in ("true")
696
697    def to_dict(self):
698        return {
699            "id": self["id"],
700            "name": self["name"],
701            "validateRequestBody": self["validateRequestBody"],
702            "validateRequestParameters": self["validateRequestParameters"],
703        }
704
705
706class UsagePlanKey(BaseModel, dict):
707    def __init__(self, id, type, name, value):
708        super(UsagePlanKey, self).__init__()
709        self["id"] = id
710        self["name"] = name
711        self["type"] = type
712        self["value"] = value
713
714
715class RestAPI(CloudFormationModel):
716
717    PROP_ID = "id"
718    PROP_NAME = "name"
719    PROP_DESCRIPTON = "description"
720    PROP_VERSION = "version"
721    PROP_BINARY_MEDIA_TYPES = "binaryMediaTypes"
722    PROP_CREATED_DATE = "createdDate"
723    PROP_API_KEY_SOURCE = "apiKeySource"
724    PROP_ENDPOINT_CONFIGURATION = "endpointConfiguration"
725    PROP_TAGS = "tags"
726    PROP_POLICY = "policy"
727    PROP_DISABLE_EXECUTE_API_ENDPOINT = "disableExecuteApiEndpoint"
728    PROP_MINIMUM_COMPRESSION_SIZE = "minimumCompressionSize"
729
730    # operations
731    OPERATION_ADD = "add"
732    OPERATION_REPLACE = "replace"
733    OPERATION_REMOVE = "remove"
734    OPERATION_PATH = "path"
735    OPERATION_VALUE = "value"
736    OPERATION_OP = "op"
737
738    def __init__(self, id, region_name, name, description, **kwargs):
739        super(RestAPI, self).__init__()
740        self.id = id
741        self.region_name = region_name
742        self.name = name
743        self.description = description
744        self.version = kwargs.get(RestAPI.PROP_VERSION) or "V1"
745        self.binaryMediaTypes = kwargs.get(RestAPI.PROP_BINARY_MEDIA_TYPES) or []
746        self.create_date = int(time.time())
747        self.api_key_source = kwargs.get("api_key_source") or "HEADER"
748        self.policy = kwargs.get(RestAPI.PROP_POLICY) or None
749        self.endpoint_configuration = kwargs.get("endpoint_configuration") or {
750            "types": ["EDGE"]
751        }
752        self.tags = kwargs.get(RestAPI.PROP_TAGS) or {}
753        self.disableExecuteApiEndpoint = (
754            kwargs.get(RestAPI.PROP_DISABLE_EXECUTE_API_ENDPOINT) or False
755        )
756        self.minimum_compression_size = kwargs.get("minimum_compression_size")
757        self.deployments = {}
758        self.authorizers = {}
759        self.stages = {}
760        self.resources = {}
761        self.models = {}
762        self.request_validators = {}
763        self.add_child("/")  # Add default child
764
765    def __repr__(self):
766        return str(self.id)
767
768    def to_dict(self):
769        return {
770            self.PROP_ID: self.id,
771            self.PROP_NAME: self.name,
772            self.PROP_DESCRIPTON: self.description,
773            self.PROP_VERSION: self.version,
774            self.PROP_BINARY_MEDIA_TYPES: self.binaryMediaTypes,
775            self.PROP_CREATED_DATE: self.create_date,
776            self.PROP_API_KEY_SOURCE: self.api_key_source,
777            self.PROP_ENDPOINT_CONFIGURATION: self.endpoint_configuration,
778            self.PROP_TAGS: self.tags,
779            self.PROP_POLICY: self.policy,
780            self.PROP_DISABLE_EXECUTE_API_ENDPOINT: self.disableExecuteApiEndpoint,
781            self.PROP_MINIMUM_COMPRESSION_SIZE: self.minimum_compression_size,
782        }
783
784    def apply_patch_operations(self, patch_operations):
785        def to_path(prop):
786            return "/" + prop
787
788        for op in patch_operations:
789            path = op[self.OPERATION_PATH]
790            value = ""
791            if self.OPERATION_VALUE in op:
792                value = op[self.OPERATION_VALUE]
793            operaton = op[self.OPERATION_OP]
794            if operaton == self.OPERATION_REPLACE:
795                if to_path(self.PROP_NAME) in path:
796                    self.name = value
797                if to_path(self.PROP_DESCRIPTON) in path:
798                    self.description = value
799                if to_path(self.PROP_API_KEY_SOURCE) in path:
800                    self.api_key_source = value
801                if to_path(self.PROP_BINARY_MEDIA_TYPES) in path:
802                    self.binaryMediaTypes = [value]
803                if to_path(self.PROP_DISABLE_EXECUTE_API_ENDPOINT) in path:
804                    self.disableExecuteApiEndpoint = bool(value)
805            elif operaton == self.OPERATION_ADD:
806                if to_path(self.PROP_BINARY_MEDIA_TYPES) in path:
807                    self.binaryMediaTypes.append(value)
808            elif operaton == self.OPERATION_REMOVE:
809                if to_path(self.PROP_BINARY_MEDIA_TYPES) in path:
810                    self.binaryMediaTypes.remove(value)
811                if to_path(self.PROP_DESCRIPTON) in path:
812                    self.description = ""
813
814    @classmethod
815    def has_cfn_attr(cls, attribute):
816        return attribute in ["RootResourceId"]
817
818    def get_cfn_attribute(self, attribute_name):
819        from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
820
821        if attribute_name == "RootResourceId":
822            for res_id, res_obj in self.resources.items():
823                if res_obj.path_part == "/" and not res_obj.parent_id:
824                    return res_id
825            raise Exception("Unable to find root resource for API %s" % self)
826        raise UnformattedGetAttTemplateException()
827
828    @property
829    def physical_resource_id(self):
830        return self.id
831
832    @staticmethod
833    def cloudformation_name_type():
834        return "RestApi"
835
836    @staticmethod
837    def cloudformation_type():
838        return "AWS::ApiGateway::RestApi"
839
840    @classmethod
841    def create_from_cloudformation_json(
842        cls, resource_name, cloudformation_json, region_name, **kwargs
843    ):
844        properties = cloudformation_json["Properties"]
845        name = properties["Name"]
846        desc = properties.get("Description", "")
847        config = properties.get("EndpointConfiguration", None)
848        backend = apigateway_backends[region_name]
849        return backend.create_rest_api(
850            name=name, description=desc, endpoint_configuration=config
851        )
852
853    def add_child(self, path, parent_id=None):
854        child_id = create_id()
855        child = Resource(
856            id=child_id,
857            region_name=self.region_name,
858            api_id=self.id,
859            path_part=path,
860            parent_id=parent_id,
861        )
862        self.resources[child_id] = child
863        return child
864
865    def add_model(
866        self,
867        name,
868        description=None,
869        schema=None,
870        content_type=None,
871        cli_input_json=None,
872        generate_cli_skeleton=None,
873    ):
874        model_id = create_id()
875        new_model = Model(
876            id=model_id,
877            name=name,
878            description=description,
879            schema=schema,
880            content_type=content_type,
881            cli_input_json=cli_input_json,
882            generate_cli_skeleton=generate_cli_skeleton,
883        )
884
885        self.models[name] = new_model
886        return new_model
887
888    def get_resource_for_path(self, path_after_stage_name):
889        for resource in self.resources.values():
890            if resource.get_path() == path_after_stage_name:
891                return resource
892        # TODO deal with no matching resource
893
894    def resource_callback(self, request):
895        path = path_url(request.url)
896        path_after_stage_name = "/".join(path.split("/")[2:])
897        if not path_after_stage_name:
898            path_after_stage_name = "/"
899
900        resource = self.get_resource_for_path(path_after_stage_name)
901        status_code, response = resource.get_response(request)
902        return status_code, {}, response
903
904    def update_integration_mocks(self, stage_name):
905        stage_url_lower = STAGE_URL.format(
906            api_id=self.id.lower(), region_name=self.region_name, stage_name=stage_name
907        )
908        stage_url_upper = STAGE_URL.format(
909            api_id=self.id.upper(), region_name=self.region_name, stage_name=stage_name
910        )
911
912        for url in [stage_url_lower, stage_url_upper]:
913            responses_mock._matches.insert(
914                0,
915                responses.CallbackResponse(
916                    url=url,
917                    method=responses.GET,
918                    callback=self.resource_callback,
919                    content_type="text/plain",
920                    match_querystring=False,
921                ),
922            )
923
924    def create_authorizer(
925        self,
926        id,
927        name,
928        authorizer_type,
929        provider_arns=None,
930        auth_type=None,
931        authorizer_uri=None,
932        authorizer_credentials=None,
933        identity_source=None,
934        identiy_validation_expression=None,
935        authorizer_result_ttl=None,
936    ):
937        authorizer = Authorizer(
938            id=id,
939            name=name,
940            authorizer_type=authorizer_type,
941            provider_arns=provider_arns,
942            auth_type=auth_type,
943            authorizer_uri=authorizer_uri,
944            authorizer_credentials=authorizer_credentials,
945            identity_source=identity_source,
946            identiy_validation_expression=identiy_validation_expression,
947            authorizer_result_ttl=authorizer_result_ttl,
948        )
949        self.authorizers[id] = authorizer
950        return authorizer
951
952    def create_stage(
953        self,
954        name,
955        deployment_id,
956        variables=None,
957        description="",
958        cacheClusterEnabled=None,
959        cacheClusterSize=None,
960        tags=None,
961        tracing_enabled=None,
962    ):
963        if variables is None:
964            variables = {}
965        stage = Stage(
966            name=name,
967            deployment_id=deployment_id,
968            variables=variables,
969            description=description,
970            cacheClusterSize=cacheClusterSize,
971            cacheClusterEnabled=cacheClusterEnabled,
972            tags=tags,
973            tracing_enabled=tracing_enabled,
974        )
975        self.stages[name] = stage
976        self.update_integration_mocks(name)
977        return stage
978
979    def create_deployment(self, name, description="", stage_variables=None):
980        if stage_variables is None:
981            stage_variables = {}
982        deployment_id = create_id()
983        deployment = Deployment(deployment_id, name, description)
984        self.deployments[deployment_id] = deployment
985        self.stages[name] = Stage(
986            name=name, deployment_id=deployment_id, variables=stage_variables
987        )
988        self.update_integration_mocks(name)
989
990        return deployment
991
992    def get_deployment(self, deployment_id):
993        return self.deployments[deployment_id]
994
995    def get_authorizers(self):
996        return list(self.authorizers.values())
997
998    def get_stages(self):
999        return list(self.stages.values())
1000
1001    def get_deployments(self):
1002        return list(self.deployments.values())
1003
1004    def delete_deployment(self, deployment_id):
1005        return self.deployments.pop(deployment_id)
1006
1007    def create_request_validator(
1008        self, name, validateRequestBody, validateRequestParameters
1009    ):
1010        validator_id = create_id()
1011        request_validator = RequestValidator(
1012            id=validator_id,
1013            name=name,
1014            validateRequestBody=validateRequestBody,
1015            validateRequestParameters=validateRequestParameters,
1016        )
1017        self.request_validators[validator_id] = request_validator
1018        return request_validator
1019
1020    def get_request_validators(self):
1021        return list(self.request_validators.values())
1022
1023    def get_request_validator(self, validator_id):
1024        reqeust_validator = self.request_validators.get(validator_id)
1025        if reqeust_validator is None:
1026            raise RequestValidatorNotFound()
1027        return reqeust_validator
1028
1029    def delete_request_validator(self, validator_id):
1030        reqeust_validator = self.request_validators.pop(validator_id)
1031        return reqeust_validator
1032
1033    def update_request_validator(self, validator_id, patch_operations):
1034        self.request_validators[validator_id].apply_patch_operations(patch_operations)
1035        return self.request_validators[validator_id]
1036
1037
1038class DomainName(BaseModel, dict):
1039    def __init__(self, domain_name, **kwargs):
1040        super(DomainName, self).__init__()
1041        self["domainName"] = domain_name
1042        self["regionalDomainName"] = "d-%s.execute-api.%s.amazonaws.com" % (
1043            create_id(),
1044            kwargs.get("region_name") or "us-east-1",
1045        )
1046        self["distributionDomainName"] = "d%s.cloudfront.net" % create_id()
1047        self["domainNameStatus"] = "AVAILABLE"
1048        self["domainNameStatusMessage"] = "Domain Name Available"
1049        self["regionalHostedZoneId"] = "Z2FDTNDATAQYW2"
1050        self["distributionHostedZoneId"] = "Z2FDTNDATAQYW2"
1051        self["certificateUploadDate"] = int(time.time())
1052        if kwargs.get("certificate_name"):
1053            self["certificateName"] = kwargs.get("certificate_name")
1054        if kwargs.get("certificate_arn"):
1055            self["certificateArn"] = kwargs.get("certificate_arn")
1056        if kwargs.get("certificate_body"):
1057            self["certificateBody"] = kwargs.get("certificate_body")
1058        if kwargs.get("tags"):
1059            self["tags"] = kwargs.get("tags")
1060        if kwargs.get("security_policy"):
1061            self["securityPolicy"] = kwargs.get("security_policy")
1062        if kwargs.get("certificate_chain"):
1063            self["certificateChain"] = kwargs.get("certificate_chain")
1064        if kwargs.get("regional_certificate_name"):
1065            self["regionalCertificateName"] = kwargs.get("regional_certificate_name")
1066        if kwargs.get("certificate_private_key"):
1067            self["certificatePrivateKey"] = kwargs.get("certificate_private_key")
1068        if kwargs.get("regional_certificate_arn"):
1069            self["regionalCertificateArn"] = kwargs.get("regional_certificate_arn")
1070        if kwargs.get("endpoint_configuration"):
1071            self["endpointConfiguration"] = kwargs.get("endpoint_configuration")
1072        if kwargs.get("generate_cli_skeleton"):
1073            self["generateCliSkeleton"] = kwargs.get("generate_cli_skeleton")
1074
1075
1076class Model(BaseModel, dict):
1077    def __init__(self, id, name, **kwargs):
1078        super(Model, self).__init__()
1079        self["id"] = id
1080        self["name"] = name
1081        if kwargs.get("description"):
1082            self["description"] = kwargs.get("description")
1083        if kwargs.get("schema"):
1084            self["schema"] = kwargs.get("schema")
1085        if kwargs.get("content_type"):
1086            self["contentType"] = kwargs.get("content_type")
1087        if kwargs.get("cli_input_json"):
1088            self["cliInputJson"] = kwargs.get("cli_input_json")
1089        if kwargs.get("generate_cli_skeleton"):
1090            self["generateCliSkeleton"] = kwargs.get("generate_cli_skeleton")
1091
1092
1093class BasePathMapping(BaseModel, dict):
1094    def __init__(self, domain_name, rest_api_id, **kwargs):
1095        super(BasePathMapping, self).__init__()
1096        self["domain_name"] = domain_name
1097        self["restApiId"] = rest_api_id
1098        if kwargs.get("basePath"):
1099            self["basePath"] = kwargs.get("basePath")
1100        else:
1101            self["basePath"] = "(none)"
1102
1103        if kwargs.get("stage"):
1104            self["stage"] = kwargs.get("stage")
1105
1106
1107class APIGatewayBackend(BaseBackend):
1108    def __init__(self, region_name):
1109        super(APIGatewayBackend, self).__init__()
1110        self.apis = {}
1111        self.keys = {}
1112        self.usage_plans = {}
1113        self.usage_plan_keys = {}
1114        self.domain_names = {}
1115        self.models = {}
1116        self.region_name = region_name
1117        self.base_path_mappings = {}
1118
1119    def reset(self):
1120        region_name = self.region_name
1121        self.__dict__ = {}
1122        self.__init__(region_name)
1123
1124    def create_rest_api(
1125        self,
1126        name,
1127        description,
1128        api_key_source=None,
1129        endpoint_configuration=None,
1130        tags=None,
1131        policy=None,
1132        minimum_compression_size=None,
1133    ):
1134        api_id = create_id()
1135        rest_api = RestAPI(
1136            api_id,
1137            self.region_name,
1138            name,
1139            description,
1140            api_key_source=api_key_source,
1141            endpoint_configuration=endpoint_configuration,
1142            tags=tags,
1143            policy=policy,
1144            minimum_compression_size=minimum_compression_size,
1145        )
1146        self.apis[api_id] = rest_api
1147        return rest_api
1148
1149    def get_rest_api(self, function_id):
1150        rest_api = self.apis.get(function_id)
1151        if rest_api is None:
1152            raise RestAPINotFound()
1153        return rest_api
1154
1155    def update_rest_api(self, function_id, patch_operations):
1156        rest_api = self.apis.get(function_id)
1157        if rest_api is None:
1158            raise RestAPINotFound()
1159        self.apis[function_id].apply_patch_operations(patch_operations)
1160        return self.apis[function_id]
1161
1162    def list_apis(self):
1163        return self.apis.values()
1164
1165    def delete_rest_api(self, function_id):
1166        rest_api = self.apis.pop(function_id)
1167        return rest_api
1168
1169    def list_resources(self, function_id):
1170        api = self.get_rest_api(function_id)
1171        return api.resources.values()
1172
1173    def get_resource(self, function_id, resource_id):
1174        api = self.get_rest_api(function_id)
1175        resource = api.resources[resource_id]
1176        return resource
1177
1178    def create_resource(self, function_id, parent_resource_id, path_part):
1179        if not re.match("^\\{?[a-zA-Z0-9._-]+\\+?\\}?$", path_part):
1180            raise InvalidResourcePathException()
1181        api = self.get_rest_api(function_id)
1182        child = api.add_child(path=path_part, parent_id=parent_resource_id)
1183        return child
1184
1185    def delete_resource(self, function_id, resource_id):
1186        api = self.get_rest_api(function_id)
1187        resource = api.resources.pop(resource_id)
1188        return resource
1189
1190    def get_method(self, function_id, resource_id, method_type):
1191        resource = self.get_resource(function_id, resource_id)
1192        return resource.get_method(method_type)
1193
1194    def create_method(
1195        self,
1196        function_id,
1197        resource_id,
1198        method_type,
1199        authorization_type,
1200        api_key_required=None,
1201        request_models=None,
1202        operation_name=None,
1203        authorizer_id=None,
1204        authorization_scopes=None,
1205        request_validator_id=None,
1206    ):
1207        resource = self.get_resource(function_id, resource_id)
1208        method = resource.add_method(
1209            method_type,
1210            authorization_type,
1211            api_key_required=api_key_required,
1212            request_models=request_models,
1213            operation_name=operation_name,
1214            authorizer_id=authorizer_id,
1215            authorization_scopes=authorization_scopes,
1216            request_validator_id=request_validator_id,
1217        )
1218        return method
1219
1220    def update_method(self, function_id, resource_id, method_type, patch_operations):
1221        resource = self.get_resource(function_id, resource_id)
1222        method = resource.get_method(method_type)
1223        return method.apply_operations(patch_operations)
1224
1225    def delete_method(self, function_id, resource_id, method_type):
1226        resource = self.get_resource(function_id, resource_id)
1227        resource.delete_method(method_type)
1228
1229    def get_authorizer(self, restapi_id, authorizer_id):
1230        api = self.get_rest_api(restapi_id)
1231        authorizer = api.authorizers.get(authorizer_id)
1232        if authorizer is None:
1233            raise AuthorizerNotFoundException()
1234        else:
1235            return authorizer
1236
1237    def get_authorizers(self, restapi_id):
1238        api = self.get_rest_api(restapi_id)
1239        return api.get_authorizers()
1240
1241    def create_authorizer(self, restapi_id, name, authorizer_type, **kwargs):
1242        api = self.get_rest_api(restapi_id)
1243        authorizer_id = create_id()
1244        authorizer = api.create_authorizer(
1245            authorizer_id,
1246            name,
1247            authorizer_type,
1248            provider_arns=kwargs.get("provider_arns"),
1249            auth_type=kwargs.get("auth_type"),
1250            authorizer_uri=kwargs.get("authorizer_uri"),
1251            authorizer_credentials=kwargs.get("authorizer_credentials"),
1252            identity_source=kwargs.get("identity_source"),
1253            identiy_validation_expression=kwargs.get("identiy_validation_expression"),
1254            authorizer_result_ttl=kwargs.get("authorizer_result_ttl"),
1255        )
1256        return api.authorizers.get(authorizer["id"])
1257
1258    def update_authorizer(self, restapi_id, authorizer_id, patch_operations):
1259        authorizer = self.get_authorizer(restapi_id, authorizer_id)
1260        if not authorizer:
1261            api = self.get_rest_api(restapi_id)
1262            authorizer = api.authorizers[authorizer_id] = Authorizer()
1263        return authorizer.apply_operations(patch_operations)
1264
1265    def delete_authorizer(self, restapi_id, authorizer_id):
1266        api = self.get_rest_api(restapi_id)
1267        del api.authorizers[authorizer_id]
1268
1269    def get_stage(self, function_id, stage_name):
1270        api = self.get_rest_api(function_id)
1271        stage = api.stages.get(stage_name)
1272        if stage is None:
1273            raise StageNotFoundException()
1274        return stage
1275
1276    def get_stages(self, function_id):
1277        api = self.get_rest_api(function_id)
1278        return api.get_stages()
1279
1280    def create_stage(
1281        self,
1282        function_id,
1283        stage_name,
1284        deploymentId,
1285        variables=None,
1286        description="",
1287        cacheClusterEnabled=None,
1288        cacheClusterSize=None,
1289        tags=None,
1290        tracing_enabled=None,
1291    ):
1292        if variables is None:
1293            variables = {}
1294        api = self.get_rest_api(function_id)
1295        api.create_stage(
1296            stage_name,
1297            deploymentId,
1298            variables=variables,
1299            description=description,
1300            cacheClusterEnabled=cacheClusterEnabled,
1301            cacheClusterSize=cacheClusterSize,
1302            tags=tags,
1303            tracing_enabled=tracing_enabled,
1304        )
1305        return api.stages.get(stage_name)
1306
1307    def update_stage(self, function_id, stage_name, patch_operations):
1308        stage = self.get_stage(function_id, stage_name)
1309        if not stage:
1310            api = self.get_rest_api(function_id)
1311            stage = api.stages[stage_name] = Stage()
1312        return stage.apply_operations(patch_operations)
1313
1314    def delete_stage(self, function_id, stage_name):
1315        api = self.get_rest_api(function_id)
1316        deleted = api.stages.pop(stage_name, None)
1317        if not deleted:
1318            raise StageNotFoundException()
1319
1320    def get_method_response(self, function_id, resource_id, method_type, response_code):
1321        method = self.get_method(function_id, resource_id, method_type)
1322        method_response = method.get_response(response_code)
1323        return method_response
1324
1325    def create_method_response(
1326        self,
1327        function_id,
1328        resource_id,
1329        method_type,
1330        response_code,
1331        response_models,
1332        response_parameters,
1333    ):
1334        method = self.get_method(function_id, resource_id, method_type)
1335        method_response = method.create_response(
1336            response_code, response_models, response_parameters
1337        )
1338        return method_response
1339
1340    def update_method_response(
1341        self, function_id, resource_id, method_type, response_code, patch_operations
1342    ):
1343        method = self.get_method(function_id, resource_id, method_type)
1344        method_response = method.get_response(response_code)
1345        method_response.apply_operations(patch_operations)
1346        return method_response
1347
1348    def delete_method_response(
1349        self, function_id, resource_id, method_type, response_code
1350    ):
1351        method = self.get_method(function_id, resource_id, method_type)
1352        method_response = method.delete_response(response_code)
1353        return method_response
1354
1355    def create_integration(
1356        self,
1357        function_id,
1358        resource_id,
1359        method_type,
1360        integration_type,
1361        uri,
1362        integration_method=None,
1363        credentials=None,
1364        request_templates=None,
1365        tls_config=None,
1366        cache_namespace=None,
1367    ):
1368        resource = self.get_resource(function_id, resource_id)
1369        if credentials and not re.match(
1370            "^arn:aws:iam::" + str(ACCOUNT_ID), credentials
1371        ):
1372            raise CrossAccountNotAllowed()
1373        if not integration_method and integration_type in [
1374            "HTTP",
1375            "HTTP_PROXY",
1376            "AWS",
1377            "AWS_PROXY",
1378        ]:
1379            raise IntegrationMethodNotDefined()
1380        if integration_type in ["AWS_PROXY"] and re.match(
1381            "^arn:aws:apigateway:[a-zA-Z0-9-]+:s3", uri
1382        ):
1383            raise AwsProxyNotAllowed()
1384        if (
1385            integration_type in ["AWS"]
1386            and re.match("^arn:aws:apigateway:[a-zA-Z0-9-]+:s3", uri)
1387            and not credentials
1388        ):
1389            raise RoleNotSpecified()
1390        if integration_type in ["HTTP", "HTTP_PROXY"] and not self._uri_validator(uri):
1391            raise InvalidHttpEndpoint()
1392        if integration_type in ["AWS", "AWS_PROXY"] and not re.match("^arn:aws:", uri):
1393            raise InvalidArn()
1394        if integration_type in ["AWS", "AWS_PROXY"] and not re.match(
1395            "^arn:aws:apigateway:[a-zA-Z0-9-]+:[a-zA-Z0-9-]+:(path|action)/", uri
1396        ):
1397            raise InvalidIntegrationArn()
1398        integration = resource.add_integration(
1399            method_type,
1400            integration_type,
1401            uri,
1402            integration_method=integration_method,
1403            request_templates=request_templates,
1404            tls_config=tls_config,
1405            cache_namespace=cache_namespace,
1406        )
1407        return integration
1408
1409    def get_integration(self, function_id, resource_id, method_type):
1410        resource = self.get_resource(function_id, resource_id)
1411        return resource.get_integration(method_type)
1412
1413    def delete_integration(self, function_id, resource_id, method_type):
1414        resource = self.get_resource(function_id, resource_id)
1415        return resource.delete_integration(method_type)
1416
1417    def create_integration_response(
1418        self,
1419        function_id,
1420        resource_id,
1421        method_type,
1422        status_code,
1423        selection_pattern,
1424        response_templates,
1425        content_handling,
1426    ):
1427        integration = self.get_integration(function_id, resource_id, method_type)
1428        integration_response = integration.create_integration_response(
1429            status_code, selection_pattern, response_templates, content_handling
1430        )
1431        return integration_response
1432
1433    def get_integration_response(
1434        self, function_id, resource_id, method_type, status_code
1435    ):
1436        integration = self.get_integration(function_id, resource_id, method_type)
1437        integration_response = integration.get_integration_response(status_code)
1438        return integration_response
1439
1440    def delete_integration_response(
1441        self, function_id, resource_id, method_type, status_code
1442    ):
1443        integration = self.get_integration(function_id, resource_id, method_type)
1444        integration_response = integration.delete_integration_response(status_code)
1445        return integration_response
1446
1447    def create_deployment(
1448        self, function_id, name, description="", stage_variables=None
1449    ):
1450        if stage_variables is None:
1451            stage_variables = {}
1452        api = self.get_rest_api(function_id)
1453        methods = [
1454            list(res.resource_methods.values())
1455            for res in self.list_resources(function_id)
1456        ]
1457        methods = [m for sublist in methods for m in sublist]
1458        if not any(methods):
1459            raise NoMethodDefined()
1460        method_integrations = [
1461            method["methodIntegration"] if "methodIntegration" in method else None
1462            for method in methods
1463        ]
1464        if not any(method_integrations):
1465            raise NoIntegrationDefined()
1466        deployment = api.create_deployment(name, description, stage_variables)
1467        return deployment
1468
1469    def get_deployment(self, function_id, deployment_id):
1470        api = self.get_rest_api(function_id)
1471        return api.get_deployment(deployment_id)
1472
1473    def get_deployments(self, function_id):
1474        api = self.get_rest_api(function_id)
1475        return api.get_deployments()
1476
1477    def delete_deployment(self, function_id, deployment_id):
1478        api = self.get_rest_api(function_id)
1479        return api.delete_deployment(deployment_id)
1480
1481    def create_api_key(self, payload):
1482        if payload.get("value"):
1483            if len(payload.get("value", [])) < 20:
1484                raise ApiKeyValueMinLength()
1485            for api_key in self.get_api_keys(include_values=True):
1486                if api_key.get("value") == payload["value"]:
1487                    raise ApiKeyAlreadyExists()
1488        key = ApiKey(**payload)
1489        self.keys[key["id"]] = key
1490        return key
1491
1492    def get_api_keys(self, include_values=False):
1493        api_keys = list(self.keys.values())
1494
1495        if not include_values:
1496            keys = []
1497            for api_key in list(self.keys.values()):
1498                new_key = copy(api_key)
1499                del new_key["value"]
1500                keys.append(new_key)
1501            api_keys = keys
1502
1503        return api_keys
1504
1505    def get_api_key(self, api_key_id, include_value=False):
1506        api_key = self.keys.get(api_key_id)
1507        if not api_key:
1508            raise ApiKeyNotFoundException()
1509
1510        if not include_value:
1511            new_key = copy(api_key)
1512            del new_key["value"]
1513            api_key = new_key
1514
1515        return api_key
1516
1517    def update_api_key(self, api_key_id, patch_operations):
1518        key = self.keys[api_key_id]
1519        return key.update_operations(patch_operations)
1520
1521    def delete_api_key(self, api_key_id):
1522        self.keys.pop(api_key_id)
1523        return {}
1524
1525    def create_usage_plan(self, payload):
1526        plan = UsagePlan(**payload)
1527        self.usage_plans[plan["id"]] = plan
1528        return plan
1529
1530    def get_usage_plans(self, api_key_id=None):
1531        plans = list(self.usage_plans.values())
1532        if api_key_id is not None:
1533            plans = [
1534                plan
1535                for plan in plans
1536                if self.usage_plan_keys.get(plan["id"], {}).get(api_key_id, False)
1537            ]
1538        return plans
1539
1540    def get_usage_plan(self, usage_plan_id):
1541        if usage_plan_id not in self.usage_plans:
1542            raise UsagePlanNotFoundException()
1543        return self.usage_plans[usage_plan_id]
1544
1545    def update_usage_plan(self, usage_plan_id, patch_operations):
1546        if usage_plan_id not in self.usage_plans:
1547            raise UsagePlanNotFoundException()
1548        self.usage_plans[usage_plan_id].apply_patch_operations(patch_operations)
1549        return self.usage_plans[usage_plan_id]
1550
1551    def delete_usage_plan(self, usage_plan_id):
1552        self.usage_plans.pop(usage_plan_id)
1553        return {}
1554
1555    def create_usage_plan_key(self, usage_plan_id, payload):
1556        if usage_plan_id not in self.usage_plan_keys:
1557            self.usage_plan_keys[usage_plan_id] = {}
1558
1559        key_id = payload["keyId"]
1560        if key_id not in self.keys:
1561            raise ApiKeyNotFoundException()
1562
1563        api_key = self.keys[key_id]
1564
1565        usage_plan_key = UsagePlanKey(
1566            id=key_id,
1567            type=payload["keyType"],
1568            name=api_key["name"],
1569            value=api_key["value"],
1570        )
1571        self.usage_plan_keys[usage_plan_id][usage_plan_key["id"]] = usage_plan_key
1572        return usage_plan_key
1573
1574    def get_usage_plan_keys(self, usage_plan_id):
1575        if usage_plan_id not in self.usage_plan_keys:
1576            return []
1577
1578        return list(self.usage_plan_keys[usage_plan_id].values())
1579
1580    def get_usage_plan_key(self, usage_plan_id, key_id):
1581        # first check if is a valid api key
1582        if key_id not in self.keys:
1583            raise ApiKeyNotFoundException()
1584
1585        # then check if is a valid api key and that the key is in the plan
1586        if (
1587            usage_plan_id not in self.usage_plan_keys
1588            or key_id not in self.usage_plan_keys[usage_plan_id]
1589        ):
1590            raise UsagePlanNotFoundException()
1591
1592        return self.usage_plan_keys[usage_plan_id][key_id]
1593
1594    def delete_usage_plan_key(self, usage_plan_id, key_id):
1595        self.usage_plan_keys[usage_plan_id].pop(key_id)
1596        return {}
1597
1598    def _uri_validator(self, uri):
1599        try:
1600            result = urlparse(uri)
1601            return all([result.scheme, result.netloc, result.path or "/"])
1602        except Exception:
1603            return False
1604
1605    def create_domain_name(
1606        self,
1607        domain_name,
1608        certificate_name=None,
1609        tags=None,
1610        certificate_arn=None,
1611        certificate_body=None,
1612        certificate_private_key=None,
1613        certificate_chain=None,
1614        regional_certificate_name=None,
1615        regional_certificate_arn=None,
1616        endpoint_configuration=None,
1617        security_policy=None,
1618        generate_cli_skeleton=None,
1619    ):
1620
1621        if not domain_name:
1622            raise InvalidDomainName()
1623
1624        new_domain_name = DomainName(
1625            domain_name=domain_name,
1626            certificate_name=certificate_name,
1627            certificate_private_key=certificate_private_key,
1628            certificate_arn=certificate_arn,
1629            certificate_body=certificate_body,
1630            certificate_chain=certificate_chain,
1631            regional_certificate_name=regional_certificate_name,
1632            regional_certificate_arn=regional_certificate_arn,
1633            endpoint_configuration=endpoint_configuration,
1634            tags=tags,
1635            security_policy=security_policy,
1636            generate_cli_skeleton=generate_cli_skeleton,
1637            region_name=self.region_name,
1638        )
1639
1640        self.domain_names[domain_name] = new_domain_name
1641        return new_domain_name
1642
1643    def get_domain_names(self):
1644        return list(self.domain_names.values())
1645
1646    def get_domain_name(self, domain_name):
1647        domain_info = self.domain_names.get(domain_name)
1648        if domain_info is None:
1649            raise DomainNameNotFound()
1650        else:
1651            return self.domain_names[domain_name]
1652
1653    def delete_domain_name(self, domain_name):
1654        domain_info = self.domain_names.pop(domain_name, None)
1655        if domain_info is None:
1656            raise DomainNameNotFound()
1657
1658    def update_domain_name(self, domain_name, patch_operations):
1659        domain_info = self.domain_names.get(domain_name)
1660        if not domain_info:
1661            raise DomainNameNotFound()
1662        domain_info.apply_patch_operations(patch_operations)
1663        return domain_info
1664
1665    def create_model(
1666        self,
1667        rest_api_id,
1668        name,
1669        content_type,
1670        description=None,
1671        schema=None,
1672        cli_input_json=None,
1673        generate_cli_skeleton=None,
1674    ):
1675
1676        if not rest_api_id:
1677            raise InvalidRestApiId
1678        if not name:
1679            raise InvalidModelName
1680
1681        api = self.get_rest_api(rest_api_id)
1682        new_model = api.add_model(
1683            name=name,
1684            description=description,
1685            schema=schema,
1686            content_type=content_type,
1687            cli_input_json=cli_input_json,
1688            generate_cli_skeleton=generate_cli_skeleton,
1689        )
1690
1691        return new_model
1692
1693    def get_models(self, rest_api_id):
1694        if not rest_api_id:
1695            raise InvalidRestApiId
1696        api = self.get_rest_api(rest_api_id)
1697        models = api.models.values()
1698        return list(models)
1699
1700    def get_model(self, rest_api_id, model_name):
1701        if not rest_api_id:
1702            raise InvalidRestApiId
1703        api = self.get_rest_api(rest_api_id)
1704        model = api.models.get(model_name)
1705        if model is None:
1706            raise ModelNotFound
1707        else:
1708            return model
1709
1710    def get_request_validators(self, restapi_id):
1711        restApi = self.get_rest_api(restapi_id)
1712        return restApi.get_request_validators()
1713
1714    def create_request_validator(self, restapi_id, name, body, params):
1715        restApi = self.get_rest_api(restapi_id)
1716        return restApi.create_request_validator(
1717            name=name, validateRequestBody=body, validateRequestParameters=params,
1718        )
1719
1720    def get_request_validator(self, restapi_id, validator_id):
1721        restApi = self.get_rest_api(restapi_id)
1722        return restApi.get_request_validator(validator_id)
1723
1724    def delete_request_validator(self, restapi_id, validator_id):
1725        restApi = self.get_rest_api(restapi_id)
1726        restApi.delete_request_validator(validator_id)
1727
1728    def update_request_validator(self, restapi_id, validator_id, patch_operations):
1729        restApi = self.get_rest_api(restapi_id)
1730        return restApi.update_request_validator(validator_id, patch_operations)
1731
1732    def create_base_path_mapping(
1733        self, domain_name, rest_api_id, base_path=None, stage=None
1734    ):
1735        if domain_name not in self.domain_names:
1736            raise DomainNameNotFound()
1737
1738        if base_path and "/" in base_path:
1739            raise InvalidBasePathException()
1740
1741        if rest_api_id not in self.apis:
1742            raise InvalidRestApiIdForBasePathMappingException()
1743
1744        if stage and self.apis[rest_api_id].stages.get(stage) is None:
1745            raise InvalidStageException()
1746
1747        new_base_path_mapping = BasePathMapping(
1748            domain_name=domain_name,
1749            rest_api_id=rest_api_id,
1750            basePath=base_path,
1751            stage=stage,
1752        )
1753
1754        new_base_path = new_base_path_mapping.get("basePath")
1755        if self.base_path_mappings.get(domain_name) is None:
1756            self.base_path_mappings[domain_name] = {}
1757        else:
1758            if (
1759                self.base_path_mappings[domain_name].get(new_base_path)
1760                and new_base_path != "(none)"
1761            ):
1762                raise BasePathConflictException()
1763        self.base_path_mappings[domain_name][new_base_path] = new_base_path_mapping
1764        return new_base_path_mapping
1765
1766    def get_base_path_mappings(self, domain_name):
1767
1768        if domain_name not in self.domain_names:
1769            raise DomainNameNotFound()
1770
1771        return list(self.base_path_mappings[domain_name].values())
1772
1773    def get_base_path_mapping(self, domain_name, base_path):
1774
1775        if domain_name not in self.domain_names:
1776            raise DomainNameNotFound()
1777
1778        if base_path not in self.base_path_mappings[domain_name]:
1779            raise BasePathNotFoundException()
1780
1781        return self.base_path_mappings[domain_name][base_path]
1782
1783    def delete_base_path_mapping(self, domain_name, base_path):
1784
1785        if domain_name not in self.domain_names:
1786            raise DomainNameNotFound()
1787
1788        if base_path not in self.base_path_mappings[domain_name]:
1789            raise BasePathNotFoundException()
1790
1791        self.base_path_mappings[domain_name].pop(base_path)
1792
1793
1794apigateway_backends = {}
1795for region_name in Session().get_available_regions("apigateway"):
1796    apigateway_backends[region_name] = APIGatewayBackend(region_name)
1797for region_name in Session().get_available_regions(
1798    "apigateway", partition_name="aws-us-gov"
1799):
1800    apigateway_backends[region_name] = APIGatewayBackend(region_name)
1801for region_name in Session().get_available_regions(
1802    "apigateway", partition_name="aws-cn"
1803):
1804    apigateway_backends[region_name] = APIGatewayBackend(region_name)
1805