1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6try: 7 from urllib.parse import unquote 8except ImportError: 9 from urllib import unquote 10 11from knack.util import CLIError 12from knack.log import get_logger 13 14from ._utils import user_confirmation 15from ._docker_utils import request_data_from_registry, get_access_credentials, RegistryException 16 17logger = get_logger(__name__) 18 19 20ORDERBY_PARAMS = { 21 'time_asc': 'timeasc', 22 'time_desc': 'timedesc' 23} 24DEFAULT_PAGINATION = 100 25 26 27def _get_repository_path(repository=None): 28 """Return the path for a repository, or list of repositories if repository is empty. 29 """ 30 if repository: 31 return '/acr/v1/{}'.format(repository) 32 return '/acr/v1/_catalog' 33 34 35def _get_tag_path(repository, tag=None): 36 """Return the path for a tag, or list of tags if tag is empty. 37 """ 38 if tag: 39 return '/acr/v1/{}/_tags/{}'.format(repository, tag) 40 return '/acr/v1/{}/_tags'.format(repository) 41 42 43def _get_manifest_path(repository, manifest=None): 44 """Return the path for a manifest, or list of manifests if manifest is empty. 45 """ 46 if manifest: 47 return '/acr/v1/{}/_manifests/{}'.format(repository, manifest) 48 return '/acr/v1/{}/_manifests'.format(repository) 49 50 51def _get_manifest_digest(login_server, repository, tag, username, password): 52 response = request_data_from_registry( 53 http_method='get', 54 login_server=login_server, 55 path=_get_tag_path(repository, tag), 56 username=username, 57 password=password, 58 result_index='tag')[0] 59 60 if 'digest' in response and response['digest']: 61 return response['digest'] 62 63 raise CLIError("Could not get the manifest digest for image '{}:{}'.".format(repository, tag)) 64 65 66def _obtain_data_from_registry(login_server, 67 path, 68 username, 69 password, 70 result_index, 71 top=None, 72 orderby=None): 73 result_list = [] 74 execute_next_http_call = True 75 76 params = { 77 'n': DEFAULT_PAGINATION, 78 'orderby': ORDERBY_PARAMS[orderby] if orderby else None 79 } 80 81 while execute_next_http_call: 82 execute_next_http_call = False 83 84 # Override the default page size if top is provided 85 if top is not None: 86 params['n'] = DEFAULT_PAGINATION if top > DEFAULT_PAGINATION else top 87 top -= params['n'] 88 89 result, next_link = request_data_from_registry( 90 http_method='get', 91 login_server=login_server, 92 path=path, 93 username=username, 94 password=password, 95 result_index=result_index, 96 params=params) 97 98 if result: 99 result_list += result 100 101 if top is not None and top <= 0: 102 break 103 104 if next_link: 105 # The registry is telling us there's more items in the list, 106 # and another call is needed. The link header looks something 107 # like `Link: </v2/_catalog?last=hello-world&n=1>; rel="next"` 108 # we should follow the next path indicated in the link header 109 next_link_path = next_link[(next_link.index('<') + 1):next_link.index('>')] 110 tokens = next_link_path.split('?', 2) 111 params = {y[0]: unquote(y[1]) for y in (x.split('=', 2) for x in tokens[1].split('&'))} 112 execute_next_http_call = True 113 114 return result_list 115 116 117def acr_repository_list(cmd, 118 registry_name, 119 top=None, 120 resource_group_name=None, # pylint: disable=unused-argument 121 tenant_suffix=None, 122 username=None, 123 password=None): 124 login_server, username, password = get_access_credentials( 125 cmd=cmd, 126 registry_name=registry_name, 127 tenant_suffix=tenant_suffix, 128 username=username, 129 password=password) 130 131 return _obtain_data_from_registry( 132 login_server=login_server, 133 path='/v2/_catalog', 134 username=username, 135 password=password, 136 result_index='repositories', 137 top=top) 138 139 140def acr_repository_show_tags(cmd, 141 registry_name, 142 repository, 143 top=None, 144 orderby=None, 145 resource_group_name=None, # pylint: disable=unused-argument 146 tenant_suffix=None, 147 username=None, 148 password=None, 149 detail=False): 150 login_server, username, password = get_access_credentials( 151 cmd=cmd, 152 registry_name=registry_name, 153 tenant_suffix=tenant_suffix, 154 username=username, 155 password=password, 156 repository=repository, 157 permission='pull') 158 159 try: 160 raw_result = _obtain_data_from_registry( 161 login_server=login_server, 162 path=_get_tag_path(repository), 163 username=username, 164 password=password, 165 result_index='tags', 166 top=top, 167 orderby=orderby) 168 except RegistryException as e: 169 # Check for Classic registry 170 if e.status_code == 405: 171 if detail: 172 logger.warning("The specified --detail is ignored as it is only supported for managed registries.") 173 if top: 174 logger.warning("The specified --top is ignored as it is only supported for managed registries.") 175 if orderby: 176 logger.warning("The specified --orderby is ignored as it is only supported for managed registries.") 177 return _obtain_data_from_registry( 178 login_server=login_server, 179 path='/v2/{}/tags/list'.format(repository), 180 username=username, 181 password=password, 182 result_index='tags') 183 raise 184 185 # For backward compatibility, convert the results to the old schema 186 if not detail: 187 return [item['name'] for item in raw_result] 188 189 return raw_result 190 191 192def acr_repository_show_manifests(cmd, 193 registry_name, 194 repository, 195 top=None, 196 orderby=None, 197 resource_group_name=None, # pylint: disable=unused-argument 198 tenant_suffix=None, 199 username=None, 200 password=None, 201 detail=False): 202 login_server, username, password = get_access_credentials( 203 cmd=cmd, 204 registry_name=registry_name, 205 tenant_suffix=tenant_suffix, 206 username=username, 207 password=password, 208 repository=repository, 209 permission='pull') 210 211 raw_result = _obtain_data_from_registry( 212 login_server=login_server, 213 path=_get_manifest_path(repository), 214 username=username, 215 password=password, 216 result_index='manifests', 217 top=top, 218 orderby=orderby) 219 220 # For backward compatibility, convert the results to the old schema 221 if not detail: 222 return [{ 223 'digest': item['digest'] if 'digest' in item else '', 224 'tags': item['tags'] if 'tags' in item else [], 225 'timestamp': item['lastUpdateTime'] if 'lastUpdateTime' in item else '' 226 } for item in raw_result] 227 228 return raw_result 229 230 231def acr_repository_show(cmd, 232 registry_name, 233 repository=None, 234 image=None, 235 resource_group_name=None, # pylint: disable=unused-argument 236 tenant_suffix=None, 237 username=None, 238 password=None): 239 return _acr_repository_attributes_helper( 240 cmd=cmd, 241 registry_name=registry_name, 242 http_method='get', 243 json_payload=None, 244 permission='pull', 245 repository=repository, 246 image=image, 247 tenant_suffix=tenant_suffix, 248 username=username, 249 password=password) 250 251 252def acr_repository_update(cmd, 253 registry_name, 254 repository=None, 255 image=None, 256 resource_group_name=None, # pylint: disable=unused-argument 257 tenant_suffix=None, 258 username=None, 259 password=None, 260 delete_enabled=None, 261 list_enabled=None, 262 read_enabled=None, 263 write_enabled=None): 264 json_payload = {} 265 266 if delete_enabled is not None: 267 json_payload.update({ 268 'deleteEnabled': delete_enabled 269 }) 270 if list_enabled is not None: 271 json_payload.update({ 272 'listEnabled': list_enabled 273 }) 274 if read_enabled is not None: 275 json_payload.update({ 276 'readEnabled': read_enabled 277 }) 278 if write_enabled is not None: 279 json_payload.update({ 280 'writeEnabled': write_enabled 281 }) 282 283 return _acr_repository_attributes_helper( 284 cmd=cmd, 285 registry_name=registry_name, 286 http_method='patch' if json_payload else 'get', 287 json_payload=json_payload, 288 permission='push,pull' if json_payload else 'pull', 289 repository=repository, 290 image=image, 291 tenant_suffix=tenant_suffix, 292 username=username, 293 password=password) 294 295 296def _acr_repository_attributes_helper(cmd, 297 registry_name, 298 http_method, 299 json_payload, 300 permission, 301 repository=None, 302 image=None, 303 tenant_suffix=None, 304 username=None, 305 password=None): 306 _validate_parameters(repository, image) 307 308 if image: 309 # If --image is specified, repository must be empty. 310 repository, tag, manifest = _parse_image_name(image, allow_digest=True) 311 else: 312 # This is a request on repository 313 tag, manifest = None, None 314 315 login_server, username, password = get_access_credentials( 316 cmd=cmd, 317 registry_name=registry_name, 318 tenant_suffix=tenant_suffix, 319 username=username, 320 password=password, 321 repository=repository, 322 permission=permission) 323 324 if tag: 325 path = _get_tag_path(repository, tag) 326 result_index = 'tag' 327 elif manifest: 328 path = _get_manifest_path(repository, manifest) 329 result_index = 'manifest' 330 else: 331 path = _get_repository_path(repository) 332 result_index = None 333 334 return request_data_from_registry( 335 http_method=http_method, 336 login_server=login_server, 337 path=path, 338 username=username, 339 password=password, 340 result_index=result_index, 341 json_payload=json_payload)[0] 342 343 344def acr_repository_untag(cmd, 345 registry_name, 346 image, 347 resource_group_name=None, # pylint: disable=unused-argument 348 tenant_suffix=None, 349 username=None, 350 password=None): 351 repository, tag, _ = _parse_image_name(image) 352 353 login_server, username, password = get_access_credentials( 354 cmd=cmd, 355 registry_name=registry_name, 356 tenant_suffix=tenant_suffix, 357 username=username, 358 password=password, 359 repository=repository, 360 permission='delete') 361 362 return request_data_from_registry( 363 http_method='delete', 364 login_server=login_server, 365 path=_get_tag_path(repository, tag), 366 username=username, 367 password=password)[0] 368 369 370def acr_repository_delete(cmd, 371 registry_name, 372 repository=None, 373 image=None, 374 resource_group_name=None, # pylint: disable=unused-argument 375 tenant_suffix=None, 376 username=None, 377 password=None, 378 yes=False): 379 _validate_parameters(repository, image) 380 381 if image: 382 # If --image is specified, repository must be empty. 383 repository, tag, manifest = _parse_image_name(image, allow_digest=True) 384 else: 385 # This is a request on repository 386 tag, manifest = None, None 387 388 login_server, username, password = get_access_credentials( 389 cmd=cmd, 390 registry_name=registry_name, 391 tenant_suffix=tenant_suffix, 392 username=username, 393 password=password, 394 repository=repository, 395 permission='delete,pull') 396 397 if tag or manifest: 398 manifest = _delete_manifest_confirmation( 399 login_server=login_server, 400 username=username, 401 password=password, 402 repository=repository, 403 tag=tag, 404 manifest=manifest, 405 yes=yes) 406 path = '/v2/{}/manifests/{}'.format(repository, manifest) 407 else: 408 user_confirmation("Are you sure you want to delete the repository '{}' " 409 "and all images under it?".format(repository), yes) 410 path = _get_repository_path(repository) 411 412 return request_data_from_registry( 413 http_method='delete', 414 login_server=login_server, 415 path=path, 416 username=username, 417 password=password)[0] 418 419 420def _validate_parameters(repository, image): 421 if bool(repository) == bool(image): 422 raise CLIError('Usage error: --image IMAGE | --repository REPOSITORY') 423 424 425def _parse_image_name(image, allow_digest=False): 426 if allow_digest and '@' in image: 427 # This is probably an image name by manifest digest 428 tokens = image.split('@') 429 if len(tokens) == 2: 430 return tokens[0], None, tokens[1] 431 432 if ':' in image: 433 # This is probably an image name by tag 434 tokens = image.split(':') 435 if len(tokens) == 2: 436 return tokens[0], tokens[1], None 437 else: 438 # This is probably an image with implicit latest tag 439 return image, 'latest', None 440 441 if allow_digest: 442 raise CLIError("The name of the image to delete may include a tag in the" 443 " format 'name:tag' or digest in the format 'name@digest'.") 444 else: 445 raise CLIError("The name of the image may include a tag in the format 'name:tag'.") 446 447 448def _delete_manifest_confirmation(login_server, 449 username, 450 password, 451 repository, 452 tag, 453 manifest, 454 yes): 455 # Always query manifest if it is empty 456 manifest = manifest or _get_manifest_digest( 457 login_server=login_server, 458 repository=repository, 459 tag=tag, 460 username=username, 461 password=password) 462 463 if yes: 464 return manifest 465 466 tags = _obtain_data_from_registry( 467 login_server=login_server, 468 path=_get_tag_path(repository), 469 username=username, 470 password=password, 471 result_index='tags') 472 473 filter_by_manifest = [x['name'] for x in tags if manifest == x['digest']] 474 message = "This operation will delete the manifest '{}'".format(manifest) 475 if filter_by_manifest: 476 images = ", ".join(["'{}:{}'".format(repository, str(x)) for x in filter_by_manifest]) 477 message += " and all the following images: {}".format(images) 478 user_confirmation("{}.\nAre you sure you want to continue?".format(message)) 479 480 return manifest 481 482 483def get_image_digest(cmd, registry_name, image): 484 repository, tag, manifest = _parse_image_name(image, allow_digest=True) 485 486 if manifest: 487 return repository, tag, manifest 488 489 # If we don't have manifest yet, try to get it from tag. 490 login_server, username, password = get_access_credentials( 491 cmd=cmd, 492 registry_name=registry_name, 493 repository=repository, 494 permission='pull') 495 496 manifest = _get_manifest_digest( 497 login_server=login_server, 498 repository=repository, 499 tag=tag, 500 username=username, 501 password=password) 502 503 return repository, tag, manifest 504