1import pytest
2
3from stix2 import parse
4from stix2.datastore.filters import Filter, apply_common_filters
5from stix2.utils import STIXdatetime, parse_into_datetime
6
7stix_objs = [
8    {
9        "created": "2017-01-27T13:49:53.997Z",
10        "description": "\n\nTITLE:\n\tPoison Ivy",
11        "id": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111",
12        "labels": [
13            "remote-access-trojan",
14        ],
15        "modified": "2017-01-27T13:49:53.997Z",
16        "name": "Poison Ivy",
17        "type": "malware",
18    },
19    {
20        "created": "2014-05-08T09:00:00.000Z",
21        "id": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade",
22        "labels": [
23            "file-hash-watchlist",
24        ],
25        "modified": "2014-05-08T09:00:00.000Z",
26        "name": "File hash for Poison Ivy variant",
27        "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
28        "type": "indicator",
29        "valid_from": "2014-05-08T09:00:00.000000Z",
30    },
31    {
32        "created": "2014-05-08T09:00:00.000Z",
33        "granular_markings": [
34            {
35                "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
36                "selectors": [
37                    "relationship_type",
38                ],
39            },
40        ],
41        "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
42        "modified": "2014-05-08T09:00:00.000Z",
43        "object_marking_refs": [
44            "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
45        ],
46        "relationship_type": "indicates",
47        "revoked": True,
48        "source_ref": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade",
49        "target_ref": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111",
50        "type": "relationship",
51    },
52    {
53        "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
54        "created": "2016-02-14T00:00:00.000Z",
55        "created_by_ref": "identity--f1350682-3290-4e0d-be58-69e290537647",
56        "modified": "2016-02-14T00:00:00.000Z",
57        "type": "vulnerability",
58        "name": "CVE-2014-0160",
59        "description": "The (1) TLS...",
60        "external_references": [
61            {
62                "source_name": "cve",
63                "external_id": "CVE-2014-0160",
64            },
65        ],
66        "labels": ["heartbleed", "has-logo"],
67    },
68    {
69        "type": "observed-data",
70        "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
71        "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
72        "created": "2016-04-06T19:58:16.000Z",
73        "modified": "2016-04-06T19:58:16.000Z",
74        "first_observed": "2015-12-21T19:00:00Z",
75        "last_observed": "2015-12-21T19:00:00Z",
76        "number_observed": 1,
77        "objects": {
78            "0": {
79                "type": "file",
80                "name": "HAL 9000.exe",
81            },
82        },
83
84    },
85]
86
87
88filters = [
89    Filter("type", "!=", "relationship"),
90    Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
91    Filter("labels", "in", "remote-access-trojan"),
92    Filter("created", ">", "2015-01-01T01:00:00.000Z"),
93    Filter("revoked", "=", True),
94    Filter("revoked", "!=", True),
95    Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
96    Filter("granular_markings.selectors", "in", "relationship_type"),
97    Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
98    Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
99    Filter("created_by_ref", "=", "identity--f1350682-3290-4e0d-be58-69e290537647"),
100    Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9"),
101    Filter("granular_markings.selectors", "in", "description"),
102    Filter("external_references.source_name", "=", "CVE"),
103    Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}}),
104    Filter("objects", "contains", {"type": "file", "name": "HAL 9000.exe"}),
105    Filter("labels", "contains", "heartbleed"),
106]
107
108# same as above objects but converted to real Python STIX2 objects
109# to test filters against true Python STIX2 objects
110real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
111
112
113def test_filter_ops_check():
114    # invalid filters - non supported operators
115
116    with pytest.raises(ValueError) as excinfo:
117        # create Filter that has an operator that is not allowed
118        Filter('modified', '*', 'not supported operator')
119    assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'"
120
121    with pytest.raises(ValueError) as excinfo:
122        Filter("type", "%", "4")
123    assert "Filter operator '%' not supported for specified property" in str(excinfo.value)
124
125
126def test_filter_value_type_check():
127    # invalid filters - non supported value types
128
129    with pytest.raises(TypeError) as excinfo:
130        Filter('created', '=', object())
131    # On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
132    assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
133    assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
134
135    with pytest.raises(TypeError) as excinfo:
136        Filter("type", "=", complex(2, -1))
137    assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
138    assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
139
140    with pytest.raises(TypeError) as excinfo:
141        Filter("type", "=", set([16, 23]))
142    assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
143    assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
144
145
146def test_filter_type_underscore_check():
147    # check that Filters where property="type", value (name) doesnt have underscores
148    with pytest.raises(ValueError) as excinfo:
149        Filter("type", "=", "oh_underscore")
150    assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value)
151
152
153def test_apply_common_filters0():
154    # "Return any object whose type is not relationship"
155    resp = list(apply_common_filters(stix_objs, [filters[0]]))
156    ids = [r['id'] for r in resp]
157    assert stix_objs[0]['id'] in ids
158    assert stix_objs[1]['id'] in ids
159    assert stix_objs[3]['id'] in ids
160    assert len(ids) == 4
161
162    resp = list(apply_common_filters(real_stix_objs, [filters[0]]))
163    ids = [r.id for r in resp]
164    assert real_stix_objs[0].id in ids
165    assert real_stix_objs[1].id in ids
166    assert real_stix_objs[3].id in ids
167    assert len(ids) == 4
168
169
170def test_apply_common_filters1():
171    # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
172    resp = list(apply_common_filters(stix_objs, [filters[1]]))
173    assert resp[0]['id'] == stix_objs[2]['id']
174    assert len(resp) == 1
175
176    resp = list(apply_common_filters(real_stix_objs, [filters[1]]))
177    assert resp[0].id == real_stix_objs[2].id
178    assert len(resp) == 1
179
180
181def test_apply_common_filters2():
182    # "Return any object that contains remote-access-trojan in labels"
183    resp = list(apply_common_filters(stix_objs, [filters[2]]))
184    assert resp[0]['id'] == stix_objs[0]['id']
185    assert len(resp) == 1
186
187    resp = list(apply_common_filters(real_stix_objs, [filters[2]]))
188    assert resp[0].id == real_stix_objs[0].id
189    assert len(resp) == 1
190
191
192def test_apply_common_filters3():
193    # "Return any object created after 2015-01-01T01:00:00.000Z"
194    resp = list(apply_common_filters(stix_objs, [filters[3]]))
195    assert resp[0]['id'] == stix_objs[0]['id']
196    assert len(resp) == 3
197
198    resp = list(apply_common_filters(real_stix_objs, [filters[3]]))
199    assert len(resp) == 3
200    assert resp[0].id == real_stix_objs[0].id
201
202
203def test_apply_common_filters4():
204    # "Return any revoked object"
205    resp = list(apply_common_filters(stix_objs, [filters[4]]))
206    assert resp[0]['id'] == stix_objs[2]['id']
207    assert len(resp) == 1
208
209    resp = list(apply_common_filters(real_stix_objs, [filters[4]]))
210    assert resp[0].id == real_stix_objs[2].id
211    assert len(resp) == 1
212
213
214def test_apply_common_filters5():
215    # "Return any object whose not revoked"
216    resp = list(apply_common_filters(stix_objs, [filters[5]]))
217    assert len(resp) == 0
218
219    resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
220    assert len(resp) == 4
221
222
223def test_apply_common_filters6():
224    # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
225    resp = list(apply_common_filters(stix_objs, [filters[6]]))
226    assert resp[0]['id'] == stix_objs[2]['id']
227    assert len(resp) == 1
228
229    resp = list(apply_common_filters(real_stix_objs, [filters[6]]))
230    assert resp[0].id == real_stix_objs[2].id
231    assert len(resp) == 1
232
233
234def test_apply_common_filters7():
235    # "Return any object that contains relationship_type in their selectors AND
236    # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
237    resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
238    assert resp[0]['id'] == stix_objs[2]['id']
239    assert len(resp) == 1
240
241    resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]]))
242    assert resp[0].id == real_stix_objs[2].id
243    assert len(resp) == 1
244
245
246def test_apply_common_filters8():
247    # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
248    resp = list(apply_common_filters(stix_objs, [filters[9]]))
249    assert resp[0]['id'] == stix_objs[3]['id']
250    assert len(resp) == 1
251
252    resp = list(apply_common_filters(real_stix_objs, [filters[9]]))
253    assert resp[0].id == real_stix_objs[3].id
254    assert len(resp) == 1
255
256
257def test_apply_common_filters9():
258    # "Return any object that matches created_by_ref identity--f1350682-3290-4e0d-be58-69e290537647"
259    resp = list(apply_common_filters(stix_objs, [filters[10]]))
260    assert len(resp) == 1
261
262    resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
263    assert len(resp) == 1
264
265
266def test_apply_common_filters10():
267    # "Return any object that matches marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9 in object_marking_refs" (None)
268    resp = list(apply_common_filters(stix_objs, [filters[11]]))
269    assert len(resp) == 0
270
271    resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
272    assert len(resp) == 0
273
274
275def test_apply_common_filters11():
276    # "Return any object that contains description in its selectors" (None)
277    resp = list(apply_common_filters(stix_objs, [filters[12]]))
278    assert len(resp) == 0
279
280    resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
281    assert len(resp) == 0
282
283
284def test_apply_common_filters12():
285    # "Return any object that matches CVE in source_name" (None, case sensitive)
286    resp = list(apply_common_filters(stix_objs, [filters[13]]))
287    assert len(resp) == 0
288
289    resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
290    assert len(resp) == 0
291
292
293def test_apply_common_filters13():
294    # Return any object that matches file object in "objects"
295    resp = list(apply_common_filters(stix_objs, [filters[14]]))
296    assert resp[0]["id"] == stix_objs[4]["id"]
297    assert len(resp) == 1
298    # important additional check to make sure original File dict was
299    # not converted to File object. (this was a deep bug found)
300    assert isinstance(resp[0]["objects"]["0"], dict)
301
302    resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
303    assert resp[0].id == real_stix_objs[4].id
304    assert len(resp) == 1
305
306
307def test_apply_common_filters14():
308    # Return any object that contains a specific File Cyber Observable Object
309    resp = list(apply_common_filters(stix_objs, [filters[15]]))
310    assert resp[0]['id'] == stix_objs[4]['id']
311    assert len(resp) == 1
312
313    resp = list(apply_common_filters(real_stix_objs, [filters[15]]))
314    assert resp[0].id == real_stix_objs[4].id
315    assert len(resp) == 1
316
317
318def test_apply_common_filters15():
319    # Return any object that contains 'heartbleed' in "labels"
320    resp = list(apply_common_filters(stix_objs, [filters[16]]))
321    assert resp[0]['id'] == stix_objs[3]['id']
322    assert len(resp) == 1
323
324    resp = list(apply_common_filters(real_stix_objs, [filters[16]]))
325    assert resp[0].id == real_stix_objs[3].id
326    assert len(resp) == 1
327
328
329def test_datetime_filter_behavior():
330    """if a filter is initialized with its value being a datetime object
331    OR the STIX object property being filtered on is a datetime object, all
332    resulting comparisons executed are done on the string representations
333    of the datetime objects, as the Filter functionality will convert
334    all datetime objects to there string forms using format_datetim()
335
336    This test makes sure all datetime comparisons are carried out correctly
337    """
338    filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond"))
339    filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z")
340
341    # compare datetime obj to filter w/ datetime obj
342    resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj]))
343    assert len(resp) == 1
344    assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
345    assert isinstance(resp[0].created, STIXdatetime)  # make sure original object not altered
346
347    # compare datetime string to filter w/ str
348    resp = list(apply_common_filters(stix_objs, [filter_with_str]))
349    assert len(resp) == 1
350    assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
351
352    # compare datetime obj to filter w/ str
353    resp = list(apply_common_filters(real_stix_objs, [filter_with_str]))
354    assert len(resp) == 1
355    assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
356    assert isinstance(resp[0].created, STIXdatetime)  # make sure original object not altered
357
358
359def test_filters0(stix_objs2, real_stix_objs2):
360    # "Return any object modified before 2017-01-28T13:49:53.935Z"
361    resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
362    assert resp[0]['id'] == stix_objs2[1]['id']
363    assert len(resp) == 2
364
365    resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
366    assert resp[0].id == real_stix_objs2[1].id
367    assert len(resp) == 2
368
369
370def test_filters1(stix_objs2, real_stix_objs2):
371    # "Return any object modified after 2017-01-28T13:49:53.935Z"
372    resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
373    assert resp[0]['id'] == stix_objs2[0]['id']
374    assert len(resp) == 1
375
376    resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
377    assert resp[0].id == real_stix_objs2[0].id
378    assert len(resp) == 1
379
380
381def test_filters2(stix_objs2, real_stix_objs2):
382    # "Return any object modified after or on 2017-01-28T13:49:53.935Z"
383    resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
384    assert resp[0]['id'] == stix_objs2[0]['id']
385    assert len(resp) == 3
386
387    resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
388    assert resp[0].id == real_stix_objs2[0].id
389    assert len(resp) == 3
390
391
392def test_filters3(stix_objs2, real_stix_objs2):
393    # "Return any object modified before or on 2017-01-28T13:49:53.935Z"
394    resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
395    assert resp[0]['id'] == stix_objs2[1]['id']
396    assert len(resp) == 2
397
398    # "Return any object modified before or on 2017-01-28T13:49:53.935Z"
399    fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))
400    resp = list(apply_common_filters(real_stix_objs2, [fv]))
401    assert resp[0].id == real_stix_objs2[1].id
402    assert len(resp) == 2
403
404
405def test_filters4():
406    # Assert invalid Filter cannot be created
407    with pytest.raises(ValueError) as excinfo:
408        Filter("modified", "?", "2017-01-27T13:49:53.935Z")
409    assert str(excinfo.value) == (
410        "Filter operator '?' not supported "
411        "for specified property: 'modified'"
412    )
413
414
415def test_filters5(stix_objs2, real_stix_objs2):
416    # "Return any object whose id is not indicator--00000000-0000-4000-8000-000000000002"
417    resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")]))
418    assert resp[0]['id'] == stix_objs2[0]['id']
419    assert len(resp) == 1
420
421    resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")]))
422    assert resp[0].id == real_stix_objs2[0].id
423    assert len(resp) == 1
424
425
426def test_filters6(stix_objs2, real_stix_objs2):
427    # Test filtering on non-common property
428    resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")]))
429    assert resp[0]['id'] == stix_objs2[0]['id']
430    assert len(resp) == 3
431
432    resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")]))
433    assert resp[0].id == real_stix_objs2[0].id
434    assert len(resp) == 3
435
436
437def test_filters7(stix_objs2, real_stix_objs2):
438    # Test filtering on embedded property
439    obsvd_data_obj = {
440        "type": "observed-data",
441        "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf",
442        "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
443        "created": "2016-04-06T19:58:16.000Z",
444        "modified": "2016-04-06T19:58:16.000Z",
445        "first_observed": "2015-12-21T19:00:00Z",
446        "last_observed": "2015-12-21T19:00:00Z",
447        "number_observed": 50,
448        "objects": {
449            "0": {
450                "type": "file",
451                "hashes": {
452                    "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f",
453                },
454                "extensions": {
455                    "pdf-ext": {
456                        "version": "1.7",
457                        "document_info_dict": {
458                            "Title": "Sample document",
459                            "Author": "Adobe Systems Incorporated",
460                            "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
461                            "Producer": "Acrobat Distiller 3.01 for Power Macintosh",
462                            "CreationDate": "20070412090123-02",
463                        },
464                        "pdfid0": "DFCE52BD827ECF765649852119D",
465                        "pdfid1": "57A1E0F9ED2AE523E313C",
466                    },
467                },
468            },
469        },
470    }
471
472    stix_objects = list(stix_objs2) + [obsvd_data_obj]
473    real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)]
474
475    resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
476    assert resp[0]['id'] == stix_objects[3]['id']
477    assert len(resp) == 1
478
479    resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
480    assert resp[0].id == real_stix_objects[3].id
481    assert len(resp) == 1
482