1# Copyright (c) 2018 Cisco and/or its affiliates.
2#
3# This file is part of Ansible
4#
5# Ansible is free software: you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation, either version 3 of the License, or
8# (at your option) any later version.
9#
10# Ansible is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19from ansible.module_utils.network.ftd.common import HTTPMethod
20from ansible.module_utils.six import integer_types, string_types, iteritems
21
22FILE_MODEL_NAME = '_File'
23SUCCESS_RESPONSE_CODE = '200'
24DELETE_PREFIX = 'delete'
25
26
27class OperationField:
28    URL = 'url'
29    METHOD = 'method'
30    PARAMETERS = 'parameters'
31    MODEL_NAME = 'modelName'
32    DESCRIPTION = 'description'
33    RETURN_MULTIPLE_ITEMS = 'returnMultipleItems'
34    TAGS = "tags"
35
36
37class SpecProp:
38    DEFINITIONS = 'definitions'
39    OPERATIONS = 'operations'
40    MODELS = 'models'
41    MODEL_OPERATIONS = 'model_operations'
42
43
44class PropName:
45    ENUM = 'enum'
46    TYPE = 'type'
47    REQUIRED = 'required'
48    INVALID_TYPE = 'invalid_type'
49    REF = '$ref'
50    ALL_OF = 'allOf'
51    BASE_PATH = 'basePath'
52    PATHS = 'paths'
53    OPERATION_ID = 'operationId'
54    SCHEMA = 'schema'
55    ITEMS = 'items'
56    PROPERTIES = 'properties'
57    RESPONSES = 'responses'
58    NAME = 'name'
59    DESCRIPTION = 'description'
60
61
62class PropType:
63    STRING = 'string'
64    BOOLEAN = 'boolean'
65    INTEGER = 'integer'
66    NUMBER = 'number'
67    OBJECT = 'object'
68    ARRAY = 'array'
69    FILE = 'file'
70
71
72class OperationParams:
73    PATH = 'path'
74    QUERY = 'query'
75
76
77class QueryParams:
78    FILTER = 'filter'
79
80
81def _get_model_name_from_url(schema_ref):
82    path = schema_ref.split('/')
83    return path[len(path) - 1]
84
85
86class IllegalArgumentException(ValueError):
87    """
88    Exception raised when the function parameters:
89        - not all passed
90        - empty string
91        - wrong type
92    """
93    pass
94
95
96class ValidationError(ValueError):
97    pass
98
99
100class FdmSwaggerParser:
101    _definitions = None
102    _base_path = None
103
104    def parse_spec(self, spec, docs=None):
105        """
106        This method simplifies a swagger format, resolves a model name for each operation, and adds documentation for
107        each operation and model if it is provided.
108
109        :param spec: An API specification in the swagger format, see
110            <https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md>
111        :type spec: dict
112        :param spec: A documentation map containing descriptions for models, operations and operation parameters.
113        :type docs: dict
114        :rtype: dict
115        :return:
116        Ex.
117            The models field contains model definition from swagger see
118            <#https://github.com/OAI/OpenAPI-Specification/blob/master/versions/2.0.md#definitions>
119            {
120                'models':{
121                    'model_name':{...},
122                    ...
123                },
124                'operations':{
125                    'operation_name':{
126                        'method': 'get', #post, put, delete
127                        'url': '/api/fdm/v2/object/networks', #url already contains a value from `basePath`
128                        'modelName': 'NetworkObject', # it is a link to the model from 'models'
129                                                      # None - for a delete operation or we don't have information
130                                                      # '_File' - if an endpoint works with files
131                        'returnMultipleItems': False, # shows if the operation returns a single item or an item list
132                        'parameters': {
133                            'path':{
134                                'param_name':{
135                                    'type': 'string'#integer, boolean, number
136                                    'required' True #False
137                                }
138                                ...
139                                },
140                            'query':{
141                                'param_name':{
142                                    'type': 'string'#integer, boolean, number
143                                    'required' True #False
144                                }
145                                ...
146                            }
147                        }
148                    },
149                    ...
150                },
151                'model_operations':{
152                    'model_name':{ # a list of operations available for the current model
153                        'operation_name':{
154                            ... # the same as in the operations section
155                        },
156                        ...
157                    },
158                    ...
159                }
160            }
161        """
162        self._definitions = spec[SpecProp.DEFINITIONS]
163        self._base_path = spec[PropName.BASE_PATH]
164        operations = self._get_operations(spec)
165
166        if docs:
167            operations = self._enrich_operations_with_docs(operations, docs)
168            self._definitions = self._enrich_definitions_with_docs(self._definitions, docs)
169
170        return {
171            SpecProp.MODELS: self._definitions,
172            SpecProp.OPERATIONS: operations,
173            SpecProp.MODEL_OPERATIONS: self._get_model_operations(operations)
174        }
175
176    @property
177    def base_path(self):
178        return self._base_path
179
180    def _get_model_operations(self, operations):
181        model_operations = {}
182        for operations_name, params in iteritems(operations):
183            model_name = params[OperationField.MODEL_NAME]
184            model_operations.setdefault(model_name, {})[operations_name] = params
185        return model_operations
186
187    def _get_operations(self, spec):
188        paths_dict = spec[PropName.PATHS]
189        operations_dict = {}
190        for url, operation_params in iteritems(paths_dict):
191            for method, params in iteritems(operation_params):
192                operation = {
193                    OperationField.METHOD: method,
194                    OperationField.URL: self._base_path + url,
195                    OperationField.MODEL_NAME: self._get_model_name(method, params),
196                    OperationField.RETURN_MULTIPLE_ITEMS: self._return_multiple_items(params),
197                    OperationField.TAGS: params.get(OperationField.TAGS, [])
198                }
199                if OperationField.PARAMETERS in params:
200                    operation[OperationField.PARAMETERS] = self._get_rest_params(params[OperationField.PARAMETERS])
201
202                operation_id = params[PropName.OPERATION_ID]
203                operations_dict[operation_id] = operation
204        return operations_dict
205
206    def _enrich_operations_with_docs(self, operations, docs):
207        def get_operation_docs(op):
208            op_url = op[OperationField.URL][len(self._base_path):]
209            return docs[PropName.PATHS].get(op_url, {}).get(op[OperationField.METHOD], {})
210
211        for operation in operations.values():
212            operation_docs = get_operation_docs(operation)
213            operation[OperationField.DESCRIPTION] = operation_docs.get(PropName.DESCRIPTION, '')
214
215            if OperationField.PARAMETERS in operation:
216                param_descriptions = dict((
217                    (p[PropName.NAME], p[PropName.DESCRIPTION])
218                    for p in operation_docs.get(OperationField.PARAMETERS, {})
219                ))
220
221                for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.PATH].items():
222                    params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
223
224                for param_name, params_spec in operation[OperationField.PARAMETERS][OperationParams.QUERY].items():
225                    params_spec[OperationField.DESCRIPTION] = param_descriptions.get(param_name, '')
226
227        return operations
228
229    def _enrich_definitions_with_docs(self, definitions, docs):
230        for model_name, model_def in definitions.items():
231            model_docs = docs[SpecProp.DEFINITIONS].get(model_name, {})
232            model_def[PropName.DESCRIPTION] = model_docs.get(PropName.DESCRIPTION, '')
233            for prop_name, prop_spec in model_def.get(PropName.PROPERTIES, {}).items():
234                prop_spec[PropName.DESCRIPTION] = model_docs.get(PropName.PROPERTIES, {}).get(prop_name, '')
235                prop_spec[PropName.REQUIRED] = prop_name in model_def.get(PropName.REQUIRED, [])
236        return definitions
237
238    def _get_model_name(self, method, params):
239        if method == HTTPMethod.GET:
240            return self._get_model_name_from_responses(params)
241        elif method == HTTPMethod.POST or method == HTTPMethod.PUT:
242            return self._get_model_name_for_post_put_requests(params)
243        elif method == HTTPMethod.DELETE:
244            return self._get_model_name_from_delete_operation(params)
245        else:
246            return None
247
248    @staticmethod
249    def _return_multiple_items(op_params):
250        """
251        Defines if the operation returns one item or a list of items.
252
253        :param op_params: operation specification
254        :return: True if the operation returns a list of items, otherwise False
255        """
256        try:
257            schema = op_params[PropName.RESPONSES][SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
258            return PropName.ITEMS in schema[PropName.PROPERTIES]
259        except KeyError:
260            return False
261
262    def _get_model_name_from_delete_operation(self, params):
263        operation_id = params[PropName.OPERATION_ID]
264        if operation_id.startswith(DELETE_PREFIX):
265            model_name = operation_id[len(DELETE_PREFIX):]
266            if model_name in self._definitions:
267                return model_name
268        return None
269
270    def _get_model_name_for_post_put_requests(self, params):
271        model_name = None
272        if OperationField.PARAMETERS in params:
273            body_param_dict = self._get_body_param_from_parameters(params[OperationField.PARAMETERS])
274            if body_param_dict:
275                schema_ref = body_param_dict[PropName.SCHEMA][PropName.REF]
276                model_name = self._get_model_name_byschema_ref(schema_ref)
277        if model_name is None:
278            model_name = self._get_model_name_from_responses(params)
279        return model_name
280
281    @staticmethod
282    def _get_body_param_from_parameters(params):
283        return next((param for param in params if param['in'] == 'body'), None)
284
285    def _get_model_name_from_responses(self, params):
286        responses = params[PropName.RESPONSES]
287        if SUCCESS_RESPONSE_CODE in responses:
288            response = responses[SUCCESS_RESPONSE_CODE][PropName.SCHEMA]
289            if PropName.REF in response:
290                return self._get_model_name_byschema_ref(response[PropName.REF])
291            elif PropName.PROPERTIES in response:
292                ref = response[PropName.PROPERTIES][PropName.ITEMS][PropName.ITEMS][PropName.REF]
293                return self._get_model_name_byschema_ref(ref)
294            elif (PropName.TYPE in response) and response[PropName.TYPE] == PropType.FILE:
295                return FILE_MODEL_NAME
296        else:
297            return None
298
299    def _get_rest_params(self, params):
300        path = {}
301        query = {}
302        operation_param = {
303            OperationParams.PATH: path,
304            OperationParams.QUERY: query
305        }
306        for param in params:
307            in_param = param['in']
308            if in_param == OperationParams.QUERY:
309                query[param[PropName.NAME]] = self._simplify_param_def(param)
310            elif in_param == OperationParams.PATH:
311                path[param[PropName.NAME]] = self._simplify_param_def(param)
312        return operation_param
313
314    @staticmethod
315    def _simplify_param_def(param):
316        return {
317            PropName.TYPE: param[PropName.TYPE],
318            PropName.REQUIRED: param[PropName.REQUIRED]
319        }
320
321    def _get_model_name_byschema_ref(self, schema_ref):
322        model_name = _get_model_name_from_url(schema_ref)
323        model_def = self._definitions[model_name]
324        if PropName.ALL_OF in model_def:
325            return self._get_model_name_byschema_ref(model_def[PropName.ALL_OF][0][PropName.REF])
326        else:
327            return model_name
328
329
330class FdmSwaggerValidator:
331    def __init__(self, spec):
332        """
333        :param spec: dict
334                    data from FdmSwaggerParser().parse_spec()
335        """
336        self._operations = spec[SpecProp.OPERATIONS]
337        self._models = spec[SpecProp.MODELS]
338
339    def validate_data(self, operation_name, data=None):
340        """
341        Validate data for the post|put requests
342        :param operation_name: string
343                            The value must be non empty string.
344                            The operation name is used to get a model specification
345        :param data: dict
346                    The value must be in the format that the model(from operation) expects
347        :rtype: (bool, string|dict)
348        :return:
349            (True, None) - if data valid
350            Invalid:
351            (False, {
352                'required': [ #list of the fields that are required but were not present in the data
353                    'field_name',
354                    'patent.field_name',# when the nested field is omitted
355                    'patent.list[2].field_name' # if data is array and one of the field is omitted
356                ],
357                'invalid_type':[ #list of the fields with invalid data
358                        {
359                           'path': 'objId', #field name or path to the field. Ex. objects[3].id, parent.name
360                           'expected_type': 'string',# expected type. Ex. 'object', 'array', 'string', 'integer',
361                                                     # 'boolean', 'number'
362                           'actually_value': 1 # the value that user passed
363                       }
364                ]
365            })
366        :raises IllegalArgumentException
367            'The operation_name parameter must be a non-empty string' if operation_name is not valid
368            'The data parameter must be a dict' if data neither dict or None
369            '{operation_name} operation does not support' if the spec does not contain the operation
370        """
371        if data is None:
372            data = {}
373
374        self._check_validate_data_params(data, operation_name)
375
376        operation = self._operations[operation_name]
377        model = self._models[operation[OperationField.MODEL_NAME]]
378        status = self._init_report()
379
380        self._validate_object(status, model, data, '')
381
382        if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0:
383            return False, self._delete_empty_field_from_report(status)
384        return True, None
385
386    def _check_validate_data_params(self, data, operation_name):
387        if not operation_name or not isinstance(operation_name, string_types):
388            raise IllegalArgumentException("The operation_name parameter must be a non-empty string")
389        if not isinstance(data, dict):
390            raise IllegalArgumentException("The data parameter must be a dict")
391        if operation_name not in self._operations:
392            raise IllegalArgumentException("{0} operation does not support".format(operation_name))
393
394    def validate_query_params(self, operation_name, params):
395        """
396           Validate params for the get requests. Use this method for validating the query part of the url.
397           :param operation_name: string
398                               The value must be non empty string.
399                               The operation name is used to get a params specification
400           :param params: dict
401                        should be in the format that the specification(from operation) expects
402                    Ex.
403                    {
404                        'objId': "string_value",
405                        'p_integer': 1,
406                        'p_boolean': True,
407                        'p_number': 2.3
408                    }
409           :rtype:(Boolean, msg)
410           :return:
411               (True, None) - if params valid
412               Invalid:
413               (False, {
414                   'required': [ #list of the fields that are required but are not present in the params
415                       'field_name'
416                   ],
417                   'invalid_type':[ #list of the fields with invalid data and expected type of the params
418                            {
419                              'path': 'objId', #field name
420                              'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number'
421                              'actually_value': 1 # the value that user passed
422                            }
423                   ]
424               })
425            :raises IllegalArgumentException
426               'The operation_name parameter must be a non-empty string' if operation_name is not valid
427               'The params parameter must be a dict' if params neither dict or None
428               '{operation_name} operation does not support' if the spec does not contain the operation
429           """
430        return self._validate_url_params(operation_name, params, resource=OperationParams.QUERY)
431
432    def validate_path_params(self, operation_name, params):
433        """
434        Validate params for the get requests. Use this method for validating the path part of the url.
435           :param operation_name: string
436                               The value must be non empty string.
437                               The operation name is used to get a params specification
438           :param params: dict
439                        should be in the format that the specification(from operation) expects
440
441                 Ex.
442                 {
443                     'objId': "string_value",
444                     'p_integer': 1,
445                     'p_boolean': True,
446                     'p_number': 2.3
447                 }
448        :rtype:(Boolean, msg)
449        :return:
450            (True, None) - if params valid
451            Invalid:
452            (False, {
453                'required': [ #list of the fields that are required but are not present in the params
454                    'field_name'
455                ],
456                'invalid_type':[ #list of the fields with invalid data and expected type of the params
457                         {
458                           'path': 'objId', #field name
459                           'expected_type': 'string',#expected type. Ex. 'string', 'integer', 'boolean', 'number'
460                           'actually_value': 1 # the value that user passed
461                         }
462                ]
463            })
464        :raises IllegalArgumentException
465            'The operation_name parameter must be a non-empty string' if operation_name is not valid
466            'The params parameter must be a dict' if params neither dict or None
467            '{operation_name} operation does not support' if the spec does not contain the operation
468        """
469        return self._validate_url_params(operation_name, params, resource=OperationParams.PATH)
470
471    def _validate_url_params(self, operation, params, resource):
472        if params is None:
473            params = {}
474
475        self._check_validate_url_params(operation, params)
476
477        operation = self._operations[operation]
478        if OperationField.PARAMETERS in operation and resource in operation[OperationField.PARAMETERS]:
479            spec = operation[OperationField.PARAMETERS][resource]
480            status = self._init_report()
481            self._check_url_params(status, spec, params)
482
483            if len(status[PropName.REQUIRED]) > 0 or len(status[PropName.INVALID_TYPE]) > 0:
484                return False, self._delete_empty_field_from_report(status)
485            return True, None
486        else:
487            return True, None
488
489    def _check_validate_url_params(self, operation, params):
490        if not operation or not isinstance(operation, string_types):
491            raise IllegalArgumentException("The operation_name parameter must be a non-empty string")
492        if not isinstance(params, dict):
493            raise IllegalArgumentException("The params parameter must be a dict")
494        if operation not in self._operations:
495            raise IllegalArgumentException("{0} operation does not support".format(operation))
496
497    def _check_url_params(self, status, spec, params):
498        for prop_name in spec.keys():
499            prop = spec[prop_name]
500            if prop[PropName.REQUIRED] and prop_name not in params:
501                status[PropName.REQUIRED].append(prop_name)
502                continue
503            if prop_name in params:
504                expected_type = prop[PropName.TYPE]
505                value = params[prop_name]
506                if prop_name in params and not self._is_correct_simple_types(expected_type, value, allow_null=False):
507                    self._add_invalid_type_report(status, '', prop_name, expected_type, value)
508
509    def _validate_object(self, status, model, data, path):
510        if self._is_enum(model):
511            self._check_enum(status, model, data, path)
512        elif self._is_object(model):
513            self._check_object(status, model, data, path)
514
515    def _is_enum(self, model):
516        return self._is_string_type(model) and PropName.ENUM in model
517
518    def _check_enum(self, status, model, data, path):
519        if data is not None and data not in model[PropName.ENUM]:
520            self._add_invalid_type_report(status, path, '', PropName.ENUM, data)
521
522    def _add_invalid_type_report(self, status, path, prop_name, expected_type, actually_value):
523        status[PropName.INVALID_TYPE].append({
524            'path': self._create_path_to_field(path, prop_name),
525            'expected_type': expected_type,
526            'actually_value': actually_value
527        })
528
529    def _check_object(self, status, model, data, path):
530        if data is None:
531            return
532
533        if not isinstance(data, dict):
534            self._add_invalid_type_report(status, path, '', PropType.OBJECT, data)
535            return None
536
537        if PropName.REQUIRED in model:
538            self._check_required_fields(status, model[PropName.REQUIRED], data, path)
539
540        model_properties = model[PropName.PROPERTIES]
541        for prop in model_properties.keys():
542            if prop in data:
543                model_prop_val = model_properties[prop]
544                expected_type = model_prop_val[PropName.TYPE]
545                actually_value = data[prop]
546                self._check_types(status, actually_value, expected_type, model_prop_val, path, prop)
547
548    def _check_types(self, status, actually_value, expected_type, model, path, prop_name):
549        if expected_type == PropType.OBJECT:
550            ref_model = self._get_model_by_ref(model)
551
552            self._validate_object(status, ref_model, actually_value,
553                                  path=self._create_path_to_field(path, prop_name))
554        elif expected_type == PropType.ARRAY:
555            self._check_array(status, model, actually_value,
556                              path=self._create_path_to_field(path, prop_name))
557        elif not self._is_correct_simple_types(expected_type, actually_value):
558            self._add_invalid_type_report(status, path, prop_name, expected_type, actually_value)
559
560    def _get_model_by_ref(self, model_prop_val):
561        model = _get_model_name_from_url(model_prop_val[PropName.REF])
562        return self._models[model]
563
564    def _check_required_fields(self, status, required_fields, data, path):
565        missed_required_fields = [self._create_path_to_field(path, field) for field in
566                                  required_fields if field not in data.keys() or data[field] is None]
567        if len(missed_required_fields) > 0:
568            status[PropName.REQUIRED] += missed_required_fields
569
570    def _check_array(self, status, model, data, path):
571        if data is None:
572            return
573        elif not isinstance(data, list):
574            self._add_invalid_type_report(status, path, '', PropType.ARRAY, data)
575        else:
576            item_model = model[PropName.ITEMS]
577            for i, item_data in enumerate(data):
578                self._check_types(status, item_data, item_model[PropName.TYPE], item_model, "{0}[{1}]".format(path, i),
579                                  '')
580
581    @staticmethod
582    def _is_correct_simple_types(expected_type, value, allow_null=True):
583        def is_numeric_string(s):
584            try:
585                float(s)
586                return True
587            except ValueError:
588                return False
589
590        if value is None and allow_null:
591            return True
592        elif expected_type == PropType.STRING:
593            return isinstance(value, string_types)
594        elif expected_type == PropType.BOOLEAN:
595            return isinstance(value, bool)
596        elif expected_type == PropType.INTEGER:
597            is_integer = isinstance(value, integer_types) and not isinstance(value, bool)
598            is_digit_string = isinstance(value, string_types) and value.isdigit()
599            return is_integer or is_digit_string
600        elif expected_type == PropType.NUMBER:
601            is_number = isinstance(value, (integer_types, float)) and not isinstance(value, bool)
602            is_numeric_string = isinstance(value, string_types) and is_numeric_string(value)
603            return is_number or is_numeric_string
604        return False
605
606    @staticmethod
607    def _is_string_type(model):
608        return PropName.TYPE in model and model[PropName.TYPE] == PropType.STRING
609
610    @staticmethod
611    def _init_report():
612        return {
613            PropName.REQUIRED: [],
614            PropName.INVALID_TYPE: []
615        }
616
617    @staticmethod
618    def _delete_empty_field_from_report(status):
619        if not status[PropName.REQUIRED]:
620            del status[PropName.REQUIRED]
621        if not status[PropName.INVALID_TYPE]:
622            del status[PropName.INVALID_TYPE]
623        return status
624
625    @staticmethod
626    def _create_path_to_field(path='', field=''):
627        separator = ''
628        if path and field:
629            separator = '.'
630        return "{0}{1}{2}".format(path, separator, field)
631
632    @staticmethod
633    def _is_object(model):
634        return PropName.TYPE in model and model[PropName.TYPE] == PropType.OBJECT
635