1# -*- coding: utf-8 -*-
2# Copyright 2020 Red Hat
3# GNU General Public License v3.0+
4# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import absolute_import, division, print_function
7
8__metaclass__ = type
9
10DOCUMENTATION = """
11    author: Ganesh Nalawade (@ganeshrn)
12    name: jsonschema
13    short_description: Define configurable options for jsonschema validate plugin
14    description:
15    - This sub plugin documentation provides the configurable options that can be passed
16      to the validate plugins when C(ansible.utils.jsonschema) is used as a value for
17      engine option.
18    version_added: 1.0.0
19    options:
20      draft:
21        description:
22        - This option provides the jsonschema specification that should be used
23          for the validating the data. The I(criteria) option in the validate
24          plugin should follow the specification as mentioned by this option
25        default: draft7
26        choices:
27        - draft3
28        - draft4
29        - draft6
30        - draft7
31        env:
32        - name: ANSIBLE_VALIDATE_JSONSCHEMA_DRAFT
33        vars:
34        - name: ansible_validate_jsonschema_draft
35    notes:
36    - The value of I(data) option should be either a valid B(JSON) object or a B(JSON) string.
37    - The value of I(criteria) should be B(list) of B(dict) or B(list) of B(strings) and each
38      B(string) within the B(list) entry should be a valid B(dict) when read in python.
39"""
40
41import json
42
43from ansible.module_utils._text import to_text
44from ansible.module_utils.basic import missing_required_lib
45from ansible.errors import AnsibleError
46from ansible.module_utils.six import string_types
47
48from ansible_collections.ansible.utils.plugins.plugin_utils.base.validate import (
49    ValidateBase,
50)
51
52from ansible_collections.ansible.utils.plugins.module_utils.common.utils import (
53    to_list,
54)
55
56# PY2 compatiblilty for JSONDecodeError
57try:
58    from json.decoder import JSONDecodeError
59except ImportError:
60    JSONDecodeError = ValueError
61
62try:
63    import jsonschema
64
65    HAS_JSONSCHEMA = True
66except ImportError:
67    HAS_JSONSCHEMA = False
68
69
70def to_path(fpath):
71    return ".".join(str(index) for index in fpath)
72
73
74def json_path(absolute_path):
75    path = "$"
76    for elem in absolute_path:
77        if isinstance(elem, int):
78            path += "[" + str(elem) + "]"
79        else:
80            path += "." + elem
81    return path
82
83
84class Validate(ValidateBase):
85    @staticmethod
86    def _check_reqs():
87        """Check the prerequisites are installed for jsonschema
88
89        :return None: In case all prerequisites are satisfised
90        """
91        if not HAS_JSONSCHEMA:
92            raise AnsibleError(missing_required_lib("jsonschema"))
93
94    def _check_args(self):
95        """Ensure specific args are set
96
97        :return: None: In case all arguments passed are valid
98        """
99        try:
100            if isinstance(self._data, string_types):
101                self._data = json.loads(self._data)
102            else:
103                self._data = json.loads(json.dumps(self._data))
104
105        except (TypeError, JSONDecodeError) as exe:
106            msg = (
107                "'data' option value is invalid, value should a valid JSON."
108                " Failed to read with error '{err}'".format(
109                    err=to_text(exe, errors="surrogate_then_replace")
110                )
111            )
112            raise AnsibleError(msg)
113
114        try:
115            criteria = []
116            for item in to_list(self._criteria):
117                if isinstance(self._criteria, string_types):
118                    criteria.append(json.loads(item))
119                else:
120                    criteria.append(json.loads(json.dumps(item)))
121
122            self._criteria = criteria
123        except (TypeError, JSONDecodeError) as exe:
124            msg = (
125                "'criteria' option value is invalid, value should a valid JSON."
126                " Failed to read with error '{err}'".format(
127                    err=to_text(exe, errors="surrogate_then_replace")
128                )
129            )
130            raise AnsibleError(msg)
131
132    def validate(self):
133        """Std entry point for a validate execution
134
135        :return: Errors or parsed text as structured data
136        :rtype: dict
137
138        :example:
139
140        The parse function of a parser should return a dict:
141        {"errors": [a list of errors]}
142        or
143        {"parsed": obj}
144        """
145        self._check_reqs()
146        self._check_args()
147
148        try:
149            self._validate_jsonschema()
150        except Exception as exc:
151            return {"errors": to_text(exc, errors="surrogate_then_replace")}
152
153        return self._result
154
155    def _validate_jsonschema(self):
156        error_messages = None
157
158        draft = self._get_sub_plugin_options("draft")
159        error_messages = []
160
161        for criteria in self._criteria:
162            if draft == "draft3":
163                validator = jsonschema.Draft3Validator(
164                    criteria, format_checker=jsonschema.draft3_format_checker
165                )
166            elif draft == "draft4":
167                validator = jsonschema.Draft4Validator(
168                    criteria, format_checker=jsonschema.draft4_format_checker
169                )
170            elif draft == "draft6":
171                validator = jsonschema.Draft6Validator(
172                    criteria, format_checker=jsonschema.draft6_format_checker
173                )
174            else:
175                validator = jsonschema.Draft7Validator(
176                    criteria, format_checker=jsonschema.draft7_format_checker
177                )
178
179            validation_errors = sorted(
180                validator.iter_errors(self._data), key=lambda e: e.path
181            )
182
183            if validation_errors:
184                if "errors" not in self._result:
185                    self._result["errors"] = []
186
187                for validation_error in validation_errors:
188                    if isinstance(
189                        validation_error, jsonschema.ValidationError
190                    ):
191                        error = {
192                            "message": validation_error.message,
193                            "data_path": to_path(
194                                validation_error.absolute_path
195                            ),
196                            "json_path": json_path(
197                                validation_error.absolute_path
198                            ),
199                            "schema_path": to_path(
200                                validation_error.relative_schema_path
201                            ),
202                            "relative_schema": validation_error.schema,
203                            "expected": validation_error.validator_value,
204                            "validator": validation_error.validator,
205                            "found": validation_error.instance,
206                        }
207                        self._result["errors"].append(error)
208                        error_message = "At '{schema_path}' {message}. ".format(
209                            schema_path=error["schema_path"],
210                            message=error["message"],
211                        )
212                        error_messages.append(error_message)
213        if error_messages:
214            if "msg" not in self._result:
215                self._result["msg"] = "\n".join(error_messages)
216            else:
217                self._result["msg"] += "\n".join(error_messages)
218