1import pytest
2
3from stix2 import parse
4from stix2.datastore.filters import Filter, apply_common_filters
5from stix2.utils import STIXdatetime, parse_into_datetime
6
7from .constants import OBSERVED_DATA_ID
8
9stix_objs = [
10    {
11        "created": "2017-01-27T13:49:53.997Z",
12        "description": "\n\nTITLE:\n\tPoison Ivy",
13        "id": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111",
14        "spec_version": "2.1",
15        "malware_types": [
16            "remote-access-trojan",
17        ],
18        "modified": "2017-01-27T13:49:53.997Z",
19        "is_family": False,
20        "name": "Poison Ivy",
21        "type": "malware",
22    },
23    {
24        "created": "2014-05-08T09:00:00.000Z",
25        "id": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade",
26        "indicator_types": [
27            "file-hash-watchlist",
28        ],
29        "modified": "2014-05-08T09:00:00.000Z",
30        "name": "File hash for Poison Ivy variant",
31        "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']",
32        "pattern_type": "stix",
33        "spec_version": "2.1",
34        "type": "indicator",
35        "valid_from": "2014-05-08T09:00:00.000000Z",
36    },
37    {
38        "created": "2014-05-08T09:00:00.000Z",
39        "granular_markings": [
40            {
41                "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed",
42                "selectors": [
43                    "relationship_type",
44                ],
45            },
46        ],
47        "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463",
48        "modified": "2014-05-08T09:00:00.000Z",
49        "object_marking_refs": [
50            "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9",
51        ],
52        "relationship_type": "indicates",
53        "revoked": True,
54        "source_ref": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade",
55        "spec_version": "2.1",
56        "target_ref": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111",
57        "type": "relationship",
58    },
59    {
60        "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef",
61        "spec_version": "2.1",
62        "created": "2016-02-14T00:00:00.000Z",
63        "created_by_ref": "identity--f1350682-3290-4e0d-be58-69e290537647",
64        "modified": "2016-02-14T00:00:00.000Z",
65        "type": "vulnerability",
66        "name": "CVE-2014-0160",
67        "description": "The (1) TLS...",
68        "external_references": [
69            {
70                "source_name": "cve",
71                "external_id": "CVE-2014-0160",
72            },
73        ],
74        "labels": ["heartbleed", "has-logo"],
75    },
76    {
77        "type": "observed-data",
78        "spec_version": "2.1",
79        "id": OBSERVED_DATA_ID,
80        "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
81        "created": "2016-04-06T19:58:16.000Z",
82        "modified": "2016-04-06T19:58:16.000Z",
83        "first_observed": "2015-12-21T19:00:00Z",
84        "last_observed": "2015-12-21T19:00:00Z",
85        "number_observed": 1,
86        "objects": {
87            "0": {
88                "type": "file",
89                "spec_version": "2.1",
90                "id": "file--42a7175a-42cc-508f-8fa7-23b330aff876",
91                "name": "HAL 9000.exe",
92                "defanged": False,
93            },
94        },
95
96    },
97]
98
99
100filters = [
101    Filter("type", "!=", "relationship"),
102    Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"),
103    Filter("malware_types", "in", "remote-access-trojan"),
104    Filter("created", ">", "2015-01-01T01:00:00.000Z"),
105    Filter("revoked", "=", True),
106    Filter("revoked", "!=", True),
107    Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"),
108    Filter("granular_markings.selectors", "in", "relationship_type"),
109    Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"),
110    Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"),
111    Filter("created_by_ref", "=", "identity--f1350682-3290-4e0d-be58-69e290537647"),
112    Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9"),
113    Filter("granular_markings.selectors", "in", "description"),
114    Filter("external_references.source_name", "=", "CVE"),
115    Filter(
116        "objects", "=",
117        {"0": {"type": "file", "id": "file--42a7175a-42cc-508f-8fa7-23b330aff876", "name": "HAL 9000.exe", "spec_version": "2.1", "defanged": False}},
118    ),
119    Filter(
120        "objects", "contains",
121        {"type": "file", "id": "file--42a7175a-42cc-508f-8fa7-23b330aff876", "name": "HAL 9000.exe", "spec_version": "2.1", "defanged": False},
122    ),
123    Filter("labels", "contains", "heartbleed"),
124]
125
126# same as above objects but converted to real Python STIX2 objects
127# to test filters against true Python STIX2 objects
128real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs]
129
130
131def test_filter_ops_check():
132    # invalid filters - non supported operators
133
134    with pytest.raises(ValueError) as excinfo:
135        # create Filter that has an operator that is not allowed
136        Filter('modified', '*', 'not supported operator')
137    assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'"
138
139    with pytest.raises(ValueError) as excinfo:
140        Filter("type", "%", "4")
141    assert "Filter operator '%' not supported for specified property" in str(excinfo.value)
142
143
144def test_filter_value_type_check():
145    # invalid filters - non supported value types
146
147    with pytest.raises(TypeError) as excinfo:
148        Filter('created', '=', object())
149    # On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`.
150    assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]])
151    assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
152
153    with pytest.raises(TypeError) as excinfo:
154        Filter("type", "=", complex(2, -1))
155    assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]])
156    assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
157
158    with pytest.raises(TypeError) as excinfo:
159        Filter("type", "=", set([16, 23]))
160    assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]])
161    assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value)
162
163
164def test_filter_type_underscore_check():
165    # check that Filters where property="type", value (name) doesnt have underscores
166    with pytest.raises(ValueError) as excinfo:
167        Filter("type", "=", "oh_underscore")
168    assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value)
169
170
171def test_apply_common_filters0():
172    # "Return any object whose type is not relationship"
173    resp = list(apply_common_filters(stix_objs, [filters[0]]))
174    ids = [r['id'] for r in resp]
175    assert stix_objs[0]['id'] in ids
176    assert stix_objs[1]['id'] in ids
177    assert stix_objs[3]['id'] in ids
178    assert len(ids) == 4
179
180    resp = list(apply_common_filters(real_stix_objs, [filters[0]]))
181    ids = [r.id for r in resp]
182    assert real_stix_objs[0].id in ids
183    assert real_stix_objs[1].id in ids
184    assert real_stix_objs[3].id in ids
185    assert len(ids) == 4
186
187
188def test_apply_common_filters1():
189    # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"
190    resp = list(apply_common_filters(stix_objs, [filters[1]]))
191    assert resp[0]['id'] == stix_objs[2]['id']
192    assert len(resp) == 1
193
194    resp = list(apply_common_filters(real_stix_objs, [filters[1]]))
195    assert resp[0].id == real_stix_objs[2].id
196    assert len(resp) == 1
197
198
199def test_apply_common_filters2():
200    # "Return any object that contains remote-access-trojan in labels"
201    resp = list(apply_common_filters(stix_objs, [filters[2]]))
202    assert resp[0]['id'] == stix_objs[0]['id']
203    assert len(resp) == 1
204
205    resp = list(apply_common_filters(real_stix_objs, [filters[2]]))
206    assert resp[0].id == real_stix_objs[0].id
207    assert len(resp) == 1
208
209
210def test_apply_common_filters3():
211    # "Return any object created after 2015-01-01T01:00:00.000Z"
212    resp = list(apply_common_filters(stix_objs, [filters[3]]))
213    assert resp[0]['id'] == stix_objs[0]['id']
214    assert len(resp) == 3
215
216    resp = list(apply_common_filters(real_stix_objs, [filters[3]]))
217    assert len(resp) == 3
218    assert resp[0].id == real_stix_objs[0].id
219
220
221def test_apply_common_filters4():
222    # "Return any revoked object"
223    resp = list(apply_common_filters(stix_objs, [filters[4]]))
224    assert resp[0]['id'] == stix_objs[2]['id']
225    assert len(resp) == 1
226
227    resp = list(apply_common_filters(real_stix_objs, [filters[4]]))
228    assert resp[0].id == real_stix_objs[2].id
229    assert len(resp) == 1
230
231
232def test_apply_common_filters5():
233    # "Return any object whose not revoked"
234    resp = list(apply_common_filters(stix_objs, [filters[5]]))
235    assert len(resp) == 0
236
237    resp = list(apply_common_filters(real_stix_objs, [filters[5]]))
238    assert len(resp) == 4
239
240
241def test_apply_common_filters6():
242    # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs"
243    resp = list(apply_common_filters(stix_objs, [filters[6]]))
244    assert resp[0]['id'] == stix_objs[2]['id']
245    assert len(resp) == 1
246
247    resp = list(apply_common_filters(real_stix_objs, [filters[6]]))
248    assert resp[0].id == real_stix_objs[2].id
249    assert len(resp) == 1
250
251
252def test_apply_common_filters7():
253    # "Return any object that contains relationship_type in their selectors AND
254    # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref"
255    resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]]))
256    assert resp[0]['id'] == stix_objs[2]['id']
257    assert len(resp) == 1
258
259    resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]]))
260    assert resp[0].id == real_stix_objs[2].id
261    assert len(resp) == 1
262
263
264def test_apply_common_filters8():
265    # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id"
266    resp = list(apply_common_filters(stix_objs, [filters[9]]))
267    assert resp[0]['id'] == stix_objs[3]['id']
268    assert len(resp) == 1
269
270    resp = list(apply_common_filters(real_stix_objs, [filters[9]]))
271    assert resp[0].id == real_stix_objs[3].id
272    assert len(resp) == 1
273
274
275def test_apply_common_filters9():
276    # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9"
277    resp = list(apply_common_filters(stix_objs, [filters[10]]))
278    assert len(resp) == 1
279
280    resp = list(apply_common_filters(real_stix_objs, [filters[10]]))
281    assert len(resp) == 1
282
283
284def test_apply_common_filters10():
285    # "Return any object that matches marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9 in object_marking_refs" (None)
286    resp = list(apply_common_filters(stix_objs, [filters[11]]))
287    assert len(resp) == 0
288
289    resp = list(apply_common_filters(real_stix_objs, [filters[11]]))
290    assert len(resp) == 0
291
292
293def test_apply_common_filters11():
294    # "Return any object that contains description in its selectors" (None)
295    resp = list(apply_common_filters(stix_objs, [filters[12]]))
296    assert len(resp) == 0
297
298    resp = list(apply_common_filters(real_stix_objs, [filters[12]]))
299    assert len(resp) == 0
300
301
302def test_apply_common_filters12():
303    # "Return any object that matches CVE in source_name" (None, case sensitive)
304    resp = list(apply_common_filters(stix_objs, [filters[13]]))
305    assert len(resp) == 0
306
307    resp = list(apply_common_filters(real_stix_objs, [filters[13]]))
308    assert len(resp) == 0
309
310
311def test_apply_common_filters13():
312    # Return any object that matches file object in "objects"
313    resp = list(apply_common_filters(stix_objs, [filters[14]]))
314    assert resp[0]["id"] == stix_objs[4]["id"]
315    assert len(resp) == 1
316    # important additional check to make sure original File dict was
317    # not converted to File object. (this was a deep bug found)
318    assert isinstance(resp[0]["objects"]["0"], dict)
319
320    resp = list(apply_common_filters(real_stix_objs, [filters[14]]))
321    assert resp[0].id == real_stix_objs[4].id
322    assert len(resp) == 1
323
324
325def test_apply_common_filters14():
326    # Return any object that contains a specific File Cyber Observable Object
327    resp = list(apply_common_filters(stix_objs, [filters[15]]))
328    assert resp[0]['id'] == stix_objs[4]['id']
329    assert len(resp) == 1
330
331    resp = list(apply_common_filters(real_stix_objs, [filters[15]]))
332    assert resp[0].id == real_stix_objs[4].id
333    assert len(resp) == 1
334
335
336def test_apply_common_filters15():
337    # Return any object that contains 'heartbleed' in "labels"
338    resp = list(apply_common_filters(stix_objs, [filters[16]]))
339    assert resp[0]['id'] == stix_objs[3]['id']
340    assert len(resp) == 1
341
342    resp = list(apply_common_filters(real_stix_objs, [filters[16]]))
343    assert resp[0].id == real_stix_objs[3].id
344    assert len(resp) == 1
345
346
347def test_datetime_filter_behavior():
348    """if a filter is initialized with its value being a datetime object
349    OR the STIX object property being filtered on is a datetime object, all
350    resulting comparisons executed are done on the string representations
351    of the datetime objects, as the Filter functionality will convert
352    all datetime objects to there string forms using format_datetim()
353
354    This test makes sure all datetime comparisons are carried out correctly
355    """
356    filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond"))
357    filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z")
358
359    # compare datetime obj to filter w/ datetime obj
360    resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj]))
361    assert len(resp) == 1
362    assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
363    assert isinstance(resp[0].created, STIXdatetime)  # make sure original object not altered
364
365    # compare datetime string to filter w/ str
366    resp = list(apply_common_filters(stix_objs, [filter_with_str]))
367    assert len(resp) == 1
368    assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
369
370    # compare datetime obj to filter w/ str
371    resp = list(apply_common_filters(real_stix_objs, [filter_with_str]))
372    assert len(resp) == 1
373    assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef"
374    assert isinstance(resp[0].created, STIXdatetime)  # make sure original object not altered
375
376
377def test_filters0(stix_objs2, real_stix_objs2):
378    # "Return any object modified before 2017-01-28T13:49:53.935Z"
379    resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")]))
380    assert resp[0]['id'] == stix_objs2[1]['id']
381    assert len(resp) == 2
382
383    resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
384    assert resp[0].id == real_stix_objs2[1].id
385    assert len(resp) == 2
386
387
388def test_filters1(stix_objs2, real_stix_objs2):
389    # "Return any object modified after 2017-01-28T13:49:53.935Z"
390    resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")]))
391    assert resp[0]['id'] == stix_objs2[0]['id']
392    assert len(resp) == 1
393
394    resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))]))
395    assert resp[0].id == real_stix_objs2[0].id
396    assert len(resp) == 1
397
398
399def test_filters2(stix_objs2, real_stix_objs2):
400    # "Return any object modified after or on 2017-01-28T13:49:53.935Z"
401    resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")]))
402    assert resp[0]['id'] == stix_objs2[0]['id']
403    assert len(resp) == 3
404
405    resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))]))
406    assert resp[0].id == real_stix_objs2[0].id
407    assert len(resp) == 3
408
409
410def test_filters3(stix_objs2, real_stix_objs2):
411    # "Return any object modified before or on 2017-01-28T13:49:53.935Z"
412    resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")]))
413    assert resp[0]['id'] == stix_objs2[1]['id']
414    assert len(resp) == 2
415
416    # "Return any object modified before or on 2017-01-28T13:49:53.935Z"
417    fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z"))
418    resp = list(apply_common_filters(real_stix_objs2, [fv]))
419    assert resp[0].id == real_stix_objs2[1].id
420    assert len(resp) == 2
421
422
423def test_filters4():
424    # Assert invalid Filter cannot be created
425    with pytest.raises(ValueError) as excinfo:
426        Filter("modified", "?", "2017-01-27T13:49:53.935Z")
427    assert str(excinfo.value) == (
428        "Filter operator '?' not supported "
429        "for specified property: 'modified'"
430    )
431
432
433def test_filters5(stix_objs2, real_stix_objs2):
434    # "Return any object whose id is not indicator--00000000-0000-4000-8000-000000000002"
435    resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")]))
436    assert resp[0]['id'] == stix_objs2[0]['id']
437    assert len(resp) == 1
438
439    resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")]))
440    assert resp[0].id == real_stix_objs2[0].id
441    assert len(resp) == 1
442
443
444def test_filters6(stix_objs2, real_stix_objs2):
445    # Test filtering on non-common property
446    resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")]))
447    assert resp[0]['id'] == stix_objs2[0]['id']
448    assert len(resp) == 3
449
450    resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")]))
451    assert resp[0].id == real_stix_objs2[0].id
452    assert len(resp) == 3
453
454
455def test_filters7(stix_objs2, real_stix_objs2):
456    # Test filtering on embedded property
457    obsvd_data_obj = {
458        "type": "observed-data",
459        "spec_version": "2.1",
460        "id": OBSERVED_DATA_ID,
461        "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff",
462        "created": "2016-04-06T19:58:16.000Z",
463        "modified": "2016-04-06T19:58:16.000Z",
464        "first_observed": "2015-12-21T19:00:00Z",
465        "last_observed": "2015-12-21T19:00:00Z",
466        "number_observed": 50,
467        "objects": {
468            "0": {
469                "type": "file",
470                "hashes": {
471                    "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f",
472                },
473                "extensions": {
474                    "pdf-ext": {
475                        "version": "1.7",
476                        "document_info_dict": {
477                            "Title": "Sample document",
478                            "Author": "Adobe Systems Incorporated",
479                            "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh",
480                            "Producer": "Acrobat Distiller 3.01 for Power Macintosh",
481                            "CreationDate": "20070412090123-02",
482                        },
483                        "pdfid0": "DFCE52BD827ECF765649852119D",
484                        "pdfid1": "57A1E0F9ED2AE523E313C",
485                    },
486                },
487            },
488        },
489    }
490
491    stix_objects = list(stix_objs2) + [obsvd_data_obj]
492    real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)]
493
494    resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
495    assert resp[0]['id'] == stix_objects[3]['id']
496    assert len(resp) == 1
497
498    resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")]))
499    assert resp[0].id == real_stix_objects[3].id
500    assert len(resp) == 1
501