1# coding: utf-8 2 3#------------------------------------------------------------------------- 4# Copyright (c) Microsoft Corporation. All rights reserved. 5# Licensed under the MIT License. See License.txt in the project root for 6# license information. 7#-------------------------------------------------------------------------- 8 9 10# TEST SCENARIO COVERAGE 11# ---------------------- 12# Methods Total : 17 13# Methods Covered : 17 14# Examples Total : 19 15# Examples Tested : 19 16# Coverage % : 100 17# ---------------------- 18 19# covered ops: 20# configuration_stores: 9/9 21# operations: 2/2 22# private_endpoint_connections: 4/4 23# private_link_resources: 2/2 24 25import time 26import unittest 27 28import azure.mgmt.appconfiguration 29from devtools_testutils import AzureMgmtTestCase, RandomNameResourceGroupPreparer 30 31AZURE_LOCATION = 'eastus' 32KEY_UUID = "test_key_a6af8952-54a6-11e9-b600-2816a84d0309" 33LABEL_UUID = "1d7b2b28-549e-11e9-b51c-2816a84d0309" 34KEY = "PYTHON_UNIT_" + KEY_UUID 35LABEL = "test_label1_" + LABEL_UUID 36TEST_CONTENT_TYPE = "test content type" 37TEST_VALUE = "test value" 38 39class MgmtAppConfigurationTest(AzureMgmtTestCase): 40 41 def setUp(self): 42 super(MgmtAppConfigurationTest, self).setUp() 43 from azure.mgmt.appconfiguration import AppConfigurationManagementClient 44 self.mgmt_client = self.create_mgmt_client( 45 AppConfigurationManagementClient 46 ) 47 48 if self.is_live: 49 import azure.mgmt.network 50 self.network_client = self.create_mgmt_client( 51 azure.mgmt.network.NetworkManagementClient 52 ) 53 54 def create_kv(self, connection_str): 55 from azure.appconfiguration import ( 56 AzureAppConfigurationClient, 57 ConfigurationSetting 58 ) 59 app_config_client = AzureAppConfigurationClient.from_connection_string(connection_str) 60 kv = ConfigurationSetting( 61 key=KEY, 62 label=LABEL, 63 value=TEST_VALUE, 64 content_type=TEST_CONTENT_TYPE, 65 tags={"tag1": "tag1", "tag2": "tag2"} 66 ) 67 created_kv = app_config_client.add_configuration_setting(kv) 68 return created_kv 69 70 71 # TODO: update to track 2 version later 72 def create_endpoint(self, group_name, vnet_name, sub_net, endpoint_name, conf_store_id): 73 # Create VNet 74 async_vnet_creation = self.network_client.virtual_networks.create_or_update( 75 group_name, 76 vnet_name, 77 { 78 'location': AZURE_LOCATION, 79 'address_space': { 80 'address_prefixes': ['10.0.0.0/16'] 81 } 82 } 83 ) 84 async_vnet_creation.wait() 85 86 # Create Subnet 87 async_subnet_creation = self.network_client.subnets.create_or_update( 88 group_name, 89 vnet_name, 90 sub_net, 91 { 92 'address_prefix': '10.0.0.0/24', 93 'private_link_service_network_policies': 'disabled', 94 'private_endpoint_network_policies': 'disabled' 95 } 96 ) 97 subnet_info = async_subnet_creation.result() 98 99 # Create private endpoint 100 BODY = { 101 "location": "eastus", 102 "properties": { 103 "privateLinkServiceConnections": [ 104 # { 105 # "name": PRIVATE_LINK_SERVICES, # TODO: This is needed, but was not showed in swagger. 106 # "private_link_service_id": "/subscriptions/" + SUBSCRIPTION_ID + "/resourceGroups/" + RESOURCE_GROUP + "/providers/Microsoft.Network/privateLinkServices/" + PRIVATE_LINK_SERVICES, 107 # }, 108 { 109 "name": "myconnection", 110 # "private_link_service_id": "/subscriptions/" + self.settings.SUBSCRIPTION_ID + "/resourceGroups/" + group_name + "/providers/Microsoft.Storage/storageAccounts/" + STORAGE_ACCOUNT_NAME + "" 111 "private_link_service_id": conf_store_id, 112 "group_ids": ["configurationStores"] 113 } 114 ], 115 "subnet": { 116 "id": "/subscriptions/" + self.settings.SUBSCRIPTION_ID + "/resourceGroups/" + group_name + "/providers/Microsoft.Network/virtualNetworks/" + vnet_name + "/subnets/" + sub_net 117 } 118 } 119 } 120 result = self.network_client.private_endpoints.create_or_update(group_name, endpoint_name, BODY) 121 122 return result.result() 123 124 @unittest.skip('hard to test') 125 @RandomNameResourceGroupPreparer(location=AZURE_LOCATION) 126 def test_appconfiguration_list_key_values(self, resource_group): 127 CONFIGURATION_STORE_NAME = self.get_resource_name("configuration") 128 129 # ConfigurationStores_Create[put] 130 BODY = { 131 "location": "westus", 132 "sku": { 133 "name": "Standard" # Free can not use private endpoint 134 }, 135 "tags": { 136 "my_tag": "myTagValue" 137 } 138 } 139 result = self.mgmt_client.configuration_stores.begin_create(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 140 conf_store = result.result() 141 142 # ConfigurationStores_ListKeys[post] 143 keys = list(self.mgmt_client.configuration_stores.list_keys(resource_group.name, CONFIGURATION_STORE_NAME)) 144 145 # ConfigurationStores_RegenerateKey[post] 146 BODY = { 147 "id": keys[0].id 148 } 149 key = self.mgmt_client.configuration_stores.regenerate_key(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 150 151 if self.is_live: 152 # create key-value 153 self.create_kv(key.connection_string) 154 155 # ConfigurationStores_ListKeyValue[post] 156 BODY = { 157 "key": KEY, 158 "label": LABEL 159 } 160 result = self.mgmt_client.configuration_stores.list_key_value(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 161 162 @unittest.skip('hard to test') 163 @RandomNameResourceGroupPreparer(location=AZURE_LOCATION) 164 def test_appconfiguration(self, resource_group): 165 166 SERVICE_NAME = "myapimrndxyz" 167 VNET_NAME = "vnetname" 168 SUB_NET = "subnetname" 169 ENDPOINT_NAME = "endpointxyz" 170 CONFIGURATION_STORE_NAME = self.get_resource_name("configuration") 171 PRIVATE_ENDPOINT_CONNECTION_NAME = self.get_resource_name("privateendpoint") 172 173 # ConfigurationStores_Create[put] 174 BODY = { 175 "location": "westus", 176 "sku": { 177 "name": "Standard" # Free can not use private endpoint 178 }, 179 "tags": { 180 "my_tag": "myTagValue" 181 } 182 } 183 result = self.mgmt_client.configuration_stores.begin_create(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 184 conf_store = result.result() 185 186 if self.is_live: 187 # create endpoint 188 endpoint = self.create_endpoint(resource_group.name, VNET_NAME, SUB_NET, ENDPOINT_NAME, conf_store.id) 189 190 # ConfigurationStores_Create_WithIdentity[put] 191 # BODY = { 192 # "location": "westus", 193 # "sku": { 194 # "name": "Free" 195 # }, 196 # "tags": { 197 # "my_tag": "myTagValue" 198 # }, 199 # "identity": { 200 # "type": "SystemAssigned, UserAssigned", 201 # "user_assigned_identities": {} 202 # } 203 # } 204 # result = self.mgmt_client.configuration_stores.begin_create(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 205 # result = result.result() 206 207 # ConfigurationStores_Get[get] 208 conf_store = self.mgmt_client.configuration_stores.get(resource_group.name, CONFIGURATION_STORE_NAME) 209 210 PRIVATE_ENDPOINT_CONNECTION_NAME = conf_store.private_endpoint_connections[0].name 211 private_connection_id = conf_store.private_endpoint_connections[0].id 212 213 # PrivateEndpointConnection_CreateOrUpdate[put] 214 BODY = { 215 # "id": "https://management.azure.com/subscriptions/" + self.settings.SUBSCRIPTION_ID + "/resourceGroups/" + resource_group.name + "/providers/Microsoft.AppConfiguration/configurationStores/" + CONFIGURATION_STORE_NAME + "/privateEndpointConnections/" + PRIVATE_ENDPOINT_CONNECTION_NAME, 216 "id": private_connection_id, 217 "private_endpoint": { 218 "id": "/subscriptions/" + self.settings.SUBSCRIPTION_ID + "/resourceGroups/" + resource_group.name + "/providers/Microsoft.Network/privateEndpoints/" + ENDPOINT_NAME, 219 }, 220 "private_link_service_connection_state": { 221 "status": "Approved", 222 "description": "Auto-Approved" 223 } 224 } 225 result = self.mgmt_client.private_endpoint_connections.begin_create_or_update( 226 resource_group.name, 227 CONFIGURATION_STORE_NAME, 228 PRIVATE_ENDPOINT_CONNECTION_NAME, 229 BODY) 230 # id=BODY["id"], 231 # private_endpoint=BODY["private_endpoint"], 232 # private_link_service_connection_state=BODY["private_link_service_connection_state"]) 233 result = result.result() 234 235 # PrivateEndpointConnection_GetConnection[get] 236 result = self.mgmt_client.private_endpoint_connections.get(resource_group.name, CONFIGURATION_STORE_NAME, PRIVATE_ENDPOINT_CONNECTION_NAME) 237 238 # PrivateLinkResources_ListGroupIds[get] 239 privatelinks = list(self.mgmt_client.private_link_resources.list_by_configuration_store(resource_group.name, CONFIGURATION_STORE_NAME)) 240 PRIVATE_LINK_RESOURCE_NAME = privatelinks[0].name 241 242 # PrivateLinkResources_Get[get] 243 result = self.mgmt_client.private_link_resources.get(resource_group.name, CONFIGURATION_STORE_NAME, PRIVATE_LINK_RESOURCE_NAME) 244 245 # PrivateEndpointConnection_List[get] 246 result = list(self.mgmt_client.private_endpoint_connections.list_by_configuration_store(resource_group.name, CONFIGURATION_STORE_NAME)) 247 248 # List the operations available 249 result = self.mgmt_client.operations.list() 250 251 # ConfigurationStores_ListByResourceGroup[get] 252 result = self.mgmt_client.configuration_stores.list_by_resource_group(resource_group.name) 253 254 # ConfigurationStores_List[get] 255 result = self.mgmt_client.configuration_stores.list() 256 257 # ConfigurationStores_Update[patch] 258 BODY = { 259 "tags": { 260 "category": "Marketing" 261 }, 262 "sku": { 263 "name": "Standard" 264 } 265 } 266 result = self.mgmt_client.configuration_stores.begin_update(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 267 result = result.result() 268 269 # ConfigurationStores_Update_WithIdentity[patch] 270 # BODY = { 271 # "tags": { 272 # "category": "Marketing" 273 # }, 274 # "sku": { 275 # "name": "Standard" 276 # }, 277 # "identity": { 278 # "type": "SystemAssigned, UserAssigned", 279 # "user_assigned_identities": {} 280 # } 281 # } 282 # result = self.mgmt_client.configuration_stores.begin_update(resource_group.name, CONFIGURATION_STORE_NAME, BODY) 283 # result = result.result() 284 285 # ConfigurationStores_CheckNameNotAvailable[post] 286 BODY = { 287 "name": "contoso", 288 "type": "Microsoft.AppConfiguration/configurationStores" 289 } 290 result = self.mgmt_client.operations.check_name_availability(BODY) 291 292 # ConfigurationStores_CheckNameAvailable[post] 293 BODY = { 294 "name": "contoso", 295 "type": "Microsoft.AppConfiguration/configurationStores" 296 } 297 result = self.mgmt_client.operations.check_name_availability(BODY) 298 299 # PrivateEndpointConnections_Delete[delete] 300 result = self.mgmt_client.private_endpoint_connections.begin_delete(resource_group.name, CONFIGURATION_STORE_NAME, PRIVATE_ENDPOINT_CONNECTION_NAME) 301 result = result.result() 302 303 # ConfigurationStores_Delete[delete] 304 result = self.mgmt_client.configuration_stores.begin_delete(resource_group.name, CONFIGURATION_STORE_NAME) 305 result = result.result() 306 307 308#------------------------------------------------------------------------------ 309if __name__ == '__main__': 310 unittest.main() 311