1# -*- coding: utf-8 -*- 2# Copyright (c) 2021 Ansible Project 3# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause) 4 5from __future__ import absolute_import, division, print_function 6__metaclass__ = type 7 8from copy import deepcopy 9 10from ansible.module_utils.common.parameters import ( 11 _ADDITIONAL_CHECKS, 12 _get_legal_inputs, 13 _get_unsupported_parameters, 14 _handle_aliases, 15 _list_no_log_values, 16 _set_defaults, 17 _validate_argument_types, 18 _validate_argument_values, 19 _validate_sub_spec, 20 set_fallbacks, 21) 22 23from ansible.module_utils.common.text.converters import to_native 24from ansible.module_utils.common.warnings import deprecate, warn 25 26from ansible.module_utils.common.validation import ( 27 check_mutually_exclusive, 28 check_required_arguments, 29 check_required_by, 30 check_required_if, 31 check_required_one_of, 32 check_required_together, 33) 34 35from ansible.module_utils.errors import ( 36 AliasError, 37 AnsibleValidationErrorMultiple, 38 MutuallyExclusiveError, 39 NoLogError, 40 RequiredByError, 41 RequiredDefaultError, 42 RequiredError, 43 RequiredIfError, 44 RequiredOneOfError, 45 RequiredTogetherError, 46 UnsupportedError, 47) 48 49 50class ValidationResult: 51 """Result of argument spec validation. 52 53 This is the object returned by :func:`ArgumentSpecValidator.validate() 54 <ansible.module_utils.common.arg_spec.ArgumentSpecValidator.validate()>` 55 containing the validated parameters and any errors. 56 """ 57 58 def __init__(self, parameters): 59 """ 60 :arg parameters: Terms to be validated and coerced to the correct type. 61 :type parameters: dict 62 """ 63 self._no_log_values = set() 64 """:class:`set` of values marked as ``no_log`` in the argument spec. This 65 is a temporary holding place for these values and may move in the future. 66 """ 67 68 self._unsupported_parameters = set() 69 self._validated_parameters = deepcopy(parameters) 70 self._deprecations = [] 71 self._warnings = [] 72 self.errors = AnsibleValidationErrorMultiple() 73 """ 74 :class:`~ansible.module_utils.errors.AnsibleValidationErrorMultiple` containing all 75 :class:`~ansible.module_utils.errors.AnsibleValidationError` objects if there were 76 any failures during validation. 77 """ 78 79 @property 80 def validated_parameters(self): 81 """Validated and coerced parameters.""" 82 return self._validated_parameters 83 84 @property 85 def unsupported_parameters(self): 86 """:class:`set` of unsupported parameter names.""" 87 return self._unsupported_parameters 88 89 @property 90 def error_messages(self): 91 """:class:`list` of all error messages from each exception in :attr:`errors`.""" 92 return self.errors.messages 93 94 95class ArgumentSpecValidator: 96 """Argument spec validation class 97 98 Creates a validator based on the ``argument_spec`` that can be used to 99 validate a number of parameters using the :meth:`validate` method. 100 """ 101 102 def __init__(self, argument_spec, 103 mutually_exclusive=None, 104 required_together=None, 105 required_one_of=None, 106 required_if=None, 107 required_by=None, 108 ): 109 110 """ 111 :arg argument_spec: Specification of valid parameters and their type. May 112 include nested argument specs. 113 :type argument_spec: dict[str, dict] 114 115 :kwarg mutually_exclusive: List or list of lists of terms that should not 116 be provided together. 117 :type mutually_exclusive: list[str] or list[list[str]] 118 119 :kwarg required_together: List of lists of terms that are required together. 120 :type required_together: list[list[str]] 121 122 :kwarg required_one_of: List of lists of terms, one of which in each list 123 is required. 124 :type required_one_of: list[list[str]] 125 126 :kwarg required_if: List of lists of ``[parameter, value, [parameters]]`` where 127 one of ``[parameters]`` is required if ``parameter == value``. 128 :type required_if: list 129 130 :kwarg required_by: Dictionary of parameter names that contain a list of 131 parameters required by each key in the dictionary. 132 :type required_by: dict[str, list[str]] 133 """ 134 135 self._mutually_exclusive = mutually_exclusive 136 self._required_together = required_together 137 self._required_one_of = required_one_of 138 self._required_if = required_if 139 self._required_by = required_by 140 self._valid_parameter_names = set() 141 self.argument_spec = argument_spec 142 143 for key in sorted(self.argument_spec.keys()): 144 aliases = self.argument_spec[key].get('aliases') 145 if aliases: 146 self._valid_parameter_names.update(["{key} ({aliases})".format(key=key, aliases=", ".join(sorted(aliases)))]) 147 else: 148 self._valid_parameter_names.update([key]) 149 150 def validate(self, parameters, *args, **kwargs): 151 """Validate ``parameters`` against argument spec. 152 153 Error messages in the :class:`ValidationResult` may contain no_log values and should be 154 sanitized with :func:`~ansible.module_utils.common.parameters.sanitize_keys` before logging or displaying. 155 156 :arg parameters: Parameters to validate against the argument spec 157 :type parameters: dict[str, dict] 158 159 :return: :class:`ValidationResult` containing validated parameters. 160 161 :Simple Example: 162 163 .. code-block:: text 164 165 argument_spec = { 166 'name': {'type': 'str'}, 167 'age': {'type': 'int'}, 168 } 169 170 parameters = { 171 'name': 'bo', 172 'age': '42', 173 } 174 175 validator = ArgumentSpecValidator(argument_spec) 176 result = validator.validate(parameters) 177 178 if result.error_messages: 179 sys.exit("Validation failed: {0}".format(", ".join(result.error_messages)) 180 181 valid_params = result.validated_parameters 182 """ 183 184 result = ValidationResult(parameters) 185 186 result._no_log_values.update(set_fallbacks(self.argument_spec, result._validated_parameters)) 187 188 alias_warnings = [] 189 alias_deprecations = [] 190 try: 191 aliases = _handle_aliases(self.argument_spec, result._validated_parameters, alias_warnings, alias_deprecations) 192 except (TypeError, ValueError) as e: 193 aliases = {} 194 result.errors.append(AliasError(to_native(e))) 195 196 legal_inputs = _get_legal_inputs(self.argument_spec, result._validated_parameters, aliases) 197 198 for option, alias in alias_warnings: 199 result._warnings.append({'option': option, 'alias': alias}) 200 201 for deprecation in alias_deprecations: 202 result._deprecations.append({ 203 'name': deprecation['name'], 204 'version': deprecation.get('version'), 205 'date': deprecation.get('date'), 206 'collection_name': deprecation.get('collection_name'), 207 }) 208 209 try: 210 result._no_log_values.update(_list_no_log_values(self.argument_spec, result._validated_parameters)) 211 except TypeError as te: 212 result.errors.append(NoLogError(to_native(te))) 213 214 try: 215 result._unsupported_parameters.update(_get_unsupported_parameters(self.argument_spec, result._validated_parameters, legal_inputs)) 216 except TypeError as te: 217 result.errors.append(RequiredDefaultError(to_native(te))) 218 except ValueError as ve: 219 result.errors.append(AliasError(to_native(ve))) 220 221 try: 222 check_mutually_exclusive(self._mutually_exclusive, result._validated_parameters) 223 except TypeError as te: 224 result.errors.append(MutuallyExclusiveError(to_native(te))) 225 226 result._no_log_values.update(_set_defaults(self.argument_spec, result._validated_parameters, False)) 227 228 try: 229 check_required_arguments(self.argument_spec, result._validated_parameters) 230 except TypeError as e: 231 result.errors.append(RequiredError(to_native(e))) 232 233 _validate_argument_types(self.argument_spec, result._validated_parameters, errors=result.errors) 234 _validate_argument_values(self.argument_spec, result._validated_parameters, errors=result.errors) 235 236 for check in _ADDITIONAL_CHECKS: 237 try: 238 check['func'](getattr(self, "_{attr}".format(attr=check['attr'])), result._validated_parameters) 239 except TypeError as te: 240 result.errors.append(check['err'](to_native(te))) 241 242 result._no_log_values.update(_set_defaults(self.argument_spec, result._validated_parameters)) 243 244 _validate_sub_spec(self.argument_spec, result._validated_parameters, 245 errors=result.errors, 246 no_log_values=result._no_log_values, 247 unsupported_parameters=result._unsupported_parameters) 248 249 if result._unsupported_parameters: 250 flattened_names = [] 251 for item in result._unsupported_parameters: 252 if isinstance(item, tuple): 253 flattened_names.append(".".join(item)) 254 else: 255 flattened_names.append(item) 256 257 unsupported_string = ", ".join(sorted(list(flattened_names))) 258 supported_string = ", ".join(self._valid_parameter_names) 259 result.errors.append( 260 UnsupportedError("{0}. Supported parameters include: {1}.".format(unsupported_string, supported_string))) 261 262 return result 263 264 265class ModuleArgumentSpecValidator(ArgumentSpecValidator): 266 """Argument spec validation class used by :class:`AnsibleModule`. 267 268 This is not meant to be used outside of :class:`AnsibleModule`. Use 269 :class:`ArgumentSpecValidator` instead. 270 """ 271 272 def __init__(self, *args, **kwargs): 273 super(ModuleArgumentSpecValidator, self).__init__(*args, **kwargs) 274 275 def validate(self, parameters): 276 result = super(ModuleArgumentSpecValidator, self).validate(parameters) 277 278 for d in result._deprecations: 279 deprecate("Alias '{name}' is deprecated. See the module docs for more information".format(name=d['name']), 280 version=d.get('version'), date=d.get('date'), 281 collection_name=d.get('collection_name')) 282 283 for w in result._warnings: 284 warn('Both option {option} and its alias {alias} are set.'.format(option=w['option'], alias=w['alias'])) 285 286 return result 287