1# Copyright (c) 2017, The MITRE Corporation 2# For license information, see the LICENSE.txt file 3 4""" 5Common data validation functions used across libtaxii 6""" 7 8 9import collections 10import re 11import datetime 12from lxml import etree 13import os 14 15from .common import (parse, parse_datetime_string) 16import six 17 18# General purpose helper methods # 19 20RegexTuple = collections.namedtuple('_RegexTuple', ['regex', 'title']) 21# URI regex per http://tools.ietf.org/html/rfc3986 22uri_regex = RegexTuple("(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?", "URI Format") 23message_id_regex_10 = RegexTuple("^[0-9]+$", "Numbers only") 24targeting_expression_regex = RegexTuple("^(@?\w+|\*{1,2})(/(@?\w+|\*{1,2}))*$", "Targeting Expression Syntax") 25 26_none_error = "%s is not allowed to be None and the provided value was None" 27_type_error = "%s must be of type %s. The incorrect value was of type %s" 28_regex_error = "%s must be a string conforming to %s. The incorrect value was: %s" 29_tuple_error = "%s must be one of %s. The incorrect value was %s" 30 31 32def do_check(var, varname, type=None, regex_tuple=None, value_tuple=None, can_be_none=False): 33 """ 34 Checks supplied var against all of the supplied checks using the following 35 process: 36 37 1. If var is iterable, call this function for every item in the iterable object 38 2. If the var is none and can be none, return 39 3. If the var is none and cannot be none, raise ValueError 40 4. If a type is specified, and the var is not of the specified type, raise ValueError 41 5. If a regex is specified, and the var doesn't match the regex, raise ValueError 42 6. If a value_tuple is specified, and the var is not in the value_tuple, raise ValueError 43 44 varname is used in the error messages 45 46 """ 47 48 if isinstance(var, list) or isinstance(var, set) or isinstance(var, tuple): 49 50 x = 0 51 for item in var: 52 do_check(item, "%s[%s]" % (varname, x), type, regex_tuple, value_tuple, can_be_none) 53 x = x + 1 54 55 return 56 57 if var is None and can_be_none: 58 return 59 60 if var is None and not can_be_none: 61 raise ValueError(_none_error % varname) 62 63 if type is not None: 64 if not isinstance(var, type): 65 bad_type = var.__class__.__name__ 66 raise ValueError(_type_error % (varname, type, bad_type)) 67 68 if regex_tuple is not None: 69 if not isinstance(var, six.string_types): 70 raise ValueError('%s was about to undergo a regex check, but is not of type basestring! Regex check was not performed' % (varname)) 71 if re.match(regex_tuple.regex, var) is None: 72 raise ValueError(_regex_error % (varname, regex_tuple.title, var)) 73 74 if value_tuple is not None: 75 if var not in value_tuple: 76 raise ValueError(_tuple_error % (varname, value_tuple, var)) 77 return 78 79 80def check_timestamp_label(timestamp_label, varname, can_be_none=False): 81 """ 82 Checks the timestamp_label to see if it is a valid timestamp label 83 using the following process: 84 85 1. If the timestamp_label is None and is allowed to be None, Pass 86 2. If the timestamp_label is None and is not allowed to be None, Fail 87 3. If the timestamp_label arg is a string, convert to datetime 88 4. If the timestamp_label does not have a tzinfo attribute, Fail 89 5. Pass 90 """ 91 92 if timestamp_label is None and can_be_none: 93 return 94 95 if timestamp_label is None and not can_be_none: 96 raise ValueError(_none_error % varname) 97 98 if isinstance(timestamp_label, six.string_types): 99 timestamp_label = parse_datetime_string(timestamp_label) 100 101 do_check(timestamp_label, varname, type=datetime.datetime, can_be_none=can_be_none) 102 103 if timestamp_label.tzinfo is None: 104 raise ValueError('%s.tzinfo must not be None!' % varname) 105 106 return timestamp_label 107 108 109class SchemaValidationResult(object): 110 """A wrapper for the results of schema validation.""" 111 112 def __init__(self, valid, error_log): 113 self.valid = valid 114 self.error_log = error_log 115 116_pkg_dir = os.path.dirname(__file__) 117 118#: Automatically-calculated path to the bundled TAXII 1.0 schema. 119TAXII_10_SCHEMA = os.path.join(_pkg_dir, "xsd", "TAXII_XMLMessageBinding_Schema.xsd") 120 121#: Automatically-calculated path to the bundled TAXII 1.1 schema. 122TAXII_11_SCHEMA = os.path.join(_pkg_dir, "xsd", "TAXII_XMLMessageBinding_Schema_11.xsd") 123 124 125class SchemaValidator(object): 126 """ 127 A helper class for TAXII Schema Validation. 128 129 Example: 130 See validate_etree(...) for an example how to use this class 131 """ 132 133 # Create class-level variables equal to module-level variables for 134 # backwards-compatibility 135 TAXII_10_SCHEMA = TAXII_10_SCHEMA 136 TAXII_11_SCHEMA = TAXII_11_SCHEMA 137 138 def __init__(self, schema_file): 139 """ 140 Args: 141 schema_file (str) - The file location of the schema to 142 validate against. Use the TAXII_11_SCHEMA 143 and TAXII_10_SCHEMA constants to validate 144 against TAXII 1.1 / 1.0. This schema file 145 will be used when validate_file/string/etree 146 is used. 147 """ 148 schema_doc = parse(schema_file, allow_file=True) 149 self.xml_schema = etree.XMLSchema(schema_doc) 150 151 def validate_file(self, file_location): 152 """ 153 A wrapper for validate_etree. Parses file_location, 154 turns it into an etree, then calls validate_etree( ... ) 155 """ 156 157 with open(file_location, 'r') as f: 158 etree_xml = parse(f, allow_file=True) 159 160 return self.validate_etree(etree_xml) 161 162 def validate_string(self, xml_string): 163 """ 164 A wrapper for validate_etree. Parses xml_string, 165 turns it into an etree, then calls validate_etree( ... ) 166 """ 167 etree_xml = parse(xml_string, allow_file=False) 168 return self.validate_etree(etree_xml) 169 170 def validate_etree(self, etree_xml): 171 """Validate an LXML etree with the specified schema_file. 172 173 Args: 174 etree_xml (etree): The XML to validate. 175 schema_file (str): The schema file to validate against 176 177 Returns: 178 A SchemaValidationResult object 179 180 Raises: 181 lxml.etree.XMLSyntaxError: When the XML to be validated is not well formed 182 183 Example: 184 .. code-block:: python 185 186 from libtaxii import messages_11 187 from libtaxii.validation import SchemaValidator, TAXII_11_SCHEMA 188 from lxml.etree import XMLSyntaxError 189 190 sv = SchemaValidator(TAXII_11_SCHEMA) 191 192 try: 193 result = sv.validate_etree(some_etree) 194 # Note that validate_string() and validate_file() can also be used 195 except XMLSyntaxError: 196 # Handle this exception, which occurs when 197 # some_xml_string is not valid XML (e.g., 'foo') 198 199 if not result.valid: 200 for error in result.error_log: 201 print error 202 sys.exit(1) 203 204 # At this point, the XML is schema valid 205 do_something(some_xml_string) 206 """ 207 valid = self.xml_schema.validate(etree_xml) 208 return SchemaValidationResult(valid, self.xml_schema.error_log) 209 210 211class TAXII10Validator(SchemaValidator): 212 """A :py:class:`SchemaValidator` that uses the TAXII 1.0 Schemas""" 213 214 def __init__(self): 215 super(TAXII10Validator, self).__init__(TAXII_10_SCHEMA) 216 217 218class TAXII11Validator(SchemaValidator): 219 """A :py:class:`SchemaValidator` that uses the TAXII 1.1 Schemas""" 220 221 def __init__(self): 222 super(TAXII11Validator, self).__init__(TAXII_11_SCHEMA) 223