1import hashlib 2import json 3import re 4import uuid 5from collections import namedtuple 6from datetime import datetime, timezone 7from random import random 8from typing import Dict, List 9 10from botocore.exceptions import ParamValidationError 11 12from moto.core import BaseBackend, BaseModel, CloudFormationModel, ACCOUNT_ID 13from moto.core.utils import iso_8601_datetime_without_milliseconds 14from moto.ec2 import ec2_backends 15from moto.ecr.exceptions import ( 16 ImageNotFoundException, 17 RepositoryNotFoundException, 18 RepositoryAlreadyExistsException, 19 RepositoryNotEmptyException, 20 InvalidParameterException, 21 RepositoryPolicyNotFoundException, 22 LifecyclePolicyNotFoundException, 23 RegistryPolicyNotFoundException, 24 LimitExceededException, 25 ScanNotFoundException, 26 ValidationException, 27) 28from moto.ecr.policy_validation import EcrLifecyclePolicyValidator 29from moto.iam.exceptions import MalformedPolicyDocument 30from moto.iam.policy_validation import IAMPolicyDocumentValidator 31from moto.utilities.tagging_service import TaggingService 32 33DEFAULT_REGISTRY_ID = ACCOUNT_ID 34ECR_REPOSITORY_ARN_PATTERN = "^arn:(?P<partition>[^:]+):ecr:(?P<region>[^:]+):(?P<account_id>[^:]+):repository/(?P<repo_name>.*)$" 35 36EcrRepositoryArn = namedtuple( 37 "EcrRepositoryArn", ["partition", "region", "account_id", "repo_name"] 38) 39 40 41class BaseObject(BaseModel): 42 def camelCase(self, key): 43 words = [] 44 for i, word in enumerate(key.split("_")): 45 if i > 0: 46 words.append(word.title()) 47 else: 48 words.append(word) 49 return "".join(words) 50 51 def gen_response_object(self): 52 response_object = dict() 53 for key, value in self.__dict__.items(): 54 if "_" in key: 55 response_object[self.camelCase(key)] = value 56 else: 57 response_object[key] = value 58 return response_object 59 60 @property 61 def response_object(self): 62 return self.gen_response_object() 63 64 65class Repository(BaseObject, CloudFormationModel): 66 def __init__( 67 self, 68 region_name, 69 repository_name, 70 encryption_config, 71 image_scan_config, 72 image_tag_mutablility, 73 ): 74 self.region_name = region_name 75 self.registry_id = DEFAULT_REGISTRY_ID 76 self.arn = ( 77 f"arn:aws:ecr:{region_name}:{self.registry_id}:repository/{repository_name}" 78 ) 79 self.name = repository_name 80 self.created_at = datetime.utcnow() 81 self.uri = ( 82 f"{self.registry_id}.dkr.ecr.{region_name}.amazonaws.com/{repository_name}" 83 ) 84 self.image_tag_mutability = image_tag_mutablility or "MUTABLE" 85 self.image_scanning_configuration = image_scan_config or {"scanOnPush": False} 86 self.encryption_configuration = self._determine_encryption_config( 87 encryption_config 88 ) 89 self.policy = None 90 self.lifecycle_policy = None 91 self.images: List[Image] = [] 92 93 def _determine_encryption_config(self, encryption_config): 94 if not encryption_config: 95 return {"encryptionType": "AES256"} 96 if encryption_config == {"encryptionType": "KMS"}: 97 encryption_config[ 98 "kmsKey" 99 ] = f"arn:aws:kms:{self.region_name}:{ACCOUNT_ID}:key/{uuid.uuid4()}" 100 return encryption_config 101 102 def _get_image(self, image_tag, image_digest): 103 # you can either search for one or both 104 image = next( 105 ( 106 i 107 for i in self.images 108 if (not image_tag or image_tag in i.image_tags) 109 and (not image_digest or image_digest == i.get_image_digest()) 110 ), 111 None, 112 ) 113 114 if not image: 115 image_id_rep = "{{imageDigest:'{0}', imageTag:'{1}'}}".format( 116 image_digest or "null", image_tag or "null" 117 ) 118 119 raise ImageNotFoundException( 120 image_id=image_id_rep, 121 repository_name=self.name, 122 registry_id=self.registry_id, 123 ) 124 125 return image 126 127 @property 128 def physical_resource_id(self): 129 return self.name 130 131 @property 132 def response_object(self): 133 response_object = self.gen_response_object() 134 135 response_object["registryId"] = self.registry_id 136 response_object["repositoryArn"] = self.arn 137 response_object["repositoryName"] = self.name 138 response_object["repositoryUri"] = self.uri 139 response_object["createdAt"] = iso_8601_datetime_without_milliseconds( 140 self.created_at 141 ) 142 del response_object["arn"], response_object["name"], response_object["images"] 143 return response_object 144 145 def update(self, image_scan_config=None, image_tag_mutability=None): 146 if image_scan_config: 147 self.image_scanning_configuration = image_scan_config 148 if image_tag_mutability: 149 self.image_tag_mutability = image_tag_mutability 150 151 def delete(self, region_name): 152 ecr_backend = ecr_backends[region_name] 153 ecr_backend.delete_repository(self.name) 154 155 @classmethod 156 def has_cfn_attr(cls, attribute): 157 return attribute in ["Arn", "RepositoryUri"] 158 159 def get_cfn_attribute(self, attribute_name): 160 from moto.cloudformation.exceptions import UnformattedGetAttTemplateException 161 162 if attribute_name == "Arn": 163 return self.arn 164 elif attribute_name == "RepositoryUri": 165 return self.uri 166 167 raise UnformattedGetAttTemplateException() 168 169 @staticmethod 170 def cloudformation_name_type(): 171 return "RepositoryName" 172 173 @staticmethod 174 def cloudformation_type(): 175 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-ecr-repository.html 176 return "AWS::ECR::Repository" 177 178 @classmethod 179 def create_from_cloudformation_json( 180 cls, resource_name, cloudformation_json, region_name, **kwargs 181 ): 182 ecr_backend = ecr_backends[region_name] 183 properties = cloudformation_json["Properties"] 184 185 encryption_config = properties.get("EncryptionConfiguration") 186 image_scan_config = properties.get("ImageScanningConfiguration") 187 image_tag_mutablility = properties.get("ImageTagMutability") 188 tags = properties.get("Tags", []) 189 190 return ecr_backend.create_repository( 191 # RepositoryName is optional in CloudFormation, thus create a random 192 # name if necessary 193 repository_name=resource_name, 194 encryption_config=encryption_config, 195 image_scan_config=image_scan_config, 196 image_tag_mutablility=image_tag_mutablility, 197 tags=tags, 198 ) 199 200 @classmethod 201 def update_from_cloudformation_json( 202 cls, original_resource, new_resource_name, cloudformation_json, region_name 203 ): 204 ecr_backend = ecr_backends[region_name] 205 properties = cloudformation_json["Properties"] 206 encryption_configuration = properties.get( 207 "EncryptionConfiguration", {"encryptionType": "AES256"} 208 ) 209 210 if ( 211 new_resource_name == original_resource.name 212 and encryption_configuration == original_resource.encryption_configuration 213 ): 214 original_resource.update( 215 properties.get("ImageScanningConfiguration"), 216 properties.get("ImageTagMutability"), 217 ) 218 219 ecr_backend.tagger.tag_resource( 220 original_resource.arn, properties.get("Tags", []) 221 ) 222 223 return original_resource 224 else: 225 original_resource.delete(region_name) 226 return cls.create_from_cloudformation_json( 227 new_resource_name, cloudformation_json, region_name 228 ) 229 230 231class Image(BaseObject): 232 def __init__( 233 self, tag, manifest, repository, digest=None, registry_id=DEFAULT_REGISTRY_ID 234 ): 235 self.image_tag = tag 236 self.image_tags = [tag] if tag is not None else [] 237 self.image_manifest = manifest 238 self.image_size_in_bytes = 50 * 1024 * 1024 239 self.repository = repository 240 self.registry_id = registry_id 241 self.image_digest = digest 242 self.image_pushed_at = str(datetime.now(timezone.utc).isoformat()) 243 self.last_scan = None 244 245 def _create_digest(self): 246 image_contents = "docker_image{0}".format(int(random() * 10 ** 6)) 247 self.image_digest = ( 248 "sha256:%s" % hashlib.sha256(image_contents.encode("utf-8")).hexdigest() 249 ) 250 251 def get_image_digest(self): 252 if not self.image_digest: 253 self._create_digest() 254 return self.image_digest 255 256 def get_image_manifest(self): 257 return self.image_manifest 258 259 def remove_tag(self, tag): 260 if tag is not None and tag in self.image_tags: 261 self.image_tags.remove(tag) 262 if self.image_tags: 263 self.image_tag = self.image_tags[-1] 264 265 def update_tag(self, tag): 266 self.image_tag = tag 267 if tag not in self.image_tags and tag is not None: 268 self.image_tags.append(tag) 269 270 @property 271 def response_object(self): 272 response_object = self.gen_response_object() 273 response_object["imageId"] = {} 274 response_object["imageId"]["imageTag"] = self.image_tag 275 response_object["imageId"]["imageDigest"] = self.get_image_digest() 276 response_object["imageManifest"] = self.image_manifest 277 response_object["repositoryName"] = self.repository 278 response_object["registryId"] = self.registry_id 279 return { 280 k: v for k, v in response_object.items() if v is not None and v != [None] 281 } 282 283 @property 284 def response_list_object(self): 285 response_object = self.gen_response_object() 286 response_object["imageTag"] = self.image_tag 287 response_object["imageDigest"] = self.get_image_digest() 288 return { 289 k: v for k, v in response_object.items() if v is not None and v != [None] 290 } 291 292 @property 293 def response_describe_object(self): 294 response_object = self.gen_response_object() 295 response_object["imageTags"] = self.image_tags 296 response_object["imageDigest"] = self.get_image_digest() 297 response_object["imageManifest"] = self.image_manifest 298 response_object["repositoryName"] = self.repository 299 response_object["registryId"] = self.registry_id 300 response_object["imageSizeInBytes"] = self.image_size_in_bytes 301 response_object["imagePushedAt"] = self.image_pushed_at 302 return {k: v for k, v in response_object.items() if v is not None and v != []} 303 304 @property 305 def response_batch_get_image(self): 306 response_object = {} 307 response_object["imageId"] = {} 308 response_object["imageId"]["imageTag"] = self.image_tag 309 response_object["imageId"]["imageDigest"] = self.get_image_digest() 310 response_object["imageManifest"] = self.image_manifest 311 response_object["repositoryName"] = self.repository 312 response_object["registryId"] = self.registry_id 313 return { 314 k: v for k, v in response_object.items() if v is not None and v != [None] 315 } 316 317 @property 318 def response_batch_delete_image(self): 319 response_object = {} 320 response_object["imageDigest"] = self.get_image_digest() 321 response_object["imageTag"] = self.image_tag 322 return { 323 k: v for k, v in response_object.items() if v is not None and v != [None] 324 } 325 326 327class ECRBackend(BaseBackend): 328 def __init__(self, region_name): 329 self.region_name = region_name 330 self.registry_policy = None 331 self.replication_config = {"rules": []} 332 self.repositories: Dict[str, Repository] = {} 333 self.tagger = TaggingService(tag_name="tags") 334 335 def reset(self): 336 region_name = self.region_name 337 self.__dict__ = {} 338 self.__init__(region_name) 339 340 @staticmethod 341 def default_vpc_endpoint_service(service_region, zones): 342 """Default VPC endpoint service.""" 343 docker_endpoint = { 344 "AcceptanceRequired": False, 345 "AvailabilityZones": zones, 346 "BaseEndpointDnsNames": [f"dkr.ecr.{service_region}.vpce.amazonaws.com"], 347 "ManagesVpcEndpoints": False, 348 "Owner": "amazon", 349 "PrivateDnsName": f"*.dkr.ecr.{service_region}.amazonaws.com", 350 "PrivateDnsNameVerificationState": "verified", 351 "PrivateDnsNames": [ 352 {"PrivateDnsName": f"*.dkr.ecr.{service_region}.amazonaws.com"} 353 ], 354 "ServiceId": f"vpce-svc-{BaseBackend.vpce_random_number()}", 355 "ServiceName": f"com.amazonaws.{service_region}.ecr.dkr", 356 "ServiceType": [{"ServiceType": "Interface"}], 357 "Tags": [], 358 "VpcEndpointPolicySupported": True, 359 } 360 return BaseBackend.default_vpc_endpoint_service_factory( 361 service_region, zones, "api.ecr", special_service_name="ecr.api", 362 ) + [docker_endpoint] 363 364 def _get_repository(self, name, registry_id=None) -> Repository: 365 repo = self.repositories.get(name) 366 reg_id = registry_id or DEFAULT_REGISTRY_ID 367 368 if not repo or repo.registry_id != reg_id: 369 raise RepositoryNotFoundException(name, reg_id) 370 return repo 371 372 @staticmethod 373 def _parse_resource_arn(resource_arn) -> EcrRepositoryArn: 374 match = re.match(ECR_REPOSITORY_ARN_PATTERN, resource_arn) 375 if not match: 376 raise InvalidParameterException( 377 "Invalid parameter at 'resourceArn' failed to satisfy constraint: " 378 "'Invalid ARN'" 379 ) 380 return EcrRepositoryArn(**match.groupdict()) 381 382 def describe_repositories(self, registry_id=None, repository_names=None): 383 """ 384 maxResults and nextToken not implemented 385 """ 386 if repository_names: 387 for repository_name in repository_names: 388 if repository_name not in self.repositories: 389 raise RepositoryNotFoundException( 390 repository_name, registry_id or DEFAULT_REGISTRY_ID 391 ) 392 393 repositories = [] 394 for repository in self.repositories.values(): 395 # If a registry_id was supplied, ensure this repository matches 396 if registry_id: 397 if repository.registry_id != registry_id: 398 continue 399 # If a list of repository names was supplied, esure this repository 400 # is in that list 401 if repository_names: 402 if repository.name not in repository_names: 403 continue 404 repositories.append(repository.response_object) 405 return repositories 406 407 def create_repository( 408 self, 409 repository_name, 410 encryption_config, 411 image_scan_config, 412 image_tag_mutablility, 413 tags, 414 ): 415 if self.repositories.get(repository_name): 416 raise RepositoryAlreadyExistsException(repository_name, DEFAULT_REGISTRY_ID) 417 418 repository = Repository( 419 region_name=self.region_name, 420 repository_name=repository_name, 421 encryption_config=encryption_config, 422 image_scan_config=image_scan_config, 423 image_tag_mutablility=image_tag_mutablility, 424 ) 425 self.repositories[repository_name] = repository 426 self.tagger.tag_resource(repository.arn, tags) 427 428 return repository 429 430 def delete_repository(self, repository_name, registry_id=None, force=False): 431 repo = self._get_repository(repository_name, registry_id) 432 433 if repo.images and not force: 434 raise RepositoryNotEmptyException( 435 repository_name, registry_id or DEFAULT_REGISTRY_ID 436 ) 437 438 self.tagger.delete_all_tags_for_resource(repo.arn) 439 return self.repositories.pop(repository_name) 440 441 def list_images(self, repository_name, registry_id=None): 442 """ 443 maxResults and filtering not implemented 444 """ 445 repository = None 446 found = False 447 if repository_name in self.repositories: 448 repository = self.repositories[repository_name] 449 if registry_id: 450 if repository.registry_id == registry_id: 451 found = True 452 else: 453 found = True 454 455 if not found: 456 raise RepositoryNotFoundException( 457 repository_name, registry_id or DEFAULT_REGISTRY_ID 458 ) 459 460 images = [] 461 for image in repository.images: 462 images.append(image) 463 return images 464 465 def describe_images(self, repository_name, registry_id=None, image_ids=None): 466 repository = self._get_repository(repository_name, registry_id) 467 468 if image_ids: 469 response = set( 470 repository._get_image( 471 image_id.get("imageTag"), image_id.get("imageDigest") 472 ) 473 for image_id in image_ids 474 ) 475 476 else: 477 response = [] 478 for image in repository.images: 479 response.append(image) 480 481 return response 482 483 def put_image(self, repository_name, image_manifest, image_tag): 484 if repository_name in self.repositories: 485 repository = self.repositories[repository_name] 486 else: 487 raise Exception("{0} is not a repository".format(repository_name)) 488 489 existing_images = list( 490 filter( 491 lambda x: x.response_object["imageManifest"] == image_manifest, 492 repository.images, 493 ) 494 ) 495 if not existing_images: 496 # this image is not in ECR yet 497 image = Image(image_tag, image_manifest, repository_name) 498 repository.images.append(image) 499 return image 500 else: 501 # update existing image 502 existing_images[0].update_tag(image_tag) 503 return existing_images[0] 504 505 def batch_get_image( 506 self, 507 repository_name, 508 registry_id=None, 509 image_ids=None, 510 accepted_media_types=None, 511 ): 512 if repository_name in self.repositories: 513 repository = self.repositories[repository_name] 514 else: 515 raise RepositoryNotFoundException( 516 repository_name, registry_id or DEFAULT_REGISTRY_ID 517 ) 518 519 if not image_ids: 520 raise ParamValidationError( 521 msg='Missing required parameter in input: "imageIds"' 522 ) 523 524 response = {"images": [], "failures": []} 525 526 for image_id in image_ids: 527 found = False 528 for image in repository.images: 529 if ( 530 "imageDigest" in image_id 531 and image.get_image_digest() == image_id["imageDigest"] 532 ) or ( 533 "imageTag" in image_id and image.image_tag == image_id["imageTag"] 534 ): 535 found = True 536 response["images"].append(image.response_batch_get_image) 537 538 if not found: 539 response["failures"].append( 540 { 541 "imageId": {"imageTag": image_id.get("imageTag", "null")}, 542 "failureCode": "ImageNotFound", 543 "failureReason": "Requested image not found", 544 } 545 ) 546 547 return response 548 549 def batch_delete_image(self, repository_name, registry_id=None, image_ids=None): 550 if repository_name in self.repositories: 551 repository = self.repositories[repository_name] 552 else: 553 raise RepositoryNotFoundException( 554 repository_name, registry_id or DEFAULT_REGISTRY_ID 555 ) 556 557 if not image_ids: 558 raise ParamValidationError( 559 msg='Missing required parameter in input: "imageIds"' 560 ) 561 562 response = {"imageIds": [], "failures": []} 563 564 for image_id in image_ids: 565 image_found = False 566 567 # Is request missing both digest and tag? 568 if "imageDigest" not in image_id and "imageTag" not in image_id: 569 response["failures"].append( 570 { 571 "imageId": {}, 572 "failureCode": "MissingDigestAndTag", 573 "failureReason": "Invalid request parameters: both tag and digest cannot be null", 574 } 575 ) 576 continue 577 578 # If we have a digest, is it valid? 579 if "imageDigest" in image_id: 580 pattern = re.compile(r"^[0-9a-zA-Z_+\.-]+:[0-9a-fA-F]{64}") 581 if not pattern.match(image_id.get("imageDigest")): 582 response["failures"].append( 583 { 584 "imageId": { 585 "imageDigest": image_id.get("imageDigest", "null") 586 }, 587 "failureCode": "InvalidImageDigest", 588 "failureReason": "Invalid request parameters: image digest should satisfy the regex '[a-zA-Z0-9-_+.]+:[a-fA-F0-9]+'", 589 } 590 ) 591 continue 592 593 for num, image in enumerate(repository.images): 594 595 # Search by matching both digest and tag 596 if "imageDigest" in image_id and "imageTag" in image_id: 597 if ( 598 image_id["imageDigest"] == image.get_image_digest() 599 and image_id["imageTag"] in image.image_tags 600 ): 601 image_found = True 602 for image_tag in reversed(image.image_tags): 603 repository.images[num].image_tag = image_tag 604 response["imageIds"].append( 605 image.response_batch_delete_image 606 ) 607 repository.images[num].remove_tag(image_tag) 608 del repository.images[num] 609 610 # Search by matching digest 611 elif ( 612 "imageDigest" in image_id 613 and image.get_image_digest() == image_id["imageDigest"] 614 ): 615 image_found = True 616 for image_tag in reversed(image.image_tags): 617 repository.images[num].image_tag = image_tag 618 response["imageIds"].append(image.response_batch_delete_image) 619 repository.images[num].remove_tag(image_tag) 620 del repository.images[num] 621 622 # Search by matching tag 623 elif ( 624 "imageTag" in image_id and image_id["imageTag"] in image.image_tags 625 ): 626 image_found = True 627 repository.images[num].image_tag = image_id["imageTag"] 628 response["imageIds"].append(image.response_batch_delete_image) 629 if len(image.image_tags) > 1: 630 repository.images[num].remove_tag(image_id["imageTag"]) 631 else: 632 repository.images.remove(image) 633 634 if not image_found: 635 failure_response = { 636 "imageId": {}, 637 "failureCode": "ImageNotFound", 638 "failureReason": "Requested image not found", 639 } 640 641 if "imageDigest" in image_id: 642 failure_response["imageId"]["imageDigest"] = image_id.get( 643 "imageDigest", "null" 644 ) 645 646 if "imageTag" in image_id: 647 failure_response["imageId"]["imageTag"] = image_id.get( 648 "imageTag", "null" 649 ) 650 651 response["failures"].append(failure_response) 652 653 return response 654 655 def list_tags_for_resource(self, arn): 656 resource = self._parse_resource_arn(arn) 657 repo = self._get_repository(resource.repo_name, resource.account_id) 658 659 return self.tagger.list_tags_for_resource(repo.arn) 660 661 def tag_resource(self, arn, tags): 662 resource = self._parse_resource_arn(arn) 663 repo = self._get_repository(resource.repo_name, resource.account_id) 664 self.tagger.tag_resource(repo.arn, tags) 665 666 return {} 667 668 def untag_resource(self, arn, tag_keys): 669 resource = self._parse_resource_arn(arn) 670 repo = self._get_repository(resource.repo_name, resource.account_id) 671 self.tagger.untag_resource_using_names(repo.arn, tag_keys) 672 673 return {} 674 675 def put_image_tag_mutability( 676 self, registry_id, repository_name, image_tag_mutability 677 ): 678 if image_tag_mutability not in ["IMMUTABLE", "MUTABLE"]: 679 raise InvalidParameterException( 680 "Invalid parameter at 'imageTagMutability' failed to satisfy constraint: " 681 "'Member must satisfy enum value set: [IMMUTABLE, MUTABLE]'" 682 ) 683 684 repo = self._get_repository(repository_name, registry_id) 685 repo.update(image_tag_mutability=image_tag_mutability) 686 687 return { 688 "registryId": repo.registry_id, 689 "repositoryName": repository_name, 690 "imageTagMutability": repo.image_tag_mutability, 691 } 692 693 def put_image_scanning_configuration( 694 self, registry_id, repository_name, image_scan_config 695 ): 696 repo = self._get_repository(repository_name, registry_id) 697 repo.update(image_scan_config=image_scan_config) 698 699 return { 700 "registryId": repo.registry_id, 701 "repositoryName": repository_name, 702 "imageScanningConfiguration": repo.image_scanning_configuration, 703 } 704 705 def set_repository_policy(self, registry_id, repository_name, policy_text): 706 repo = self._get_repository(repository_name, registry_id) 707 708 try: 709 iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text) 710 # the repository policy can be defined without a resource field 711 iam_policy_document_validator._validate_resource_exist = lambda: None 712 # the repository policy can have the old version 2008-10-17 713 iam_policy_document_validator._validate_version = lambda: None 714 iam_policy_document_validator.validate() 715 except MalformedPolicyDocument: 716 raise InvalidParameterException( 717 "Invalid parameter at 'PolicyText' failed to satisfy constraint: " 718 "'Invalid repository policy provided'" 719 ) 720 721 repo.policy = policy_text 722 723 return { 724 "registryId": repo.registry_id, 725 "repositoryName": repository_name, 726 "policyText": repo.policy, 727 } 728 729 def get_repository_policy(self, registry_id, repository_name): 730 repo = self._get_repository(repository_name, registry_id) 731 732 if not repo.policy: 733 raise RepositoryPolicyNotFoundException(repository_name, repo.registry_id) 734 735 return { 736 "registryId": repo.registry_id, 737 "repositoryName": repository_name, 738 "policyText": repo.policy, 739 } 740 741 def delete_repository_policy(self, registry_id, repository_name): 742 repo = self._get_repository(repository_name, registry_id) 743 policy = repo.policy 744 745 if not policy: 746 raise RepositoryPolicyNotFoundException(repository_name, repo.registry_id) 747 748 repo.policy = None 749 750 return { 751 "registryId": repo.registry_id, 752 "repositoryName": repository_name, 753 "policyText": policy, 754 } 755 756 def put_lifecycle_policy(self, registry_id, repository_name, lifecycle_policy_text): 757 repo = self._get_repository(repository_name, registry_id) 758 759 validator = EcrLifecyclePolicyValidator(lifecycle_policy_text) 760 validator.validate() 761 762 repo.lifecycle_policy = lifecycle_policy_text 763 764 return { 765 "registryId": repo.registry_id, 766 "repositoryName": repository_name, 767 "lifecyclePolicyText": repo.lifecycle_policy, 768 } 769 770 def get_lifecycle_policy(self, registry_id, repository_name): 771 repo = self._get_repository(repository_name, registry_id) 772 773 if not repo.lifecycle_policy: 774 raise LifecyclePolicyNotFoundException(repository_name, repo.registry_id) 775 776 return { 777 "registryId": repo.registry_id, 778 "repositoryName": repository_name, 779 "lifecyclePolicyText": repo.lifecycle_policy, 780 "lastEvaluatedAt": iso_8601_datetime_without_milliseconds( 781 datetime.utcnow() 782 ), 783 } 784 785 def delete_lifecycle_policy(self, registry_id, repository_name): 786 repo = self._get_repository(repository_name, registry_id) 787 policy = repo.lifecycle_policy 788 789 if not policy: 790 raise LifecyclePolicyNotFoundException(repository_name, repo.registry_id) 791 792 repo.lifecycle_policy = None 793 794 return { 795 "registryId": repo.registry_id, 796 "repositoryName": repository_name, 797 "lifecyclePolicyText": policy, 798 "lastEvaluatedAt": iso_8601_datetime_without_milliseconds( 799 datetime.utcnow() 800 ), 801 } 802 803 def _validate_registry_policy_action(self, policy_text): 804 # only CreateRepository & ReplicateImage actions are allowed 805 VALID_ACTIONS = {"ecr:CreateRepository", "ecr:ReplicateImage"} 806 807 policy = json.loads(policy_text) 808 for statement in policy["Statement"]: 809 if set(statement["Action"]) - VALID_ACTIONS: 810 raise MalformedPolicyDocument() 811 812 def put_registry_policy(self, policy_text): 813 try: 814 iam_policy_document_validator = IAMPolicyDocumentValidator(policy_text) 815 iam_policy_document_validator.validate() 816 817 self._validate_registry_policy_action(policy_text) 818 except MalformedPolicyDocument: 819 raise InvalidParameterException( 820 "Invalid parameter at 'PolicyText' failed to satisfy constraint: " 821 "'Invalid registry policy provided'" 822 ) 823 824 self.registry_policy = policy_text 825 826 return { 827 "registryId": ACCOUNT_ID, 828 "policyText": policy_text, 829 } 830 831 def get_registry_policy(self): 832 if not self.registry_policy: 833 raise RegistryPolicyNotFoundException(ACCOUNT_ID) 834 835 return { 836 "registryId": ACCOUNT_ID, 837 "policyText": self.registry_policy, 838 } 839 840 def delete_registry_policy(self): 841 policy = self.registry_policy 842 if not policy: 843 raise RegistryPolicyNotFoundException(ACCOUNT_ID) 844 845 self.registry_policy = None 846 847 return { 848 "registryId": ACCOUNT_ID, 849 "policyText": policy, 850 } 851 852 def start_image_scan(self, registry_id, repository_name, image_id): 853 repo = self._get_repository(repository_name, registry_id) 854 855 image = repo._get_image(image_id.get("imageTag"), image_id.get("imageDigest")) 856 857 # scanning an image is only allowed once per day 858 if image.last_scan and image.last_scan.date() == datetime.today().date(): 859 raise LimitExceededException() 860 861 image.last_scan = datetime.today() 862 863 return { 864 "registryId": repo.registry_id, 865 "repositoryName": repository_name, 866 "imageId": { 867 "imageDigest": image.image_digest, 868 "imageTag": image.image_tag, 869 }, 870 "imageScanStatus": {"status": "IN_PROGRESS"}, 871 } 872 873 def describe_image_scan_findings(self, registry_id, repository_name, image_id): 874 repo = self._get_repository(repository_name, registry_id) 875 876 image = repo._get_image(image_id.get("imageTag"), image_id.get("imageDigest")) 877 878 if not image.last_scan: 879 image_id_rep = "{{imageDigest:'{0}', imageTag:'{1}'}}".format( 880 image_id.get("imageDigest") or "null", 881 image_id.get("imageTag") or "null", 882 ) 883 raise ScanNotFoundException( 884 image_id=image_id_rep, 885 repository_name=repository_name, 886 registry_id=repo.registry_id, 887 ) 888 889 return { 890 "registryId": repo.registry_id, 891 "repositoryName": repository_name, 892 "imageId": { 893 "imageDigest": image.image_digest, 894 "imageTag": image.image_tag, 895 }, 896 "imageScanStatus": { 897 "status": "COMPLETE", 898 "description": "The scan was completed successfully.", 899 }, 900 "imageScanFindings": { 901 "imageScanCompletedAt": iso_8601_datetime_without_milliseconds( 902 image.last_scan 903 ), 904 "vulnerabilitySourceUpdatedAt": iso_8601_datetime_without_milliseconds( 905 datetime.utcnow() 906 ), 907 "findings": [ 908 { 909 "name": "CVE-9999-9999", 910 "uri": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-9999-9999", 911 "severity": "HIGH", 912 "attributes": [ 913 {"key": "package_version", "value": "9.9.9"}, 914 {"key": "package_name", "value": "moto_fake"}, 915 { 916 "key": "CVSS2_VECTOR", 917 "value": "AV:N/AC:L/Au:N/C:P/I:P/A:P", 918 }, 919 {"key": "CVSS2_SCORE", "value": "7.5"}, 920 ], 921 } 922 ], 923 "findingSeverityCounts": {"HIGH": 1}, 924 }, 925 } 926 927 def put_replication_configuration(self, replication_config): 928 rules = replication_config["rules"] 929 if len(rules) > 1: 930 raise ValidationException("This feature is disabled") 931 932 if len(rules) == 1: 933 for dest in rules[0]["destinations"]: 934 if ( 935 dest["region"] == self.region_name 936 and dest["registryId"] == DEFAULT_REGISTRY_ID 937 ): 938 raise InvalidParameterException( 939 "Invalid parameter at 'replicationConfiguration' failed to satisfy constraint: " 940 "'Replication destination cannot be the same as the source registry'" 941 ) 942 943 self.replication_config = replication_config 944 945 return {"replicationConfiguration": replication_config} 946 947 def describe_registry(self): 948 return { 949 "registryId": DEFAULT_REGISTRY_ID, 950 "replicationConfiguration": self.replication_config, 951 } 952 953 954ecr_backends = {} 955for region, ec2_backend in ec2_backends.items(): 956 ecr_backends[region] = ECRBackend(region) 957