1# -------------------------------------------------------------------------------------------- 2# Copyright (c) Microsoft Corporation. All rights reserved. 3# Licensed under the MIT License. See License.txt in the project root for license information. 4# -------------------------------------------------------------------------------------------- 5 6import os 7import re 8import argparse 9 10from knack.util import CLIError 11try: 12 from urllib.parse import urlparse, urlsplit 13except ImportError: 14 from urlparse import urlparse, urlsplit # pylint: disable=import-error 15 16MSI_LOCAL_ID = '[system]' 17 18 19def process_ts_create_or_update_namespace(namespace): 20 from azure.cli.core.commands.validators import validate_tags 21 validate_tags(namespace) 22 if namespace.template_file and not os.path.isfile(namespace.template_file): 23 raise CLIError('Please enter a valid file path') 24 25 26def _validate_template_spec(namespace): 27 if namespace.template_spec is None: 28 if (namespace.name is None or namespace.resource_group_name is None): 29 raise CLIError('incorrect usage: Please enter ' 30 'a resource group and resource name or a resource ID for --template-spec') 31 else: 32 from azure.mgmt.core.tools import is_valid_resource_id 33 namespace.template_spec = namespace.template_spec.strip("\"") 34 if not is_valid_resource_id(namespace.template_spec): 35 raise CLIError('--template-spec is not a valid resource ID.') 36 37 38def _validate_template_spec_out(namespace): 39 _validate_template_spec(namespace) 40 if namespace.output_folder and not os.path.isdir(namespace.output_folder): 41 raise CLIError('Please enter a valid output folder') 42 43 44def _validate_deployment_name_with_template_specs(namespace): 45 # If missing,try come out with a name associated with the template name 46 if namespace.deployment_name is None: 47 template_filename = None 48 if namespace.template_file and os.path.isfile(namespace.template_file): 49 template_filename = namespace.template_file 50 if namespace.template_uri and urlparse(namespace.template_uri).scheme: 51 template_filename = urlsplit(namespace.template_uri).path 52 if namespace.template_spec: 53 from azure.mgmt.core.tools import parse_resource_id, is_valid_resource_id 54 namespace.template_spec = namespace.template_spec.strip("\"") 55 if not is_valid_resource_id(namespace.template_spec): 56 raise CLIError('--template-spec is not a valid resource ID.') 57 if namespace.template_spec.__contains__("versions") is False: 58 raise CLIError('Please enter a valid template spec version ID.') 59 template_filename = parse_resource_id(namespace.template_spec).get('resource_name') 60 if template_filename: 61 template_filename = os.path.basename(template_filename) 62 namespace.deployment_name = os.path.splitext(template_filename)[0] 63 else: 64 namespace.deployment_name = 'deployment1' 65 66 67def _validate_deployment_name(namespace): 68 # If missing,try come out with a name associated with the template name 69 if namespace.deployment_name is None: 70 template_filename = None 71 if namespace.template_file and os.path.isfile(namespace.template_file): 72 template_filename = namespace.template_file 73 if namespace.template_uri and urlparse(namespace.template_uri).scheme: 74 template_filename = urlsplit(namespace.template_uri).path 75 if template_filename: 76 template_filename = os.path.basename(template_filename) 77 namespace.deployment_name = os.path.splitext(template_filename)[0] 78 else: 79 namespace.deployment_name = 'deployment1' 80 81 82def process_deployment_create_namespace(namespace): 83 try: 84 if [bool(namespace.template_uri), bool(namespace.template_file), 85 bool(namespace.template_spec)].count(True) != 1: 86 raise CLIError('incorrect usage: Chose only one of' 87 ' --template-file FILE | --template-uri URI | --template-spec ID to pass in') 88 except Exception: # pylint: disable=broad-except 89 if [bool(namespace.template_uri), bool(namespace.template_file)].count(True) != 1: 90 raise CLIError('incorrect usage: Chose only one of' 91 ' --template-file FILE | --template-uri URI') 92 if(bool(namespace.template_uri) or bool(namespace.template_file)): 93 _validate_deployment_name(namespace) 94 else: 95 _validate_deployment_name_with_template_specs(namespace) 96 97 98def internal_validate_lock_parameters(namespace, resource_group, resource_provider_namespace, 99 parent_resource_path, resource_type, resource_name): 100 if resource_group is None: 101 if resource_name is not None: 102 from msrestazure.tools import parse_resource_id, is_valid_resource_id 103 if not is_valid_resource_id(resource_name): 104 raise CLIError('--resource is not a valid resource ID. ' 105 '--resource as a resource name is ignored if --resource-group is not given.') 106 # resource-name is an ID, populate namespace 107 id_dict = parse_resource_id(resource_name) 108 for id_part in ['resource_name', 'resource_type', 'resource_group']: 109 setattr(namespace, id_part, id_dict.get(id_part)) 110 setattr(namespace, 'resource_provider_namespace', id_dict.get('resource_namespace')) 111 setattr(namespace, 'parent_resource_path', id_dict.get('resource_parent').strip('/')) 112 113 if resource_type is not None: 114 raise CLIError('--resource-type is ignored if --resource-group is not given.') 115 if resource_provider_namespace is not None: 116 raise CLIError('--namespace is ignored if --resource-group is not given.') 117 if parent_resource_path is not None: 118 raise CLIError('--parent is ignored if --resource-group is not given.') 119 return 120 121 if resource_name is None: 122 if resource_type is not None: 123 raise CLIError('--resource-type is ignored if --resource is not given.') 124 if resource_provider_namespace is not None: 125 raise CLIError('--namespace is ignored if --resource is not given.') 126 if parent_resource_path is not None: 127 raise CLIError('--parent is ignored if --resource is not given.') 128 return 129 130 if not resource_type: 131 raise CLIError('--resource-type is required if the name, --resource, is present') 132 133 parts = resource_type.split('/') 134 if resource_provider_namespace is None: 135 if len(parts) == 1: 136 raise CLIError('A resource namespace is required if the name, --resource, is present.' 137 'Expected <namespace>/<type> or --namespace=<namespace>') 138 elif len(parts) != 1: 139 raise CLIError('Resource namespace specified in both --resource-type and --namespace') 140 141 142def validate_lock_parameters(namespace): 143 internal_validate_lock_parameters(namespace, 144 getattr(namespace, 'resource_group', None), 145 getattr(namespace, 'resource_provider_namespace', None), 146 getattr(namespace, 'parent_resource_path', None), 147 getattr(namespace, 'resource_type', None), 148 getattr(namespace, 'resource_name', None)) 149 150 151def _parse_lock_id(id_arg): 152 """ 153 Lock ids look very different from regular resource ids, this function uses a regular expression 154 that parses a lock's id and extracts the following parameters if available: 155 -lock_name: the lock's name; always present in a lock id 156 -resource_group: the name of the resource group; present in group/resource level locks 157 -resource_provider_namespace: the resource provider; present in resource level locks 158 -resource_type: the resource type; present in resource level locks 159 -resource_name: the resource name; present in resource level locks 160 -parent_resource_path: the resource's parent path; present in child resources such as subnets 161 """ 162 regex = re.compile( 163 '/subscriptions/[^/]*(/resource[gG]roups/(?P<resource_group>[^/]*)' 164 '(/providers/(?P<resource_provider_namespace>[^/]*)' 165 '(/(?P<parent_resource_path>.*))?/(?P<resource_type>[^/]*)/(?P<resource_name>[^/]*))?)?' 166 '/providers/Microsoft.Authorization/locks/(?P<lock_name>[^/]*)') 167 168 return regex.match(id_arg).groupdict() 169 170 171def validate_subscription_lock(namespace): 172 if getattr(namespace, 'ids', None): 173 for lock_id in getattr(namespace, 'ids'): 174 if _parse_lock_id(lock_id).get('resource_group'): 175 raise CLIError('{} is not a valid subscription-level lock id.'.format(lock_id)) 176 177 178def validate_group_lock(namespace): 179 if getattr(namespace, 'ids', None): 180 for lock_id in getattr(namespace, 'ids'): 181 lock_id_dict = _parse_lock_id(lock_id) 182 if not lock_id_dict.get('resource_group') or lock_id_dict.get('resource_name'): 183 raise CLIError('{} is not a valid group-level lock id.'.format(lock_id)) 184 else: 185 if not getattr(namespace, 'resource_group', None): 186 raise CLIError('Missing required --resource-group/-g parameter.') 187 188 189def validate_resource_lock(namespace): 190 if getattr(namespace, 'ids', None): 191 for lock_id in getattr(namespace, 'ids'): 192 lock_id_dict = _parse_lock_id(lock_id) 193 if not all((lock_id_dict.get(param)) for param in ['resource_group', 194 'resource_provider_namespace', 195 'resource_type', 196 'resource_name']): 197 raise CLIError('{} is not a valid resource-level lock id.'.format(lock_id)) 198 else: 199 if not getattr(namespace, 'resource_name', None): 200 raise CLIError('Missing required {} parameter.'.format('resource_name')) 201 kwargs = {} 202 for param in ['resource_group', 'resource_name', 'resource_provider_namespace', 'parent_resource_path', 203 'resource_type']: 204 kwargs[param] = getattr(namespace, param, None) 205 internal_validate_lock_parameters(namespace, **kwargs) 206 207 208def validate_metadata(namespace): 209 if namespace.metadata: 210 namespace.metadata = dict(x.split('=', 1) for x in namespace.metadata) 211 212 213def validate_msi(namespace): 214 if namespace.assign_identity is not None: 215 identities = namespace.assign_identity or [] 216 if any(identity != MSI_LOCAL_ID for identity in identities): 217 raise CLIError("usage error: 'User assigned identities are not supported " 218 "with --assign-identity and policy assignments'") 219 220 if not namespace.identity_scope and getattr(namespace.identity_role, 'is_default', None) is None: 221 raise CLIError("usage error: '--role {}' is not applicable as the '--identity-scope' is not provided" 222 .format(namespace.identity_role)) 223 224 if namespace.identity_scope: 225 if identities and MSI_LOCAL_ID not in identities: 226 raise CLIError( 227 "usage error: '--identity-scope'/'--role' is only applicable when assigning a system identity") 228 229 elif namespace.identity_scope or getattr(namespace.identity_role, 'is_default', None) is None: 230 raise CLIError( 231 'usage error: --assign-identity [--identity-scope SCOPE] [--role ROLE]') 232 233 234# pylint: disable=too-few-public-methods 235class RollbackAction(argparse.Action): 236 def __call__(self, parser, namespace, values, option_string=None): 237 setattr(namespace, 'rollback_on_error', '' if not values else values) 238