1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6from azure.cli.core.azclierror import InvalidArgumentValueError 7from azure.cli.core.commands.client_factory import get_subscription_id 8from knack.util import CLIError 9 10 11def validate_failover_policies(ns): 12 """ Extracts multiple space-separated failoverPolicies in regionName=failoverPriority format """ 13 from azure.mgmt.cosmosdb.models import FailoverPolicies, FailoverPolicy 14 fp_dict = [] 15 for item in ns.failover_parameters: 16 comps = item.split('=', 1) 17 fp_dict.append(FailoverPolicy(location_name=comps[0], failover_priority=int(comps[1]))) 18 ns.failover_parameters = FailoverPolicies(failover_policies=fp_dict) 19 20 21def validate_ip_range_filter(ns): 22 """ Extracts multiple comma-separated ip rules """ 23 from azure.mgmt.cosmosdb.models import IpAddressOrRange 24 if ns.ip_range_filter is not None: 25 ip_rules_list = [] 26 for item in ns.ip_range_filter: 27 for i in item.split(","): 28 if i: 29 ip_rules_list.append(IpAddressOrRange(ip_address_or_range=i)) 30 ns.ip_range_filter = ip_rules_list 31 ns.ip_range_filter = ip_rules_list 32 33 34def validate_private_endpoint_connection_id(ns): 35 if ns.connection_id: 36 from azure.cli.core.util import parse_proxy_resource_id 37 result = parse_proxy_resource_id(ns.connection_id) 38 ns.resource_group_name = result['resource_group'] 39 ns.account_name = result['name'] 40 ns.private_endpoint_connection_name = result['child_name_1'] 41 42 if not all([ns.account_name, ns.resource_group_name, ns.private_endpoint_connection_name]): 43 raise CLIError(None, 'incorrect usage: [--id ID | --name NAME --account-name NAME --resource-group NAME]') 44 45 del ns.connection_id 46 47 48def validate_capabilities(ns): 49 """ Extracts multiple space-separated capabilities """ 50 from azure.mgmt.cosmosdb.models import Capability 51 if ns.capabilities is not None: 52 capabilties_list = [] 53 for item in ns.capabilities: 54 capabilties_list.append(Capability(name=item)) 55 ns.capabilities = capabilties_list 56 57 58def validate_virtual_network_rules(ns): 59 """ Extracts multiple space-separated virtual network rules """ 60 from azure.mgmt.cosmosdb.models import VirtualNetworkRule 61 if ns.virtual_network_rules is not None: 62 virtual_network_rules_list = [] 63 for item in ns.virtual_network_rules: 64 virtual_network_rules_list.append(VirtualNetworkRule(id=item)) 65 ns.virtual_network_rules = virtual_network_rules_list 66 67 68def validate_role_definition_body(cmd, ns): 69 """ Extracts role definition body """ 70 import os 71 72 from azure.cli.core.util import get_file_json, shell_safe_json_parse 73 from azure.mgmt.cosmosdb.models import Permission, RoleDefinitionType 74 if ns.role_definition_body is not None: 75 if os.path.exists(ns.role_definition_body): 76 role_definition = get_file_json(ns.role_definition_body) 77 else: 78 role_definition = shell_safe_json_parse(ns.role_definition_body) 79 80 if not isinstance(role_definition, dict): 81 raise InvalidArgumentValueError( 82 'Invalid role definition. A valid dictionary JSON representation is expected.') 83 84 if 'Id' in role_definition: 85 role_definition['Id'] = _parse_resource_path( 86 resource=role_definition['Id'], 87 to_fully_qualified=False, 88 resource_type="sqlRoleDefinitions") 89 else: 90 role_definition['Id'] = _gen_guid() 91 92 if 'DataActions' in role_definition: 93 role_definition['Permissions'] = [Permission(data_actions=role_definition['DataActions'])] 94 else: 95 role_definition['Permissions'] = [Permission(data_actions=p['DataActions']) 96 for p in role_definition['Permissions']] 97 98 if 'Type' not in role_definition: 99 role_definition['Type'] = RoleDefinitionType.custom_role 100 101 role_definition['AssignableScopes'] = [_parse_resource_path( 102 resource=scope, 103 to_fully_qualified=True, 104 resource_type=None, 105 subscription_id=get_subscription_id(cmd.cli_ctx), 106 resource_group_name=ns.resource_group_name, 107 account_name=ns.account_name) for scope in role_definition['AssignableScopes']] 108 109 ns.role_definition_body = role_definition 110 111 112def validate_role_definition_id(ns): 113 """ Extracts Guid role definition Id """ 114 if ns.role_definition_id is not None: 115 ns.role_definition_id = _parse_resource_path( 116 resource=ns.role_definition_id, 117 to_fully_qualified=False, 118 resource_type="sqlRoleDefinitions") 119 120 121def validate_fully_qualified_role_definition_id(cmd, ns): 122 """ Extracts fully qualified role definition Id """ 123 if ns.role_definition_id is not None: 124 ns.role_definition_id = _parse_resource_path( 125 resource=ns.role_definition_id, 126 to_fully_qualified=True, 127 resource_type="sqlRoleDefinitions", 128 subscription_id=get_subscription_id(cmd.cli_ctx), 129 resource_group_name=ns.resource_group_name, 130 account_name=ns.account_name) 131 132 133def validate_role_assignment_id(ns): 134 """ Extracts Guid role assignment Id """ 135 if ns.role_assignment_id is not None: 136 ns.role_assignment_id = _parse_resource_path( 137 resource=ns.role_assignment_id, 138 to_fully_qualified=False, 139 resource_type="sqlRoleAssignments") 140 else: 141 ns.role_assignment_id = _gen_guid() 142 143 144def validate_scope(cmd, ns): 145 """ Extracts fully qualified scope """ 146 if ns.scope is not None: 147 ns.scope = _parse_resource_path( 148 resource=ns.scope, 149 to_fully_qualified=True, 150 resource_type=None, 151 subscription_id=get_subscription_id(cmd.cli_ctx), 152 resource_group_name=ns.resource_group_name, 153 account_name=ns.account_name) 154 155 156def _parse_resource_path(resource, 157 to_fully_qualified, 158 resource_type=None, 159 subscription_id=None, 160 resource_group_name=None, 161 account_name=None): 162 """Returns a properly formatted role definition or assignment id or scope. If scope, type=None.""" 163 import re 164 regex = "/subscriptions/(?P<subscription>.*)/resourceGroups/(?P<resource_group>.*)/providers/" \ 165 "Microsoft.DocumentDB/databaseAccounts/(?P<database_account>.*)" 166 formatted = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.DocumentDB/databaseAccounts/{2}" 167 168 if resource_type is not None: 169 regex += "/" + resource_type + "/(?P<resource_id>.*)" 170 formatted += "/" + resource_type + "/" 171 172 formatted += "{3}" 173 174 if to_fully_qualified: 175 result = re.match(regex, resource) 176 if result is not None: 177 return resource 178 179 return formatted.format(subscription_id, resource_group_name, account_name, resource) 180 181 result = re.match(regex, resource) 182 if result is None: 183 return resource 184 185 return result['resource_id'] 186 187 188def _gen_guid(): 189 import uuid 190 return uuid.uuid4() 191