1# MAEC Bundle Class 2 3# Copyright (c) 2018, The MITRE Corporation 4# All rights reserved 5 6from mixbox import fields 7from mixbox import idgen 8 9from cybox.core import Object 10from cybox.utils.normalize import normalize_object_properties 11 12import maec 13import maec.bindings.maec_bundle as bundle_binding 14from maec.utils import BundleComparator, BundleDeduplicator 15 16from . import _namespace 17from .malware_action import MalwareAction 18from .av_classification import AVClassifications 19from .behavior import Behavior 20from .candidate_indicator import CandidateIndicatorList 21from .process_tree import ProcessTree 22from .capability import CapabilityList 23from .object_history import ObjectHistory 24 25 26class BehaviorList(maec.EntityList): 27 _binding_class = bundle_binding.BehaviorListType 28 _namespace = _namespace 29 behavior = fields.TypedField("Behavior", Behavior, multiple=True) 30 31class ActionList(maec.EntityList): 32 _binding_class = bundle_binding.ActionListType 33 _namespace = _namespace 34 action = fields.TypedField("Action", MalwareAction, multiple=True) 35 36 37class ObjectList(maec.EntityList): 38 _binding_class = bundle_binding.ObjectListType 39 _namespace = _namespace 40 object = fields.TypedField("Object", Object, multiple=True) 41 42 43class BaseCollection(maec.Entity): 44 _binding = bundle_binding 45 _binding_class = bundle_binding.BaseCollectionType 46 _namespace = _namespace 47 48 name = fields.TypedField("name") 49 affinity_type = fields.TypedField("Affinity_Type") 50 affinity_degree = fields.TypedField("Affinity_Degree") 51 description = fields.TypedField("Description") 52 53 def __init__(self, name = None): 54 super(BaseCollection, self).__init__() 55 self.name = name 56 57 58class ActionCollection(BaseCollection): 59 _binding = bundle_binding 60 _binding_class = bundle_binding.ActionCollectionType 61 _namespace = _namespace 62 63 id_ = fields.TypedField("id") 64 action_list = fields.TypedField("Action_List", ActionList) 65 66 def __init__(self, name = None, id = None): 67 super(ActionCollection, self).__init__(name) 68 if id: 69 self.id_ = id 70 else: 71 self.id_ = idgen.create_id(prefix="action_collection") 72 self.action_list = ActionList() 73 74 def add_action(self, action): 75 """Add an input Action to the Collection.""" 76 self.action_list.append(action) 77 78 79class BehaviorCollection(BaseCollection): 80 _binding = bundle_binding 81 _binding_class = bundle_binding.BehaviorCollectionType 82 _namespace = _namespace 83 84 id_ = fields.TypedField("id") 85 behavior_list = fields.TypedField("Behavior_List", BehaviorList) 86 87 def __init__(self, name = None, id = None): 88 super(BehaviorCollection, self).__init__(name) 89 if id: 90 self.id_ = id 91 else: 92 self.id_ = idgen.create_id(prefix="behavior_collection") 93 self.behavior_list = BehaviorList() 94 95 def add_behavior(self, behavior): 96 """Add an input Behavior to the Collection.""" 97 self.behavior_list.append(behavior) 98 99 100class ObjectCollection(BaseCollection): 101 _binding = bundle_binding 102 _binding_class = bundle_binding.ObjectCollectionType 103 _namespace = _namespace 104 105 id_ = fields.TypedField("id") 106 object_list = fields.TypedField("Object_List", ObjectList) 107 108 def __init__(self, name = None, id = None): 109 super(ObjectCollection, self).__init__(name) 110 if id: 111 self.id_ = id 112 else: 113 self.id_ = idgen.create_id(prefix="object_collection") 114 self.object_list = ObjectList() 115 116 def add_object(self, object): 117 """Add an input Object to the Collection.""" 118 self.object_list.append(object) 119 120 121class CandidateIndicatorCollection(BaseCollection): 122 _binding = bundle_binding 123 _binding_class = bundle_binding.CandidateIndicatorCollectionType 124 _namespace = _namespace 125 126 id_ = fields.TypedField("id") 127 candidate_indicator_list = fields.TypedField("Candidate_Indicator_List", CandidateIndicatorList) 128 129 def __init__(self, name = None, id = None): 130 super(CandidateIndicatorCollection, self).__init__(name) 131 if id: 132 self.id_ = id 133 else: 134 self.id_ = idgen.create_id(prefix="candidate_indicator_collection") 135 self.candidate_indicator_list = CandidateIndicatorList() 136 137 def add_candidate_indicator(self, candidate_indicator): 138 """Add an input Candidate Indicator to the Collection.""" 139 self.candidate_indicator_list.append(candidate_indicator) 140 141 142class BehaviorCollectionList(maec.EntityList): 143 _binding_class = bundle_binding.BehaviorCollectionListType 144 _namespace = _namespace 145 behavior_collection = fields.TypedField("Behavior_Collection", BehaviorCollection, multiple=True) 146 147 def __init__(self): 148 super(BehaviorCollectionList, self).__init__() 149 150 def to_obj(self, ns_info=None): 151 behavior_collection_list_obj = super(BehaviorCollectionList, self).to_obj() 152 153 for behavior_collection in self: 154 if len(behavior_collection.behavior_list) > 0: 155 behavior_collection_list_obj.add_Behavior_Collection(behavior_collection.to_obj(ns_info=ns_info)) 156 if behavior_collection_list_obj.hasContent_(): 157 return behavior_collection_list_obj 158 159 def has_collection(self, collection_name): 160 """Checks for the existence of a specific named Collection in the list, based on the its name.""" 161 for collection in self: 162 if collection.name is not None and collection.name == collection_name: 163 return True 164 return False 165 166 def get_named_collection(self, collection_name): 167 """Return a specific named Collection from the list, based on its name.""" 168 for collection in self: 169 if collection.name is not None and collection.name == collection_name: 170 return collection 171 return None 172 173 174class ActionCollectionList(maec.EntityList): 175 _binding_class = bundle_binding.ActionCollectionListType 176 _namespace = _namespace 177 action_collection = fields.TypedField("Action_Collection", ActionCollection, multiple=True) 178 179 def __init__(self): 180 super(ActionCollectionList, self).__init__() 181 182 def to_obj(self, ns_info=None): 183 action_collection_list_obj = super(ActionCollectionList, self).to_obj() 184 action_collection_list_obj.set_Action_Collection([]) 185 186 for action_collection in self: 187 if len(action_collection.action_list) > 0: 188 action_collection_list_obj.add_Action_Collection(action_collection.to_obj(ns_info=ns_info)) 189 if action_collection_list_obj.hasContent_(): 190 return action_collection_list_obj 191 192 def has_collection(self, collection_name): 193 """Checks for the existence of a specific named Collection in the list, based on the its name.""" 194 for collection in self: 195 if collection.name is not None and collection.name == collection_name: 196 return True 197 return False 198 199 def get_named_collection(self, collection_name): 200 """Return a specific named Collection from the list, based on its name.""" 201 for collection in self: 202 if collection.name is not None and collection.name == collection_name: 203 return collection 204 return None 205 206 207class ObjectCollectionList(maec.EntityList): 208 _binding_class = bundle_binding.ObjectCollectionListType 209 _namespace = _namespace 210 object_collection = fields.TypedField("Object_Collection", ObjectCollection, multiple=True) 211 212 def __init__(self): 213 super(ObjectCollectionList, self).__init__() 214 215 def to_obj(self, ns_info=None): 216 object_collection_list_obj = super(ObjectCollectionList, self).to_obj() 217 218 for object_collection in self: 219 if len(object_collection.object_list) > 0: 220 object_collection_list_obj.add_Object_Collection(object_collection.to_obj(ns_info=ns_info)) 221 if object_collection_list_obj.hasContent_(): 222 return object_collection_list_obj 223 224 def has_collection(self, collection_name): 225 """Checks for the existence of a specific named Collection in the list, based on the its name.""" 226 for collection in self: 227 if collection.name is not None and collection.name == collection_name: 228 return True 229 return False 230 231 def get_named_collection(self, collection_name): 232 """Return a specific named Collection from the list, based on its name.""" 233 for collection in self: 234 if collection.name is not None and collection.name == collection_name: 235 return collection 236 return None 237 238 239class CandidateIndicatorCollectionList(maec.EntityList): 240 _binding_class = bundle_binding.CandidateIndicatorCollectionListType 241 _namespace = _namespace 242 candidate_indicator_collection = fields.TypedField("Candidate_Indicator_Collection", CandidateIndicatorCollection, multiple=True) 243 244 def __init__(self): 245 super(CandidateIndicatorCollectionList, self).__init__() 246 247 def to_obj(self, ns_info=None): 248 candidate_indicator_collection_list_obj = super(CandidateIndicatorCollectionList, self).to_obj() 249 250 for candidate_indicator_collection in self: 251 if len(candidate_indicator_collection.candidate_indicator_list) > 0: 252 candidate_indicator_collection_list_obj.add_Candidate_Indicator_Collection(candidate_indicator_collection.to_obj(ns_info=ns_info)) 253 if candidate_indicator_collection_list_obj.hasContent_(): 254 return candidate_indicator_collection_list_obj 255 256 def has_collection(self, collection_name): 257 """Checks for the existence of a specific named Collection in the list, based on the its name.""" 258 for collection in self: 259 if collection.name is not None and collection.name == collection_name: 260 return True 261 return False 262 263 def get_named_collection(self, collection_name): 264 """Return a specific named Collection from the list, based on its name.""" 265 for collection in self: 266 if collection.name is not None and collection.name == collection_name: 267 return collection 268 return None 269 270 271class Collections(maec.Entity): 272 _binding = bundle_binding 273 _binding_class = bundle_binding.CollectionsType 274 _namespace = _namespace 275 276 behavior_collections = fields.TypedField("Behavior_Collections", BehaviorCollectionList) 277 action_collections = fields.TypedField("Action_Collections", ActionCollectionList) 278 object_collections = fields.TypedField("Object_Collections", ObjectCollectionList) 279 candidate_indicator_collections = fields.TypedField("Candidate_Indicator_Collections", CandidateIndicatorCollectionList) 280 281 def __init__(self): 282 super(Collections, self).__init__() 283 284 def add_named_action_collection(self, action_collection_name, collection_id = None): 285 """Add a new named Action Collection to the Collections instance.""" 286 if not self.action_collections: 287 self.action_collections = ActionCollectionList() 288 self.action_collections.append(ActionCollection(action_collection_name, collection_id)) 289 290 def add_named_object_collection(self, object_collection_name, collection_id = None): 291 """Add a new named Object Collection to the Collections instance.""" 292 if not self.object_collections: 293 self.object_collections = ObjectCollectionList() 294 self.object_collections.append(ObjectCollection(object_collection_name, collection_id)) 295 296 def add_named_behavior_collection(self, behavior_collection_name, collection_id = None): 297 """Add a new named Behavior Collection to the Collections instance.""" 298 if not self.behavior_collections: 299 self.behavior_collections = BehaviorCollectionList() 300 self.behavior_collections.append(BehaviorCollection(behavior_collection_name, collection_id)) 301 302 def add_named_candidate_indicator_collection(self, candidate_indicator_collection_name, collection_id = None): 303 """Add a new named Candidate Indicator Collection to the Collections instance.""" 304 if not self.candidate_indicator_collections: 305 self.candidate_indicator_collections = CandidateIndicatorCollectionList() 306 self.candidate_indicator_collections.append(CandidateIndicatorCollection(candidate_indicator_collection_name, collection_id)) 307 308 def has_content(self): 309 """Returns true if any Collections instance inside of the Collection has len > 0.""" 310 if self.behavior_collections and len(self.behavior_collections) > 0: 311 return True 312 elif self.action_collections and len(self.action_collections) > 0: 313 return True 314 elif self.object_collections and len(self.object_collections) > 0: 315 return True 316 elif self.candidate_indicator_collections and len(self.candidate_indicator_collections) > 0: 317 return True 318 return False 319 320 321class BehaviorReference(maec.Entity): 322 _binding = bundle_binding 323 _binding_class = bundle_binding.BehaviorReferenceType 324 _namespace = _namespace 325 326 behavior_idref = fields.TypedField('behavior_idref') 327 328 329class Bundle(maec.Entity): 330 _binding = bundle_binding 331 _namespace = _namespace 332 _binding_class = bundle_binding.BundleType 333 334 id_ = fields.TypedField("id") 335 schema_version = fields.TypedField("schema_version") 336 defined_subject = fields.TypedField("defined_subject") 337 content_type = fields.TypedField("content_type") 338 timestamp = fields.TypedField("timestamp") 339 malware_instance_object_attributes = fields.TypedField("Malware_Instance_Object_Attributes", Object) 340 av_classifications = fields.TypedField("AV_Classifications", AVClassifications) 341 actions = fields.TypedField("Actions", ActionList) 342 process_tree = fields.TypedField("Process_Tree", ProcessTree) 343 behaviors = fields.TypedField("Behaviors", BehaviorList) 344 capabilities = fields.TypedField("Capabilities", CapabilityList) 345 objects = fields.TypedField("Objects", ObjectList) 346 candidate_indicators = fields.TypedField("Candidate_Indicators", CandidateIndicatorList) 347 collections = fields.TypedField("Collections", Collections) 348 349 def __init__(self, id = None, defined_subject = False, schema_version = "4.1", content_type = None, malware_instance_object = None): 350 super(Bundle, self).__init__() 351 if id: 352 self.id_ = id 353 else: 354 self.id_ = idgen.create_id(prefix="bundle") 355 self.schema_version = schema_version 356 self.defined_subject = defined_subject 357 self.content_type = content_type 358 self.timestamp = None 359 self.malware_instance_object_attributes = malware_instance_object 360 self.__input_namespaces__ = {} 361 self.__input_schemalocations__ = {} 362 363 def set_malware_instance_object_attributes(self, malware_instance_object): 364 """Set the top-level Malware Instance Object Attributes entity in the Bundle.""" 365 self.malware_instance_object_attributes = malware_instance_object 366 367 def add_av_classification(self, av_classification): 368 """Add an AV Classification to the top-level AV_Classifications entity in the Bundle.""" 369 if not self.av_classifications: 370 self.av_classifications = AVClassifications() 371 self.av_classifications.append(av_classification) 372 373 def add_capability(self, capability): 374 """Add a Capability to the top-level Capabilities entity in the Bundle.""" 375 if not self.capabilities: 376 self.capabilities = CapabilityList() 377 self.capabilities.capability.append(capability) 378 379 def set_process_tree(self, process_tree): 380 """Set the Process Tree, in the top-level <Process_Tree> element.""" 381 self.process_tree = process_tree 382 383 def add_named_action_collection(self, collection_name, collection_id = None): 384 """Add a new named Action Collection to the top-level Collections entity in the Bundle.""" 385 if not self.collections: 386 self.collections = Collections() 387 if collection_name is not None: 388 self.collections.add_named_action_collection(collection_name, collection_id) 389 390 def add_action(self, action, action_collection_name = None): 391 """Add an Action to an existing named Action Collection in the Collections entity. 392 If it does not exist, add it to the top-level Actions entity.""" 393 if action_collection_name is not None and self.collections: 394 #The collection has already been defined 395 if self.collections.action_collections.has_collection(action_collection_name): 396 action_collection = self.collections.action_collections.get_named_collection(action_collection_name) 397 action_collection.add_action(action) 398 elif action_collection_name == None: 399 if not self.actions: 400 self.actions = ActionList() 401 self.actions.append(action) 402 403 def add_named_object_collection(self, collection_name, collection_id = None): 404 """Add a new named Object Collection to the Collections entity in the Bundle.""" 405 if not self.collections: 406 self.collections = Collections() 407 if collection_name is not None: 408 self.collections.add_named_object_collection(collection_name, collection_id) 409 410 def get_all_actions(self, bin = False): 411 """Return a list of all Actions in the Bundle.""" 412 all_actions = [] 413 414 if self.actions: 415 for action in self.actions: 416 all_actions.append(action) 417 418 if self.collections and self.collections.action_collections: 419 for collection in self.collections.action_collections: 420 for action in collection.action_list: 421 all_actions.append(action) 422 423 if bin: 424 binned_actions = {} 425 for action in all_actions: 426 if action.name and action.name.value not in binned_actions: 427 binned_actions[action.name.value] = [action] 428 elif action.name and action.name.value in binned_actions: 429 binned_actions[action.name.value].append(action) 430 return binned_actions 431 else: 432 return all_actions 433 434 def get_all_actions_on_object(self, object): 435 """Return a list of all of the Actions in the Bundle that operate on a particular input Object.""" 436 object_actions = [] 437 if object.id_: 438 for action in self.get_all_actions(): 439 associated_objects = action.associated_objects 440 if associated_objects: 441 for associated_object in associated_objects: 442 if associated_object.idref and associated_object.idref == object.id_: 443 object_actions.append(action) 444 elif associated_object.id_ and associated_object.id_ == object.id_: 445 object_actions.append(action) 446 return object_actions 447 448 def add_object(self, object, object_collection_name = None): 449 """Add an Object to an existing named Object Collection in the Collections entity. 450 If it does not exist, add it to the top-level Object entity.""" 451 if object_collection_name is not None and self.collections: 452 #The collection has already been defined 453 if self.collections.object_collections.has_collection(object_collection_name): 454 object_collection = self.collections.object_collections.get_named_collection(object_collection_name) 455 object_collection.add_object(object) 456 elif object_collection_name == None: 457 if not self.objects: 458 self.objects = ObjectList() 459 self.objects.append(object) 460 461 def get_all_objects(self, include_actions = False): 462 """Return a list of all Objects in the Bundle.""" 463 all_objects = [] 464 465 if self.objects: 466 for obj in self.objects: 467 all_objects.append(obj) 468 if obj.related_objects: 469 for related_obj in obj.related_objects: 470 all_objects.append(related_obj) 471 472 if self.collections and self.collections.object_collections: 473 for collection in self.collections.object_collections: 474 for obj in collection.object_list: 475 all_objects.append(obj) 476 if obj.related_objects: 477 for related_obj in obj.related_objects: 478 all_objects.append(related_obj) 479 480 # Include Objects in Actions, if include_actions flag is specified 481 if include_actions: 482 for action in self.get_all_actions(): 483 associated_objects = action.associated_objects 484 if associated_objects: 485 for associated_object in associated_objects: 486 all_objects.append(associated_object) 487 if associated_object.related_objects: 488 for related_obj in associated_object.related_objects: 489 all_objects.append(related_obj) 490 491 # Add the Object corresponding to the Malware Instance Object Attributes, if specified 492 if self.malware_instance_object_attributes: 493 all_objects.append(self.malware_instance_object_attributes) 494 495 return all_objects 496 497 def get_all_multiple_referenced_objects(self): 498 """Return a list of all Objects in the Bundle that are referenced more than once.""" 499 idref_list = [x.idref for x in self.get_all_objects() if x.idref] 500 return [self.get_object_by_id(x) for x in idref_list if self.get_object_by_id(x)] 501 502 def get_all_non_reference_objects(self): 503 """Return a list of all Objects in the Bundle that are not references (i.e. all of the actual Objects in the Bundle).""" 504 return [x for x in self.get_all_objects(True) if x.id_ and not x.idref] 505 506 def get_object_by_id(self, id, extra_objects = [], ignore_actions = False): 507 """Find and return the Entity (Action, Object, etc.) with the specified ID.""" 508 if not ignore_actions: 509 if self.actions: 510 for action in self.actions: 511 if action.id_ == id: 512 return action 513 514 if action.associated_objects: 515 for associated_obj in action.associated_objects: 516 if associated_obj.id_ == id: 517 return associated_obj 518 if self.collections and self.collections.action_collections: 519 for collection in self.collections.action_collections: 520 for action in collection.action_list: 521 if action.id_ == id: 522 return action 523 524 if action.associated_objects: 525 for associated_obj in action.associated_objects: 526 if associated_obj.id_ == id: 527 return associated_obj 528 if self.objects: 529 for obj in self.objects: 530 if obj.id_ == id: 531 return obj 532 533 if self.collections and self.collections.object_collections: 534 for collection in self.collections.object_collections: 535 for obj in collection.object_list: 536 if obj.id_ == id: 537 return obj 538 539 # Test the extra_objects Array 540 for obj in extra_objects: 541 if obj.id_ == id: 542 return obj 543 544 def add_named_behavior_collection(self, collection_name, collection_id = None): 545 """Add a new named Behavior Collection to the Collections entity in the Bundle.""" 546 if not self.collections: 547 self.collections = Collections() 548 if collection_name is not None: 549 self.collections.add_named_behavior_collection(collection_name, collection_id) 550 551 def add_behavior(self, behavior, behavior_collection_name = None): 552 """Add a Behavior to an existing named Behavior Collection in the Collections entity. 553 If it does not exist, add it to the top-level Behaviors entity.""" 554 if behavior_collection_name is not None and self.collections: 555 #The collection has already been defined 556 if self.collections.behavior_collections.has_collection(behavior_collection_name): 557 behavior_collection = self.collections.behavior_collections.get_named_collection(behavior_collection_name) 558 behavior_collection.add_behavior(behavior) 559 elif behavior_collection_name == None: 560 if not self.behaviors: 561 self.behaviors = BehaviorList() 562 self.behaviors.append(behavior) 563 564 def add_named_candidate_indicator_collection(self, collection_name, collection_id = None): 565 """Add a new named Candidate Indicator Collection to the Collections entity in the Bundle.""" 566 if not self.collections: 567 self.collections = Collections() 568 if collection_name is not None: 569 self.collections.add_named_candidate_indicator_collection(collection_name, collection_id) 570 571 def add_candidate_indicator(self, candidate_indicator, candidate_indicator_collection_name = None): 572 """Add a Candidate Indicator to an existing named Candidate Indicator Collection in the Collections entity. 573 If it does not exist, add it to the top-level Candidate Indicators entity.""" 574 if candidate_indicator_collection_name is not None and self.collections: 575 #The collection has already been defined 576 if self.collections.candidate_indicator_collections.has_collection(candidate_indicator_collection_name): 577 candidate_indicator_collection = self.collections.candidate_indicator_collections.get_named_collection(candidate_indicator_collection_name) 578 candidate_indicator_collection.add_candidate_indicator(candidate_indicator) 579 elif candidate_indicator_collection_name == None: 580 if not self.candidate_indicators: 581 self.candidate_indicators = CandidateIndicatorList() 582 self.candidate_indicators.append(candidate_indicator) 583 584 def deduplicate(self): 585 """Deduplicate all Objects in the Bundle. 586 Add duplicate Objects to new "Deduplicated Objects" Object Collection, 587 and replace duplicate entries with references to corresponding Object.""" 588 BundleDeduplicator.deduplicate(self) 589 590 def get_action_objects(self, action_name_list): 591 """Get all Objects corresponding to one or more types of Actions, specified via a list of Action names.""" 592 action_objects = {} 593 all_actions = self.get_all_actions(bin=True) 594 for action_name in action_name_list: 595 if action_name in all_actions: 596 associated_objects = [] 597 associated_object_lists = [[y for y in x.associated_objects if x.associated_objects] for x in all_actions[action_name]] 598 for associated_object_list in associated_object_lists: 599 associated_objects += associated_object_list 600 action_objects[action_name] = associated_objects 601 return action_objects 602 603 def get_object_history(self): 604 """Build and return the Object history for the Bundle.""" 605 return ObjectHistory.build(self) 606 607 def normalize_objects(self): 608 """Normalize all Objects in the Bundle, using the CybOX normalize module.""" 609 all_objects = self.get_all_objects(include_actions = True) 610 for object in all_objects: 611 if object.properties: 612 normalize_object_properties(object.properties) 613 614 def dereference_objects(self, extra_objects = []): 615 """Dereference any Objects in the Bundle by replacing them with the entities they reference.""" 616 all_objects = self.get_all_objects(include_actions=True) 617 # Add any extra objects that were passed, e.g. from a Malware Subject 618 all_objects = all_objects + extra_objects 619 for object in all_objects: 620 if object.idref and not object.id_: 621 real_object = self.get_object_by_id(object.idref, extra_objects, ignore_actions = True) 622 if real_object: 623 object.idref = None 624 object.id_ = real_object.id_ 625 object.properties = real_object.properties 626 627 @classmethod 628 def compare(cls, bundle_list, match_on = None, case_sensitive = True): 629 """Compare the Bundle to a list of other Bundles, returning a BundleComparator object.""" 630 return BundleComparator.compare(bundle_list, match_on, case_sensitive) 631