1import pytest 2 3from stix2 import parse 4from stix2.datastore.filters import Filter, apply_common_filters 5from stix2.utils import STIXdatetime, parse_into_datetime 6 7stix_objs = [ 8 { 9 "created": "2017-01-27T13:49:53.997Z", 10 "description": "\n\nTITLE:\n\tPoison Ivy", 11 "id": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111", 12 "labels": [ 13 "remote-access-trojan", 14 ], 15 "modified": "2017-01-27T13:49:53.997Z", 16 "name": "Poison Ivy", 17 "type": "malware", 18 }, 19 { 20 "created": "2014-05-08T09:00:00.000Z", 21 "id": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade", 22 "labels": [ 23 "file-hash-watchlist", 24 ], 25 "modified": "2014-05-08T09:00:00.000Z", 26 "name": "File hash for Poison Ivy variant", 27 "pattern": "[file:hashes.'SHA-256' = 'ef537f25c895bfa782526529a9b63d97aa631564d5d789c2b765448c8635fb6c']", 28 "type": "indicator", 29 "valid_from": "2014-05-08T09:00:00.000000Z", 30 }, 31 { 32 "created": "2014-05-08T09:00:00.000Z", 33 "granular_markings": [ 34 { 35 "marking_ref": "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed", 36 "selectors": [ 37 "relationship_type", 38 ], 39 }, 40 ], 41 "id": "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463", 42 "modified": "2014-05-08T09:00:00.000Z", 43 "object_marking_refs": [ 44 "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9", 45 ], 46 "relationship_type": "indicates", 47 "revoked": True, 48 "source_ref": "indicator--a932fcc6-e032-476c-826f-cb970a5a1ade", 49 "target_ref": "malware--fdd60b30-b67c-41e3-b0b9-f01faf20d111", 50 "type": "relationship", 51 }, 52 { 53 "id": "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef", 54 "created": "2016-02-14T00:00:00.000Z", 55 "created_by_ref": "identity--f1350682-3290-4e0d-be58-69e290537647", 56 "modified": "2016-02-14T00:00:00.000Z", 57 "type": "vulnerability", 58 "name": "CVE-2014-0160", 59 "description": "The (1) TLS...", 60 "external_references": [ 61 { 62 "source_name": "cve", 63 "external_id": "CVE-2014-0160", 64 }, 65 ], 66 "labels": ["heartbleed", "has-logo"], 67 }, 68 { 69 "type": "observed-data", 70 "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", 71 "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", 72 "created": "2016-04-06T19:58:16.000Z", 73 "modified": "2016-04-06T19:58:16.000Z", 74 "first_observed": "2015-12-21T19:00:00Z", 75 "last_observed": "2015-12-21T19:00:00Z", 76 "number_observed": 1, 77 "objects": { 78 "0": { 79 "type": "file", 80 "name": "HAL 9000.exe", 81 }, 82 }, 83 84 }, 85] 86 87 88filters = [ 89 Filter("type", "!=", "relationship"), 90 Filter("id", "=", "relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463"), 91 Filter("labels", "in", "remote-access-trojan"), 92 Filter("created", ">", "2015-01-01T01:00:00.000Z"), 93 Filter("revoked", "=", True), 94 Filter("revoked", "!=", True), 95 Filter("object_marking_refs", "=", "marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9"), 96 Filter("granular_markings.selectors", "in", "relationship_type"), 97 Filter("granular_markings.marking_ref", "=", "marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed"), 98 Filter("external_references.external_id", "in", "CVE-2014-0160,CVE-2017-6608"), 99 Filter("created_by_ref", "=", "identity--f1350682-3290-4e0d-be58-69e290537647"), 100 Filter("object_marking_refs", "=", "marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9"), 101 Filter("granular_markings.selectors", "in", "description"), 102 Filter("external_references.source_name", "=", "CVE"), 103 Filter("objects", "=", {"0": {"type": "file", "name": "HAL 9000.exe"}}), 104 Filter("objects", "contains", {"type": "file", "name": "HAL 9000.exe"}), 105 Filter("labels", "contains", "heartbleed"), 106] 107 108# same as above objects but converted to real Python STIX2 objects 109# to test filters against true Python STIX2 objects 110real_stix_objs = [parse(stix_obj) for stix_obj in stix_objs] 111 112 113def test_filter_ops_check(): 114 # invalid filters - non supported operators 115 116 with pytest.raises(ValueError) as excinfo: 117 # create Filter that has an operator that is not allowed 118 Filter('modified', '*', 'not supported operator') 119 assert str(excinfo.value) == "Filter operator '*' not supported for specified property: 'modified'" 120 121 with pytest.raises(ValueError) as excinfo: 122 Filter("type", "%", "4") 123 assert "Filter operator '%' not supported for specified property" in str(excinfo.value) 124 125 126def test_filter_value_type_check(): 127 # invalid filters - non supported value types 128 129 with pytest.raises(TypeError) as excinfo: 130 Filter('created', '=', object()) 131 # On Python 2, the type of object() is `<type 'object'>` On Python 3, it's `<class 'object'>`. 132 assert any([s in str(excinfo.value) for s in ["<type 'object'>", "'<class 'object'>'"]]) 133 assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) 134 135 with pytest.raises(TypeError) as excinfo: 136 Filter("type", "=", complex(2, -1)) 137 assert any([s in str(excinfo.value) for s in ["<type 'complex'>", "'<class 'complex'>'"]]) 138 assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) 139 140 with pytest.raises(TypeError) as excinfo: 141 Filter("type", "=", set([16, 23])) 142 assert any([s in str(excinfo.value) for s in ["<type 'set'>", "'<class 'set'>'"]]) 143 assert "is not supported. The type must be a Python immutable type or dictionary" in str(excinfo.value) 144 145 146def test_filter_type_underscore_check(): 147 # check that Filters where property="type", value (name) doesnt have underscores 148 with pytest.raises(ValueError) as excinfo: 149 Filter("type", "=", "oh_underscore") 150 assert "Filter for property 'type' cannot have its value 'oh_underscore'" in str(excinfo.value) 151 152 153def test_apply_common_filters0(): 154 # "Return any object whose type is not relationship" 155 resp = list(apply_common_filters(stix_objs, [filters[0]])) 156 ids = [r['id'] for r in resp] 157 assert stix_objs[0]['id'] in ids 158 assert stix_objs[1]['id'] in ids 159 assert stix_objs[3]['id'] in ids 160 assert len(ids) == 4 161 162 resp = list(apply_common_filters(real_stix_objs, [filters[0]])) 163 ids = [r.id for r in resp] 164 assert real_stix_objs[0].id in ids 165 assert real_stix_objs[1].id in ids 166 assert real_stix_objs[3].id in ids 167 assert len(ids) == 4 168 169 170def test_apply_common_filters1(): 171 # "Return any object that matched id relationship--2f9a9aa9-108a-4333-83e2-4fb25add0463" 172 resp = list(apply_common_filters(stix_objs, [filters[1]])) 173 assert resp[0]['id'] == stix_objs[2]['id'] 174 assert len(resp) == 1 175 176 resp = list(apply_common_filters(real_stix_objs, [filters[1]])) 177 assert resp[0].id == real_stix_objs[2].id 178 assert len(resp) == 1 179 180 181def test_apply_common_filters2(): 182 # "Return any object that contains remote-access-trojan in labels" 183 resp = list(apply_common_filters(stix_objs, [filters[2]])) 184 assert resp[0]['id'] == stix_objs[0]['id'] 185 assert len(resp) == 1 186 187 resp = list(apply_common_filters(real_stix_objs, [filters[2]])) 188 assert resp[0].id == real_stix_objs[0].id 189 assert len(resp) == 1 190 191 192def test_apply_common_filters3(): 193 # "Return any object created after 2015-01-01T01:00:00.000Z" 194 resp = list(apply_common_filters(stix_objs, [filters[3]])) 195 assert resp[0]['id'] == stix_objs[0]['id'] 196 assert len(resp) == 3 197 198 resp = list(apply_common_filters(real_stix_objs, [filters[3]])) 199 assert len(resp) == 3 200 assert resp[0].id == real_stix_objs[0].id 201 202 203def test_apply_common_filters4(): 204 # "Return any revoked object" 205 resp = list(apply_common_filters(stix_objs, [filters[4]])) 206 assert resp[0]['id'] == stix_objs[2]['id'] 207 assert len(resp) == 1 208 209 resp = list(apply_common_filters(real_stix_objs, [filters[4]])) 210 assert resp[0].id == real_stix_objs[2].id 211 assert len(resp) == 1 212 213 214def test_apply_common_filters5(): 215 # "Return any object whose not revoked" 216 resp = list(apply_common_filters(stix_objs, [filters[5]])) 217 assert len(resp) == 0 218 219 resp = list(apply_common_filters(real_stix_objs, [filters[5]])) 220 assert len(resp) == 4 221 222 223def test_apply_common_filters6(): 224 # "Return any object that matches marking-definition--613f2e26-407d-48c7-9eca-b8e91df99dc9 in object_marking_refs" 225 resp = list(apply_common_filters(stix_objs, [filters[6]])) 226 assert resp[0]['id'] == stix_objs[2]['id'] 227 assert len(resp) == 1 228 229 resp = list(apply_common_filters(real_stix_objs, [filters[6]])) 230 assert resp[0].id == real_stix_objs[2].id 231 assert len(resp) == 1 232 233 234def test_apply_common_filters7(): 235 # "Return any object that contains relationship_type in their selectors AND 236 # also has marking-definition--5e57c739-391a-4eb3-b6be-7d15ca92d5ed in marking_ref" 237 resp = list(apply_common_filters(stix_objs, [filters[7], filters[8]])) 238 assert resp[0]['id'] == stix_objs[2]['id'] 239 assert len(resp) == 1 240 241 resp = list(apply_common_filters(real_stix_objs, [filters[7], filters[8]])) 242 assert resp[0].id == real_stix_objs[2].id 243 assert len(resp) == 1 244 245 246def test_apply_common_filters8(): 247 # "Return any object that contains CVE-2014-0160,CVE-2017-6608 in their external_id" 248 resp = list(apply_common_filters(stix_objs, [filters[9]])) 249 assert resp[0]['id'] == stix_objs[3]['id'] 250 assert len(resp) == 1 251 252 resp = list(apply_common_filters(real_stix_objs, [filters[9]])) 253 assert resp[0].id == real_stix_objs[3].id 254 assert len(resp) == 1 255 256 257def test_apply_common_filters9(): 258 # "Return any object that matches created_by_ref identity--f1350682-3290-4e0d-be58-69e290537647" 259 resp = list(apply_common_filters(stix_objs, [filters[10]])) 260 assert len(resp) == 1 261 262 resp = list(apply_common_filters(real_stix_objs, [filters[10]])) 263 assert len(resp) == 1 264 265 266def test_apply_common_filters10(): 267 # "Return any object that matches marking-definition--613f2e26-0000-4000-8000-b8e91df99dc9 in object_marking_refs" (None) 268 resp = list(apply_common_filters(stix_objs, [filters[11]])) 269 assert len(resp) == 0 270 271 resp = list(apply_common_filters(real_stix_objs, [filters[11]])) 272 assert len(resp) == 0 273 274 275def test_apply_common_filters11(): 276 # "Return any object that contains description in its selectors" (None) 277 resp = list(apply_common_filters(stix_objs, [filters[12]])) 278 assert len(resp) == 0 279 280 resp = list(apply_common_filters(real_stix_objs, [filters[12]])) 281 assert len(resp) == 0 282 283 284def test_apply_common_filters12(): 285 # "Return any object that matches CVE in source_name" (None, case sensitive) 286 resp = list(apply_common_filters(stix_objs, [filters[13]])) 287 assert len(resp) == 0 288 289 resp = list(apply_common_filters(real_stix_objs, [filters[13]])) 290 assert len(resp) == 0 291 292 293def test_apply_common_filters13(): 294 # Return any object that matches file object in "objects" 295 resp = list(apply_common_filters(stix_objs, [filters[14]])) 296 assert resp[0]["id"] == stix_objs[4]["id"] 297 assert len(resp) == 1 298 # important additional check to make sure original File dict was 299 # not converted to File object. (this was a deep bug found) 300 assert isinstance(resp[0]["objects"]["0"], dict) 301 302 resp = list(apply_common_filters(real_stix_objs, [filters[14]])) 303 assert resp[0].id == real_stix_objs[4].id 304 assert len(resp) == 1 305 306 307def test_apply_common_filters14(): 308 # Return any object that contains a specific File Cyber Observable Object 309 resp = list(apply_common_filters(stix_objs, [filters[15]])) 310 assert resp[0]['id'] == stix_objs[4]['id'] 311 assert len(resp) == 1 312 313 resp = list(apply_common_filters(real_stix_objs, [filters[15]])) 314 assert resp[0].id == real_stix_objs[4].id 315 assert len(resp) == 1 316 317 318def test_apply_common_filters15(): 319 # Return any object that contains 'heartbleed' in "labels" 320 resp = list(apply_common_filters(stix_objs, [filters[16]])) 321 assert resp[0]['id'] == stix_objs[3]['id'] 322 assert len(resp) == 1 323 324 resp = list(apply_common_filters(real_stix_objs, [filters[16]])) 325 assert resp[0].id == real_stix_objs[3].id 326 assert len(resp) == 1 327 328 329def test_datetime_filter_behavior(): 330 """if a filter is initialized with its value being a datetime object 331 OR the STIX object property being filtered on is a datetime object, all 332 resulting comparisons executed are done on the string representations 333 of the datetime objects, as the Filter functionality will convert 334 all datetime objects to there string forms using format_datetim() 335 336 This test makes sure all datetime comparisons are carried out correctly 337 """ 338 filter_with_dt_obj = Filter("created", "=", parse_into_datetime("2016-02-14T00:00:00.000Z", "millisecond")) 339 filter_with_str = Filter("created", "=", "2016-02-14T00:00:00.000Z") 340 341 # compare datetime obj to filter w/ datetime obj 342 resp = list(apply_common_filters(real_stix_objs, [filter_with_dt_obj])) 343 assert len(resp) == 1 344 assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" 345 assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered 346 347 # compare datetime string to filter w/ str 348 resp = list(apply_common_filters(stix_objs, [filter_with_str])) 349 assert len(resp) == 1 350 assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" 351 352 # compare datetime obj to filter w/ str 353 resp = list(apply_common_filters(real_stix_objs, [filter_with_str])) 354 assert len(resp) == 1 355 assert resp[0]["id"] == "vulnerability--ee916c28-c7a4-4d0d-ad56-a8d357f89fef" 356 assert isinstance(resp[0].created, STIXdatetime) # make sure original object not altered 357 358 359def test_filters0(stix_objs2, real_stix_objs2): 360 # "Return any object modified before 2017-01-28T13:49:53.935Z" 361 resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<", "2017-01-28T13:49:53.935Z")])) 362 assert resp[0]['id'] == stix_objs2[1]['id'] 363 assert len(resp) == 2 364 365 resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", "<", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) 366 assert resp[0].id == real_stix_objs2[1].id 367 assert len(resp) == 2 368 369 370def test_filters1(stix_objs2, real_stix_objs2): 371 # "Return any object modified after 2017-01-28T13:49:53.935Z" 372 resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">", "2017-01-28T13:49:53.935Z")])) 373 assert resp[0]['id'] == stix_objs2[0]['id'] 374 assert len(resp) == 1 375 376 resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">", parse_into_datetime("2017-01-28T13:49:53.935Z"))])) 377 assert resp[0].id == real_stix_objs2[0].id 378 assert len(resp) == 1 379 380 381def test_filters2(stix_objs2, real_stix_objs2): 382 # "Return any object modified after or on 2017-01-28T13:49:53.935Z" 383 resp = list(apply_common_filters(stix_objs2, [Filter("modified", ">=", "2017-01-27T13:49:53.935Z")])) 384 assert resp[0]['id'] == stix_objs2[0]['id'] 385 assert len(resp) == 3 386 387 resp = list(apply_common_filters(real_stix_objs2, [Filter("modified", ">=", parse_into_datetime("2017-01-27T13:49:53.935Z"))])) 388 assert resp[0].id == real_stix_objs2[0].id 389 assert len(resp) == 3 390 391 392def test_filters3(stix_objs2, real_stix_objs2): 393 # "Return any object modified before or on 2017-01-28T13:49:53.935Z" 394 resp = list(apply_common_filters(stix_objs2, [Filter("modified", "<=", "2017-01-27T13:49:53.935Z")])) 395 assert resp[0]['id'] == stix_objs2[1]['id'] 396 assert len(resp) == 2 397 398 # "Return any object modified before or on 2017-01-28T13:49:53.935Z" 399 fv = Filter("modified", "<=", parse_into_datetime("2017-01-27T13:49:53.935Z")) 400 resp = list(apply_common_filters(real_stix_objs2, [fv])) 401 assert resp[0].id == real_stix_objs2[1].id 402 assert len(resp) == 2 403 404 405def test_filters4(): 406 # Assert invalid Filter cannot be created 407 with pytest.raises(ValueError) as excinfo: 408 Filter("modified", "?", "2017-01-27T13:49:53.935Z") 409 assert str(excinfo.value) == ( 410 "Filter operator '?' not supported " 411 "for specified property: 'modified'" 412 ) 413 414 415def test_filters5(stix_objs2, real_stix_objs2): 416 # "Return any object whose id is not indicator--00000000-0000-4000-8000-000000000002" 417 resp = list(apply_common_filters(stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")])) 418 assert resp[0]['id'] == stix_objs2[0]['id'] 419 assert len(resp) == 1 420 421 resp = list(apply_common_filters(real_stix_objs2, [Filter("id", "!=", "indicator--00000000-0000-4000-8000-000000000002")])) 422 assert resp[0].id == real_stix_objs2[0].id 423 assert len(resp) == 1 424 425 426def test_filters6(stix_objs2, real_stix_objs2): 427 # Test filtering on non-common property 428 resp = list(apply_common_filters(stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) 429 assert resp[0]['id'] == stix_objs2[0]['id'] 430 assert len(resp) == 3 431 432 resp = list(apply_common_filters(real_stix_objs2, [Filter("name", "=", "Malicious site hosting downloader")])) 433 assert resp[0].id == real_stix_objs2[0].id 434 assert len(resp) == 3 435 436 437def test_filters7(stix_objs2, real_stix_objs2): 438 # Test filtering on embedded property 439 obsvd_data_obj = { 440 "type": "observed-data", 441 "id": "observed-data--b67d30ff-02ac-498a-92f9-32f845f448cf", 442 "created_by_ref": "identity--f431f809-377b-45e0-aa1c-6a4751cae5ff", 443 "created": "2016-04-06T19:58:16.000Z", 444 "modified": "2016-04-06T19:58:16.000Z", 445 "first_observed": "2015-12-21T19:00:00Z", 446 "last_observed": "2015-12-21T19:00:00Z", 447 "number_observed": 50, 448 "objects": { 449 "0": { 450 "type": "file", 451 "hashes": { 452 "SHA-256": "35a01331e9ad96f751278b891b6ea09699806faedfa237d40513d92ad1b7100f", 453 }, 454 "extensions": { 455 "pdf-ext": { 456 "version": "1.7", 457 "document_info_dict": { 458 "Title": "Sample document", 459 "Author": "Adobe Systems Incorporated", 460 "Creator": "Adobe FrameMaker 5.5.3 for Power Macintosh", 461 "Producer": "Acrobat Distiller 3.01 for Power Macintosh", 462 "CreationDate": "20070412090123-02", 463 }, 464 "pdfid0": "DFCE52BD827ECF765649852119D", 465 "pdfid1": "57A1E0F9ED2AE523E313C", 466 }, 467 }, 468 }, 469 }, 470 } 471 472 stix_objects = list(stix_objs2) + [obsvd_data_obj] 473 real_stix_objects = list(real_stix_objs2) + [parse(obsvd_data_obj)] 474 475 resp = list(apply_common_filters(stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) 476 assert resp[0]['id'] == stix_objects[3]['id'] 477 assert len(resp) == 1 478 479 resp = list(apply_common_filters(real_stix_objects, [Filter("objects.0.extensions.pdf-ext.version", ">", "1.2")])) 480 assert resp[0].id == real_stix_objects[3].id 481 assert len(resp) == 1 482