1import json 2 3from medallion.filters.basic_filter import BasicFilter 4import pytest 5from requests.models import Response 6import six 7from taxii2client.common import _filter_kwargs_to_query_params 8from taxii2client.v21 import Collection 9 10import stix2 11from stix2.datastore import DataSourceError 12from stix2.datastore.filters import Filter 13from stix2.utils import get_timestamp 14 15COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/' 16 17 18class MockTAXIICollectionEndpoint(Collection): 19 """Mock for taxii2_client.TAXIIClient""" 20 21 def __init__(self, url, collection_info): 22 super(MockTAXIICollectionEndpoint, self).__init__( 23 url, collection_info=collection_info, 24 ) 25 self.objects = [] 26 self.manifests = [] 27 28 def add_objects(self, bundle): 29 self._verify_can_write() 30 if isinstance(bundle, six.string_types): 31 bundle = json.loads(bundle) 32 for object in bundle.get("objects", []): 33 self.objects.append(object) 34 self.manifests.append( 35 { 36 "date_added": get_timestamp(), 37 "id": object["id"], 38 "media_type": "application/stix+json;version=2.1", 39 "version": object.get("modified", object.get("created", get_timestamp())), 40 }, 41 ) 42 43 def get_objects(self, **filter_kwargs): 44 self._verify_can_read() 45 query_params = _filter_kwargs_to_query_params(filter_kwargs) 46 assert isinstance(query_params, dict) 47 full_filter = BasicFilter(query_params) 48 objs = full_filter.process_filter( 49 self.objects, 50 ("id", "type", "version"), 51 self.manifests, 52 100, 53 )[0] 54 if objs: 55 return stix2.v21.Bundle(objects=objs) 56 else: 57 resp = Response() 58 resp.status_code = 404 59 resp.raise_for_status() 60 61 def get_object(self, id, **filter_kwargs): 62 self._verify_can_read() 63 query_params = _filter_kwargs_to_query_params(filter_kwargs) 64 assert isinstance(query_params, dict) 65 full_filter = BasicFilter(query_params) 66 67 # In this endpoint we must first filter objects by id beforehand. 68 objects = [x for x in self.objects if x["id"] == id] 69 if objects: 70 filtered_objects = full_filter.process_filter( 71 objects, 72 ("version",), 73 self.manifests, 74 100, 75 )[0] 76 else: 77 filtered_objects = [] 78 if filtered_objects: 79 return stix2.v21.Bundle(objects=filtered_objects) 80 else: 81 resp = Response() 82 resp.status_code = 404 83 resp.raise_for_status() 84 85 86@pytest.fixture 87def collection(stix_objs1, stix_objs1_manifests): 88 mock = MockTAXIICollectionEndpoint( 89 COLLECTION_URL, { 90 "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116", 91 "title": "Writable Collection", 92 "description": "This collection is a dropbox for submitting indicators", 93 "can_read": True, 94 "can_write": True, 95 "media_types": [ 96 "application/vnd.oasis.stix+json; version=2.0", 97 ], 98 }, 99 ) 100 101 mock.objects.extend(stix_objs1) 102 mock.manifests.extend(stix_objs1_manifests) 103 return mock 104 105 106@pytest.fixture 107def collection_no_rw_access(stix_objs1, stix_objs1_manifests): 108 mock = MockTAXIICollectionEndpoint( 109 COLLECTION_URL, { 110 "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116", 111 "title": "Not writeable or readable Collection", 112 "description": "This collection is a dropbox for submitting indicators", 113 "can_read": False, 114 "can_write": False, 115 "media_types": [ 116 "application/vnd.oasis.stix+json; version=2.0", 117 ], 118 }, 119 ) 120 121 mock.objects.extend(stix_objs1) 122 mock.manifests.extend(stix_objs1_manifests) 123 return mock 124 125 126def test_ds_taxii(collection): 127 ds = stix2.TAXIICollectionSource(collection) 128 assert ds.collection is not None 129 130 131def test_add_stix2_object(collection): 132 tc_sink = stix2.TAXIICollectionSink(collection) 133 134 # create new STIX threat-actor 135 ta = stix2.v21.ThreatActor( 136 name="Teddy Bear", 137 threat_actor_types=["nation-state"], 138 sophistication="innovator", 139 resource_level="government", 140 goals=[ 141 "compromising environment NGOs", 142 "water-hole attacks geared towards energy sector", 143 ], 144 ) 145 146 tc_sink.add(ta) 147 148 149def test_add_stix2_with_custom_object(collection): 150 tc_sink = stix2.TAXIICollectionStore(collection, allow_custom=True) 151 152 # create new STIX threat-actor 153 ta = stix2.v21.ThreatActor( 154 name="Teddy Bear", 155 threat_actor_types=["nation-state"], 156 sophistication="innovator", 157 resource_level="government", 158 goals=[ 159 "compromising environment NGOs", 160 "water-hole attacks geared towards energy sector", 161 ], 162 foo="bar", 163 allow_custom=True, 164 ) 165 166 tc_sink.add(ta) 167 168 169def test_add_list_object(collection, indicator): 170 tc_sink = stix2.TAXIICollectionSink(collection) 171 172 # create new STIX threat-actor 173 ta = stix2.v21.ThreatActor( 174 name="Teddy Bear", 175 threat_actor_types=["nation-state"], 176 sophistication="innovator", 177 resource_level="government", 178 goals=[ 179 "compromising environment NGOs", 180 "water-hole attacks geared towards energy sector", 181 ], 182 ) 183 184 tc_sink.add([ta, indicator]) 185 186 187def test_get_object_found(collection): 188 tc_source = stix2.TAXIICollectionSource(collection) 189 result = tc_source.query([ 190 stix2.Filter("id", "=", "indicator--00000000-0000-4000-8000-000000000001"), 191 ]) 192 assert result 193 194 195def test_get_object_not_found(collection): 196 tc_source = stix2.TAXIICollectionSource(collection) 197 result = tc_source.get("indicator--00000000-0000-4000-8000-000000000012") 198 assert result is None 199 200 201def test_add_stix2_bundle_object(collection): 202 tc_sink = stix2.TAXIICollectionSink(collection) 203 204 # create new STIX threat-actor 205 ta = stix2.v21.ThreatActor( 206 name="Teddy Bear", 207 threat_actor_types=["nation-state"], 208 sophistication="innovator", 209 resource_level="government", 210 goals=[ 211 "compromising environment NGOs", 212 "water-hole attacks geared towards energy sector", 213 ], 214 ) 215 216 tc_sink.add(stix2.v21.Bundle(objects=[ta])) 217 218 219def test_add_str_object(collection): 220 tc_sink = stix2.TAXIICollectionSink(collection) 221 222 # create new STIX threat-actor 223 ta = """{ 224 "type": "threat-actor", 225 "spec_version": "2.1", 226 "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415", 227 "created": "2018-04-23T16:40:50.847Z", 228 "modified": "2018-04-23T16:40:50.847Z", 229 "name": "Teddy Bear", 230 "threat_actor_types": [ 231 "nation-state" 232 ], 233 "goals": [ 234 "compromising environment NGOs", 235 "water-hole attacks geared towards energy sector" 236 ], 237 "sophistication": "innovator", 238 "resource_level": "government" 239 }""" 240 241 tc_sink.add(ta) 242 243 244def test_add_dict_object(collection): 245 tc_sink = stix2.TAXIICollectionSink(collection) 246 247 ta = { 248 "type": "threat-actor", 249 "spec_version": "2.1", 250 "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415", 251 "created": "2018-04-23T16:40:50.847Z", 252 "modified": "2018-04-23T16:40:50.847Z", 253 "name": "Teddy Bear", 254 "goals": [ 255 "compromising environment NGOs", 256 "water-hole attacks geared towards energy sector", 257 ], 258 "sophistication": "innovator", 259 "resource_level": "government", 260 "threat_actor_types": [ 261 "nation-state", 262 ], 263 } 264 265 tc_sink.add(ta) 266 267 268def test_add_dict_bundle_object(collection): 269 tc_sink = stix2.TAXIICollectionSink(collection) 270 271 ta = { 272 "type": "bundle", 273 "id": "bundle--860ccc8d-56c9-4fda-9384-84276fb52fb1", 274 "objects": [ 275 { 276 "type": "threat-actor", 277 "spec_version": "2.1", 278 "id": "threat-actor--dc5a2f41-f76e-425a-81fe-33afc7aabd75", 279 "created": "2018-04-23T18:45:11.390Z", 280 "modified": "2018-04-23T18:45:11.390Z", 281 "name": "Teddy Bear", 282 "goals": [ 283 "compromising environment NGOs", 284 "water-hole attacks geared towards energy sector", 285 ], 286 "sophistication": "innovator", 287 "resource_level": "government", 288 "threat_actor_types": [ 289 "nation-state", 290 ], 291 }, 292 ], 293 } 294 295 tc_sink.add(ta) 296 297 298def test_get_stix2_object(collection): 299 tc_sink = stix2.TAXIICollectionSource(collection) 300 301 objects = tc_sink.get("indicator--00000000-0000-4000-8000-000000000001") 302 303 assert objects 304 305 306def test_parse_taxii_filters(collection): 307 query = [ 308 Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), 309 Filter("id", "=", "taxii stix object ID"), 310 Filter("type", "=", "taxii stix object ID"), 311 Filter("version", "=", "first"), 312 Filter("created_by_ref", "=", "Bane"), 313 ] 314 315 taxii_filters_expected = [ 316 Filter("added_after", "=", "2016-02-01T00:00:01.000Z"), 317 Filter("id", "=", "taxii stix object ID"), 318 Filter("type", "=", "taxii stix object ID"), 319 Filter("version", "=", "first"), 320 ] 321 322 ds = stix2.TAXIICollectionSource(collection) 323 324 taxii_filters = ds._parse_taxii_filters(query) 325 326 assert taxii_filters == taxii_filters_expected 327 328 329def test_add_get_remove_filter(collection): 330 ds = stix2.TAXIICollectionSource(collection) 331 332 # First 3 filters are valid, remaining properties are erroneous in some way 333 valid_filters = [ 334 Filter('type', '=', 'malware'), 335 Filter('id', '!=', 'stix object id'), 336 Filter('threat_actor_types', 'in', ["heartbleed", "malicious-activity"]), 337 ] 338 339 assert len(ds.filters) == 0 340 341 ds.filters.add(valid_filters[0]) 342 assert len(ds.filters) == 1 343 344 # Addin the same filter again will have no effect since `filters` acts 345 # like a set 346 ds.filters.add(valid_filters[0]) 347 assert len(ds.filters) == 1 348 349 ds.filters.add(valid_filters[1]) 350 assert len(ds.filters) == 2 351 352 ds.filters.add(valid_filters[2]) 353 assert len(ds.filters) == 3 354 355 assert valid_filters == [f for f in ds.filters] 356 357 # remove 358 ds.filters.remove(valid_filters[0]) 359 360 assert len(ds.filters) == 2 361 362 ds.filters.add(valid_filters) 363 364 365def test_get_all_versions(collection): 366 ds = stix2.TAXIICollectionStore(collection) 367 368 indicators = ds.all_versions('indicator--00000000-0000-4000-8000-000000000001') 369 # There are 3 indicators but 2 share the same 'modified' timestamp 370 assert len(indicators) == 2 371 372 373def test_can_read_error(collection_no_rw_access): 374 """create a TAXIICOllectionSource with a taxii2client.Collection 375 instance that does not have read access, check ValueError exception is raised""" 376 377 with pytest.raises(DataSourceError) as excinfo: 378 stix2.TAXIICollectionSource(collection_no_rw_access) 379 assert "Collection object provided does not have read access" in str(excinfo.value) 380 381 382def test_can_write_error(collection_no_rw_access): 383 """create a TAXIICOllectionSink with a taxii2client.Collection 384 instance that does not have write access, check ValueError exception is raised""" 385 386 with pytest.raises(DataSourceError) as excinfo: 387 stix2.TAXIICollectionSink(collection_no_rw_access) 388 assert "Collection object provided does not have write access" in str(excinfo.value) 389 390 391def test_get_404(): 392 """a TAXIICollectionSource.get() call that receives an HTTP 404 response 393 code from the taxii2client should be be returned as None. 394 395 TAXII spec states that a TAXII server can return a 404 for nonexistent 396 resources or lack of access. Decided that None is acceptable reponse 397 to imply that state of the TAXII endpoint. 398 """ 399 400 class TAXIICollection404(): 401 can_read = True 402 403 def get_object(self, id, version=None): 404 resp = Response() 405 resp.status_code = 404 406 resp.raise_for_status() 407 408 ds = stix2.TAXIICollectionSource(TAXIICollection404()) 409 410 # this will raise 404 from mock TAXII Client but TAXIICollectionStore 411 # should handle gracefully and return None 412 stix_obj = ds.get("indicator--1") 413 assert stix_obj is None 414 415 416def test_all_versions_404(collection): 417 """ a TAXIICollectionSource.all_version() call that recieves an HTTP 404 418 response code from the taxii2client should be returned as an exception""" 419 420 ds = stix2.TAXIICollectionStore(collection) 421 422 with pytest.raises(DataSourceError) as excinfo: 423 ds.all_versions("indicator--1") 424 assert "are either not found or access is denied" in str(excinfo.value) 425 assert "404" in str(excinfo.value) 426 427 428def test_query_404(collection): 429 """ a TAXIICollectionSource.query() call that recieves an HTTP 404 430 response code from the taxii2client should be returned as an exception""" 431 432 ds = stix2.TAXIICollectionStore(collection) 433 query = [Filter("type", "=", "malware")] 434 435 with pytest.raises(DataSourceError) as excinfo: 436 ds.query(query=query) 437 assert "are either not found or access is denied" in str(excinfo.value) 438 assert "404" in str(excinfo.value) 439