1import json
2
3from medallion.filters.basic_filter import BasicFilter
4import pytest
5from requests.models import Response
6import six
7from taxii2client.common import _filter_kwargs_to_query_params
8from taxii2client.v21 import Collection
9
10import stix2
11from stix2.datastore import DataSourceError
12from stix2.datastore.filters import Filter
13from stix2.utils import get_timestamp
14
15COLLECTION_URL = 'https://example.com/api1/collections/91a7b528-80eb-42ed-a74d-c6fbd5a26116/'
16
17
18class MockTAXIICollectionEndpoint(Collection):
19    """Mock for taxii2_client.TAXIIClient"""
20
21    def __init__(self, url, collection_info):
22        super(MockTAXIICollectionEndpoint, self).__init__(
23            url, collection_info=collection_info,
24        )
25        self.objects = []
26        self.manifests = []
27
28    def add_objects(self, bundle):
29        self._verify_can_write()
30        if isinstance(bundle, six.string_types):
31            bundle = json.loads(bundle)
32        for object in bundle.get("objects", []):
33            self.objects.append(object)
34            self.manifests.append(
35                {
36                    "date_added": get_timestamp(),
37                    "id": object["id"],
38                    "media_type": "application/stix+json;version=2.1",
39                    "version": object.get("modified", object.get("created", get_timestamp())),
40                },
41            )
42
43    def get_objects(self, **filter_kwargs):
44        self._verify_can_read()
45        query_params = _filter_kwargs_to_query_params(filter_kwargs)
46        assert isinstance(query_params, dict)
47        full_filter = BasicFilter(query_params)
48        objs = full_filter.process_filter(
49            self.objects,
50            ("id", "type", "version"),
51            self.manifests,
52            100,
53        )[0]
54        if objs:
55            return stix2.v21.Bundle(objects=objs)
56        else:
57            resp = Response()
58            resp.status_code = 404
59            resp.raise_for_status()
60
61    def get_object(self, id, **filter_kwargs):
62        self._verify_can_read()
63        query_params = _filter_kwargs_to_query_params(filter_kwargs)
64        assert isinstance(query_params, dict)
65        full_filter = BasicFilter(query_params)
66
67        # In this endpoint we must first filter objects by id beforehand.
68        objects = [x for x in self.objects if x["id"] == id]
69        if objects:
70            filtered_objects = full_filter.process_filter(
71                objects,
72                ("version",),
73                self.manifests,
74                100,
75            )[0]
76        else:
77            filtered_objects = []
78        if filtered_objects:
79            return stix2.v21.Bundle(objects=filtered_objects)
80        else:
81            resp = Response()
82            resp.status_code = 404
83            resp.raise_for_status()
84
85
86@pytest.fixture
87def collection(stix_objs1, stix_objs1_manifests):
88    mock = MockTAXIICollectionEndpoint(
89        COLLECTION_URL, {
90            "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
91            "title": "Writable Collection",
92            "description": "This collection is a dropbox for submitting indicators",
93            "can_read": True,
94            "can_write": True,
95            "media_types": [
96                "application/vnd.oasis.stix+json; version=2.0",
97            ],
98        },
99    )
100
101    mock.objects.extend(stix_objs1)
102    mock.manifests.extend(stix_objs1_manifests)
103    return mock
104
105
106@pytest.fixture
107def collection_no_rw_access(stix_objs1, stix_objs1_manifests):
108    mock = MockTAXIICollectionEndpoint(
109        COLLECTION_URL, {
110            "id": "91a7b528-80eb-42ed-a74d-c6fbd5a26116",
111            "title": "Not writeable or readable Collection",
112            "description": "This collection is a dropbox for submitting indicators",
113            "can_read": False,
114            "can_write": False,
115            "media_types": [
116                "application/vnd.oasis.stix+json; version=2.0",
117            ],
118        },
119    )
120
121    mock.objects.extend(stix_objs1)
122    mock.manifests.extend(stix_objs1_manifests)
123    return mock
124
125
126def test_ds_taxii(collection):
127    ds = stix2.TAXIICollectionSource(collection)
128    assert ds.collection is not None
129
130
131def test_add_stix2_object(collection):
132    tc_sink = stix2.TAXIICollectionSink(collection)
133
134    # create new STIX threat-actor
135    ta = stix2.v21.ThreatActor(
136        name="Teddy Bear",
137        threat_actor_types=["nation-state"],
138        sophistication="innovator",
139        resource_level="government",
140        goals=[
141            "compromising environment NGOs",
142            "water-hole attacks geared towards energy sector",
143        ],
144    )
145
146    tc_sink.add(ta)
147
148
149def test_add_stix2_with_custom_object(collection):
150    tc_sink = stix2.TAXIICollectionStore(collection, allow_custom=True)
151
152    # create new STIX threat-actor
153    ta = stix2.v21.ThreatActor(
154        name="Teddy Bear",
155        threat_actor_types=["nation-state"],
156        sophistication="innovator",
157        resource_level="government",
158        goals=[
159            "compromising environment NGOs",
160            "water-hole attacks geared towards energy sector",
161        ],
162        foo="bar",
163        allow_custom=True,
164    )
165
166    tc_sink.add(ta)
167
168
169def test_add_list_object(collection, indicator):
170    tc_sink = stix2.TAXIICollectionSink(collection)
171
172    # create new STIX threat-actor
173    ta = stix2.v21.ThreatActor(
174        name="Teddy Bear",
175        threat_actor_types=["nation-state"],
176        sophistication="innovator",
177        resource_level="government",
178        goals=[
179            "compromising environment NGOs",
180            "water-hole attacks geared towards energy sector",
181        ],
182    )
183
184    tc_sink.add([ta, indicator])
185
186
187def test_get_object_found(collection):
188    tc_source = stix2.TAXIICollectionSource(collection)
189    result = tc_source.query([
190        stix2.Filter("id", "=", "indicator--00000000-0000-4000-8000-000000000001"),
191    ])
192    assert result
193
194
195def test_get_object_not_found(collection):
196    tc_source = stix2.TAXIICollectionSource(collection)
197    result = tc_source.get("indicator--00000000-0000-4000-8000-000000000012")
198    assert result is None
199
200
201def test_add_stix2_bundle_object(collection):
202    tc_sink = stix2.TAXIICollectionSink(collection)
203
204    # create new STIX threat-actor
205    ta = stix2.v21.ThreatActor(
206        name="Teddy Bear",
207        threat_actor_types=["nation-state"],
208        sophistication="innovator",
209        resource_level="government",
210        goals=[
211            "compromising environment NGOs",
212            "water-hole attacks geared towards energy sector",
213        ],
214    )
215
216    tc_sink.add(stix2.v21.Bundle(objects=[ta]))
217
218
219def test_add_str_object(collection):
220    tc_sink = stix2.TAXIICollectionSink(collection)
221
222    # create new STIX threat-actor
223    ta = """{
224        "type": "threat-actor",
225        "spec_version": "2.1",
226        "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415",
227        "created": "2018-04-23T16:40:50.847Z",
228        "modified": "2018-04-23T16:40:50.847Z",
229        "name": "Teddy Bear",
230        "threat_actor_types": [
231            "nation-state"
232        ],
233        "goals": [
234            "compromising environment NGOs",
235            "water-hole attacks geared towards energy sector"
236        ],
237        "sophistication": "innovator",
238        "resource_level": "government"
239    }"""
240
241    tc_sink.add(ta)
242
243
244def test_add_dict_object(collection):
245    tc_sink = stix2.TAXIICollectionSink(collection)
246
247    ta = {
248        "type": "threat-actor",
249        "spec_version": "2.1",
250        "id": "threat-actor--eddff64f-feb1-4469-b07c-499a73c96415",
251        "created": "2018-04-23T16:40:50.847Z",
252        "modified": "2018-04-23T16:40:50.847Z",
253        "name": "Teddy Bear",
254        "goals": [
255            "compromising environment NGOs",
256            "water-hole attacks geared towards energy sector",
257        ],
258        "sophistication": "innovator",
259        "resource_level": "government",
260        "threat_actor_types": [
261            "nation-state",
262        ],
263    }
264
265    tc_sink.add(ta)
266
267
268def test_add_dict_bundle_object(collection):
269    tc_sink = stix2.TAXIICollectionSink(collection)
270
271    ta = {
272        "type": "bundle",
273        "id": "bundle--860ccc8d-56c9-4fda-9384-84276fb52fb1",
274        "objects": [
275            {
276                "type": "threat-actor",
277                "spec_version": "2.1",
278                "id": "threat-actor--dc5a2f41-f76e-425a-81fe-33afc7aabd75",
279                "created": "2018-04-23T18:45:11.390Z",
280                "modified": "2018-04-23T18:45:11.390Z",
281                "name": "Teddy Bear",
282                "goals": [
283                    "compromising environment NGOs",
284                    "water-hole attacks geared towards energy sector",
285                ],
286                "sophistication": "innovator",
287                "resource_level": "government",
288                "threat_actor_types": [
289                    "nation-state",
290                ],
291            },
292        ],
293    }
294
295    tc_sink.add(ta)
296
297
298def test_get_stix2_object(collection):
299    tc_sink = stix2.TAXIICollectionSource(collection)
300
301    objects = tc_sink.get("indicator--00000000-0000-4000-8000-000000000001")
302
303    assert objects
304
305
306def test_parse_taxii_filters(collection):
307    query = [
308        Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
309        Filter("id", "=", "taxii stix object ID"),
310        Filter("type", "=", "taxii stix object ID"),
311        Filter("version", "=", "first"),
312        Filter("created_by_ref", "=", "Bane"),
313    ]
314
315    taxii_filters_expected = [
316        Filter("added_after", "=", "2016-02-01T00:00:01.000Z"),
317        Filter("id", "=", "taxii stix object ID"),
318        Filter("type", "=", "taxii stix object ID"),
319        Filter("version", "=", "first"),
320    ]
321
322    ds = stix2.TAXIICollectionSource(collection)
323
324    taxii_filters = ds._parse_taxii_filters(query)
325
326    assert taxii_filters == taxii_filters_expected
327
328
329def test_add_get_remove_filter(collection):
330    ds = stix2.TAXIICollectionSource(collection)
331
332    # First 3 filters are valid, remaining properties are erroneous in some way
333    valid_filters = [
334        Filter('type', '=', 'malware'),
335        Filter('id', '!=', 'stix object id'),
336        Filter('threat_actor_types', 'in', ["heartbleed", "malicious-activity"]),
337    ]
338
339    assert len(ds.filters) == 0
340
341    ds.filters.add(valid_filters[0])
342    assert len(ds.filters) == 1
343
344    # Addin the same filter again will have no effect since `filters` acts
345    # like a set
346    ds.filters.add(valid_filters[0])
347    assert len(ds.filters) == 1
348
349    ds.filters.add(valid_filters[1])
350    assert len(ds.filters) == 2
351
352    ds.filters.add(valid_filters[2])
353    assert len(ds.filters) == 3
354
355    assert valid_filters == [f for f in ds.filters]
356
357    # remove
358    ds.filters.remove(valid_filters[0])
359
360    assert len(ds.filters) == 2
361
362    ds.filters.add(valid_filters)
363
364
365def test_get_all_versions(collection):
366    ds = stix2.TAXIICollectionStore(collection)
367
368    indicators = ds.all_versions('indicator--00000000-0000-4000-8000-000000000001')
369    # There are 3 indicators but 2 share the same 'modified' timestamp
370    assert len(indicators) == 2
371
372
373def test_can_read_error(collection_no_rw_access):
374    """create a TAXIICOllectionSource with a taxii2client.Collection
375    instance that does not have read access, check ValueError exception is raised"""
376
377    with pytest.raises(DataSourceError) as excinfo:
378        stix2.TAXIICollectionSource(collection_no_rw_access)
379    assert "Collection object provided does not have read access" in str(excinfo.value)
380
381
382def test_can_write_error(collection_no_rw_access):
383    """create a TAXIICOllectionSink with a taxii2client.Collection
384    instance that does not have write access, check ValueError exception is raised"""
385
386    with pytest.raises(DataSourceError) as excinfo:
387        stix2.TAXIICollectionSink(collection_no_rw_access)
388    assert "Collection object provided does not have write access" in str(excinfo.value)
389
390
391def test_get_404():
392    """a TAXIICollectionSource.get() call that receives an HTTP 404 response
393    code from the taxii2client should be be returned as None.
394
395    TAXII spec states that a TAXII server can return a 404 for nonexistent
396    resources or lack of access. Decided that None is acceptable reponse
397    to imply that state of the TAXII endpoint.
398    """
399
400    class TAXIICollection404():
401        can_read = True
402
403        def get_object(self, id, version=None):
404            resp = Response()
405            resp.status_code = 404
406            resp.raise_for_status()
407
408    ds = stix2.TAXIICollectionSource(TAXIICollection404())
409
410    # this will raise 404 from mock TAXII Client but TAXIICollectionStore
411    # should handle gracefully and return None
412    stix_obj = ds.get("indicator--1")
413    assert stix_obj is None
414
415
416def test_all_versions_404(collection):
417    """ a TAXIICollectionSource.all_version() call that recieves an HTTP 404
418    response code from the taxii2client should be returned as an exception"""
419
420    ds = stix2.TAXIICollectionStore(collection)
421
422    with pytest.raises(DataSourceError) as excinfo:
423        ds.all_versions("indicator--1")
424    assert "are either not found or access is denied" in str(excinfo.value)
425    assert "404" in str(excinfo.value)
426
427
428def test_query_404(collection):
429    """ a TAXIICollectionSource.query() call that recieves an HTTP 404
430    response code from the taxii2client should be returned as an exception"""
431
432    ds = stix2.TAXIICollectionStore(collection)
433    query = [Filter("type", "=", "malware")]
434
435    with pytest.raises(DataSourceError) as excinfo:
436        ds.query(query=query)
437    assert "are either not found or access is denied" in str(excinfo.value)
438    assert "404" in str(excinfo.value)
439