1# Copyright (c) 2017, The MITRE Corporation
2# For license information, see the LICENSE.txt file
3
4"""
5Common data validation functions used across libtaxii
6"""
7
8
9import collections
10import re
11import datetime
12from lxml import etree
13import os
14
15from .common import (parse, parse_datetime_string)
16import six
17
18# General purpose helper methods #
19
20RegexTuple = collections.namedtuple('_RegexTuple', ['regex', 'title'])
21# URI regex per http://tools.ietf.org/html/rfc3986
22uri_regex = RegexTuple("(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?", "URI Format")
23message_id_regex_10 = RegexTuple("^[0-9]+$", "Numbers only")
24targeting_expression_regex = RegexTuple("^(@?\w+|\*{1,2})(/(@?\w+|\*{1,2}))*$", "Targeting Expression Syntax")
25
26_none_error = "%s is not allowed to be None and the provided value was None"
27_type_error = "%s must be of type %s. The incorrect value was of type %s"
28_regex_error = "%s must be a string conforming to %s. The incorrect value was: %s"
29_tuple_error = "%s must be one of %s. The incorrect value was %s"
30
31
32def do_check(var, varname, type=None, regex_tuple=None, value_tuple=None, can_be_none=False):
33    """
34    Checks supplied var against all of the supplied checks using the following
35    process:
36
37    1. If var is iterable, call this function for every item in the iterable object
38    2. If the var is none and can be none, return
39    3. If the var is none and cannot be none, raise ValueError
40    4. If a type is specified, and the var is not of the specified type, raise ValueError
41    5. If a regex is specified, and the var doesn't match the regex, raise ValueError
42    6. If a value_tuple is specified, and the var is not in the value_tuple, raise ValueError
43
44    varname is used in the error messages
45
46    """
47
48    if isinstance(var, list) or isinstance(var, set) or isinstance(var, tuple):
49
50        x = 0
51        for item in var:
52            do_check(item, "%s[%s]" % (varname, x), type, regex_tuple, value_tuple, can_be_none)
53            x = x + 1
54
55        return
56
57    if var is None and can_be_none:
58        return
59
60    if var is None and not can_be_none:
61        raise ValueError(_none_error % varname)
62
63    if type is not None:
64        if not isinstance(var, type):
65            bad_type = var.__class__.__name__
66            raise ValueError(_type_error % (varname, type, bad_type))
67
68    if regex_tuple is not None:
69        if not isinstance(var, six.string_types):
70            raise ValueError('%s was about to undergo a regex check, but is not of type basestring! Regex check was not performed' % (varname))
71        if re.match(regex_tuple.regex, var) is None:
72            raise ValueError(_regex_error % (varname, regex_tuple.title, var))
73
74    if value_tuple is not None:
75        if var not in value_tuple:
76            raise ValueError(_tuple_error % (varname, value_tuple, var))
77    return
78
79
80def check_timestamp_label(timestamp_label, varname, can_be_none=False):
81    """
82    Checks the timestamp_label to see if it is a valid timestamp label
83    using the following process:
84
85    1. If the timestamp_label is None and is allowed to be None, Pass
86    2. If the timestamp_label is None and is not allowed to be None, Fail
87    3. If the timestamp_label arg is a string, convert to datetime
88    4. If the timestamp_label does not have a tzinfo attribute, Fail
89    5. Pass
90    """
91
92    if timestamp_label is None and can_be_none:
93        return
94
95    if timestamp_label is None and not can_be_none:
96        raise ValueError(_none_error % varname)
97
98    if isinstance(timestamp_label, six.string_types):
99        timestamp_label = parse_datetime_string(timestamp_label)
100
101    do_check(timestamp_label, varname, type=datetime.datetime, can_be_none=can_be_none)
102
103    if timestamp_label.tzinfo is None:
104        raise ValueError('%s.tzinfo must not be None!' % varname)
105
106    return timestamp_label
107
108
109class SchemaValidationResult(object):
110    """A wrapper for the results of schema validation."""
111
112    def __init__(self, valid, error_log):
113        self.valid = valid
114        self.error_log = error_log
115
116_pkg_dir = os.path.dirname(__file__)
117
118#: Automatically-calculated path to the bundled TAXII 1.0 schema.
119TAXII_10_SCHEMA = os.path.join(_pkg_dir, "xsd", "TAXII_XMLMessageBinding_Schema.xsd")
120
121#: Automatically-calculated path to the bundled TAXII 1.1 schema.
122TAXII_11_SCHEMA = os.path.join(_pkg_dir, "xsd", "TAXII_XMLMessageBinding_Schema_11.xsd")
123
124
125class SchemaValidator(object):
126    """
127    A helper class for TAXII Schema Validation.
128
129    Example:
130        See validate_etree(...) for an example how to use this class
131    """
132
133    # Create class-level variables equal to module-level variables for
134    # backwards-compatibility
135    TAXII_10_SCHEMA = TAXII_10_SCHEMA
136    TAXII_11_SCHEMA = TAXII_11_SCHEMA
137
138    def __init__(self, schema_file):
139        """
140        Args:
141            schema_file (str) - The file location of the schema to
142                                validate against. Use the TAXII_11_SCHEMA
143                                and TAXII_10_SCHEMA constants to validate
144                                against TAXII 1.1 / 1.0. This schema file
145                                will be used when validate_file/string/etree
146                                is used.
147        """
148        schema_doc = parse(schema_file, allow_file=True)
149        self.xml_schema = etree.XMLSchema(schema_doc)
150
151    def validate_file(self, file_location):
152        """
153        A wrapper for validate_etree. Parses file_location,
154        turns it into an etree, then calls validate_etree( ... )
155        """
156
157        with open(file_location, 'r') as f:
158            etree_xml = parse(f, allow_file=True)
159
160        return self.validate_etree(etree_xml)
161
162    def validate_string(self, xml_string):
163        """
164        A wrapper for validate_etree. Parses xml_string,
165        turns it into an etree, then calls validate_etree( ... )
166        """
167        etree_xml = parse(xml_string, allow_file=False)
168        return self.validate_etree(etree_xml)
169
170    def validate_etree(self, etree_xml):
171        """Validate an LXML etree with the specified schema_file.
172
173        Args:
174            etree_xml (etree): The XML to validate.
175            schema_file (str): The schema file to validate against
176
177        Returns:
178            A SchemaValidationResult object
179
180        Raises:
181            lxml.etree.XMLSyntaxError: When the XML to be validated is not well formed
182
183        Example:
184            .. code-block:: python
185
186                from libtaxii import messages_11
187                from libtaxii.validation import SchemaValidator, TAXII_11_SCHEMA
188                from lxml.etree import XMLSyntaxError
189
190                sv = SchemaValidator(TAXII_11_SCHEMA)
191
192                try:
193                   result = sv.validate_etree(some_etree)
194                   # Note that validate_string() and validate_file() can also be used
195                except XMLSyntaxError:
196                    # Handle this exception, which occurs when
197                    # some_xml_string is not valid XML (e.g., 'foo')
198
199                if not result.valid:
200                    for error in result.error_log:
201                        print error
202                    sys.exit(1)
203
204                # At this point, the XML is schema valid
205                do_something(some_xml_string)
206        """
207        valid = self.xml_schema.validate(etree_xml)
208        return SchemaValidationResult(valid, self.xml_schema.error_log)
209
210
211class TAXII10Validator(SchemaValidator):
212    """A :py:class:`SchemaValidator` that uses the TAXII 1.0 Schemas"""
213
214    def __init__(self):
215        super(TAXII10Validator, self).__init__(TAXII_10_SCHEMA)
216
217
218class TAXII11Validator(SchemaValidator):
219    """A :py:class:`SchemaValidator` that uses the TAXII 1.1 Schemas"""
220
221    def __init__(self):
222        super(TAXII11Validator, self).__init__(TAXII_11_SCHEMA)
223