1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6# pylint: disable=line-too-long 7 8from knack.util import CLIError 9from knack.log import get_logger 10from azure.cli.core.util import user_confirmation 11from azure.mgmt.appconfiguration.models import (ConfigurationStoreUpdateParameters, 12 ConfigurationStore, 13 Sku, 14 ResourceIdentity, 15 UserIdentity, 16 EncryptionProperties, 17 KeyVaultProperties, 18 RegenerateKeyParameters) 19 20from ._utils import resolve_store_metadata 21 22 23logger = get_logger(__name__) 24 25SYSTEM_ASSIGNED = 'SystemAssigned' 26USER_ASSIGNED = 'UserAssigned' 27SYSTEM_USER_ASSIGNED = 'SystemAssigned, UserAssigned' 28SYSTEM_ASSIGNED_IDENTITY = '[system]' 29 30 31def create_configstore(client, 32 resource_group_name, 33 name, 34 location, 35 sku="Standard", 36 tags=None, 37 assign_identity=None, 38 enable_public_network=None, 39 disable_local_auth=None): 40 if assign_identity is not None and not assign_identity: 41 assign_identity = [SYSTEM_ASSIGNED_IDENTITY] 42 43 public_network_access = None 44 if enable_public_network is not None: 45 public_network_access = 'Enabled' if enable_public_network else 'Disabled' 46 47 configstore_params = ConfigurationStore(location=location.lower(), 48 identity=__get_resource_identity(assign_identity) if assign_identity else None, 49 sku=Sku(name=sku), 50 tags=tags, 51 public_network_access=public_network_access, 52 disable_local_auth=disable_local_auth) 53 54 return client.begin_create(resource_group_name, name, configstore_params) 55 56 57def delete_configstore(cmd, client, name, resource_group_name=None, yes=False): 58 if resource_group_name is None: 59 resource_group_name, _ = resolve_store_metadata(cmd, name) 60 confirmation_message = "Are you sure you want to delete the App Configuration: {}".format(name) 61 user_confirmation(confirmation_message, yes) 62 return client.begin_delete(resource_group_name, name) 63 64 65def list_configstore(client, resource_group_name=None): 66 response = client.list() if resource_group_name is None else client.list_by_resource_group(resource_group_name) 67 return response 68 69 70def show_configstore(cmd, client, name, resource_group_name=None): 71 if resource_group_name is None: 72 resource_group_name, _ = resolve_store_metadata(cmd, name) 73 return client.get(resource_group_name, name) 74 75 76def update_configstore(cmd, 77 client, 78 name, 79 resource_group_name=None, 80 tags=None, 81 sku=None, 82 encryption_key_name=None, 83 encryption_key_vault=None, 84 encryption_key_version=None, 85 identity_client_id=None, 86 enable_public_network=None, 87 disable_local_auth=None): 88 __validate_cmk(encryption_key_name, encryption_key_vault, encryption_key_version, identity_client_id) 89 if resource_group_name is None: 90 resource_group_name, _ = resolve_store_metadata(cmd, name) 91 92 public_network_access = None 93 if enable_public_network is not None: 94 public_network_access = 'Enabled' if enable_public_network else 'Disabled' 95 update_params = ConfigurationStoreUpdateParameters(tags=tags, 96 sku=Sku(name=sku) if sku else None, 97 public_network_access=public_network_access, 98 disable_local_auth=disable_local_auth) 99 100 if encryption_key_name is not None: 101 key_vault_properties = KeyVaultProperties() 102 if encryption_key_name: 103 # key identifier schema https://keyvaultname.vault-int.azure-int.net/keys/keyname/keyversion 104 key_identifier = "{}/keys/{}/{}".format(encryption_key_vault.strip('/'), encryption_key_name, encryption_key_version if encryption_key_version else "") 105 key_vault_properties = KeyVaultProperties(key_identifier=key_identifier, identity_client_id=identity_client_id) 106 107 update_params.encryption = EncryptionProperties(key_vault_properties=key_vault_properties) 108 109 return client.begin_update(resource_group_name=resource_group_name, 110 config_store_name=name, 111 config_store_update_parameters=update_params) 112 113 114def assign_managed_identity(cmd, client, name, resource_group_name=None, identities=None): 115 if resource_group_name is None: 116 resource_group_name, _ = resolve_store_metadata(cmd, name) 117 118 if not identities: 119 identities = [SYSTEM_ASSIGNED_IDENTITY] 120 121 current_identities = show_managed_identity(cmd, client, name, resource_group_name) 122 user_assigned_identities = {} 123 identity_types = set() 124 125 if current_identities: 126 identity_types = identity_types if current_identities.type == 'None' else {identity_type.strip() for identity_type in current_identities.type.split(',')} 127 user_assigned_identities = current_identities.user_assigned_identities if current_identities.user_assigned_identities else {} 128 129 if SYSTEM_ASSIGNED_IDENTITY in identities: 130 identities.remove(SYSTEM_ASSIGNED_IDENTITY) 131 identity_types.add(SYSTEM_ASSIGNED) 132 133 user_assigned_identities.update({identity: UserIdentity() for identity in identities}) 134 if user_assigned_identities: 135 identity_types.add(USER_ASSIGNED) 136 137 managed_identities = ResourceIdentity(type=','.join(identity_types) if identity_types else 'None', 138 user_assigned_identities=user_assigned_identities if user_assigned_identities else None) 139 140 client.begin_update(resource_group_name=resource_group_name, 141 config_store_name=name, 142 config_store_update_parameters=ConfigurationStoreUpdateParameters(identity=managed_identities)) 143 144 # Due to a bug in RP https://msazure.visualstudio.com/Azure%20AppConfig/_workitems/edit/6017040 145 # It client.update does not return the updated identities. 146 return show_managed_identity(cmd, client, name, resource_group_name) 147 148 149def remove_managed_identity(cmd, client, name, resource_group_name=None, identities=None): 150 if resource_group_name is None: 151 resource_group_name, _ = resolve_store_metadata(cmd, name) 152 153 current_identities = show_managed_identity(cmd, client, name, resource_group_name) 154 if not current_identities or current_identities.type == 'None': 155 logger.warning("No identity associated with this App Configuration.") 156 return 157 158 if not identities: 159 identities = [SYSTEM_ASSIGNED_IDENTITY] 160 161 user_assigned_identities = {} 162 if '[all]' in identities: 163 identity_types = None 164 else: 165 identity_types = {identity_type.strip() for identity_type in current_identities.type.split(',')} 166 167 if current_identities.user_assigned_identities: 168 for identity in current_identities.user_assigned_identities: 169 if identity not in identities: 170 user_assigned_identities[identity] = current_identities.user_assigned_identities[identity] 171 172 if SYSTEM_ASSIGNED_IDENTITY in identities: 173 identity_types.discard(SYSTEM_ASSIGNED) 174 175 if not user_assigned_identities: 176 identity_types.discard(USER_ASSIGNED) 177 178 managed_identities = ResourceIdentity(type=','.join(identity_types) if identity_types else 'None', 179 user_assigned_identities=user_assigned_identities if user_assigned_identities else None) 180 181 client.begin_update(resource_group_name=resource_group_name, 182 config_store_name=name, 183 config_store_update_parameters=ConfigurationStoreUpdateParameters(identity=managed_identities)) 184 185 186def show_managed_identity(cmd, client, name, resource_group_name=None): 187 config_store = show_configstore(cmd, client, name, resource_group_name) 188 return config_store.identity if config_store.identity else {} 189 190 191def list_credential(cmd, client, name, resource_group_name=None): 192 resource_group_name, _ = resolve_store_metadata(cmd, name) 193 return client.list_keys(resource_group_name, name) 194 195 196def regenerate_credential(cmd, client, name, id_, resource_group_name=None): 197 resource_group_name, _ = resolve_store_metadata(cmd, name) 198 return client.regenerate_key(resource_group_name, name, RegenerateKeyParameters(id=id_)) 199 200 201def __get_resource_identity(assign_identity): 202 system_assigned = False 203 user_assigned = {} 204 for identity in assign_identity: 205 if identity == SYSTEM_ASSIGNED_IDENTITY: 206 system_assigned = True 207 else: 208 user_assigned[identity] = UserIdentity() 209 210 if system_assigned and user_assigned: 211 identity_type = SYSTEM_USER_ASSIGNED 212 elif system_assigned: 213 identity_type = SYSTEM_ASSIGNED 214 elif user_assigned: 215 identity_type = USER_ASSIGNED 216 else: 217 identity_type = "None" 218 219 return ResourceIdentity(type=identity_type, 220 user_assigned_identities=user_assigned if user_assigned else None) 221 222 223def __validate_cmk(encryption_key_name=None, 224 encryption_key_vault=None, 225 encryption_key_version=None, 226 identity_client_id=None): 227 if encryption_key_name is None: 228 if any(arg is not None for arg in [encryption_key_vault, encryption_key_version, identity_client_id]): 229 raise CLIError("To modify customer encryption key --encryption-key-name is required") 230 else: 231 if encryption_key_name: 232 if encryption_key_vault is None: 233 raise CLIError("To modify customer encryption key --encryption-key-vault is required") 234 else: 235 if any(arg is not None for arg in [encryption_key_vault, encryption_key_version, identity_client_id]): 236 logger.warning("Removing the customer encryption key. Key vault related arguments are ignored.") 237