1import json
2from base64 import b64encode
3from datetime import datetime
4import time
5
6from moto.core.responses import BaseResponse
7from .models import ecr_backends, DEFAULT_REGISTRY_ID
8
9
10class ECRResponse(BaseResponse):
11    @property
12    def ecr_backend(self):
13        return ecr_backends[self.region]
14
15    @property
16    def request_params(self):
17        try:
18            return json.loads(self.body)
19        except ValueError:
20            return {}
21
22    def _get_param(self, param, if_none=None):
23        return self.request_params.get(param, if_none)
24
25    def create_repository(self):
26        repository_name = self._get_param("repositoryName")
27        encryption_config = self._get_param("encryptionConfiguration")
28        image_scan_config = self._get_param("imageScanningConfiguration")
29        image_tag_mutablility = self._get_param("imageTagMutability")
30        tags = self._get_param("tags", [])
31
32        repository = self.ecr_backend.create_repository(
33            repository_name=repository_name,
34            encryption_config=encryption_config,
35            image_scan_config=image_scan_config,
36            image_tag_mutablility=image_tag_mutablility,
37            tags=tags,
38        )
39        return json.dumps({"repository": repository.response_object})
40
41    def describe_repositories(self):
42        describe_repositories_name = self._get_param("repositoryNames")
43        registry_id = self._get_param("registryId")
44
45        repositories = self.ecr_backend.describe_repositories(
46            repository_names=describe_repositories_name, registry_id=registry_id
47        )
48        return json.dumps({"repositories": repositories, "failures": []})
49
50    def delete_repository(self):
51        repository_str = self._get_param("repositoryName")
52        registry_id = self._get_param("registryId")
53        force = self._get_param("force")
54
55        repository = self.ecr_backend.delete_repository(
56            repository_str, registry_id, force
57        )
58        return json.dumps({"repository": repository.response_object})
59
60    def put_image(self):
61        repository_str = self._get_param("repositoryName")
62        image_manifest = self._get_param("imageManifest")
63        image_tag = self._get_param("imageTag")
64        image = self.ecr_backend.put_image(repository_str, image_manifest, image_tag)
65
66        return json.dumps({"image": image.response_object})
67
68    def list_images(self):
69        repository_str = self._get_param("repositoryName")
70        registry_id = self._get_param("registryId")
71        images = self.ecr_backend.list_images(repository_str, registry_id)
72        return json.dumps(
73            {"imageIds": [image.response_list_object for image in images]}
74        )
75
76    def describe_images(self):
77        repository_str = self._get_param("repositoryName")
78        registry_id = self._get_param("registryId")
79        image_ids = self._get_param("imageIds")
80        images = self.ecr_backend.describe_images(
81            repository_str, registry_id, image_ids
82        )
83        return json.dumps(
84            {"imageDetails": [image.response_describe_object for image in images]}
85        )
86
87    def batch_check_layer_availability(self):
88        if self.is_not_dryrun("BatchCheckLayerAvailability"):
89            raise NotImplementedError(
90                "ECR.batch_check_layer_availability is not yet implemented"
91            )
92
93    def batch_delete_image(self):
94        repository_str = self._get_param("repositoryName")
95        registry_id = self._get_param("registryId")
96        image_ids = self._get_param("imageIds")
97
98        response = self.ecr_backend.batch_delete_image(
99            repository_str, registry_id, image_ids
100        )
101        return json.dumps(response)
102
103    def batch_get_image(self):
104        repository_str = self._get_param("repositoryName")
105        registry_id = self._get_param("registryId")
106        image_ids = self._get_param("imageIds")
107        accepted_media_types = self._get_param("acceptedMediaTypes")
108
109        response = self.ecr_backend.batch_get_image(
110            repository_str, registry_id, image_ids, accepted_media_types
111        )
112        return json.dumps(response)
113
114    def complete_layer_upload(self):
115        if self.is_not_dryrun("CompleteLayerUpload"):
116            raise NotImplementedError(
117                "ECR.complete_layer_upload is not yet implemented"
118            )
119
120    def delete_repository_policy(self):
121        registry_id = self._get_param("registryId")
122        repository_name = self._get_param("repositoryName")
123
124        return json.dumps(
125            self.ecr_backend.delete_repository_policy(
126                registry_id=registry_id, repository_name=repository_name,
127            )
128        )
129
130    def get_authorization_token(self):
131        registry_ids = self._get_param("registryIds")
132        if not registry_ids:
133            registry_ids = [DEFAULT_REGISTRY_ID]
134        auth_data = []
135        for registry_id in registry_ids:
136            password = "{}-auth-token".format(registry_id)
137            auth_token = b64encode("AWS:{}".format(password).encode("ascii")).decode()
138            auth_data.append(
139                {
140                    "authorizationToken": auth_token,
141                    "expiresAt": time.mktime(datetime(2015, 1, 1).timetuple()),
142                    "proxyEndpoint": "https://{}.dkr.ecr.{}.amazonaws.com".format(
143                        registry_id, self.region
144                    ),
145                }
146            )
147        return json.dumps({"authorizationData": auth_data})
148
149    def get_download_url_for_layer(self):
150        if self.is_not_dryrun("GetDownloadUrlForLayer"):
151            raise NotImplementedError(
152                "ECR.get_download_url_for_layer is not yet implemented"
153            )
154
155    def get_repository_policy(self):
156        registry_id = self._get_param("registryId")
157        repository_name = self._get_param("repositoryName")
158
159        return json.dumps(
160            self.ecr_backend.get_repository_policy(
161                registry_id=registry_id, repository_name=repository_name,
162            )
163        )
164
165    def initiate_layer_upload(self):
166        if self.is_not_dryrun("InitiateLayerUpload"):
167            raise NotImplementedError(
168                "ECR.initiate_layer_upload is not yet implemented"
169            )
170
171    def set_repository_policy(self):
172        registry_id = self._get_param("registryId")
173        repository_name = self._get_param("repositoryName")
174        policy_text = self._get_param("policyText")
175        # this is usually a safety flag to prevent accidental repository lock outs
176        # but this would need a much deeper validation of the provided policy
177        # force = self._get_param("force")
178
179        return json.dumps(
180            self.ecr_backend.set_repository_policy(
181                registry_id=registry_id,
182                repository_name=repository_name,
183                policy_text=policy_text,
184            )
185        )
186
187    def upload_layer_part(self):
188        if self.is_not_dryrun("UploadLayerPart"):
189            raise NotImplementedError("ECR.upload_layer_part is not yet implemented")
190
191    def list_tags_for_resource(self):
192        arn = self._get_param("resourceArn")
193
194        return json.dumps(self.ecr_backend.list_tags_for_resource(arn))
195
196    def tag_resource(self):
197        arn = self._get_param("resourceArn")
198        tags = self._get_param("tags", [])
199
200        return json.dumps(self.ecr_backend.tag_resource(arn, tags))
201
202    def untag_resource(self):
203        arn = self._get_param("resourceArn")
204        tag_keys = self._get_param("tagKeys", [])
205
206        return json.dumps(self.ecr_backend.untag_resource(arn, tag_keys))
207
208    def put_image_tag_mutability(self):
209        registry_id = self._get_param("registryId")
210        repository_name = self._get_param("repositoryName")
211        image_tag_mutability = self._get_param("imageTagMutability")
212
213        return json.dumps(
214            self.ecr_backend.put_image_tag_mutability(
215                registry_id=registry_id,
216                repository_name=repository_name,
217                image_tag_mutability=image_tag_mutability,
218            )
219        )
220
221    def put_image_scanning_configuration(self):
222        registry_id = self._get_param("registryId")
223        repository_name = self._get_param("repositoryName")
224        image_scan_config = self._get_param("imageScanningConfiguration")
225
226        return json.dumps(
227            self.ecr_backend.put_image_scanning_configuration(
228                registry_id=registry_id,
229                repository_name=repository_name,
230                image_scan_config=image_scan_config,
231            )
232        )
233
234    def put_lifecycle_policy(self):
235        registry_id = self._get_param("registryId")
236        repository_name = self._get_param("repositoryName")
237        lifecycle_policy_text = self._get_param("lifecyclePolicyText")
238
239        return json.dumps(
240            self.ecr_backend.put_lifecycle_policy(
241                registry_id=registry_id,
242                repository_name=repository_name,
243                lifecycle_policy_text=lifecycle_policy_text,
244            )
245        )
246
247    def get_lifecycle_policy(self):
248        registry_id = self._get_param("registryId")
249        repository_name = self._get_param("repositoryName")
250
251        return json.dumps(
252            self.ecr_backend.get_lifecycle_policy(
253                registry_id=registry_id, repository_name=repository_name,
254            )
255        )
256
257    def delete_lifecycle_policy(self):
258        registry_id = self._get_param("registryId")
259        repository_name = self._get_param("repositoryName")
260
261        return json.dumps(
262            self.ecr_backend.delete_lifecycle_policy(
263                registry_id=registry_id, repository_name=repository_name,
264            )
265        )
266
267    def put_registry_policy(self):
268        policy_text = self._get_param("policyText")
269
270        return json.dumps(self.ecr_backend.put_registry_policy(policy_text=policy_text))
271
272    def get_registry_policy(self):
273        return json.dumps(self.ecr_backend.get_registry_policy())
274
275    def delete_registry_policy(self):
276        return json.dumps(self.ecr_backend.delete_registry_policy())
277
278    def start_image_scan(self):
279        registry_id = self._get_param("registryId")
280        repository_name = self._get_param("repositoryName")
281        image_id = self._get_param("imageId")
282
283        return json.dumps(
284            self.ecr_backend.start_image_scan(
285                registry_id=registry_id,
286                repository_name=repository_name,
287                image_id=image_id,
288            )
289        )
290
291    def describe_image_scan_findings(self):
292        registry_id = self._get_param("registryId")
293        repository_name = self._get_param("repositoryName")
294        image_id = self._get_param("imageId")
295
296        return json.dumps(
297            self.ecr_backend.describe_image_scan_findings(
298                registry_id=registry_id,
299                repository_name=repository_name,
300                image_id=image_id,
301            )
302        )
303
304    def put_replication_configuration(self):
305        replication_config = self._get_param("replicationConfiguration")
306
307        return json.dumps(
308            self.ecr_backend.put_replication_configuration(
309                replication_config=replication_config
310            )
311        )
312
313    def describe_registry(self):
314        return json.dumps(self.ecr_backend.describe_registry())
315