1import pytest 2 3from stix2 import parse 4from stix2.datastore.filters import Filter, apply_common_filters 5from stix2.utils import STIXdatetime, parse_into_datetime 6 7from .constants import OBSERVED_DATA_ID 8 9stix_objs = [ 10 { 11 "created": "2017-01-27T13:49:53.997Z", 12 "description": "\n\nTITLE:\n\tPoison Ivy", 13 "id": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111", 14 "spec_version": "2.1", 15 "malware_types": [ 16 "remote-access-trojan", 17 ], 18 "modified": "2017-01-27T13:49:53.997Z", 19 "is_family": False, 20 "name": "Poison Ivy", 21 "type": "malware", 22 }, 23 { 24 "created": "2014-05-08T09:00:00.000Z", 25 "id": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade", 26 "indicator_types": [ 27 "file-hash-watchlist", 28 ], 29 "modified": "2014-05-08T09:00:00.000Z", 30 "name": "File hash for Poison Ivy variant", 31 "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", 32 "pattern_type": "stix", 33 "spec_version": "2.1", 34 "type": "indicator", 35 "valid_from": "2014-05-08T09:00:00.000000Z", 36 }, 37 { 38 "created": "2014-05-08T09:00:00.000Z", 39 "granular_markings": [ 40 { 41 "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", 42 "selectors": [ 43 "relationship_type", 44 ], 45 }, 46 ], 47 "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", 48 "modified": "2014-05-08T09:00:00.000Z", 49 "object_marking_refs": [ 50 "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", 51 ], 52 "relationship_type": "indicates", 53 "revoked": True, 54 "source_ref": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade", 55 "spec_version": "2.1", 56 "target_ref": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111", 57 "type": "relationship", 58 }, 59 { 60 "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", 61 "spec_version": "2.1", 62 "created": "2016-02-14T00:00:00.000Z", 63 "created_by_ref": "identity--f1350682-3290-4e0d-be58-69e290537647", 64 "modified": "2016-02-14T00:00:00.000Z", 65 "type": "vulnerability", 66 "name": "CVE-2014-0160", 67 "description": "The (1) TLS...", 68 "external_references": [ 69 { 70 "source_name": "cve", 71 "external_id": "CVE-2014-0160", 72 }, 73 ], 74 "labels": ["heartbleed", "has-logo"], 75 }, 76 { 77 "type": "observed-data", 78 "spec_version": "2.1", 79 "id": OBSERVED_DATA_ID, 80 "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", 81 "created": "2016-04-06T19:58:16.000Z", 82 "modified": "2016-04-06T19:58:16.000Z", 83 "first_observed": "2015-12-21T19:00:00Z", 84 "last_observed": "2015-12-21T19:00:00Z", 85 "number_observed": 1, 86 "objects": { 87 "0": { 88 "type": "file", 89 "spec_version": "2.1", 90 "id": "file--42a7175a-42cc-508f-8fa7-23b330aff876", 91 "name": "HAL 9000.exe", 92 "defanged": False, 93 }, 94 }, 95 96 }, 97] 98 99 100filters = [ 101 Filter("type", "!=", "relationship"), 102 Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), 103 Filter("malware_types", "in", "remote-access-trojan"), 104 Filter("created", ">", "2015-01-01T01:00:00.000Z"), 105 Filter("revoked", "=", True), 106 Filter("revoked", "!=", True), 107 Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), 108 Filter("granular_markings.selectors", "in", "relationship_type"), 109 Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), 110 Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), 111 Filter("created_by_ref", "=", "identity--f1350682-3290-4e0d-be58-69e290537647"), 112 Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9"), 113 Filter("granular_markings.selectors", "in", "description"), 114 Filter("external_references.source_name", "=", "CVE"), 115 Filter( 116 "objects", "=", 117 {"0": {"type": "file", "id": "file--42a7175a-42cc-508f-8fa7-23b330aff876", "name": "HAL 9000.exe", "spec_version": "2.1", "defanged": False}}, 118 ), 119 Filter( 120 "objects", "contains", 121 {"type": "file", "id": "file--42a7175a-42cc-508f-8fa7-23b330aff876", "name": "HAL 9000.exe", "spec_version": "2.1", "defanged": False}, 122 ), 123 Filter("labels", "contains", "heartbleed"), 124] 125 126# same as above objects but converted to real Python STIX2 objects 127# to test filters against true Python STIX2 objects 128real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs] 129 130 131def test_filter_ops_check(): 132 # invalid filters - non supported operators 133 134 with pytest.raises(ValueError) as excinfo: 135 # create Filter that has an operator that is not allowed 136 Filter('modified', '*', 'not supported operator') 137 assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'" 138 139 with pytest.raises(ValueError) as excinfo: 140 Filter("type", "%", "4") 141 assert "Filter operator '%' not supported for specified property" in str(excinfo.value) 142 143 144def test_filter_value_type_check(): 145 # invalid filters - non supported value types 146 147 with pytest.raises(TypeError) as excinfo: 148 Filter('created', '=', object()) 149 # On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`. 150 assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]]) 151 assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) 152 153 with pytest.raises(TypeError) as excinfo: 154 Filter("type", "=", complex(2, -1)) 155 assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]]) 156 assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) 157 158 with pytest.raises(TypeError) as excinfo: 159 Filter("type", "=", set([16, 23])) 160 assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]]) 161 assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) 162 163 164def test_filter_type_underscore_check(): 165 # check that Filters where property="type", value (name) doesnt have underscores 166 with pytest.raises(ValueError) as excinfo: 167 Filter("type", "=", "oh_underscore") 168 assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value) 169 170 171def test_apply_common_filters0(): 172 # "Return any object whose type is not relationship" 173 resp = list(apply_common_filters(stix_objs, [filters[0]])) 174 ids = [r['id'] for r in resp] 175 assert stix_objs[0]['id'] in ids 176 assert stix_objs[1]['id'] in ids 177 assert stix_objs[3]['id'] in ids 178 assert len(ids) == 4 179 180 resp = list(apply_common_filters(real_stix_objs, [filters[0]])) 181 ids = [r.id for r in resp] 182 assert real_stix_objs[0].id in ids 183 assert real_stix_objs[1].id in ids 184 assert real_stix_objs[3].id in ids 185 assert len(ids) == 4 186 187 188def test_apply_common_filters1(): 189 # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" 190 resp = list(apply_common_filters(stix_objs, [filters[1]])) 191 assert resp[0]['id'] == stix_objs[2]['id'] 192 assert len(resp) == 1 193 194 resp = list(apply_common_filters(real_stix_objs, [filters[1]])) 195 assert resp[0].id == real_stix_objs[2].id 196 assert len(resp) == 1 197 198 199def test_apply_common_filters2(): 200 # "Return any object that contains remote-access-trojan in labels" 201 resp = list(apply_common_filters(stix_objs, [filters[2]])) 202 assert resp[0]['id'] == stix_objs[0]['id'] 203 assert len(resp) == 1 204 205 resp = list(apply_common_filters(real_stix_objs, [filters[2]])) 206 assert resp[0].id == real_stix_objs[0].id 207 assert len(resp) == 1 208 209 210def test_apply_common_filters3(): 211 # "Return any object created after 2015-01-01T01:00:00.000Z" 212 resp = list(apply_common_filters(stix_objs, [filters[3]])) 213 assert resp[0]['id'] == stix_objs[0]['id'] 214 assert len(resp) == 3 215 216 resp = list(apply_common_filters(real_stix_objs, [filters[3]])) 217 assert len(resp) == 3 218 assert resp[0].id == real_stix_objs[0].id 219 220 221def test_apply_common_filters4(): 222 # "Return any revoked object" 223 resp = list(apply_common_filters(stix_objs, [filters[4]])) 224 assert resp[0]['id'] == stix_objs[2]['id'] 225 assert len(resp) == 1 226 227 resp = list(apply_common_filters(real_stix_objs, [filters[4]])) 228 assert resp[0].id == real_stix_objs[2].id 229 assert len(resp) == 1 230 231 232def test_apply_common_filters5(): 233 # "Return any object whose not revoked" 234 resp = list(apply_common_filters(stix_objs, [filters[5]])) 235 assert len(resp) == 0 236 237 resp = list(apply_common_filters(real_stix_objs, [filters[5]])) 238 assert len(resp) == 4 239 240 241def test_apply_common_filters6(): 242 # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" 243 resp = list(apply_common_filters(stix_objs, [filters[6]])) 244 assert resp[0]['id'] == stix_objs[2]['id'] 245 assert len(resp) == 1 246 247 resp = list(apply_common_filters(real_stix_objs, [filters[6]])) 248 assert resp[0].id == real_stix_objs[2].id 249 assert len(resp) == 1 250 251 252def test_apply_common_filters7(): 253 # "Return any object that contains relationship_type in their selectors AND 254 # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" 255 resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]])) 256 assert resp[0]['id'] == stix_objs[2]['id'] 257 assert len(resp) == 1 258 259 resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]])) 260 assert resp[0].id == real_stix_objs[2].id 261 assert len(resp) == 1 262 263 264def test_apply_common_filters8(): 265 # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" 266 resp = list(apply_common_filters(stix_objs, [filters[9]])) 267 assert resp[0]['id'] == stix_objs[3]['id'] 268 assert len(resp) == 1 269 270 resp = list(apply_common_filters(real_stix_objs, [filters[9]])) 271 assert resp[0].id == real_stix_objs[3].id 272 assert len(resp) == 1 273 274 275def test_apply_common_filters9(): 276 # "Return any object that matches created_by_ref identity--00000000-0000-0000-0000-b8e91df99dc9" 277 resp = list(apply_common_filters(stix_objs, [filters[10]])) 278 assert len(resp) == 1 279 280 resp = list(apply_common_filters(real_stix_objs, [filters[10]])) 281 assert len(resp) == 1 282 283 284def test_apply_common_filters10(): 285 # "Return any object that matches marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9 in object_marking_refs" (None) 286 resp = list(apply_common_filters(stix_objs, [filters[11]])) 287 assert len(resp) == 0 288 289 resp = list(apply_common_filters(real_stix_objs, [filters[11]])) 290 assert len(resp) == 0 291 292 293def test_apply_common_filters11(): 294 # "Return any object that contains description in its selectors" (None) 295 resp = list(apply_common_filters(stix_objs, [filters[12]])) 296 assert len(resp) == 0 297 298 resp = list(apply_common_filters(real_stix_objs, [filters[12]])) 299 assert len(resp) == 0 300 301 302def test_apply_common_filters12(): 303 # "Return any object that matches CVE in source_name" (None, case sensitive) 304 resp = list(apply_common_filters(stix_objs, [filters[13]])) 305 assert len(resp) == 0 306 307 resp = list(apply_common_filters(real_stix_objs, [filters[13]])) 308 assert len(resp) == 0 309 310 311def test_apply_common_filters13(): 312 # Return any object that matches file object in "objects" 313 resp = list(apply_common_filters(stix_objs, [filters[14]])) 314 assert resp[0]["id"] == stix_objs[4]["id"] 315 assert len(resp) == 1 316 # important additional check to make sure original File dict was 317 # not converted to File object. (this was a deep bug found) 318 assert isinstance(resp[0]["objects"]["0"], dict) 319 320 resp = list(apply_common_filters(real_stix_objs, [filters[14]])) 321 assert resp[0].id == real_stix_objs[4].id 322 assert len(resp) == 1 323 324 325def test_apply_common_filters14(): 326 # Return any object that contains a specific File Cyber Observable Object 327 resp = list(apply_common_filters(stix_objs, [filters[15]])) 328 assert resp[0]['id'] == stix_objs[4]['id'] 329 assert len(resp) == 1 330 331 resp = list(apply_common_filters(real_stix_objs, [filters[15]])) 332 assert resp[0].id == real_stix_objs[4].id 333 assert len(resp) == 1 334 335 336def test_apply_common_filters15(): 337 # Return any object that contains 'heartbleed' in "labels" 338 resp = list(apply_common_filters(stix_objs, [filters[16]])) 339 assert resp[0]['id'] == stix_objs[3]['id'] 340 assert len(resp) == 1 341 342 resp = list(apply_common_filters(real_stix_objs, [filters[16]])) 343 assert resp[0].id == real_stix_objs[3].id 344 assert len(resp) == 1 345 346 347def test_datetime_filter_behavior(): 348 """if a filter is initialized with its value being a datetime object 349 OR the STIX object property being filtered on is a datetime object, all 350 resulting comparisons executed are done on the string representations 351 of the datetime objects, as the Filter functionality will convert 352 all datetime objects to there string forms using format_datetim() 353 354 This test makes sure all datetime comparisons are carried out correctly 355 """ 356 filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond")) 357 filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z") 358 359 # compare datetime obj to filter w/ datetime obj 360 resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj])) 361 assert len(resp) == 1 362 assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" 363 assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered 364 365 # compare datetime string to filter w/ str 366 resp = list(apply_common_filters(stix_objs, [filter_with_str])) 367 assert len(resp) == 1 368 assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" 369 370 # compare datetime obj to filter w/ str 371 resp = list(apply_common_filters(real_stix_objs, [filter_with_str])) 372 assert len(resp) == 1 373 assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" 374 assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered 375 376 377def test_filters0(stix_objs2, real_stix_objs2): 378 # "Return any object modified before 2017-01-28T13:49:53.935Z" 379 resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])) 380 assert resp[0]['id'] == stix_objs2[1]['id'] 381 assert len(resp) == 2 382 383 resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) 384 assert resp[0].id == real_stix_objs2[1].id 385 assert len(resp) == 2 386 387 388def test_filters1(stix_objs2, real_stix_objs2): 389 # "Return any object modified after 2017-01-28T13:49:53.935Z" 390 resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])) 391 assert resp[0]['id'] == stix_objs2[0]['id'] 392 assert len(resp) == 1 393 394 resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) 395 assert resp[0].id == real_stix_objs2[0].id 396 assert len(resp) == 1 397 398 399def test_filters2(stix_objs2, real_stix_objs2): 400 # "Return any object modified after or on 2017-01-28T13:49:53.935Z" 401 resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])) 402 assert resp[0]['id'] == stix_objs2[0]['id'] 403 assert len(resp) == 3 404 405 resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))])) 406 assert resp[0].id == real_stix_objs2[0].id 407 assert len(resp) == 3 408 409 410def test_filters3(stix_objs2, real_stix_objs2): 411 # "Return any object modified before or on 2017-01-28T13:49:53.935Z" 412 resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])) 413 assert resp[0]['id'] == stix_objs2[1]['id'] 414 assert len(resp) == 2 415 416 # "Return any object modified before or on 2017-01-28T13:49:53.935Z" 417 fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z")) 418 resp = list(apply_common_filters(real_stix_objs2, [fv])) 419 assert resp[0].id == real_stix_objs2[1].id 420 assert len(resp) == 2 421 422 423def test_filters4(): 424 # Assert invalid Filter cannot be created 425 with pytest.raises(ValueError) as excinfo: 426 Filter("modified", "?", "2017-01-27T13:49:53.935Z") 427 assert str(excinfo.value) == ( 428 "Filter operator '?' not supported " 429 "for specified property: 'modified'" 430 ) 431 432 433def test_filters5(stix_objs2, real_stix_objs2): 434 # "Return any object whose id is not indicator--00000000-0000-4000-8000-000000000002" 435 resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")])) 436 assert resp[0]['id'] == stix_objs2[0]['id'] 437 assert len(resp) == 1 438 439 resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")])) 440 assert resp[0].id == real_stix_objs2[0].id 441 assert len(resp) == 1 442 443 444def test_filters6(stix_objs2, real_stix_objs2): 445 # Test filtering on non-common property 446 resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) 447 assert resp[0]['id'] == stix_objs2[0]['id'] 448 assert len(resp) == 3 449 450 resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) 451 assert resp[0].id == real_stix_objs2[0].id 452 assert len(resp) == 3 453 454 455def test_filters7(stix_objs2, real_stix_objs2): 456 # Test filtering on embedded property 457 obsvd_data_obj = { 458 "type": "observed-data", 459 "spec_version": "2.1", 460 "id": OBSERVED_DATA_ID, 461 "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", 462 "created": "2016-04-06T19:58:16.000Z", 463 "modified": "2016-04-06T19:58:16.000Z", 464 "first_observed": "2015-12-21T19:00:00Z", 465 "last_observed": "2015-12-21T19:00:00Z", 466 "number_observed": 50, 467 "objects": { 468 "0": { 469 "type": "file", 470 "hashes": { 471 "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f", 472 }, 473 "extensions": { 474 "pdf-ext": { 475 "version": "1.7", 476 "document_info_dict": { 477 "Title": "Sample document", 478 "Author": "Adobe Systems Incorporated", 479 "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", 480 "Producer": "Acrobat Distiller 3.01 for Power Macintosh", 481 "CreationDate": "20070412090123-02", 482 }, 483 "pdfid0": "DFCE52BD827ECF765649852119D", 484 "pdfid1": "57A1E0F9ED2AE523E313C", 485 }, 486 }, 487 }, 488 }, 489 } 490 491 stix_objects = list(stix_objs2) + [obsvd_data_obj] 492 real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)] 493 494 resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) 495 assert resp[0]['id'] == stix_objects[3]['id'] 496 assert len(resp) == 1 497 498 resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) 499 assert resp[0].id == real_stix_objects[3].id 500 assert len(resp) == 1 501