1import os 2 3import pytest 4 5import stix2 6import stix2.equivalence.graph 7import stix2.equivalence.object 8 9from .constants import ( 10 CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS, 11 INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS, 12 RELATIONSHIP_IDS, 13) 14 15FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data") 16 17 18@pytest.fixture 19def ds(): 20 cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) 21 idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) 22 ind = stix2.v20.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) 23 mal = stix2.v20.Malware(id=MALWARE_ID, **MALWARE_KWARGS) 24 rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0]) 25 rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1]) 26 rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2]) 27 reprt = stix2.v20.Report( 28 name="Malware Report", 29 published="2021-05-09T08:22:22Z", 30 labels=["campaign"], 31 object_refs=[mal.id, rel1.id, ind.id], 32 ) 33 stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3, reprt] 34 yield stix2.MemoryStore(stix_objs) 35 36 37@pytest.fixture 38def ds2(): 39 cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS) 40 idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) 41 ind = stix2.v20.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS) 42 indv2 = ind.new_version(external_references=[{ 43 "source_name": "unknown", 44 "url": "https://examplewebsite.com/", 45 }]) 46 mal = stix2.v20.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS) 47 malv2 = mal.new_version(external_references=[{ 48 "source_name": "unknown", 49 "url": "https://examplewebsite2.com/", 50 }]) 51 rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0]) 52 rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1]) 53 rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2]) 54 stix_objs = [cam, idy, ind, indv2, mal, malv2, rel1, rel2, rel3] 55 reprt = stix2.v20.Report( 56 created_by_ref=idy.id, 57 name="example", 58 labels=["campaign"], 59 published="2021-04-09T08:22:22Z", 60 object_refs=stix_objs, 61 ) 62 stix_objs.append(reprt) 63 yield stix2.MemoryStore(stix_objs) 64 65 66def test_object_factory_created_by_ref_str(): 67 factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID) 68 ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 69 assert ind.created_by_ref == IDENTITY_ID 70 71 72def test_object_factory_created_by_ref_obj(): 73 id_obj = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS) 74 factory = stix2.ObjectFactory(created_by_ref=id_obj) 75 ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 76 assert ind.created_by_ref == IDENTITY_ID 77 78 79def test_object_factory_override_default(): 80 factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID) 81 new_id = "identity--983b3172-44fe-4a80-8091-eb8098841fe8" 82 ind = factory.create(stix2.v20.Indicator, created_by_ref=new_id, **INDICATOR_KWARGS) 83 assert ind.created_by_ref == new_id 84 85 86def test_object_factory_created(): 87 factory = stix2.ObjectFactory(created=FAKE_TIME) 88 ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 89 assert ind.created == FAKE_TIME 90 assert ind.modified == FAKE_TIME 91 92 93def test_object_factory_external_reference(): 94 ext_ref = stix2.v20.ExternalReference( 95 source_name="ACME Threat Intel", 96 description="Threat report", 97 ) 98 factory = stix2.ObjectFactory(external_references=ext_ref) 99 ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 100 assert ind.external_references[0].source_name == "ACME Threat Intel" 101 assert ind.external_references[0].description == "Threat report" 102 103 ind2 = factory.create(stix2.v20.Indicator, external_references=None, **INDICATOR_KWARGS) 104 assert 'external_references' not in ind2 105 106 107def test_object_factory_obj_markings(): 108 stmt_marking = stix2.v20.StatementMarking("Copyright 2016, Example Corp") 109 mark_def = stix2.v20.MarkingDefinition( 110 definition_type="statement", 111 definition=stmt_marking, 112 ) 113 factory = stix2.ObjectFactory(object_marking_refs=[mark_def, stix2.v20.TLP_AMBER]) 114 ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 115 assert mark_def.id in ind.object_marking_refs 116 assert stix2.v20.TLP_AMBER.id in ind.object_marking_refs 117 118 factory = stix2.ObjectFactory(object_marking_refs=stix2.v20.TLP_RED) 119 ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 120 assert stix2.v20.TLP_RED.id in ind.object_marking_refs 121 122 123def test_object_factory_list_append(): 124 ext_ref = stix2.v20.ExternalReference( 125 source_name="ACME Threat Intel", 126 description="Threat report from ACME", 127 ) 128 ext_ref2 = stix2.v20.ExternalReference( 129 source_name="Yet Another Threat Report", 130 description="Threat report from YATR", 131 ) 132 ext_ref3 = stix2.v20.ExternalReference( 133 source_name="Threat Report #3", 134 description="One more threat report", 135 ) 136 factory = stix2.ObjectFactory(external_references=ext_ref) 137 ind = factory.create(stix2.v20.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS) 138 assert ind.external_references[1].source_name == "Yet Another Threat Report" 139 140 ind = factory.create(stix2.v20.Indicator, external_references=[ext_ref2, ext_ref3], **INDICATOR_KWARGS) 141 assert ind.external_references[2].source_name == "Threat Report #3" 142 143 144def test_object_factory_list_replace(): 145 ext_ref = stix2.v20.ExternalReference( 146 source_name="ACME Threat Intel", 147 description="Threat report from ACME", 148 ) 149 ext_ref2 = stix2.v20.ExternalReference( 150 source_name="Yet Another Threat Report", 151 description="Threat report from YATR", 152 ) 153 factory = stix2.ObjectFactory(external_references=ext_ref, list_append=False) 154 ind = factory.create(stix2.v20.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS) 155 assert len(ind.external_references) == 1 156 assert ind.external_references[0].source_name == "Yet Another Threat Report" 157 158 159def test_environment_functions(): 160 env = stix2.Environment( 161 stix2.ObjectFactory(created_by_ref=IDENTITY_ID), 162 stix2.MemoryStore(), 163 ) 164 165 # Create a STIX object 166 ind = env.create(stix2.v20.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS) 167 assert ind.created_by_ref == IDENTITY_ID 168 169 # Add objects to datastore 170 ind2 = ind.new_version(labels=['benign']) 171 env.add([ind, ind2]) 172 173 # Get both versions of the object 174 resp = env.all_versions(INDICATOR_ID) 175 assert len(resp) == 2 176 177 # Get just the most recent version of the object 178 resp = env.get(INDICATOR_ID) 179 assert resp['labels'][0] == 'benign' 180 181 # Search on something other than id 182 query = [stix2.Filter('type', '=', 'vulnerability')] 183 resp = env.query(query) 184 assert len(resp) == 0 185 186 # See different results after adding filters to the environment 187 env.add_filters([ 188 stix2.Filter('type', '=', 'indicator'), 189 stix2.Filter('created_by_ref', '=', IDENTITY_ID), 190 ]) 191 env.add_filter(stix2.Filter('labels', '=', 'benign')) # should be 'malicious-activity' 192 resp = env.get(INDICATOR_ID) 193 assert resp['labels'][0] == 'benign' # should be 'malicious-activity' 194 195 196def test_environment_source_and_sink(): 197 ind = stix2.v20.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS) 198 env = stix2.Environment(source=stix2.MemorySource([ind]), sink=stix2.MemorySink([ind])) 199 assert env.get(INDICATOR_ID).labels[0] == 'malicious-activity' 200 201 202def test_environment_datastore_and_sink(): 203 with pytest.raises(ValueError) as excinfo: 204 stix2.Environment( 205 factory=stix2.ObjectFactory(), 206 store=stix2.MemoryStore(), sink=stix2.MemorySink, 207 ) 208 assert 'Data store already provided' in str(excinfo.value) 209 210 211def test_environment_no_datastore(): 212 env = stix2.Environment(factory=stix2.ObjectFactory()) 213 214 with pytest.raises(AttributeError) as excinfo: 215 env.add(stix2.v20.Indicator(**INDICATOR_KWARGS)) 216 assert 'Environment has no data sink to put objects in' in str(excinfo.value) 217 218 with pytest.raises(AttributeError) as excinfo: 219 env.get(INDICATOR_ID) 220 assert 'Environment has no data source' in str(excinfo.value) 221 222 with pytest.raises(AttributeError) as excinfo: 223 env.all_versions(INDICATOR_ID) 224 assert 'Environment has no data source' in str(excinfo.value) 225 226 with pytest.raises(AttributeError) as excinfo: 227 env.query(INDICATOR_ID) 228 assert 'Environment has no data source' in str(excinfo.value) 229 230 with pytest.raises(AttributeError) as excinfo: 231 env.relationships(INDICATOR_ID) 232 assert 'Environment has no data source' in str(excinfo.value) 233 234 with pytest.raises(AttributeError) as excinfo: 235 env.related_to(INDICATOR_ID) 236 assert 'Environment has no data source' in str(excinfo.value) 237 238 239def test_environment_add_filters(): 240 env = stix2.Environment(factory=stix2.ObjectFactory()) 241 env.add_filters([INDICATOR_ID]) 242 env.add_filter(INDICATOR_ID) 243 244 245def test_environment_datastore_and_no_object_factory(): 246 # Uses a default object factory 247 env = stix2.Environment(store=stix2.MemoryStore()) 248 ind = env.create(stix2.v20.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS) 249 assert ind.id == INDICATOR_ID 250 251 252def test_parse_malware(): 253 env = stix2.Environment() 254 data = """{ 255 "type": "malware", 256 "id": "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e", 257 "created": "2017-01-01T12:34:56.000Z", 258 "modified": "2017-01-01T12:34:56.000Z", 259 "name": "Cryptolocker", 260 "labels": [ 261 "ransomware" 262 ] 263 }""" 264 mal = env.parse(data, version="2.0") 265 266 assert mal.type == 'malware' 267 assert mal.id == MALWARE_ID 268 assert mal.created == FAKE_TIME 269 assert mal.modified == FAKE_TIME 270 assert mal.labels == ['ransomware'] 271 assert mal.name == "Cryptolocker" 272 273 274def test_creator_of(): 275 identity = stix2.v20.Identity(**IDENTITY_KWARGS) 276 factory = stix2.ObjectFactory(created_by_ref=identity.id) 277 env = stix2.Environment(store=stix2.MemoryStore(), factory=factory) 278 env.add(identity) 279 280 ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 281 creator = env.creator_of(ind) 282 assert creator is identity 283 284 285def test_creator_of_no_datasource(): 286 identity = stix2.v20.Identity(**IDENTITY_KWARGS) 287 factory = stix2.ObjectFactory(created_by_ref=identity.id) 288 env = stix2.Environment(factory=factory) 289 290 ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 291 with pytest.raises(AttributeError) as excinfo: 292 env.creator_of(ind) 293 assert 'Environment has no data source' in str(excinfo.value) 294 295 296def test_creator_of_not_found(): 297 identity = stix2.v20.Identity(**IDENTITY_KWARGS) 298 factory = stix2.ObjectFactory(created_by_ref=identity.id) 299 env = stix2.Environment(store=stix2.MemoryStore(), factory=factory) 300 301 ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 302 creator = env.creator_of(ind) 303 assert creator is None 304 305 306def test_creator_of_no_created_by_ref(): 307 env = stix2.Environment(store=stix2.MemoryStore()) 308 ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS) 309 creator = env.creator_of(ind) 310 assert creator is None 311 312 313def test_relationships(ds): 314 env = stix2.Environment(store=ds) 315 mal = env.get(MALWARE_ID) 316 resp = env.relationships(mal) 317 318 assert len(resp) == 3 319 assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp) 320 assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp) 321 assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp) 322 323 324def test_relationships_no_id(ds): 325 env = stix2.Environment(store=ds) 326 mal = { 327 "type": "malware", 328 "name": "some variant", 329 } 330 with pytest.raises(ValueError) as excinfo: 331 env.relationships(mal) 332 assert "object has no 'id' property" in str(excinfo.value) 333 334 335def test_relationships_by_type(ds): 336 env = stix2.Environment(store=ds) 337 mal = env.get(MALWARE_ID) 338 resp = env.relationships(mal, relationship_type='indicates') 339 340 assert len(resp) == 1 341 assert resp[0]['id'] == RELATIONSHIP_IDS[0] 342 343 344def test_relationships_by_source(ds): 345 env = stix2.Environment(store=ds) 346 resp = env.relationships(MALWARE_ID, source_only=True) 347 348 assert len(resp) == 1 349 assert resp[0]['id'] == RELATIONSHIP_IDS[1] 350 351 352def test_relationships_by_target(ds): 353 env = stix2.Environment(store=ds) 354 resp = env.relationships(MALWARE_ID, target_only=True) 355 356 assert len(resp) == 2 357 assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp) 358 assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp) 359 360 361def test_relationships_by_target_and_type(ds): 362 env = stix2.Environment(store=ds) 363 resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True) 364 365 assert len(resp) == 1 366 assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp) 367 368 369def test_relationships_by_target_and_source(ds): 370 env = stix2.Environment(store=ds) 371 with pytest.raises(ValueError) as excinfo: 372 env.relationships(MALWARE_ID, target_only=True, source_only=True) 373 374 assert 'not both' in str(excinfo.value) 375 376 377def test_related_to(ds): 378 env = stix2.Environment(store=ds) 379 mal = env.get(MALWARE_ID) 380 resp = env.related_to(mal) 381 382 assert len(resp) == 3 383 assert any(x['id'] == CAMPAIGN_ID for x in resp) 384 assert any(x['id'] == INDICATOR_ID for x in resp) 385 assert any(x['id'] == IDENTITY_ID for x in resp) 386 387 388def test_related_to_no_id(ds): 389 env = stix2.Environment(store=ds) 390 mal = { 391 "type": "malware", 392 "name": "some variant", 393 } 394 with pytest.raises(ValueError) as excinfo: 395 env.related_to(mal) 396 assert "object has no 'id' property" in str(excinfo.value) 397 398 399def test_related_to_by_source(ds): 400 env = stix2.Environment(store=ds) 401 resp = env.related_to(MALWARE_ID, source_only=True) 402 403 assert len(resp) == 1 404 assert resp[0]['id'] == IDENTITY_ID 405 406 407def test_related_to_by_target(ds): 408 env = stix2.Environment(store=ds) 409 resp = env.related_to(MALWARE_ID, target_only=True) 410 411 assert len(resp) == 2 412 assert any(x['id'] == CAMPAIGN_ID for x in resp) 413 assert any(x['id'] == INDICATOR_ID for x in resp) 414 415 416def test_versioned_checks(ds, ds2): 417 weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() 418 weights.update({ 419 "_internal": { 420 "ignore_spec_version": True, 421 "versioning_checks": True, 422 "max_depth": 1, 423 }, 424 }) 425 score = stix2.equivalence.object._versioned_checks(INDICATOR_ID, INDICATOR_ID, ds, ds2, **weights) 426 assert round(score) == 100 427 428 429def test_semantic_check_with_versioning(ds, ds2): 430 weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() 431 weights.update({ 432 "_internal": { 433 "ignore_spec_version": False, 434 "versioning_checks": True, 435 "ds1": ds, 436 "ds2": ds2, 437 "max_depth": 1, 438 }, 439 }) 440 ind = stix2.v20.Indicator( 441 **dict( 442 labels=["malicious-activity"], 443 pattern="[file:hashes.'SHA-256' = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855']", 444 valid_from="2017-01-01T12:34:56Z", 445 external_references=[ 446 { 447 "source_name": "unknown", 448 "url": "https://examplewebsite2.com/", 449 }, 450 ], 451 object_marking_refs=[stix2.v20.TLP_WHITE], 452 ) 453 ) 454 ds.add(ind) 455 score = stix2.equivalence.object.reference_check(ind.id, INDICATOR_ID, ds, ds2, **weights) 456 assert round(score) == 0 # Since pattern is different score is really low 457 458 459def test_list_semantic_check(ds, ds2): 460 weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy() 461 weights.update({ 462 "_internal": { 463 "ignore_spec_version": False, 464 "versioning_checks": False, 465 "ds1": ds, 466 "ds2": ds2, 467 "max_depth": 1, 468 }, 469 }) 470 object_refs1 = [ 471 "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e", 472 "relationship--06520621-5352-4e6a-b976-e8fa3d437ffd", 473 "indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7", 474 ] 475 object_refs2 = [ 476 "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f", 477 "identity--311b2d2d-f010-4473-83ec-1edf84858f4c", 478 "indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7", 479 "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e", 480 "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e", 481 "relationship--06520621-5352-4e6a-b976-e8fa3d437ffd", 482 "relationship--181c9c09-43e6-45dd-9374-3bec192f05ef", 483 "relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70", 484 ] 485 486 score = stix2.equivalence.object.list_reference_check( 487 object_refs1, 488 object_refs2, 489 ds, 490 ds2, 491 **weights, 492 ) 493 assert round(score) == 1 494 495 496def test_graph_equivalence_with_filesystem_source(ds): 497 weights = { 498 "_internal": { 499 "ignore_spec_version": True, 500 "versioning_checks": False, 501 "max_depth": 1, 502 }, 503 } 504 prop_scores = {} 505 fs = stix2.FileSystemSource(FS_PATH) 506 env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights) 507 assert round(env) == 28 508 assert round(prop_scores["matching_score"]) == 139 509 assert round(prop_scores["sum_weights"]) == 500 510 511 512def test_graph_equivalence_with_duplicate_graph(ds): 513 weights = { 514 "_internal": { 515 "ignore_spec_version": False, 516 "versioning_checks": False, 517 "max_depth": 1, 518 }, 519 } 520 prop_scores = {} 521 env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights) 522 assert round(env) == 100 523 assert round(prop_scores["matching_score"]) == 800 524 assert round(prop_scores["sum_weights"]) == 800 525 526 527def test_graph_equivalence_with_versioning_check_on(ds2, ds): 528 weights = { 529 "_internal": { 530 "ignore_spec_version": False, 531 "versioning_checks": True, 532 "max_depth": 1, 533 }, 534 } 535 prop_scores = {} 536 env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights) 537 assert round(env) == 93 538 assert round(prop_scores["matching_score"]) == 745 539 assert round(prop_scores["sum_weights"]) == 800 540 541 542def test_graph_equivalence_with_versioning_check_off(ds2, ds): 543 weights = { 544 "_internal": { 545 "ignore_spec_version": False, 546 "versioning_checks": False, 547 "max_depth": 1, 548 }, 549 } 550 prop_scores = {} 551 env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights) 552 assert round(env) == 93 553 assert round(prop_scores["matching_score"]) == 745 554 assert round(prop_scores["sum_weights"]) == 800 555