1# Copyright (c) 2018 Cisco and/or its affiliates. 2# 3# This file is part of Ansible 4# 5# Ansible is free software: you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation, either version 3 of the License, or 8# (at your option) any later version. 9# 10# Ansible is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with Ansible. If not, see <http://www.gnu.org/licenses/>. 17# 18 19from ansible.module_utils.network.ftd.common import HTTPMethod 20from ansible.module_utils.six import integer_types, string_types, iteritems 21 22FILE_MODEL_NAME = '_File' 23SUCCESS_RESPONSE_CODE = '200' 24DELETE_PREFIX = 'delete' 25 26 27class OperationField: 28 URL = 'url' 29 METHOD = 'method' 30 PARAMETERS = 'parameters' 31 MODEL_NAME = 'modelName' 32 DESCRIPTION = 'description' 33 RETURN_MULTIPLE_ITEMS = 'returnMultipleItems' 34 TAGS = "tags" 35 36 37class SpecProp: 38 DEFINITIONS = 'definitions' 39 OPERATIONS = 'operations' 40 MODELS = 'models' 41 MODEL_OPERATIONS = 'model_operations' 42 43 44class PropName: 45 ENUM = 'enum' 46 TYPE = 'type' 47 REQUIRED = 'required' 48 INVALID_TYPE = 'invalid_type' 49 REF = '$ref' 50 ALL_OF = 'allOf' 51 BASE_PATH = 'basePath' 52 PATHS = 'paths' 53 OPERATION_ID = 'operationId' 54 SCHEMA = 'schema' 55 ITEMS = 'items' 56 PROPERTIES = 'properties' 57 RESPONSES = 'responses' 58 NAME = 'name' 59 DESCRIPTION = 'description' 60 61 62class PropType: 63 STRING = 'string' 64 BOOLEAN = 'boolean' 65 INTEGER = 'integer' 66 NUMBER = 'number' 67 OBJECT = 'object' 68 ARRAY = 'array' 69 FILE = 'file' 70 71 72class OperationParams: 73 PATH = 'path' 74 QUERY = 'query' 75 76 77class QueryParams: 78 FILTER = 'filter' 79 80 81def _get_model_name_from_url(schema_ref): 82 path = schema_ref.split('/') 83 return path[len(path) - 1] 84 85 86class IllegalArgumentException(ValueError): 87 """ 88 Exception raised when the function parameters: 89 - not all passed 90 - empty string 91 - wrong type 92 """ 93 pass 94 95 96class ValidationError(ValueError): 97 pass 98 99 100class FdmSwaggerParser: 101 _definitions = None 102 _base_path = None 103 104 def parse_spec(self, spec, docs=None): 105 """ 106 This method simplifies a swagger format, resolves a model name for each operation, and adds documentation for 107 each operation and model if it is provided. 108 109 :param spec: An API specification in the swagger format, see 110 <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md> 111 :type spec: dict 112 :param spec: A documentation map containing descriptions for models, operations and operation parameters. 113 :type docs: dict 114 :rtype: dict 115 :return: 116 Ex. 117 The models field contains model definition from swagger see 118 <#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions> 119 { 120 'models':{ 121 'model_name':{...}, 122 ... 123 }, 124 'operations':{ 125 'operation_name':{ 126 'method': 'get', #post, put, delete 127 'url': '/api/fdm/v2/object/networks', #url already contains a value from `basePath` 128 'modelName': 'NetworkObject', # it is a link to the model from 'models' 129 # None - for a delete operation or we don't have information 130 # '_File' - if an endpoint works with files 131 'returnMultipleItems': False, # shows if the operation returns a single item or an item list 132 'parameters': { 133 'path':{ 134 'param_name':{ 135 'type': 'string'#integer, boolean, number 136 'required' True #False 137 } 138 ... 139 }, 140 'query':{ 141 'param_name':{ 142 'type': 'string'#integer, boolean, number 143 'required' True #False 144 } 145 ... 146 } 147 } 148 }, 149 ... 150 }, 151 'model_operations':{ 152 'model_name':{ # a list of operations available for the current model 153 'operation_name':{ 154 ... # the same as in the operations section 155 }, 156 ... 157 }, 158 ... 159 } 160 } 161 """ 162 self._definitions = spec[SpecProp.DEFINITIONS] 163 self._base_path = spec[PropName.BASE_PATH] 164 operations = self._get_operations(spec) 165 166 if docs: 167 operations = self._enrich_operations_with_docs(operations, docs) 168 self._definitions = self._enrich_definitions_with_docs(self._definitions, docs) 169 170 return { 171 SpecProp.MODELS: self._definitions, 172 SpecProp.OPERATIONS: operations, 173 SpecProp.MODEL_OPERATIONS: self._get_model_operations(operations) 174 } 175 176 @property 177 def base_path(self): 178 return self._base_path 179 180 def _get_model_operations(self, operations): 181 model_operations = {} 182 for operations_name, params in iteritems(operations): 183 model_name = params[OperationField.MODEL_NAME] 184 model_operations.setdefault(model_name, {})[operations_name] = params 185 return model_operations 186 187 def _get_operations(self, spec): 188 paths_dict = spec[PropName.PATHS] 189 operations_dict = {} 190 for url, operation_params in iteritems(paths_dict): 191 for method, params in iteritems(operation_params): 192 operation = { 193 OperationField.METHOD: method, 194 OperationField.URL: self._base_path + url, 195 OperationField.MODEL_NAME: self._get_model_name(method, params), 196 OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params), 197 OperationField.TAGS: params.get(OperationField.TAGS, []) 198 } 199 if OperationField.PARAMETERS in params: 200 operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS]) 201 202 operation_id = params[PropName.OPERATION_ID] 203 operations_dict[operation_id] = operation 204 return operations_dict 205 206 def _enrich_operations_with_docs(self, operations, docs): 207 def get_operation_docs(op): 208 op_url = op[OperationField.URL][len(self._base_path):] 209 return docs[PropName.PATHS].get(op_url, {}).get(op[OperationField.METHOD], {}) 210 211 for operation in operations.values(): 212 operation_docs = get_operation_docs(operation) 213 operation[OperationField.DESCRIPTION] = operation_docs.get(PropName.DESCRIPTION, '') 214 215 if OperationField.PARAMETERS in operation: 216 param_descriptions = dict(( 217 (p[PropName.NAME], p[PropName.DESCRIPTION]) 218 for p in operation_docs.get(OperationField.PARAMETERS, {}) 219 )) 220 221 for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.PATH].items(): 222 params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '') 223 224 for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.QUERY].items(): 225 params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '') 226 227 return operations 228 229 def _enrich_definitions_with_docs(self, definitions, docs): 230 for model_name, model_def in definitions.items(): 231 model_docs = docs[SpecProp.DEFINITIONS].get(model_name, {}) 232 model_def[PropName.DESCRIPTION] = model_docs.get(PropName.DESCRIPTION, '') 233 for prop_name, prop_spec in model_def.get(PropName.PROPERTIES, {}).items(): 234 prop_spec[PropName.DESCRIPTION] = model_docs.get(PropName.PROPERTIES, {}).get(prop_name, '') 235 prop_spec[PropName.REQUIRED] = prop_name in model_def.get(PropName.REQUIRED, []) 236 return definitions 237 238 def _get_model_name(self, method, params): 239 if method == HTTPMethod.GET: 240 return self._get_model_name_from_responses(params) 241 elif method == HTTPMethod.POST or method == HTTPMethod.PUT: 242 return self._get_model_name_for_post_put_requests(params) 243 elif method == HTTPMethod.DELETE: 244 return self._get_model_name_from_delete_operation(params) 245 else: 246 return None 247 248 @staticmethod 249 def _return_multiple_items(op_params): 250 """ 251 Defines if the operation returns one item or a list of items. 252 253 :param op_params: operation specification 254 :return: True if the operation returns a list of items, otherwise False 255 """ 256 try: 257 schema = op_params[PropName.RESPONSES][SUCCESS_RESPONSE_CODE][PropName.SCHEMA] 258 return PropName.ITEMS in schema[PropName.PROPERTIES] 259 except KeyError: 260 return False 261 262 def _get_model_name_from_delete_operation(self, params): 263 operation_id = params[PropName.OPERATION_ID] 264 if operation_id.startswith(DELETE_PREFIX): 265 model_name = operation_id[len(DELETE_PREFIX):] 266 if model_name in self._definitions: 267 return model_name 268 return None 269 270 def _get_model_name_for_post_put_requests(self, params): 271 model_name = None 272 if OperationField.PARAMETERS in params: 273 body_param_dict = self._get_body_param_from_parameters(params[OperationField.PARAMETERS]) 274 if body_param_dict: 275 schema_ref = body_param_dict[PropName.SCHEMA][PropName.REF] 276 model_name = self._get_model_name_byschema_ref(schema_ref) 277 if model_name is None: 278 model_name = self._get_model_name_from_responses(params) 279 return model_name 280 281 @staticmethod 282 def _get_body_param_from_parameters(params): 283 return next((param for param in params if param['in'] == 'body'), None) 284 285 def _get_model_name_from_responses(self, params): 286 responses = params[PropName.RESPONSES] 287 if SUCCESS_RESPONSE_CODE in responses: 288 response = responses[SUCCESS_RESPONSE_CODE][PropName.SCHEMA] 289 if PropName.REF in response: 290 return self._get_model_name_byschema_ref(response[PropName.REF]) 291 elif PropName.PROPERTIES in response: 292 ref = response[PropName.PROPERTIES][PropName.ITEMS][PropName.ITEMS][PropName.REF] 293 return self._get_model_name_byschema_ref(ref) 294 elif (PropName.TYPE in response) and response[PropName.TYPE] == PropType.FILE: 295 return FILE_MODEL_NAME 296 else: 297 return None 298 299 def _get_rest_params(self, params): 300 path = {} 301 query = {} 302 operation_param = { 303 OperationParams.PATH: path, 304 OperationParams.QUERY: query 305 } 306 for param in params: 307 in_param = param['in'] 308 if in_param == OperationParams.QUERY: 309 query[param[PropName.NAME]] = self._simplify_param_def(param) 310 elif in_param == OperationParams.PATH: 311 path[param[PropName.NAME]] = self._simplify_param_def(param) 312 return operation_param 313 314 @staticmethod 315 def _simplify_param_def(param): 316 return { 317 PropName.TYPE: param[PropName.TYPE], 318 PropName.REQUIRED: param[PropName.REQUIRED] 319 } 320 321 def _get_model_name_byschema_ref(self, schema_ref): 322 model_name = _get_model_name_from_url(schema_ref) 323 model_def = self._definitions[model_name] 324 if PropName.ALL_OF in model_def: 325 return self._get_model_name_byschema_ref(model_def[PropName.ALL_OF][0][PropName.REF]) 326 else: 327 return model_name 328 329 330class FdmSwaggerValidator: 331 def __init__(self, spec): 332 """ 333 :param spec: dict 334 data from FdmSwaggerParser().parse_spec() 335 """ 336 self._operations = spec[SpecProp.OPERATIONS] 337 self._models = spec[SpecProp.MODELS] 338 339 def validate_data(self, operation_name, data=None): 340 """ 341 Validate data for the post|put requests 342 :param operation_name: string 343 The value must be non empty string. 344 The operation name is used to get a model specification 345 :param data: dict 346 The value must be in the format that the model(from operation) expects 347 :rtype: (bool, string|dict) 348 :return: 349 (True, None) - if data valid 350 Invalid: 351 (False, { 352 'required': [ #list of the fields that are required but were not present in the data 353 'field_name', 354 'patent.field_name',# when the nested field is omitted 355 'patent.list[2].field_name' # if data is array and one of the field is omitted 356 ], 357 'invalid_type':[ #list of the fields with invalid data 358 { 359 'path': 'objId', #field name or path to the field. Ex. objects[3].id, parent.name 360 'expected_type': 'string',# expected type. Ex. 'object', 'array', 'string', 'integer', 361 # 'boolean', 'number' 362 'actually_value': 1 # the value that user passed 363 } 364 ] 365 }) 366 :raises IllegalArgumentException 367 'The operation_name parameter must be a non-empty string' if operation_name is not valid 368 'The data parameter must be a dict' if data neither dict or None 369 '{operation_name} operation does not support' if the spec does not contain the operation 370 """ 371 if data is None: 372 data = {} 373 374 self._check_validate_data_params(data, operation_name) 375 376 operation = self._operations[operation_name] 377 model = self._models[operation[OperationField.MODEL_NAME]] 378 status = self._init_report() 379 380 self._validate_object(status, model, data, '') 381 382 if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0: 383 return False, self._delete_empty_field_from_report(status) 384 return True, None 385 386 def _check_validate_data_params(self, data, operation_name): 387 if not operation_name or not isinstance(operation_name, string_types): 388 raise IllegalArgumentException("The operation_name parameter must be a non-empty string") 389 if not isinstance(data, dict): 390 raise IllegalArgumentException("The data parameter must be a dict") 391 if operation_name not in self._operations: 392 raise IllegalArgumentException("{0} operation does not support".format(operation_name)) 393 394 def validate_query_params(self, operation_name, params): 395 """ 396 Validate params for the get requests. Use this method for validating the query part of the url. 397 :param operation_name: string 398 The value must be non empty string. 399 The operation name is used to get a params specification 400 :param params: dict 401 should be in the format that the specification(from operation) expects 402 Ex. 403 { 404 'objId': "string_value", 405 'p_integer': 1, 406 'p_boolean': True, 407 'p_number': 2.3 408 } 409 :rtype:(Boolean, msg) 410 :return: 411 (True, None) - if params valid 412 Invalid: 413 (False, { 414 'required': [ #list of the fields that are required but are not present in the params 415 'field_name' 416 ], 417 'invalid_type':[ #list of the fields with invalid data and expected type of the params 418 { 419 'path': 'objId', #field name 420 'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number' 421 'actually_value': 1 # the value that user passed 422 } 423 ] 424 }) 425 :raises IllegalArgumentException 426 'The operation_name parameter must be a non-empty string' if operation_name is not valid 427 'The params parameter must be a dict' if params neither dict or None 428 '{operation_name} operation does not support' if the spec does not contain the operation 429 """ 430 return self._validate_url_params(operation_name, params, resource=OperationParams.QUERY) 431 432 def validate_path_params(self, operation_name, params): 433 """ 434 Validate params for the get requests. Use this method for validating the path part of the url. 435 :param operation_name: string 436 The value must be non empty string. 437 The operation name is used to get a params specification 438 :param params: dict 439 should be in the format that the specification(from operation) expects 440 441 Ex. 442 { 443 'objId': "string_value", 444 'p_integer': 1, 445 'p_boolean': True, 446 'p_number': 2.3 447 } 448 :rtype:(Boolean, msg) 449 :return: 450 (True, None) - if params valid 451 Invalid: 452 (False, { 453 'required': [ #list of the fields that are required but are not present in the params 454 'field_name' 455 ], 456 'invalid_type':[ #list of the fields with invalid data and expected type of the params 457 { 458 'path': 'objId', #field name 459 'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number' 460 'actually_value': 1 # the value that user passed 461 } 462 ] 463 }) 464 :raises IllegalArgumentException 465 'The operation_name parameter must be a non-empty string' if operation_name is not valid 466 'The params parameter must be a dict' if params neither dict or None 467 '{operation_name} operation does not support' if the spec does not contain the operation 468 """ 469 return self._validate_url_params(operation_name, params, resource=OperationParams.PATH) 470 471 def _validate_url_params(self, operation, params, resource): 472 if params is None: 473 params = {} 474 475 self._check_validate_url_params(operation, params) 476 477 operation = self._operations[operation] 478 if OperationField.PARAMETERS in operation and resource in operation[OperationField.PARAMETERS]: 479 spec = operation[OperationField.PARAMETERS][resource] 480 status = self._init_report() 481 self._check_url_params(status, spec, params) 482 483 if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0: 484 return False, self._delete_empty_field_from_report(status) 485 return True, None 486 else: 487 return True, None 488 489 def _check_validate_url_params(self, operation, params): 490 if not operation or not isinstance(operation, string_types): 491 raise IllegalArgumentException("The operation_name parameter must be a non-empty string") 492 if not isinstance(params, dict): 493 raise IllegalArgumentException("The params parameter must be a dict") 494 if operation not in self._operations: 495 raise IllegalArgumentException("{0} operation does not support".format(operation)) 496 497 def _check_url_params(self, status, spec, params): 498 for prop_name in spec.keys(): 499 prop = spec[prop_name] 500 if prop[PropName.REQUIRED] and prop_name not in params: 501 status[PropName.REQUIRED].append(prop_name) 502 continue 503 if prop_name in params: 504 expected_type = prop[PropName.TYPE] 505 value = params[prop_name] 506 if prop_name in params and not self._is_correct_simple_types(expected_type, value, allow_null=False): 507 self._add_invalid_type_report(status, '', prop_name, expected_type, value) 508 509 def _validate_object(self, status, model, data, path): 510 if self._is_enum(model): 511 self._check_enum(status, model, data, path) 512 elif self._is_object(model): 513 self._check_object(status, model, data, path) 514 515 def _is_enum(self, model): 516 return self._is_string_type(model) and PropName.ENUM in model 517 518 def _check_enum(self, status, model, data, path): 519 if data is not None and data not in model[PropName.ENUM]: 520 self._add_invalid_type_report(status, path, '', PropName.ENUM, data) 521 522 def _add_invalid_type_report(self, status, path, prop_name, expected_type, actually_value): 523 status[PropName.INVALID_TYPE].append({ 524 'path': self._create_path_to_field(path, prop_name), 525 'expected_type': expected_type, 526 'actually_value': actually_value 527 }) 528 529 def _check_object(self, status, model, data, path): 530 if data is None: 531 return 532 533 if not isinstance(data, dict): 534 self._add_invalid_type_report(status, path, '', PropType.OBJECT, data) 535 return None 536 537 if PropName.REQUIRED in model: 538 self._check_required_fields(status, model[PropName.REQUIRED], data, path) 539 540 model_properties = model[PropName.PROPERTIES] 541 for prop in model_properties.keys(): 542 if prop in data: 543 model_prop_val = model_properties[prop] 544 expected_type = model_prop_val[PropName.TYPE] 545 actually_value = data[prop] 546 self._check_types(status, actually_value, expected_type, model_prop_val, path, prop) 547 548 def _check_types(self, status, actually_value, expected_type, model, path, prop_name): 549 if expected_type == PropType.OBJECT: 550 ref_model = self._get_model_by_ref(model) 551 552 self._validate_object(status, ref_model, actually_value, 553 path=self._create_path_to_field(path, prop_name)) 554 elif expected_type == PropType.ARRAY: 555 self._check_array(status, model, actually_value, 556 path=self._create_path_to_field(path, prop_name)) 557 elif not self._is_correct_simple_types(expected_type, actually_value): 558 self._add_invalid_type_report(status, path, prop_name, expected_type, actually_value) 559 560 def _get_model_by_ref(self, model_prop_val): 561 model = _get_model_name_from_url(model_prop_val[PropName.REF]) 562 return self._models[model] 563 564 def _check_required_fields(self, status, required_fields, data, path): 565 missed_required_fields = [self._create_path_to_field(path, field) for field in 566 required_fields if field not in data.keys() or data[field] is None] 567 if len(missed_required_fields) > 0: 568 status[PropName.REQUIRED] += missed_required_fields 569 570 def _check_array(self, status, model, data, path): 571 if data is None: 572 return 573 elif not isinstance(data, list): 574 self._add_invalid_type_report(status, path, '', PropType.ARRAY, data) 575 else: 576 item_model = model[PropName.ITEMS] 577 for i, item_data in enumerate(data): 578 self._check_types(status, item_data, item_model[PropName.TYPE], item_model, "{0}[{1}]".format(path, i), 579 '') 580 581 @staticmethod 582 def _is_correct_simple_types(expected_type, value, allow_null=True): 583 def is_numeric_string(s): 584 try: 585 float(s) 586 return True 587 except ValueError: 588 return False 589 590 if value is None and allow_null: 591 return True 592 elif expected_type == PropType.STRING: 593 return isinstance(value, string_types) 594 elif expected_type == PropType.BOOLEAN: 595 return isinstance(value, bool) 596 elif expected_type == PropType.INTEGER: 597 is_integer = isinstance(value, integer_types) and not isinstance(value, bool) 598 is_digit_string = isinstance(value, string_types) and value.isdigit() 599 return is_integer or is_digit_string 600 elif expected_type == PropType.NUMBER: 601 is_number = isinstance(value, (integer_types, float)) and not isinstance(value, bool) 602 is_numeric_string = isinstance(value, string_types) and is_numeric_string(value) 603 return is_number or is_numeric_string 604 return False 605 606 @staticmethod 607 def _is_string_type(model): 608 return PropName.TYPE in model and model[PropName.TYPE] == PropType.STRING 609 610 @staticmethod 611 def _init_report(): 612 return { 613 PropName.REQUIRED: [], 614 PropName.INVALID_TYPE: [] 615 } 616 617 @staticmethod 618 def _delete_empty_field_from_report(status): 619 if not status[PropName.REQUIRED]: 620 del status[PropName.REQUIRED] 621 if not status[PropName.INVALID_TYPE]: 622 del status[PropName.INVALID_TYPE] 623 return status 624 625 @staticmethod 626 def _create_path_to_field(path='', field=''): 627 separator = '' 628 if path and field: 629 separator = '.' 630 return "{0}{1}{2}".format(path, separator, field) 631 632 @staticmethod 633 def _is_object(model): 634 return PropName.TYPE in model and model[PropName.TYPE] == PropType.OBJECT 635