1# -*- coding: utf-8 -*- 2# Copyright 2020 Red Hat 3# GNU General Public License v3.0+ 4# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5 6from __future__ import absolute_import, division, print_function 7 8__metaclass__ = type 9 10DOCUMENTATION = """ 11 author: Ganesh Nalawade (@ganeshrn) 12 name: jsonschema 13 short_description: Define configurable options for jsonschema validate plugin 14 description: 15 - This sub plugin documentation provides the configurable options that can be passed 16 to the validate plugins when C(ansible.utils.jsonschema) is used as a value for 17 engine option. 18 version_added: 1.0.0 19 options: 20 draft: 21 description: 22 - This option provides the jsonschema specification that should be used 23 for the validating the data. The I(criteria) option in the validate 24 plugin should follow the specification as mentioned by this option 25 default: draft7 26 choices: 27 - draft3 28 - draft4 29 - draft6 30 - draft7 31 env: 32 - name: ANSIBLE_VALIDATE_JSONSCHEMA_DRAFT 33 vars: 34 - name: ansible_validate_jsonschema_draft 35 notes: 36 - The value of I(data) option should be either a valid B(JSON) object or a B(JSON) string. 37 - The value of I(criteria) should be B(list) of B(dict) or B(list) of B(strings) and each 38 B(string) within the B(list) entry should be a valid B(dict) when read in python. 39""" 40 41import json 42 43from ansible.module_utils._text import to_text 44from ansible.module_utils.basic import missing_required_lib 45from ansible.errors import AnsibleError 46from ansible.module_utils.six import string_types 47 48from ansible_collections.ansible.utils.plugins.plugin_utils.base.validate import ( 49 ValidateBase, 50) 51 52from ansible_collections.ansible.utils.plugins.module_utils.common.utils import ( 53 to_list, 54) 55 56# PY2 compatiblilty for JSONDecodeError 57try: 58 from json.decoder import JSONDecodeError 59except ImportError: 60 JSONDecodeError = ValueError 61 62try: 63 import jsonschema 64 65 HAS_JSONSCHEMA = True 66except ImportError: 67 HAS_JSONSCHEMA = False 68 69 70def to_path(fpath): 71 return ".".join(str(index) for index in fpath) 72 73 74def json_path(absolute_path): 75 path = "$" 76 for elem in absolute_path: 77 if isinstance(elem, int): 78 path += "[" + str(elem) + "]" 79 else: 80 path += "." + elem 81 return path 82 83 84class Validate(ValidateBase): 85 @staticmethod 86 def _check_reqs(): 87 """Check the prerequisites are installed for jsonschema 88 89 :return None: In case all prerequisites are satisfised 90 """ 91 if not HAS_JSONSCHEMA: 92 raise AnsibleError(missing_required_lib("jsonschema")) 93 94 def _check_args(self): 95 """Ensure specific args are set 96 97 :return: None: In case all arguments passed are valid 98 """ 99 try: 100 if isinstance(self._data, string_types): 101 self._data = json.loads(self._data) 102 else: 103 self._data = json.loads(json.dumps(self._data)) 104 105 except (TypeError, JSONDecodeError) as exe: 106 msg = ( 107 "'data' option value is invalid, value should a valid JSON." 108 " Failed to read with error '{err}'".format( 109 err=to_text(exe, errors="surrogate_then_replace") 110 ) 111 ) 112 raise AnsibleError(msg) 113 114 try: 115 criteria = [] 116 for item in to_list(self._criteria): 117 if isinstance(self._criteria, string_types): 118 criteria.append(json.loads(item)) 119 else: 120 criteria.append(json.loads(json.dumps(item))) 121 122 self._criteria = criteria 123 except (TypeError, JSONDecodeError) as exe: 124 msg = ( 125 "'criteria' option value is invalid, value should a valid JSON." 126 " Failed to read with error '{err}'".format( 127 err=to_text(exe, errors="surrogate_then_replace") 128 ) 129 ) 130 raise AnsibleError(msg) 131 132 def validate(self): 133 """Std entry point for a validate execution 134 135 :return: Errors or parsed text as structured data 136 :rtype: dict 137 138 :example: 139 140 The parse function of a parser should return a dict: 141 {"errors": [a list of errors]} 142 or 143 {"parsed": obj} 144 """ 145 self._check_reqs() 146 self._check_args() 147 148 try: 149 self._validate_jsonschema() 150 except Exception as exc: 151 return {"errors": to_text(exc, errors="surrogate_then_replace")} 152 153 return self._result 154 155 def _validate_jsonschema(self): 156 error_messages = None 157 158 draft = self._get_sub_plugin_options("draft") 159 error_messages = [] 160 161 for criteria in self._criteria: 162 if draft == "draft3": 163 validator = jsonschema.Draft3Validator( 164 criteria, format_checker=jsonschema.draft3_format_checker 165 ) 166 elif draft == "draft4": 167 validator = jsonschema.Draft4Validator( 168 criteria, format_checker=jsonschema.draft4_format_checker 169 ) 170 elif draft == "draft6": 171 validator = jsonschema.Draft6Validator( 172 criteria, format_checker=jsonschema.draft6_format_checker 173 ) 174 else: 175 validator = jsonschema.Draft7Validator( 176 criteria, format_checker=jsonschema.draft7_format_checker 177 ) 178 179 validation_errors = sorted( 180 validator.iter_errors(self._data), key=lambda e: e.path 181 ) 182 183 if validation_errors: 184 if "errors" not in self._result: 185 self._result["errors"] = [] 186 187 for validation_error in validation_errors: 188 if isinstance( 189 validation_error, jsonschema.ValidationError 190 ): 191 error = { 192 "message": validation_error.message, 193 "data_path": to_path( 194 validation_error.absolute_path 195 ), 196 "json_path": json_path( 197 validation_error.absolute_path 198 ), 199 "schema_path": to_path( 200 validation_error.relative_schema_path 201 ), 202 "relative_schema": validation_error.schema, 203 "expected": validation_error.validator_value, 204 "validator": validation_error.validator, 205 "found": validation_error.instance, 206 } 207 self._result["errors"].append(error) 208 error_message = "At '{schema_path}' {message}. ".format( 209 schema_path=error["schema_path"], 210 message=error["message"], 211 ) 212 error_messages.append(error_message) 213 if error_messages: 214 if "msg" not in self._result: 215 self._result["msg"] = "\n".join(error_messages) 216 else: 217 self._result["msg"] += "\n".join(error_messages) 218