1# -*- coding: utf-8 -*-
2# Copyright (c) 2021 Ansible Project
3# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause)
4
5from __future__ import absolute_import, division, print_function
6__metaclass__ = type
7
8from copy import deepcopy
9
10from ansible.module_utils.common.parameters import (
11    _ADDITIONAL_CHECKS,
12    _get_legal_inputs,
13    _get_unsupported_parameters,
14    _handle_aliases,
15    _list_no_log_values,
16    _set_defaults,
17    _validate_argument_types,
18    _validate_argument_values,
19    _validate_sub_spec,
20    set_fallbacks,
21)
22
23from ansible.module_utils.common.text.converters import to_native
24from ansible.module_utils.common.warnings import deprecate, warn
25
26from ansible.module_utils.common.validation import (
27    check_mutually_exclusive,
28    check_required_arguments,
29    check_required_by,
30    check_required_if,
31    check_required_one_of,
32    check_required_together,
33)
34
35from ansible.module_utils.errors import (
36    AliasError,
37    AnsibleValidationErrorMultiple,
38    MutuallyExclusiveError,
39    NoLogError,
40    RequiredByError,
41    RequiredDefaultError,
42    RequiredError,
43    RequiredIfError,
44    RequiredOneOfError,
45    RequiredTogetherError,
46    UnsupportedError,
47)
48
49
50class ValidationResult:
51    """Result of argument spec validation.
52
53    This is the object returned by :func:`ArgumentSpecValidator.validate()
54    <ansible.module_utils.common.arg_spec.ArgumentSpecValidator.validate()>`
55    containing the validated parameters and any errors.
56    """
57
58    def __init__(self, parameters):
59        """
60        :arg parameters: Terms to be validated and coerced to the correct type.
61        :type parameters: dict
62        """
63        self._no_log_values = set()
64        """:class:`set` of values marked as ``no_log`` in the argument spec. This
65        is a temporary holding place for these values and may move in the future.
66        """
67
68        self._unsupported_parameters = set()
69        self._validated_parameters = deepcopy(parameters)
70        self._deprecations = []
71        self._warnings = []
72        self.errors = AnsibleValidationErrorMultiple()
73        """
74        :class:`~ansible.module_utils.errors.AnsibleValidationErrorMultiple` containing all
75        :class:`~ansible.module_utils.errors.AnsibleValidationError` objects if there were
76        any failures during validation.
77        """
78
79    @property
80    def validated_parameters(self):
81        """Validated and coerced parameters."""
82        return self._validated_parameters
83
84    @property
85    def unsupported_parameters(self):
86        """:class:`set` of unsupported parameter names."""
87        return self._unsupported_parameters
88
89    @property
90    def error_messages(self):
91        """:class:`list` of all error messages from each exception in :attr:`errors`."""
92        return self.errors.messages
93
94
95class ArgumentSpecValidator:
96    """Argument spec validation class
97
98    Creates a validator based on the ``argument_spec`` that can be used to
99    validate a number of parameters using the :meth:`validate` method.
100    """
101
102    def __init__(self, argument_spec,
103                 mutually_exclusive=None,
104                 required_together=None,
105                 required_one_of=None,
106                 required_if=None,
107                 required_by=None,
108                 ):
109
110        """
111        :arg argument_spec: Specification of valid parameters and their type. May
112            include nested argument specs.
113        :type argument_spec: dict[str, dict]
114
115        :kwarg mutually_exclusive: List or list of lists of terms that should not
116            be provided together.
117        :type mutually_exclusive: list[str] or list[list[str]]
118
119        :kwarg required_together: List of lists of terms that are required together.
120        :type required_together: list[list[str]]
121
122        :kwarg required_one_of: List of lists of terms, one of which in each list
123            is required.
124        :type required_one_of: list[list[str]]
125
126        :kwarg required_if: List of lists of ``[parameter, value, [parameters]]`` where
127            one of ``[parameters]`` is required if ``parameter == value``.
128        :type required_if: list
129
130        :kwarg required_by: Dictionary of parameter names that contain a list of
131            parameters required by each key in the dictionary.
132        :type required_by: dict[str, list[str]]
133        """
134
135        self._mutually_exclusive = mutually_exclusive
136        self._required_together = required_together
137        self._required_one_of = required_one_of
138        self._required_if = required_if
139        self._required_by = required_by
140        self._valid_parameter_names = set()
141        self.argument_spec = argument_spec
142
143        for key in sorted(self.argument_spec.keys()):
144            aliases = self.argument_spec[key].get('aliases')
145            if aliases:
146                self._valid_parameter_names.update(["{key} ({aliases})".format(key=key, aliases=", ".join(sorted(aliases)))])
147            else:
148                self._valid_parameter_names.update([key])
149
150    def validate(self, parameters, *args, **kwargs):
151        """Validate ``parameters`` against argument spec.
152
153        Error messages in the :class:`ValidationResult` may contain no_log values and should be
154        sanitized with :func:`~ansible.module_utils.common.parameters.sanitize_keys` before logging or displaying.
155
156        :arg parameters: Parameters to validate against the argument spec
157        :type parameters: dict[str, dict]
158
159        :return: :class:`ValidationResult` containing validated parameters.
160
161        :Simple Example:
162
163            .. code-block:: text
164
165                argument_spec = {
166                    'name': {'type': 'str'},
167                    'age': {'type': 'int'},
168                }
169
170                parameters = {
171                    'name': 'bo',
172                    'age': '42',
173                }
174
175                validator = ArgumentSpecValidator(argument_spec)
176                result = validator.validate(parameters)
177
178                if result.error_messages:
179                    sys.exit("Validation failed: {0}".format(", ".join(result.error_messages))
180
181                valid_params = result.validated_parameters
182        """
183
184        result = ValidationResult(parameters)
185
186        result._no_log_values.update(set_fallbacks(self.argument_spec, result._validated_parameters))
187
188        alias_warnings = []
189        alias_deprecations = []
190        try:
191            aliases = _handle_aliases(self.argument_spec, result._validated_parameters, alias_warnings, alias_deprecations)
192        except (TypeError, ValueError) as e:
193            aliases = {}
194            result.errors.append(AliasError(to_native(e)))
195
196        legal_inputs = _get_legal_inputs(self.argument_spec, result._validated_parameters, aliases)
197
198        for option, alias in alias_warnings:
199            result._warnings.append({'option': option, 'alias': alias})
200
201        for deprecation in alias_deprecations:
202            result._deprecations.append({
203                'name': deprecation['name'],
204                'version': deprecation.get('version'),
205                'date': deprecation.get('date'),
206                'collection_name': deprecation.get('collection_name'),
207            })
208
209        try:
210            result._no_log_values.update(_list_no_log_values(self.argument_spec, result._validated_parameters))
211        except TypeError as te:
212            result.errors.append(NoLogError(to_native(te)))
213
214        try:
215            result._unsupported_parameters.update(_get_unsupported_parameters(self.argument_spec, result._validated_parameters, legal_inputs))
216        except TypeError as te:
217            result.errors.append(RequiredDefaultError(to_native(te)))
218        except ValueError as ve:
219            result.errors.append(AliasError(to_native(ve)))
220
221        try:
222            check_mutually_exclusive(self._mutually_exclusive, result._validated_parameters)
223        except TypeError as te:
224            result.errors.append(MutuallyExclusiveError(to_native(te)))
225
226        result._no_log_values.update(_set_defaults(self.argument_spec, result._validated_parameters, False))
227
228        try:
229            check_required_arguments(self.argument_spec, result._validated_parameters)
230        except TypeError as e:
231            result.errors.append(RequiredError(to_native(e)))
232
233        _validate_argument_types(self.argument_spec, result._validated_parameters, errors=result.errors)
234        _validate_argument_values(self.argument_spec, result._validated_parameters, errors=result.errors)
235
236        for check in _ADDITIONAL_CHECKS:
237            try:
238                check['func'](getattr(self, "_{attr}".format(attr=check['attr'])), result._validated_parameters)
239            except TypeError as te:
240                result.errors.append(check['err'](to_native(te)))
241
242        result._no_log_values.update(_set_defaults(self.argument_spec, result._validated_parameters))
243
244        _validate_sub_spec(self.argument_spec, result._validated_parameters,
245                           errors=result.errors,
246                           no_log_values=result._no_log_values,
247                           unsupported_parameters=result._unsupported_parameters)
248
249        if result._unsupported_parameters:
250            flattened_names = []
251            for item in result._unsupported_parameters:
252                if isinstance(item, tuple):
253                    flattened_names.append(".".join(item))
254                else:
255                    flattened_names.append(item)
256
257            unsupported_string = ", ".join(sorted(list(flattened_names)))
258            supported_string = ", ".join(self._valid_parameter_names)
259            result.errors.append(
260                UnsupportedError("{0}. Supported parameters include: {1}.".format(unsupported_string, supported_string)))
261
262        return result
263
264
265class ModuleArgumentSpecValidator(ArgumentSpecValidator):
266    """Argument spec validation class used by :class:`AnsibleModule`.
267
268    This is not meant to be used outside of :class:`AnsibleModule`. Use
269    :class:`ArgumentSpecValidator` instead.
270    """
271
272    def __init__(self, *args, **kwargs):
273        super(ModuleArgumentSpecValidator, self).__init__(*args, **kwargs)
274
275    def validate(self, parameters):
276        result = super(ModuleArgumentSpecValidator, self).validate(parameters)
277
278        for d in result._deprecations:
279            deprecate("Alias '{name}' is deprecated. See the module docs for more information".format(name=d['name']),
280                      version=d.get('version'), date=d.get('date'),
281                      collection_name=d.get('collection_name'))
282
283        for w in result._warnings:
284            warn('Both option {option} and its alias {alias} are set.'.format(option=w['option'], alias=w['alias']))
285
286        return result
287