1# Copyright (c) 2017, The MITRE Corporation 2# For license information, see the LICENSE.txt file 3 4""" 5Creating, handling, and parsing TAXII Default Queries. 6""" 7 8 9import numbers 10import datetime 11from operator import attrgetter 12import os 13import dateutil.parser 14 15from lxml import etree 16 17import libtaxii.messages_11 as tm11 18 19from .common import TAXIIBase 20from .validation import (do_check, uri_regex, targeting_expression_regex) 21from .constants import * 22import six 23 24 25class CapabilityModule(object): 26 27 """ 28 A Capability Module has valid relationships 29 Each relationship has 0-n valid parameters 30 """ 31 32 def __init__(self, capability_module_id, relationships): 33 self.capability_module_id = capability_module_id 34 self.relationships = relationships 35 36 @property 37 def capability_module_id(self): 38 return self._capability_module_id 39 40 @capability_module_id.setter 41 def capability_module_id(self, value): 42 do_check(value, 'capability_module_id', type=six.string_types) 43 self._capability_module_id = value 44 45 @property 46 def relationships(self): 47 return self._relationships 48 49 @relationships.setter 50 def relationships(self, value): 51 do_check(value, 'relationships', type=Relationship) 52 self._relationships = {} 53 for item in value: 54 self._relationships[item.name] = item 55 56 57class Relationship(object): 58 59 def __init__(self, name, parameters=None): 60 self.name = name 61 self.parameters = parameters or [] 62 63 @property 64 def name(self): 65 return self._name 66 67 @name.setter 68 def name(self, value): 69 do_check(value, 'name', type=six.string_types) 70 self._name = value 71 72 @property 73 def parameters(self): 74 return self._parameters 75 76 @parameters.setter 77 def parameters(self, value): 78 do_check(value, 'parameters', type=Parameter) 79 self._parameters = {} 80 for item in value: 81 self._parameters[item.name] = item 82 83 84class Parameter(object): 85 86 def __init__(self, name, type, value_tuple=None): 87 self.name = name 88 self.type = type 89 self.value_tuple = value_tuple 90 91 def verify(self, value): 92 do_check(value, 'value', type=self.type, value_tuple=self.value_tuple) 93 return True, 'OK' 94 95# params - Define parameters for the Core/Regex/Timestamp capability modules 96param_str_value = Parameter(P_VALUE, six.string_types) 97param_float_value = Parameter(P_VALUE, float) 98param_ts_value = Parameter(P_VALUE, datetime.datetime) 99param_match_type = Parameter(P_MATCH_TYPE, six.string_types, ('case_sensitive_string', 'case_insensitive_string', 'number')) 100param_case_sensitive = Parameter(P_CASE_SENSITIVE, bool, (True, False)) 101 102# CORE Relationships - Define relationships for the core capability module 103rel_equals = Relationship(R_EQUALS, [param_str_value, param_match_type]) 104rel_not_equals = Relationship(R_NOT_EQUALS, [param_str_value, param_match_type]) 105rel_greater_than = Relationship(R_GREATER_THAN, [param_float_value]) 106rel_greater_than_or_equal = Relationship(R_GREATER_THAN_OR_EQUAL, [param_float_value]) 107rel_less_than = Relationship(R_LESS_THAN, [param_float_value]) 108rel_less_than_or_equal = Relationship(R_LESS_THAN_OR_EQUAL, [param_float_value]) 109rel_dne = Relationship(R_DOES_NOT_EXIST) 110rel_ex = Relationship(R_EXISTS) 111rel_begins_with = Relationship(R_BEGINS_WITH, [param_case_sensitive, param_str_value]) 112rel_ends_with = Relationship(R_ENDS_WITH, [param_case_sensitive, param_str_value]) 113rel_contains = Relationship(R_CONTAINS, [param_case_sensitive, param_str_value]) 114 115# REGEX relationships 116rel_matches = Relationship(R_MATCHES, [param_case_sensitive, param_str_value]) 117 118# TIMESTAMP relationships 119rel_ts_eq = Relationship(R_EQUALS, [param_ts_value]) 120rel_ts_gt = Relationship(R_GREATER_THAN, [param_ts_value]) 121rel_ts_gte = Relationship(R_GREATER_THAN_OR_EQUAL, [param_ts_value]) 122rel_ts_lt = Relationship(R_LESS_THAN, [param_ts_value]) 123rel_ts_lte = Relationship(R_LESS_THAN_OR_EQUAL, [param_ts_value]) 124 125# CORE - Define the Core Capability Module 126cm_core = CapabilityModule(CM_CORE, 127 [rel_equals, rel_not_equals, rel_greater_than, 128 rel_greater_than_or_equal, rel_less_than, 129 rel_less_than_or_equal, rel_dne, rel_ex, 130 rel_begins_with, rel_contains, rel_ends_with] 131 ) 132 133# REGEX - Define the RegEx Capability Module 134cm_regex = CapabilityModule(CM_REGEX, [rel_matches]) 135 136# TIMESTAMP - Define the timestamp Capability Module 137cm_timestamp = CapabilityModule(CM_TIMESTAMP, [rel_ts_eq, rel_ts_gt, rel_ts_gte, rel_ts_lt, rel_ts_lte]) 138 139capability_modules = {CM_CORE: cm_core, CM_REGEX: cm_regex, CM_TIMESTAMP: cm_timestamp} 140 141 142class DefaultQueryInfo(tm11.SupportedQuery): 143 144 """ Used to describe the TAXII Default Queries that are supported. 145 146 :param targeting_expression_infos: Describe the supported targeting expressions 147 :type targeting_expression_infos: :class:`list` of :class:`TargetingExpressionInfo` objects 148 :param capability_modules: Indicate the supported capability modules 149 :type capability_modules: :class:`list` of :class:`str` 150 """ 151 152 def __init__(self, targeting_expression_infos, capability_modules): 153 super(DefaultQueryInfo, self).__init__(FID_TAXII_DEFAULT_QUERY_10) 154 self.targeting_expression_infos = targeting_expression_infos 155 self.capability_modules = capability_modules 156 157 @property 158 def targeting_expression_infos(self): 159 return self._targeting_expression_infos 160 161 @targeting_expression_infos.setter 162 def targeting_expression_infos(self, value): 163 do_check(value, 'targeting_expression_infos', type=DefaultQueryInfo.TargetingExpressionInfo) 164 self._targeting_expression_infos = value 165 166 @property 167 def capability_modules(self): 168 return self._capability_modules 169 170 @capability_modules.setter 171 def capability_modules(self, value): 172 do_check(value, 'capability_modules', regex_tuple=uri_regex) 173 self._capability_modules = value 174 175 def to_etree(self): 176 q = super(DefaultQueryInfo, self).to_etree() 177 dqi = etree.SubElement(q, '{%s}Default_Query_Info' % ns_map['tdq']) 178 for expression_info in self.targeting_expression_infos: 179 dqi.append(expression_info.to_etree()) 180 181 for cmod in self.capability_modules: 182 cm = etree.SubElement(dqi, '{%s}Capability_Module' % ns_map['tdq'], nsmap=ns_map) 183 cm.text = cmod 184 return q 185 186 def to_dict(self): 187 d = super(DefaultQueryInfo, self).to_dict() 188 d['targeting_expression_infos'] = [] 189 for expression_info in self.targeting_expression_infos: 190 d['targeting_expression_infos'].append(expression_info.to_dict()) 191 # TODO: This looks like a serialization bug 192 d['capability_modules'] = self.capability_modules 193 return d 194 195 def to_text(self, line_prepend=''): 196 s = super(DefaultQueryInfo, self).to_text(line_prepend) 197 for expression_info in self.targeting_expression_infos: 198 s += expression_info.to_text(line_prepend + STD_INDENT) 199 for capability_module in self.capability_modules: 200 s += line_prepend + " Capability Module: %s\n" % capability_module 201 return s 202 203 def __hash__(self): 204 return hash(str(self.to_dict())) 205 206 @staticmethod 207 def from_etree(etree_xml): 208 texpr_infos = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Targeting_Expression_Info', namespaces=ns_map) 209 texpr_info_list = [] 210 for texpr_info in texpr_infos: 211 texpr_info_list.append(DefaultQueryInfo.TargetingExpressionInfo.from_etree(texpr_info)) 212 213 cms = etree_xml.xpath('./tdq:Default_Query_Info/tdq:Capability_Module', namespaces=ns_map) 214 cms_list = [] 215 for cm in cms: 216 cms_list.append(cm.text) 217 return DefaultQueryInfo(texpr_info_list, cms_list) 218 219 @staticmethod 220 def from_dict(d): 221 kwargs = {} 222 223 kwargs['targeting_expression_infos'] = [] 224 for expression_info in d['targeting_expression_infos']: 225 kwargs['targeting_expression_infos'].append(DefaultQueryInfo.TargetingExpressionInfo.from_dict(expression_info)) 226 227 kwargs['capability_modules'] = d['capability_modules'] 228 229 return DefaultQueryInfo(**kwargs) 230 231 232class TargetingExpressionInfo(TAXIIBase): 233 234 """This class describes supported Targeting Expressions 235 236 :param string targeting_expression_id: The supported targeting expression ID 237 :param preferred_scope: Indicates the preferred scope of queries 238 :type preferred_scope: :class:`list` of :class:`string` 239 :param allowed_scope: Indicates the allowed scope of queries 240 :type allowed_scope: :class:`list` of :class:`string` 241 """ 242 243 def __init__(self, targeting_expression_id, preferred_scope=None, allowed_scope=None): 244 self.targeting_expression_id = targeting_expression_id 245 self.preferred_scope = preferred_scope or [] 246 self.allowed_scope = allowed_scope or [] 247 248 @property 249 def sort_key(self): 250 return self.targeting_expression_id 251 252 @property 253 def targeting_expression_id(self): 254 return self._targeting_expression_id 255 256 @targeting_expression_id.setter 257 def targeting_expression_id(self, value): 258 do_check(value, 'targeting_expression_id', regex_tuple=uri_regex) 259 self._targeting_expression_id = value 260 261 @property 262 def preferred_scope(self): 263 return self._preferred_scope 264 265 @preferred_scope.setter 266 def preferred_scope(self, value): 267 do_check(value, 'preferred_scope', type=six.string_types, regex_tuple=targeting_expression_regex) 268 self._preferred_scope = value 269 270 @property 271 def allowed_scope(self): 272 return self._allowed_scope 273 274 @allowed_scope.setter 275 def allowed_scope(self, value): 276 do_check(value, 'allowed_scope', type=six.string_types, regex_tuple=targeting_expression_regex) 277 self._allowed_scope = value 278 279 def to_etree(self): 280 tei = etree.Element('{%s}Targeting_Expression_Info' % ns_map['tdq']) 281 tei.attrib['targeting_expression_id'] = self.targeting_expression_id 282 for scope in self.preferred_scope: 283 preferred = etree.SubElement(tei, '{%s}Preferred_Scope' % ns_map['tdq']) 284 preferred.text = scope 285 for scope in self.allowed_scope: 286 allowed = etree.SubElement(tei, '{%s}Allowed_Scope' % ns_map['tdq']) 287 allowed.text = scope 288 return tei 289 290 def to_dict(self): 291 d = {} 292 d['targeting_expression_id'] = self.targeting_expression_id 293 # TODO: Preferred / Allowed scope look like serialization bugs 294 d['preferred_scope'] = self.preferred_scope 295 d['allowed_scope'] = self.allowed_scope 296 return d 297 298 def to_text(self, line_prepend=''): 299 s = line_prepend + "=== Targeting Expression Info ===\n" 300 s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id 301 for scope in self.preferred_scope: 302 s += line_prepend + " Preferred Scope: %s\n" % scope 303 for scope in self.allowed_scope: 304 s += line_prepend + " Allowed Scope: %s\n" % scope 305 306 return s 307 308 def __hash__(self): 309 return hash(str(self.to_dict())) 310 311 @staticmethod 312 def from_etree(etree_xml): 313 kwargs = {} 314 kwargs['targeting_expression_id'] = etree_xml.xpath('./@targeting_expression_id', namespaces=ns_map)[0] 315 kwargs['preferred_scope'] = [] 316 317 preferred_scope_set = etree_xml.xpath('./tdq:Preferred_Scope', namespaces=ns_map) 318 for preferred in preferred_scope_set: 319 kwargs['preferred_scope'].append(preferred.text) 320 321 kwargs['allowed_scope'] = [] 322 allowed_scope_set = etree_xml.xpath('./tdq:Allowed_Scope', namespaces=ns_map) 323 for allowed in allowed_scope_set: 324 kwargs['allowed_scope'].append(allowed.text) 325 326 return DefaultQueryInfo.TargetingExpressionInfo(**kwargs) 327 328 @staticmethod 329 def from_dict(d): 330 return DefaultQueryInfo.TargetingExpressionInfo(**d) 331 332 333class DefaultQuery(tm11.Query): 334 335 """Conveys a TAXII Default Query. 336 337 :param string targeting_expression_id: The targeting_expression used in the query 338 :param criteria: The criteria of the query 339 :type criteria: :class:`DefaultQuery.Criteria` 340 """ 341 342 def __init__(self, targeting_expression_id, criteria): 343 super(DefaultQuery, self).__init__(FID_TAXII_DEFAULT_QUERY_10) 344 self.targeting_expression_id = targeting_expression_id 345 self.criteria = criteria 346 347 @property 348 def targeting_expression_id(self): 349 return self._targeting_expression_id 350 351 @targeting_expression_id.setter 352 def targeting_expression_id(self, value): 353 do_check(value, 'targeting_expression_id', regex_tuple=uri_regex) 354 self._targeting_expression_id = value 355 356 @property 357 def criteria(self): 358 return self._criteria 359 360 @criteria.setter 361 def criteria(self, value): 362 do_check(value, 'criteria', type=DefaultQuery.Criteria) 363 self._criteria = value 364 365 def to_etree(self): 366 q = super(DefaultQuery, self).to_etree() 367 dq = etree.SubElement(q, '{%s}Default_Query' % ns_map['tdq'], nsmap=ns_map) 368 dq.attrib['targeting_expression_id'] = self.targeting_expression_id 369 dq.append(self.criteria.to_etree()) 370 return q 371 372 def to_dict(self): 373 d = super(DefaultQuery, self).to_dict() 374 d['targeting_expression_id'] = self.targeting_expression_id 375 d['criteria'] = self.criteria.to_dict() 376 return d 377 378 def to_text(self, line_prepend=''): 379 s = super(DefaultQuery, self).to_text(line_prepend) 380 s += line_prepend + " Targeting Expression ID: %s\n" % self.targeting_expression_id 381 s += self.criteria.to_text(line_prepend) 382 383 return s 384 385 @staticmethod 386 def from_etree(etree_xml): 387 tei = etree_xml.xpath('./tdq:Default_Query/@targeting_expression_id', namespaces=ns_map)[0] # attrib['targeting_expression_id'] 388 criteria = DefaultQuery.Criteria.from_etree(etree_xml.xpath('./tdq:Default_Query/tdq:Criteria', namespaces=ns_map)[0]) 389 return DefaultQuery(tei, criteria) 390 391 @staticmethod 392 def from_dict(d): 393 tei = d['targeting_expression_id'] 394 criteria = DefaultQuery.Criteria.from_dict(d['criteria']) 395 return DefaultQuery(tei, criteria) 396 397 398class Criteria(TAXIIBase): 399 400 """Represents criteria for a :class:`DefaultQuery`. **Note**: At least one criterion OR criteria MUST be present 401 402 :param str operator: The logical operator (should be one of `OP_AND` or `OP_OR`) 403 :param criteria: The criteria for the query 404 :type criteria: :class:`DefaultQuery.Criteria` 405 :param criterion: The criterion for the query 406 :type criterion: :class:`DefaultQuery.Criterion` 407 """ 408 409 def __init__(self, operator, criteria=None, criterion=None): 410 self.operator = operator 411 self.criteria = criteria or [] 412 self.criterion = criterion or [] 413 414 @property 415 def sort_key(self): 416 key_list = [] 417 ia = sorted(self.criteria, key=attrgetter('sort_key')) 418 ion = sorted(self.criterion, key=attrgetter('sort_key')) 419 for i in ia: 420 key_list.append(i.sort_key) 421 for i in ion: 422 key_list.append(i.sort_key) 423 return ''.join(key_list) 424 425 @property 426 def operator(self): 427 return self._operator 428 429 @operator.setter 430 def operator(self, value): 431 do_check(value, 'operator', value_tuple=OP_TYPES) 432 self._operator = value 433 434 @property 435 def criteria(self): 436 return self._criteria 437 438 @criteria.setter 439 def criteria(self, value): 440 do_check(value, 'critiera', type=DefaultQuery.Criteria) 441 self._criteria = value 442 443 @property 444 def criterion(self): 445 return self._criterion 446 447 @criterion.setter 448 def criterion(self, value): 449 do_check(value, 'criterion', type=DefaultQuery.Criterion) 450 self._criterion = value 451 452 def to_etree(self): 453 cr = etree.Element('{%s}Criteria' % ns_map['tdq'], nsmap=ns_map) 454 cr.attrib['operator'] = self.operator 455 for criteria in self.criteria: 456 cr.append(criteria.to_etree()) 457 458 for criterion in self.criterion: 459 cr.append(criterion.to_etree()) 460 461 return cr 462 463 def to_dict(self): 464 d = {} 465 d['operator'] = self.operator 466 467 d['criteria'] = [] 468 for criteria in self.criteria: 469 d['criteria'].append(criteria.to_dict()) 470 471 d['criterion'] = [] 472 for criterion in self.criterion: 473 d['criterion'].append(criterion.to_dict()) 474 475 return d 476 477 def to_text(self, line_prepend=''): 478 s = line_prepend + "=== Criteria ===\n" 479 s += line_prepend + " Operator: %s\n" % self.operator 480 for criteria in self.criteria: 481 s += criteria.to_text(line_prepend + STD_INDENT) 482 for criterion in self.criterion: 483 s += criterion.to_text(line_prepend + STD_INDENT) 484 485 return s 486 487 @staticmethod 488 def from_etree(etree_xml): 489 kwargs = {} 490 kwargs['operator'] = etree_xml.attrib['operator'] 491 492 kwargs['criteria'] = [] 493 criteria_set = etree_xml.xpath('./tdq:Criteria', namespaces=ns_map) 494 for criteria in criteria_set: 495 kwargs['criteria'].append(DefaultQuery.Criteria.from_etree(criteria)) 496 497 kwargs['criterion'] = [] 498 criterion_set = etree_xml.xpath('./tdq:Criterion', namespaces=ns_map) 499 for criterion in criterion_set: 500 kwargs['criterion'].append(DefaultQuery.Criterion.from_etree(criterion)) 501 502 return DefaultQuery.Criteria(**kwargs) 503 504 @staticmethod 505 def from_dict(d): 506 kwargs = {} 507 kwargs['operator'] = d['operator'] 508 509 kwargs['criteria'] = [] 510 criteria_set = d.get('criteria', []) 511 for criteria in criteria_set: 512 kwargs['criteria'].append(DefaultQuery.Criteria.from_dict(criteria)) 513 514 kwargs['criterion'] = [] 515 criterion_set = d.get('criterion', []) 516 for criterion in criterion_set: 517 kwargs['criterion'].append(DefaultQuery.Criterion.from_dict(criterion)) 518 519 return DefaultQuery.Criteria(**kwargs) 520 521 522class Criterion(TAXIIBase): 523 524 """Represents criterion for a :class:`DefaultQuery.Criteria` 525 526 :param string target: A targeting expression identifying the target 527 :param test: The test to be applied to the target 528 :type test: :class:`DefaultQuery.Criterion.Test` 529 :param bool negate: Whether the result of applying the test to the target should be negated 530 """ 531 532 def __init__(self, target, test, negate=False): 533 self.negate = negate 534 self.target = target 535 self.test = test 536 537 @property 538 def sort_key(self): 539 return self.target 540 541 @property 542 def negate(self): 543 return self._negate 544 545 @negate.setter 546 def negate(self, value): 547 do_check(value, 'negate', value_tuple=(True, False), can_be_none=True) 548 self._negate = value 549 550 @property 551 def target(self): 552 return self._target 553 554 @target.setter 555 def target(self, value): 556 do_check(value, 'target', type=six.string_types) 557 self._target = value 558 559 @property 560 def test(self): 561 return self._test 562 563 @test.setter 564 def test(self, value): 565 do_check(value, value, type=DefaultQuery.Criterion.Test) 566 self._test = value 567 568 def to_etree(self): 569 cr = etree.Element('{%s}Criterion' % ns_map['tdq'], nsmap=ns_map) 570 if self.negate is not None: 571 cr.attrib['negate'] = str(self.negate).lower() 572 573 target = etree.SubElement(cr, '{%s}Target' % ns_map['tdq'], nsmap=ns_map) 574 target.text = self.target 575 576 cr.append(self.test.to_etree()) 577 578 return cr 579 580 def to_dict(self): 581 d = {} 582 d['negate'] = None 583 if self.negate is not None: 584 d['negate'] = self.negate 585 d['target'] = self.target 586 d['test'] = self.test.to_dict() 587 588 return d 589 590 def to_text(self, line_prepend=''): 591 s = line_prepend + "=== Criterion ===\n" 592 s += line_prepend + " Negate: %s\n" % (self.negate if (None != self.negate) else False) 593 s += line_prepend + " Target: %s\n" % self.target 594 s += self.test.to_text(line_prepend + STD_INDENT) 595 596 return s 597 598 @staticmethod 599 def from_etree(etree_xml): 600 negate_set = etree_xml.xpath('./@negate') 601 negate = None 602 if len(negate_set) > 0: 603 negate = negate_set[0] == 'true' 604 605 target = etree_xml.xpath('./tdq:Target', namespaces=ns_map)[0].text 606 test = DefaultQuery.Criterion.Test.from_etree(etree_xml.xpath('./tdq:Test', namespaces=ns_map)[0]) 607 608 return DefaultQuery.Criterion(target, test, negate) 609 610 @staticmethod 611 def from_dict(d): 612 negate = d.get('negate', None) 613 target = d['target'] 614 test = DefaultQuery.Criterion.Test.from_dict(d['test']) 615 616 return DefaultQuery.Criterion(target, test, negate) 617 618 619class Test(TAXIIBase): 620 621 """ 622 :param string capability_id: The ID of the capability module that defines the relationship & parameters 623 :param string relationship: The relationship (e.g., equals) 624 :param parameters: The parameters for the relationship. 625 :type parameters: :class:`dict` of key/value pairs 626 """ 627 628 def __init__(self, capability_id, relationship, parameters=None): 629 self.capability_id = capability_id 630 self.relationship = relationship 631 self.parameters = parameters or {} 632 633 self.validate() 634 635 @property 636 def capability_id(self): 637 return self._capability_id 638 639 @capability_id.setter 640 def capability_id(self, value): 641 do_check(value, 'capability_id', regex_tuple=uri_regex) 642 self._capability_id = value 643 644 @property 645 def relationship(self): 646 return self._relationship 647 648 @relationship.setter 649 def relationship(self, value): 650 # TODO: For known capability IDs, check that the relationship is valid 651 # TODO: provide a way to register other capability IDs 652 do_check(value, 'relationship', type=six.string_types) 653 self._relationship = value 654 655 @property 656 def parameters(self): 657 return self._parameters 658 659 @parameters.setter 660 def parameters(self, value): 661 do_check(list(value.keys()), 'parameters.keys()', regex_tuple=uri_regex) 662 self._parameters = value 663 664 # TODO: Can this be done better? 665 def validate(self): 666 capability_module = capability_modules.get(self.capability_id) 667 if capability_module is None: # Nothing is defined for this, validation not possible 668 return True 669 670 relationship = capability_module.relationships.get(self.relationship) 671 if relationship is None: 672 raise Exception('relationship not in defined relationships. %s not in %s' % (self.relationship, list(capability_module.relationships.keys()))) 673 674 for name, value in list(self.parameters.items()): 675 param = relationship.parameters.get(name) 676 if param is None: 677 raise Exception('name not valid. %s not in %s' % (name, list(relationship.parameters.keys()))) 678 param.verify(value) 679 680 def to_etree(self): 681 t = etree.Element('{%s}Test' % ns_map['tdq'], nsmap=ns_map) 682 t.attrib['capability_id'] = self.capability_id 683 t.attrib['relationship'] = self.relationship 684 685 for k, v in list(self.parameters.items()): 686 p = etree.SubElement(t, '{%s}Parameter' % ns_map['tdq']) 687 p.attrib['name'] = k 688 if isinstance(v, bool): 689 p.text = str(v).lower() 690 elif isinstance(v, datetime.datetime): 691 p.text = v.isoformat() 692 elif isinstance(v, numbers.Number): 693 p.text = str(v) 694 else: 695 p.text = v 696 697 return t 698 699 def to_dict(self): 700 d = {} 701 d['capability_id'] = self.capability_id 702 d['relationship'] = self.relationship 703 d['parameters'] = self.parameters 704 return d 705 706 def to_text(self, line_prepend=''): 707 s = line_prepend + "=== Test ==\n" 708 s += line_prepend + " Capability ID: %s\n" % self.capability_id 709 s += line_prepend + " Relationship: %s\n" % self.relationship 710 for k, v in six.iteritems(self.parameters): 711 s += line_prepend + " Parameter: %s = %s\n" % (k, v) 712 713 return s 714 715 @staticmethod 716 def from_etree(etree_xml): 717 capability_id = etree_xml.attrib['capability_id'] 718 relationship = etree_xml.attrib['relationship'] 719 parameters = {} 720 721 cm = capability_modules.get(capability_id, None) 722 if cm is not None: 723 r = cm.relationships.get(relationship, None) 724 if r is None: 725 raise ValueError('Relationship (%s) not in CM (%s).' % (r, capability_id)) 726 else: 727 r = None 728 729 for parameter in etree_xml.xpath('./tdq:Parameter', namespaces=ns_map): 730 k = parameter.attrib['name'] 731 v = parameter.text 732 733 if v in ('true', 'false'): # bool is a special case 734 parameters[k] = v == 'true' 735 elif r is not None: 736 type_ = r.parameters[k].type 737 if type_ == six.string_types: # basestring can't be instantiated, but str can be 738 type_ = str 739 elif type_ == datetime.datetime: 740 # We can use this function to parse datetime strings. 741 type_ = dateutil.parser.parse 742 parameters[k] = type_(v) 743 else: 744 parameters[k] = v 745 746 return DefaultQuery.Criterion.Test(capability_id, relationship, parameters) 747 748 @staticmethod 749 def from_dict(d): 750 return DefaultQuery.Criterion.Test(**d) 751 752DefaultQueryInfo.TargetingExpressionInfo = TargetingExpressionInfo 753DefaultQuery.Criterion = Criterion 754DefaultQuery.Criteria = Criteria 755DefaultQuery.Criterion.Test = Test 756 757package_dir, package_filename = os.path.split(__file__) 758schema_file = os.path.join(package_dir, "xsd", "TAXII_DefaultQuery_Schema.xsd") 759 760tm11.register_query_format( 761 format_id=FID_TAXII_DEFAULT_QUERY_10, 762 query=DefaultQuery, 763 query_info=DefaultQueryInfo, 764 schema=schema_file) 765