1import hashlib
2import json
3import re
4import uuid
5from collections import namedtuple
6from datetime import datetime, timezone
7from random import random
8from typing import Dict, List
9
10from botocore.exceptions import ParamValidationError
11
12from moto.core import BaseBackend, BaseModel, CloudFormationModel, ACCOUNT_ID
13from moto.core.utils import iso_8601_datetime_without_milliseconds
14from moto.ec2 import ec2_backends
15from moto.ecr.exceptions import (
16    ImageNotFoundException,
17    RepositoryNotFoundException,
18    RepositoryAlreadyExistsException,
19    RepositoryNotEmptyException,
20    InvalidParameterException,
21    RepositoryPolicyNotFoundException,
22    LifecyclePolicyNotFoundException,
23    RegistryPolicyNotFoundException,
24    LimitExceededException,
25    ScanNotFoundException,
26    ValidationException,
27)
28from moto.ecr.policy_validation import EcrLifecyclePolicyValidator
29from moto.iam.exceptions import MalformedPolicyDocument
30from moto.iam.policy_validation import IAMPolicyDocumentValidator
31from moto.utilities.tagging_service import TaggingService
32
33DEFAULT_REGISTRY_ID = ACCOUNT_ID
34ECR_REPOSITORY_ARN_PATTERN = "^arn:(?P<partition>[^:]+):ecr:(?P<region>[^:]+):(?P<account_id>[^:]+):repository/(?P<repo_name>.*)$"
35
36EcrRepositoryArn = namedtuple(
37    "EcrRepositoryArn", ["partition", "region", "account_id", "repo_name"]
38)
39
40
41class BaseObject(BaseModel):
42    def camelCase(self, key):
43        words = []
44        for i, word in enumerate(key.split("_")):
45            if i > 0:
46                words.append(word.title())
47            else:
48                words.append(word)
49        return "".join(words)
50
51    def gen_response_object(self):
52        response_object = dict()
53        for key, value in self.__dict__.items():
54            if "_" in key:
55                response_object[self.camelCase(key)] = value
56            else:
57                response_object[key] = value
58        return response_object
59
60    @property
61    def response_object(self):
62        return self.gen_response_object()
63
64
65class Repository(BaseObject, CloudFormationModel):
66    def __init__(
67        self,
68        region_name,
69        repository_name,
70        encryption_config,
71        image_scan_config,
72        image_tag_mutablility,
73    ):
74        self.region_name = region_name
75        self.registry_id = DEFAULT_REGISTRY_ID
76        self.arn = (
77            f"arn:aws:ecr:{region_name}:{self.registry_id}:repository/{repository_name}"
78        )
79        self.name = repository_name
80        self.created_at = datetime.utcnow()
81        self.uri = (
82            f"{self.registry_id}.dkr.ecr.{region_name}.amazonaws.com/{repository_name}"
83        )
84        self.image_tag_mutability = image_tag_mutablility or "MUTABLE"
85        self.image_scanning_configuration = image_scan_config or {"scanOnPush": False}
86        self.encryption_configuration = self._determine_encryption_config(
87            encryption_config
88        )
89        self.policy = None
90        self.lifecycle_policy = None
91        self.images: List[Image] = []
92
93    def _determine_encryption_config(self, encryption_config):
94        if not encryption_config:
95            return {"encryptionType": "AES256"}
96        if encryption_config == {"encryptionType": "KMS"}:
97            encryption_config[
98                "kmsKey"
99            ] = f"arn:aws:kms:{self.region_name}:{ACCOUNT_ID}:key/{uuid.uuid4()}"
100        return encryption_config
101
102    def _get_image(self, image_tag, image_digest):
103        # you can either search for one or both
104        image = next(
105            (
106                i
107                for i in self.images
108                if (not image_tag or image_tag in i.image_tags)
109                and (not image_digest or image_digest == i.get_image_digest())
110            ),
111            None,
112        )
113
114        if not image:
115            image_id_rep = "{{imageDigest:'{0}', imageTag:'{1}'}}".format(
116                image_digest or "null", image_tag or "null"
117            )
118
119            raise ImageNotFoundException(
120                image_id=image_id_rep,
121                repository_name=self.name,
122                registry_id=self.registry_id,
123            )
124
125        return image
126
127    @property
128    def physical_resource_id(self):
129        return self.name
130
131    @property
132    def response_object(self):
133        response_object = self.gen_response_object()
134
135        response_object["registryId"] = self.registry_id
136        response_object["repositoryArn"] = self.arn
137        response_object["repositoryName"] = self.name
138        response_object["repositoryUri"] = self.uri
139        response_object["createdAt"] = iso_8601_datetime_without_milliseconds(
140            self.created_at
141        )
142        del response_object["arn"], response_object["name"], response_object["images"]
143        return response_object
144
145    def update(self, image_scan_config=None, image_tag_mutability=None):
146        if image_scan_config:
147            self.image_scanning_configuration = image_scan_config
148        if image_tag_mutability:
149            self.image_tag_mutability = image_tag_mutability
150
151    def delete(self, region_name):
152        ecr_backend = ecr_backends[region_name]
153        ecr_backend.delete_repository(self.name)
154
155    @classmethod
156    def has_cfn_attr(cls, attribute):
157        return attribute in ["Arn", "RepositoryUri"]
158
159    def get_cfn_attribute(self, attribute_name):
160        from moto.cloudformation.exceptions import UnformattedGetAttTemplateException
161
162        if attribute_name == "Arn":
163            return self.arn
164        elif attribute_name == "RepositoryUri":
165            return self.uri
166
167        raise UnformattedGetAttTemplateException()
168
169    @staticmethod
170    def cloudformation_name_type():
171        return "RepositoryName"
172
173    @staticmethod
174    def cloudformation_type():
175        # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecr-repository.html
176        return "AWS::ECR::Repository"
177
178    @classmethod
179    def create_from_cloudformation_json(
180        cls, resource_name, cloudformation_json, region_name, **kwargs
181    ):
182        ecr_backend = ecr_backends[region_name]
183        properties = cloudformation_json["Properties"]
184
185        encryption_config = properties.get("EncryptionConfiguration")
186        image_scan_config = properties.get("ImageScanningConfiguration")
187        image_tag_mutablility = properties.get("ImageTagMutability")
188        tags = properties.get("Tags", [])
189
190        return ecr_backend.create_repository(
191            # RepositoryName is optional in CloudFormation, thus create a random
192            # name if necessary
193            repository_name=resource_name,
194            encryption_config=encryption_config,
195            image_scan_config=image_scan_config,
196            image_tag_mutablility=image_tag_mutablility,
197            tags=tags,
198        )
199
200    @classmethod
201    def update_from_cloudformation_json(
202        cls, original_resource, new_resource_name, cloudformation_json, region_name
203    ):
204        ecr_backend = ecr_backends[region_name]
205        properties = cloudformation_json["Properties"]
206        encryption_configuration = properties.get(
207            "EncryptionConfiguration", {"encryptionType": "AES256"}
208        )
209
210        if (
211            new_resource_name == original_resource.name
212            and encryption_configuration == original_resource.encryption_configuration
213        ):
214            original_resource.update(
215                properties.get("ImageScanningConfiguration"),
216                properties.get("ImageTagMutability"),
217            )
218
219            ecr_backend.tagger.tag_resource(
220                original_resource.arn, properties.get("Tags", [])
221            )
222
223            return original_resource
224        else:
225            original_resource.delete(region_name)
226            return cls.create_from_cloudformation_json(
227                new_resource_name, cloudformation_json, region_name
228            )
229
230
231class Image(BaseObject):
232    def __init__(
233        self, tag, manifest, repository, digest=None, registry_id=DEFAULT_REGISTRY_ID
234    ):
235        self.image_tag = tag
236        self.image_tags = [tag] if tag is not None else []
237        self.image_manifest = manifest
238        self.image_size_in_bytes = 50 * 1024 * 1024
239        self.repository = repository
240        self.registry_id = registry_id
241        self.image_digest = digest
242        self.image_pushed_at = str(datetime.now(timezone.utc).isoformat())
243        self.last_scan = None
244
245    def _create_digest(self):
246        image_contents = "docker_image{0}".format(int(random() * 10 ** 6))
247        self.image_digest = (
248            "sha256:%s" % hashlib.sha256(image_contents.encode("utf-8")).hexdigest()
249        )
250
251    def get_image_digest(self):
252        if not self.image_digest:
253            self._create_digest()
254        return self.image_digest
255
256    def get_image_manifest(self):
257        return self.image_manifest
258
259    def remove_tag(self, tag):
260        if tag is not None and tag in self.image_tags:
261            self.image_tags.remove(tag)
262            if self.image_tags:
263                self.image_tag = self.image_tags[-1]
264
265    def update_tag(self, tag):
266        self.image_tag = tag
267        if tag not in self.image_tags and tag is not None:
268            self.image_tags.append(tag)
269
270    @property
271    def response_object(self):
272        response_object = self.gen_response_object()
273        response_object["imageId"] = {}
274        response_object["imageId"]["imageTag"] = self.image_tag
275        response_object["imageId"]["imageDigest"] = self.get_image_digest()
276        response_object["imageManifest"] = self.image_manifest
277        response_object["repositoryName"] = self.repository
278        response_object["registryId"] = self.registry_id
279        return {
280            k: v for k, v in response_object.items() if v is not None and v != [None]
281        }
282
283    @property
284    def response_list_object(self):
285        response_object = self.gen_response_object()
286        response_object["imageTag"] = self.image_tag
287        response_object["imageDigest"] = self.get_image_digest()
288        return {
289            k: v for k, v in response_object.items() if v is not None and v != [None]
290        }
291
292    @property
293    def response_describe_object(self):
294        response_object = self.gen_response_object()
295        response_object["imageTags"] = self.image_tags
296        response_object["imageDigest"] = self.get_image_digest()
297        response_object["imageManifest"] = self.image_manifest
298        response_object["repositoryName"] = self.repository
299        response_object["registryId"] = self.registry_id
300        response_object["imageSizeInBytes"] = self.image_size_in_bytes
301        response_object["imagePushedAt"] = self.image_pushed_at
302        return {k: v for k, v in response_object.items() if v is not None and v != []}
303
304    @property
305    def response_batch_get_image(self):
306        response_object = {}
307        response_object["imageId"] = {}
308        response_object["imageId"]["imageTag"] = self.image_tag
309        response_object["imageId"]["imageDigest"] = self.get_image_digest()
310        response_object["imageManifest"] = self.image_manifest
311        response_object["repositoryName"] = self.repository
312        response_object["registryId"] = self.registry_id
313        return {
314            k: v for k, v in response_object.items() if v is not None and v != [None]
315        }
316
317    @property
318    def response_batch_delete_image(self):
319        response_object = {}
320        response_object["imageDigest"] = self.get_image_digest()
321        response_object["imageTag"] = self.image_tag
322        return {
323            k: v for k, v in response_object.items() if v is not None and v != [None]
324        }
325
326
327class ECRBackend(BaseBackend):
328    def __init__(self, region_name):
329        self.region_name = region_name
330        self.registry_policy = None
331        self.replication_config = {"rules": []}
332        self.repositories: Dict[str, Repository] = {}
333        self.tagger = TaggingService(tag_name="tags")
334
335    def reset(self):
336        region_name = self.region_name
337        self.__dict__ = {}
338        self.__init__(region_name)
339
340    @staticmethod
341    def default_vpc_endpoint_service(service_region, zones):
342        """Default VPC endpoint service."""
343        docker_endpoint = {
344            "AcceptanceRequired": False,
345            "AvailabilityZones": zones,
346            "BaseEndpointDnsNames": [f"dkr.ecr.{service_region}.vpce.amazonaws.com"],
347            "ManagesVpcEndpoints": False,
348            "Owner": "amazon",
349            "PrivateDnsName": f"*.dkr.ecr.{service_region}.amazonaws.com",
350            "PrivateDnsNameVerificationState": "verified",
351            "PrivateDnsNames": [
352                {"PrivateDnsName": f"*.dkr.ecr.{service_region}.amazonaws.com"}
353            ],
354            "ServiceId": f"vpce-svc-{BaseBackend.vpce_random_number()}",
355            "ServiceName": f"com.amazonaws.{service_region}.ecr.dkr",
356            "ServiceType": [{"ServiceType": "Interface"}],
357            "Tags": [],
358            "VpcEndpointPolicySupported": True,
359        }
360        return BaseBackend.default_vpc_endpoint_service_factory(
361            service_region, zones, "api.ecr", special_service_name="ecr.api",
362        ) + [docker_endpoint]
363
364    def _get_repository(self, name, registry_id=None) -> Repository:
365        repo = self.repositories.get(name)
366        reg_id = registry_id or DEFAULT_REGISTRY_ID
367
368        if not repo or repo.registry_id != reg_id:
369            raise RepositoryNotFoundException(name, reg_id)
370        return repo
371
372    @staticmethod
373    def _parse_resource_arn(resource_arn) -> EcrRepositoryArn:
374        match = re.match(ECR_REPOSITORY_ARN_PATTERN, resource_arn)
375        if not match:
376            raise InvalidParameterException(
377                "Invalid parameter at 'resourceArn' failed to satisfy constraint: "
378                "'Invalid ARN'"
379            )
380        return EcrRepositoryArn(**match.groupdict())
381
382    def describe_repositories(self, registry_id=None, repository_names=None):
383        """
384        maxResults and nextToken not implemented
385        """
386        if repository_names:
387            for repository_name in repository_names:
388                if repository_name not in self.repositories:
389                    raise RepositoryNotFoundException(
390                        repository_name, registry_id or DEFAULT_REGISTRY_ID
391                    )
392
393        repositories = []
394        for repository in self.repositories.values():
395            # If a registry_id was supplied, ensure this repository matches
396            if registry_id:
397                if repository.registry_id != registry_id:
398                    continue
399            # If a list of repository names was supplied, esure this repository
400            # is in that list
401            if repository_names:
402                if repository.name not in repository_names:
403                    continue
404            repositories.append(repository.response_object)
405        return repositories
406
407    def create_repository(
408        self,
409        repository_name,
410        encryption_config,
411        image_scan_config,
412        image_tag_mutablility,
413        tags,
414    ):
415        if self.repositories.get(repository_name):
416            raise RepositoryAlreadyExistsException(repository_name, DEFAULT_REGISTRY_ID)
417
418        repository = Repository(
419            region_name=self.region_name,
420            repository_name=repository_name,
421            encryption_config=encryption_config,
422            image_scan_config=image_scan_config,
423            image_tag_mutablility=image_tag_mutablility,
424        )
425        self.repositories[repository_name] = repository
426        self.tagger.tag_resource(repository.arn, tags)
427
428        return repository
429
430    def delete_repository(self, repository_name, registry_id=None, force=False):
431        repo = self._get_repository(repository_name, registry_id)
432
433        if repo.images and not force:
434            raise RepositoryNotEmptyException(
435                repository_name, registry_id or DEFAULT_REGISTRY_ID
436            )
437
438        self.tagger.delete_all_tags_for_resource(repo.arn)
439        return self.repositories.pop(repository_name)
440
441    def list_images(self, repository_name, registry_id=None):
442        """
443        maxResults and filtering not implemented
444        """
445        repository = None
446        found = False
447        if repository_name in self.repositories:
448            repository = self.repositories[repository_name]
449            if registry_id:
450                if repository.registry_id == registry_id:
451                    found = True
452            else:
453                found = True
454
455        if not found:
456            raise RepositoryNotFoundException(
457                repository_name, registry_id or DEFAULT_REGISTRY_ID
458            )
459
460        images = []
461        for image in repository.images:
462            images.append(image)
463        return images
464
465    def describe_images(self, repository_name, registry_id=None, image_ids=None):
466        repository = self._get_repository(repository_name, registry_id)
467
468        if image_ids:
469            response = set(
470                repository._get_image(
471                    image_id.get("imageTag"), image_id.get("imageDigest")
472                )
473                for image_id in image_ids
474            )
475
476        else:
477            response = []
478            for image in repository.images:
479                response.append(image)
480
481        return response
482
483    def put_image(self, repository_name, image_manifest, image_tag):
484        if repository_name in self.repositories:
485            repository = self.repositories[repository_name]
486        else:
487            raise Exception("{0} is not a repository".format(repository_name))
488
489        existing_images = list(
490            filter(
491                lambda x: x.response_object["imageManifest"] == image_manifest,
492                repository.images,
493            )
494        )
495        if not existing_images:
496            # this image is not in ECR yet
497            image = Image(image_tag, image_manifest, repository_name)
498            repository.images.append(image)
499            return image
500        else:
501            # update existing image
502            existing_images[0].update_tag(image_tag)
503            return existing_images[0]
504
505    def batch_get_image(
506        self,
507        repository_name,
508        registry_id=None,
509        image_ids=None,
510        accepted_media_types=None,
511    ):
512        if repository_name in self.repositories:
513            repository = self.repositories[repository_name]
514        else:
515            raise RepositoryNotFoundException(
516                repository_name, registry_id or DEFAULT_REGISTRY_ID
517            )
518
519        if not image_ids:
520            raise ParamValidationError(
521                msg='Missing required parameter in input: "imageIds"'
522            )
523
524        response = {"images": [], "failures": []}
525
526        for image_id in image_ids:
527            found = False
528            for image in repository.images:
529                if (
530                    "imageDigest" in image_id
531                    and image.get_image_digest() == image_id["imageDigest"]
532                ) or (
533                    "imageTag" in image_id and image.image_tag == image_id["imageTag"]
534                ):
535                    found = True
536                    response["images"].append(image.response_batch_get_image)
537
538        if not found:
539            response["failures"].append(
540                {
541                    "imageId": {"imageTag": image_id.get("imageTag", "null")},
542                    "failureCode": "ImageNotFound",
543                    "failureReason": "Requested image not found",
544                }
545            )
546
547        return response
548
549    def batch_delete_image(self, repository_name, registry_id=None, image_ids=None):
550        if repository_name in self.repositories:
551            repository = self.repositories[repository_name]
552        else:
553            raise RepositoryNotFoundException(
554                repository_name, registry_id or DEFAULT_REGISTRY_ID
555            )
556
557        if not image_ids:
558            raise ParamValidationError(
559                msg='Missing required parameter in input: "imageIds"'
560            )
561
562        response = {"imageIds": [], "failures": []}
563
564        for image_id in image_ids:
565            image_found = False
566
567            # Is request missing both digest and tag?
568            if "imageDigest" not in image_id and "imageTag" not in image_id:
569                response["failures"].append(
570                    {
571                        "imageId": {},
572                        "failureCode": "MissingDigestAndTag",
573                        "failureReason": "Invalid request parameters: both tag and digest cannot be null",
574                    }
575                )
576                continue
577
578            # If we have a digest, is it valid?
579            if "imageDigest" in image_id:
580                pattern = re.compile(r"^[0-9a-zA-Z_+\.-]+:[0-9a-fA-F]{64}")
581                if not pattern.match(image_id.get("imageDigest")):
582                    response["failures"].append(
583                        {
584                            "imageId": {
585                                "imageDigest": image_id.get("imageDigest", "null")
586                            },
587                            "failureCode": "InvalidImageDigest",
588                            "failureReason": "Invalid request parameters: image digest should satisfy the regex '[a-zA-Z0-9-_+.]+:[a-fA-F0-9]+'",
589                        }
590                    )
591                    continue
592
593            for num, image in enumerate(repository.images):
594
595                # Search by matching both digest and tag
596                if "imageDigest" in image_id and "imageTag" in image_id:
597                    if (
598                        image_id["imageDigest"] == image.get_image_digest()
599                        and image_id["imageTag"] in image.image_tags
600                    ):
601                        image_found = True
602                        for image_tag in reversed(image.image_tags):
603                            repository.images[num].image_tag = image_tag
604                            response["imageIds"].append(
605                                image.response_batch_delete_image
606                            )
607                            repository.images[num].remove_tag(image_tag)
608                        del repository.images[num]
609
610                # Search by matching digest
611                elif (
612                    "imageDigest" in image_id
613                    and image.get_image_digest() == image_id["imageDigest"]
614                ):
615                    image_found = True
616                    for image_tag in reversed(image.image_tags):
617                        repository.images[num].image_tag = image_tag
618                        response["imageIds"].append(image.response_batch_delete_image)
619                        repository.images[num].remove_tag(image_tag)
620                    del repository.images[num]
621
622                # Search by matching tag
623                elif (
624                    "imageTag" in image_id and image_id["imageTag"] in image.image_tags
625                ):
626                    image_found = True
627                    repository.images[num].image_tag = image_id["imageTag"]
628                    response["imageIds"].append(image.response_batch_delete_image)
629                    if len(image.image_tags) > 1:
630                        repository.images[num].remove_tag(image_id["imageTag"])
631                    else:
632                        repository.images.remove(image)
633
634                if not image_found:
635                    failure_response = {
636                        "imageId": {},
637                        "failureCode": "ImageNotFound",
638                        "failureReason": "Requested image not found",
639                    }
640
641                    if "imageDigest" in image_id:
642                        failure_response["imageId"]["imageDigest"] = image_id.get(
643                            "imageDigest", "null"
644                        )
645
646                    if "imageTag" in image_id:
647                        failure_response["imageId"]["imageTag"] = image_id.get(
648                            "imageTag", "null"
649                        )
650
651                    response["failures"].append(failure_response)
652
653        return response
654
655    def list_tags_for_resource(self, arn):
656        resource = self._parse_resource_arn(arn)
657        repo = self._get_repository(resource.repo_name, resource.account_id)
658
659        return self.tagger.list_tags_for_resource(repo.arn)
660
661    def tag_resource(self, arn, tags):
662        resource = self._parse_resource_arn(arn)
663        repo = self._get_repository(resource.repo_name, resource.account_id)
664        self.tagger.tag_resource(repo.arn, tags)
665
666        return {}
667
668    def untag_resource(self, arn, tag_keys):
669        resource = self._parse_resource_arn(arn)
670        repo = self._get_repository(resource.repo_name, resource.account_id)
671        self.tagger.untag_resource_using_names(repo.arn, tag_keys)
672
673        return {}
674
675    def put_image_tag_mutability(
676        self, registry_id, repository_name, image_tag_mutability
677    ):
678        if image_tag_mutability not in ["IMMUTABLE", "MUTABLE"]:
679            raise InvalidParameterException(
680                "Invalid parameter at 'imageTagMutability' failed to satisfy constraint: "
681                "'Member must satisfy enum value set: [IMMUTABLE, MUTABLE]'"
682            )
683
684        repo = self._get_repository(repository_name, registry_id)
685        repo.update(image_tag_mutability=image_tag_mutability)
686
687        return {
688            "registryId": repo.registry_id,
689            "repositoryName": repository_name,
690            "imageTagMutability": repo.image_tag_mutability,
691        }
692
693    def put_image_scanning_configuration(
694        self, registry_id, repository_name, image_scan_config
695    ):
696        repo = self._get_repository(repository_name, registry_id)
697        repo.update(image_scan_config=image_scan_config)
698
699        return {
700            "registryId": repo.registry_id,
701            "repositoryName": repository_name,
702            "imageScanningConfiguration": repo.image_scanning_configuration,
703        }
704
705    def set_repository_policy(self, registry_id, repository_name, policy_text):
706        repo = self._get_repository(repository_name, registry_id)
707
708        try:
709            iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text)
710            # the repository policy can be defined without a resource field
711            iam_policy_document_validator._validate_resource_exist = lambda: None
712            # the repository policy can have the old version 2008-10-17
713            iam_policy_document_validator._validate_version = lambda: None
714            iam_policy_document_validator.validate()
715        except MalformedPolicyDocument:
716            raise InvalidParameterException(
717                "Invalid parameter at 'PolicyText' failed to satisfy constraint: "
718                "'Invalid repository policy provided'"
719            )
720
721        repo.policy = policy_text
722
723        return {
724            "registryId": repo.registry_id,
725            "repositoryName": repository_name,
726            "policyText": repo.policy,
727        }
728
729    def get_repository_policy(self, registry_id, repository_name):
730        repo = self._get_repository(repository_name, registry_id)
731
732        if not repo.policy:
733            raise RepositoryPolicyNotFoundException(repository_name, repo.registry_id)
734
735        return {
736            "registryId": repo.registry_id,
737            "repositoryName": repository_name,
738            "policyText": repo.policy,
739        }
740
741    def delete_repository_policy(self, registry_id, repository_name):
742        repo = self._get_repository(repository_name, registry_id)
743        policy = repo.policy
744
745        if not policy:
746            raise RepositoryPolicyNotFoundException(repository_name, repo.registry_id)
747
748        repo.policy = None
749
750        return {
751            "registryId": repo.registry_id,
752            "repositoryName": repository_name,
753            "policyText": policy,
754        }
755
756    def put_lifecycle_policy(self, registry_id, repository_name, lifecycle_policy_text):
757        repo = self._get_repository(repository_name, registry_id)
758
759        validator = EcrLifecyclePolicyValidator(lifecycle_policy_text)
760        validator.validate()
761
762        repo.lifecycle_policy = lifecycle_policy_text
763
764        return {
765            "registryId": repo.registry_id,
766            "repositoryName": repository_name,
767            "lifecyclePolicyText": repo.lifecycle_policy,
768        }
769
770    def get_lifecycle_policy(self, registry_id, repository_name):
771        repo = self._get_repository(repository_name, registry_id)
772
773        if not repo.lifecycle_policy:
774            raise LifecyclePolicyNotFoundException(repository_name, repo.registry_id)
775
776        return {
777            "registryId": repo.registry_id,
778            "repositoryName": repository_name,
779            "lifecyclePolicyText": repo.lifecycle_policy,
780            "lastEvaluatedAt": iso_8601_datetime_without_milliseconds(
781                datetime.utcnow()
782            ),
783        }
784
785    def delete_lifecycle_policy(self, registry_id, repository_name):
786        repo = self._get_repository(repository_name, registry_id)
787        policy = repo.lifecycle_policy
788
789        if not policy:
790            raise LifecyclePolicyNotFoundException(repository_name, repo.registry_id)
791
792        repo.lifecycle_policy = None
793
794        return {
795            "registryId": repo.registry_id,
796            "repositoryName": repository_name,
797            "lifecyclePolicyText": policy,
798            "lastEvaluatedAt": iso_8601_datetime_without_milliseconds(
799                datetime.utcnow()
800            ),
801        }
802
803    def _validate_registry_policy_action(self, policy_text):
804        # only CreateRepository & ReplicateImage actions are allowed
805        VALID_ACTIONS = {"ecr:CreateRepository", "ecr:ReplicateImage"}
806
807        policy = json.loads(policy_text)
808        for statement in policy["Statement"]:
809            if set(statement["Action"]) - VALID_ACTIONS:
810                raise MalformedPolicyDocument()
811
812    def put_registry_policy(self, policy_text):
813        try:
814            iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text)
815            iam_policy_document_validator.validate()
816
817            self._validate_registry_policy_action(policy_text)
818        except MalformedPolicyDocument:
819            raise InvalidParameterException(
820                "Invalid parameter at 'PolicyText' failed to satisfy constraint: "
821                "'Invalid registry policy provided'"
822            )
823
824        self.registry_policy = policy_text
825
826        return {
827            "registryId": ACCOUNT_ID,
828            "policyText": policy_text,
829        }
830
831    def get_registry_policy(self):
832        if not self.registry_policy:
833            raise RegistryPolicyNotFoundException(ACCOUNT_ID)
834
835        return {
836            "registryId": ACCOUNT_ID,
837            "policyText": self.registry_policy,
838        }
839
840    def delete_registry_policy(self):
841        policy = self.registry_policy
842        if not policy:
843            raise RegistryPolicyNotFoundException(ACCOUNT_ID)
844
845        self.registry_policy = None
846
847        return {
848            "registryId": ACCOUNT_ID,
849            "policyText": policy,
850        }
851
852    def start_image_scan(self, registry_id, repository_name, image_id):
853        repo = self._get_repository(repository_name, registry_id)
854
855        image = repo._get_image(image_id.get("imageTag"), image_id.get("imageDigest"))
856
857        # scanning an image is only allowed once per day
858        if image.last_scan and image.last_scan.date() == datetime.today().date():
859            raise LimitExceededException()
860
861        image.last_scan = datetime.today()
862
863        return {
864            "registryId": repo.registry_id,
865            "repositoryName": repository_name,
866            "imageId": {
867                "imageDigest": image.image_digest,
868                "imageTag": image.image_tag,
869            },
870            "imageScanStatus": {"status": "IN_PROGRESS"},
871        }
872
873    def describe_image_scan_findings(self, registry_id, repository_name, image_id):
874        repo = self._get_repository(repository_name, registry_id)
875
876        image = repo._get_image(image_id.get("imageTag"), image_id.get("imageDigest"))
877
878        if not image.last_scan:
879            image_id_rep = "{{imageDigest:'{0}', imageTag:'{1}'}}".format(
880                image_id.get("imageDigest") or "null",
881                image_id.get("imageTag") or "null",
882            )
883            raise ScanNotFoundException(
884                image_id=image_id_rep,
885                repository_name=repository_name,
886                registry_id=repo.registry_id,
887            )
888
889        return {
890            "registryId": repo.registry_id,
891            "repositoryName": repository_name,
892            "imageId": {
893                "imageDigest": image.image_digest,
894                "imageTag": image.image_tag,
895            },
896            "imageScanStatus": {
897                "status": "COMPLETE",
898                "description": "The scan was completed successfully.",
899            },
900            "imageScanFindings": {
901                "imageScanCompletedAt": iso_8601_datetime_without_milliseconds(
902                    image.last_scan
903                ),
904                "vulnerabilitySourceUpdatedAt": iso_8601_datetime_without_milliseconds(
905                    datetime.utcnow()
906                ),
907                "findings": [
908                    {
909                        "name": "CVE-9999-9999",
910                        "uri": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-9999-9999",
911                        "severity": "HIGH",
912                        "attributes": [
913                            {"key": "package_version", "value": "9.9.9"},
914                            {"key": "package_name", "value": "moto_fake"},
915                            {
916                                "key": "CVSS2_VECTOR",
917                                "value": "AV:N/AC:L/Au:N/C:P/I:P/A:P",
918                            },
919                            {"key": "CVSS2_SCORE", "value": "7.5"},
920                        ],
921                    }
922                ],
923                "findingSeverityCounts": {"HIGH": 1},
924            },
925        }
926
927    def put_replication_configuration(self, replication_config):
928        rules = replication_config["rules"]
929        if len(rules) > 1:
930            raise ValidationException("This feature is disabled")
931
932        if len(rules) == 1:
933            for dest in rules[0]["destinations"]:
934                if (
935                    dest["region"] == self.region_name
936                    and dest["registryId"] == DEFAULT_REGISTRY_ID
937                ):
938                    raise InvalidParameterException(
939                        "Invalid parameter at 'replicationConfiguration' failed to satisfy constraint: "
940                        "'Replication destination cannot be the same as the source registry'"
941                    )
942
943        self.replication_config = replication_config
944
945        return {"replicationConfiguration": replication_config}
946
947    def describe_registry(self):
948        return {
949            "registryId": DEFAULT_REGISTRY_ID,
950            "replicationConfiguration": self.replication_config,
951        }
952
953
954ecr_backends = {}
955for region, ec2_backend in ec2_backends.items():
956    ecr_backends[region] = ECRBackend(region)
957