1import os
2
3import pytest
4
5import stix2
6import stix2.equivalence.graph
7import stix2.equivalence.object
8
9from .constants import (
10    CAMPAIGN_ID, CAMPAIGN_KWARGS, FAKE_TIME, IDENTITY_ID, IDENTITY_KWARGS,
11    INDICATOR_ID, INDICATOR_KWARGS, MALWARE_ID, MALWARE_KWARGS,
12    RELATIONSHIP_IDS,
13)
14
15FS_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "stix2_data")
16
17
18@pytest.fixture
19def ds():
20    cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
21    idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
22    ind = stix2.v20.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
23    mal = stix2.v20.Malware(id=MALWARE_ID, **MALWARE_KWARGS)
24    rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
25    rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
26    rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
27    reprt = stix2.v20.Report(
28        name="Malware Report",
29        published="2021-05-09T08:22:22Z",
30        labels=["campaign"],
31        object_refs=[mal.id, rel1.id, ind.id],
32    )
33    stix_objs = [cam, idy, ind, mal, rel1, rel2, rel3, reprt]
34    yield stix2.MemoryStore(stix_objs)
35
36
37@pytest.fixture
38def ds2():
39    cam = stix2.v20.Campaign(id=CAMPAIGN_ID, **CAMPAIGN_KWARGS)
40    idy = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
41    ind = stix2.v20.Indicator(id=INDICATOR_ID, created_by_ref=idy.id, **INDICATOR_KWARGS)
42    indv2 = ind.new_version(external_references=[{
43        "source_name": "unknown",
44        "url": "https://examplewebsite.com/",
45    }])
46    mal = stix2.v20.Malware(id=MALWARE_ID, created_by_ref=idy.id, **MALWARE_KWARGS)
47    malv2 = mal.new_version(external_references=[{
48        "source_name": "unknown",
49        "url": "https://examplewebsite2.com/",
50    }])
51    rel1 = stix2.v20.Relationship(ind, 'indicates', mal, id=RELATIONSHIP_IDS[0])
52    rel2 = stix2.v20.Relationship(mal, 'targets', idy, id=RELATIONSHIP_IDS[1])
53    rel3 = stix2.v20.Relationship(cam, 'uses', mal, id=RELATIONSHIP_IDS[2])
54    stix_objs = [cam, idy, ind, indv2, mal, malv2, rel1, rel2, rel3]
55    reprt = stix2.v20.Report(
56        created_by_ref=idy.id,
57        name="example",
58        labels=["campaign"],
59        published="2021-04-09T08:22:22Z",
60        object_refs=stix_objs,
61    )
62    stix_objs.append(reprt)
63    yield stix2.MemoryStore(stix_objs)
64
65
66def test_object_factory_created_by_ref_str():
67    factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
68    ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
69    assert ind.created_by_ref == IDENTITY_ID
70
71
72def test_object_factory_created_by_ref_obj():
73    id_obj = stix2.v20.Identity(id=IDENTITY_ID, **IDENTITY_KWARGS)
74    factory = stix2.ObjectFactory(created_by_ref=id_obj)
75    ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
76    assert ind.created_by_ref == IDENTITY_ID
77
78
79def test_object_factory_override_default():
80    factory = stix2.ObjectFactory(created_by_ref=IDENTITY_ID)
81    new_id = "identity--983b3172-44fe-4a80-8091-eb8098841fe8"
82    ind = factory.create(stix2.v20.Indicator, created_by_ref=new_id, **INDICATOR_KWARGS)
83    assert ind.created_by_ref == new_id
84
85
86def test_object_factory_created():
87    factory = stix2.ObjectFactory(created=FAKE_TIME)
88    ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
89    assert ind.created == FAKE_TIME
90    assert ind.modified == FAKE_TIME
91
92
93def test_object_factory_external_reference():
94    ext_ref = stix2.v20.ExternalReference(
95        source_name="ACME Threat Intel",
96        description="Threat report",
97    )
98    factory = stix2.ObjectFactory(external_references=ext_ref)
99    ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
100    assert ind.external_references[0].source_name == "ACME Threat Intel"
101    assert ind.external_references[0].description == "Threat report"
102
103    ind2 = factory.create(stix2.v20.Indicator, external_references=None, **INDICATOR_KWARGS)
104    assert 'external_references' not in ind2
105
106
107def test_object_factory_obj_markings():
108    stmt_marking = stix2.v20.StatementMarking("Copyright 2016, Example Corp")
109    mark_def = stix2.v20.MarkingDefinition(
110        definition_type="statement",
111        definition=stmt_marking,
112    )
113    factory = stix2.ObjectFactory(object_marking_refs=[mark_def, stix2.v20.TLP_AMBER])
114    ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
115    assert mark_def.id in ind.object_marking_refs
116    assert stix2.v20.TLP_AMBER.id in ind.object_marking_refs
117
118    factory = stix2.ObjectFactory(object_marking_refs=stix2.v20.TLP_RED)
119    ind = factory.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
120    assert stix2.v20.TLP_RED.id in ind.object_marking_refs
121
122
123def test_object_factory_list_append():
124    ext_ref = stix2.v20.ExternalReference(
125        source_name="ACME Threat Intel",
126        description="Threat report from ACME",
127    )
128    ext_ref2 = stix2.v20.ExternalReference(
129        source_name="Yet Another Threat Report",
130        description="Threat report from YATR",
131    )
132    ext_ref3 = stix2.v20.ExternalReference(
133        source_name="Threat Report #3",
134        description="One more threat report",
135    )
136    factory = stix2.ObjectFactory(external_references=ext_ref)
137    ind = factory.create(stix2.v20.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
138    assert ind.external_references[1].source_name == "Yet Another Threat Report"
139
140    ind = factory.create(stix2.v20.Indicator, external_references=[ext_ref2, ext_ref3], **INDICATOR_KWARGS)
141    assert ind.external_references[2].source_name == "Threat Report #3"
142
143
144def test_object_factory_list_replace():
145    ext_ref = stix2.v20.ExternalReference(
146        source_name="ACME Threat Intel",
147        description="Threat report from ACME",
148    )
149    ext_ref2 = stix2.v20.ExternalReference(
150        source_name="Yet Another Threat Report",
151        description="Threat report from YATR",
152    )
153    factory = stix2.ObjectFactory(external_references=ext_ref, list_append=False)
154    ind = factory.create(stix2.v20.Indicator, external_references=ext_ref2, **INDICATOR_KWARGS)
155    assert len(ind.external_references) == 1
156    assert ind.external_references[0].source_name == "Yet Another Threat Report"
157
158
159def test_environment_functions():
160    env = stix2.Environment(
161        stix2.ObjectFactory(created_by_ref=IDENTITY_ID),
162        stix2.MemoryStore(),
163    )
164
165    # Create a STIX object
166    ind = env.create(stix2.v20.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
167    assert ind.created_by_ref == IDENTITY_ID
168
169    # Add objects to datastore
170    ind2 = ind.new_version(labels=['benign'])
171    env.add([ind, ind2])
172
173    # Get both versions of the object
174    resp = env.all_versions(INDICATOR_ID)
175    assert len(resp) == 2
176
177    # Get just the most recent version of the object
178    resp = env.get(INDICATOR_ID)
179    assert resp['labels'][0] == 'benign'
180
181    # Search on something other than id
182    query = [stix2.Filter('type', '=', 'vulnerability')]
183    resp = env.query(query)
184    assert len(resp) == 0
185
186    # See different results after adding filters to the environment
187    env.add_filters([
188        stix2.Filter('type', '=', 'indicator'),
189        stix2.Filter('created_by_ref', '=', IDENTITY_ID),
190    ])
191    env.add_filter(stix2.Filter('labels', '=', 'benign'))  # should be 'malicious-activity'
192    resp = env.get(INDICATOR_ID)
193    assert resp['labels'][0] == 'benign'  # should be 'malicious-activity'
194
195
196def test_environment_source_and_sink():
197    ind = stix2.v20.Indicator(id=INDICATOR_ID, **INDICATOR_KWARGS)
198    env = stix2.Environment(source=stix2.MemorySource([ind]), sink=stix2.MemorySink([ind]))
199    assert env.get(INDICATOR_ID).labels[0] == 'malicious-activity'
200
201
202def test_environment_datastore_and_sink():
203    with pytest.raises(ValueError) as excinfo:
204        stix2.Environment(
205            factory=stix2.ObjectFactory(),
206            store=stix2.MemoryStore(), sink=stix2.MemorySink,
207        )
208    assert 'Data store already provided' in str(excinfo.value)
209
210
211def test_environment_no_datastore():
212    env = stix2.Environment(factory=stix2.ObjectFactory())
213
214    with pytest.raises(AttributeError) as excinfo:
215        env.add(stix2.v20.Indicator(**INDICATOR_KWARGS))
216    assert 'Environment has no data sink to put objects in' in str(excinfo.value)
217
218    with pytest.raises(AttributeError) as excinfo:
219        env.get(INDICATOR_ID)
220    assert 'Environment has no data source' in str(excinfo.value)
221
222    with pytest.raises(AttributeError) as excinfo:
223        env.all_versions(INDICATOR_ID)
224    assert 'Environment has no data source' in str(excinfo.value)
225
226    with pytest.raises(AttributeError) as excinfo:
227        env.query(INDICATOR_ID)
228    assert 'Environment has no data source' in str(excinfo.value)
229
230    with pytest.raises(AttributeError) as excinfo:
231        env.relationships(INDICATOR_ID)
232    assert 'Environment has no data source' in str(excinfo.value)
233
234    with pytest.raises(AttributeError) as excinfo:
235        env.related_to(INDICATOR_ID)
236    assert 'Environment has no data source' in str(excinfo.value)
237
238
239def test_environment_add_filters():
240    env = stix2.Environment(factory=stix2.ObjectFactory())
241    env.add_filters([INDICATOR_ID])
242    env.add_filter(INDICATOR_ID)
243
244
245def test_environment_datastore_and_no_object_factory():
246    # Uses a default object factory
247    env = stix2.Environment(store=stix2.MemoryStore())
248    ind = env.create(stix2.v20.Indicator, id=INDICATOR_ID, **INDICATOR_KWARGS)
249    assert ind.id == INDICATOR_ID
250
251
252def test_parse_malware():
253    env = stix2.Environment()
254    data = """{
255        "type": "malware",
256        "id": "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
257        "created": "2017-01-01T12:34:56.000Z",
258        "modified": "2017-01-01T12:34:56.000Z",
259        "name": "Cryptolocker",
260        "labels": [
261            "ransomware"
262        ]
263    }"""
264    mal = env.parse(data, version="2.0")
265
266    assert mal.type == 'malware'
267    assert mal.id == MALWARE_ID
268    assert mal.created == FAKE_TIME
269    assert mal.modified == FAKE_TIME
270    assert mal.labels == ['ransomware']
271    assert mal.name == "Cryptolocker"
272
273
274def test_creator_of():
275    identity = stix2.v20.Identity(**IDENTITY_KWARGS)
276    factory = stix2.ObjectFactory(created_by_ref=identity.id)
277    env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
278    env.add(identity)
279
280    ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
281    creator = env.creator_of(ind)
282    assert creator is identity
283
284
285def test_creator_of_no_datasource():
286    identity = stix2.v20.Identity(**IDENTITY_KWARGS)
287    factory = stix2.ObjectFactory(created_by_ref=identity.id)
288    env = stix2.Environment(factory=factory)
289
290    ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
291    with pytest.raises(AttributeError) as excinfo:
292        env.creator_of(ind)
293    assert 'Environment has no data source' in str(excinfo.value)
294
295
296def test_creator_of_not_found():
297    identity = stix2.v20.Identity(**IDENTITY_KWARGS)
298    factory = stix2.ObjectFactory(created_by_ref=identity.id)
299    env = stix2.Environment(store=stix2.MemoryStore(), factory=factory)
300
301    ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
302    creator = env.creator_of(ind)
303    assert creator is None
304
305
306def test_creator_of_no_created_by_ref():
307    env = stix2.Environment(store=stix2.MemoryStore())
308    ind = env.create(stix2.v20.Indicator, **INDICATOR_KWARGS)
309    creator = env.creator_of(ind)
310    assert creator is None
311
312
313def test_relationships(ds):
314    env = stix2.Environment(store=ds)
315    mal = env.get(MALWARE_ID)
316    resp = env.relationships(mal)
317
318    assert len(resp) == 3
319    assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
320    assert any(x['id'] == RELATIONSHIP_IDS[1] for x in resp)
321    assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
322
323
324def test_relationships_no_id(ds):
325    env = stix2.Environment(store=ds)
326    mal = {
327        "type": "malware",
328        "name": "some variant",
329    }
330    with pytest.raises(ValueError) as excinfo:
331        env.relationships(mal)
332    assert "object has no 'id' property" in str(excinfo.value)
333
334
335def test_relationships_by_type(ds):
336    env = stix2.Environment(store=ds)
337    mal = env.get(MALWARE_ID)
338    resp = env.relationships(mal, relationship_type='indicates')
339
340    assert len(resp) == 1
341    assert resp[0]['id'] == RELATIONSHIP_IDS[0]
342
343
344def test_relationships_by_source(ds):
345    env = stix2.Environment(store=ds)
346    resp = env.relationships(MALWARE_ID, source_only=True)
347
348    assert len(resp) == 1
349    assert resp[0]['id'] == RELATIONSHIP_IDS[1]
350
351
352def test_relationships_by_target(ds):
353    env = stix2.Environment(store=ds)
354    resp = env.relationships(MALWARE_ID, target_only=True)
355
356    assert len(resp) == 2
357    assert any(x['id'] == RELATIONSHIP_IDS[0] for x in resp)
358    assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
359
360
361def test_relationships_by_target_and_type(ds):
362    env = stix2.Environment(store=ds)
363    resp = env.relationships(MALWARE_ID, relationship_type='uses', target_only=True)
364
365    assert len(resp) == 1
366    assert any(x['id'] == RELATIONSHIP_IDS[2] for x in resp)
367
368
369def test_relationships_by_target_and_source(ds):
370    env = stix2.Environment(store=ds)
371    with pytest.raises(ValueError) as excinfo:
372        env.relationships(MALWARE_ID, target_only=True, source_only=True)
373
374    assert 'not both' in str(excinfo.value)
375
376
377def test_related_to(ds):
378    env = stix2.Environment(store=ds)
379    mal = env.get(MALWARE_ID)
380    resp = env.related_to(mal)
381
382    assert len(resp) == 3
383    assert any(x['id'] == CAMPAIGN_ID for x in resp)
384    assert any(x['id'] == INDICATOR_ID for x in resp)
385    assert any(x['id'] == IDENTITY_ID for x in resp)
386
387
388def test_related_to_no_id(ds):
389    env = stix2.Environment(store=ds)
390    mal = {
391        "type": "malware",
392        "name": "some variant",
393    }
394    with pytest.raises(ValueError) as excinfo:
395        env.related_to(mal)
396    assert "object has no 'id' property" in str(excinfo.value)
397
398
399def test_related_to_by_source(ds):
400    env = stix2.Environment(store=ds)
401    resp = env.related_to(MALWARE_ID, source_only=True)
402
403    assert len(resp) == 1
404    assert resp[0]['id'] == IDENTITY_ID
405
406
407def test_related_to_by_target(ds):
408    env = stix2.Environment(store=ds)
409    resp = env.related_to(MALWARE_ID, target_only=True)
410
411    assert len(resp) == 2
412    assert any(x['id'] == CAMPAIGN_ID for x in resp)
413    assert any(x['id'] == INDICATOR_ID for x in resp)
414
415
416def test_versioned_checks(ds, ds2):
417    weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
418    weights.update({
419        "_internal": {
420            "ignore_spec_version": True,
421            "versioning_checks": True,
422            "max_depth": 1,
423        },
424    })
425    score = stix2.equivalence.object._versioned_checks(INDICATOR_ID, INDICATOR_ID, ds, ds2, **weights)
426    assert round(score) == 100
427
428
429def test_semantic_check_with_versioning(ds, ds2):
430    weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
431    weights.update({
432        "_internal": {
433            "ignore_spec_version": False,
434            "versioning_checks": True,
435            "ds1": ds,
436            "ds2": ds2,
437            "max_depth": 1,
438        },
439    })
440    ind = stix2.v20.Indicator(
441        **dict(
442            labels=["malicious-activity"],
443            pattern="[file:hashes.'SHA-256' = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855']",
444            valid_from="2017-01-01T12:34:56Z",
445            external_references=[
446                {
447                  "source_name": "unknown",
448                  "url": "https://examplewebsite2.com/",
449                },
450            ],
451            object_marking_refs=[stix2.v20.TLP_WHITE],
452        )
453    )
454    ds.add(ind)
455    score = stix2.equivalence.object.reference_check(ind.id, INDICATOR_ID, ds, ds2, **weights)
456    assert round(score) == 0  # Since pattern is different score is really low
457
458
459def test_list_semantic_check(ds, ds2):
460    weights = stix2.equivalence.graph.GRAPH_WEIGHTS.copy()
461    weights.update({
462        "_internal": {
463            "ignore_spec_version": False,
464            "versioning_checks": False,
465            "ds1": ds,
466            "ds2": ds2,
467            "max_depth": 1,
468        },
469    })
470    object_refs1 = [
471        "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
472        "relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
473        "indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
474    ]
475    object_refs2 = [
476        "campaign--8e2e2d2b-17d4-4cbf-938f-98ee46b3cd3f",
477        "identity--311b2d2d-f010-4473-83ec-1edf84858f4c",
478        "indicator--a740531e-63ff-4e49-a9e1-a0a3eed0e3e7",
479        "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
480        "malware--9c4638ec-f1de-4ddb-abf4-1b760417654e",
481        "relationship--06520621-5352-4e6a-b976-e8fa3d437ffd",
482        "relationship--181c9c09-43e6-45dd-9374-3bec192f05ef",
483        "relationship--a0cbb21c-8daf-4a7f-96aa-7155a4ef8f70",
484    ]
485
486    score = stix2.equivalence.object.list_reference_check(
487        object_refs1,
488        object_refs2,
489        ds,
490        ds2,
491        **weights,
492    )
493    assert round(score) == 1
494
495
496def test_graph_equivalence_with_filesystem_source(ds):
497    weights = {
498        "_internal": {
499            "ignore_spec_version": True,
500            "versioning_checks": False,
501            "max_depth": 1,
502        },
503    }
504    prop_scores = {}
505    fs = stix2.FileSystemSource(FS_PATH)
506    env = stix2.Environment().graphically_equivalent(fs, ds, prop_scores, **weights)
507    assert round(env) == 28
508    assert round(prop_scores["matching_score"]) == 139
509    assert round(prop_scores["sum_weights"]) == 500
510
511
512def test_graph_equivalence_with_duplicate_graph(ds):
513    weights = {
514        "_internal": {
515            "ignore_spec_version": False,
516            "versioning_checks": False,
517            "max_depth": 1,
518        },
519    }
520    prop_scores = {}
521    env = stix2.Environment().graphically_equivalent(ds, ds, prop_scores, **weights)
522    assert round(env) == 100
523    assert round(prop_scores["matching_score"]) == 800
524    assert round(prop_scores["sum_weights"]) == 800
525
526
527def test_graph_equivalence_with_versioning_check_on(ds2, ds):
528    weights = {
529        "_internal": {
530            "ignore_spec_version": False,
531            "versioning_checks": True,
532            "max_depth": 1,
533        },
534    }
535    prop_scores = {}
536    env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
537    assert round(env) == 93
538    assert round(prop_scores["matching_score"]) == 745
539    assert round(prop_scores["sum_weights"]) == 800
540
541
542def test_graph_equivalence_with_versioning_check_off(ds2, ds):
543    weights = {
544        "_internal": {
545            "ignore_spec_version": False,
546            "versioning_checks": False,
547            "max_depth": 1,
548        },
549    }
550    prop_scores = {}
551    env = stix2.Environment().graphically_equivalent(ds, ds2, prop_scores, **weights)
552    assert round(env) == 93
553    assert round(prop_scores["matching_score"]) == 745
554    assert round(prop_scores["sum_weights"]) == 800
555