1# Copyright (c) 2020, The MITRE Corporation. All rights reserved.
2# See LICENSE.txt for complete terms.
3
4import unittest
5
6from mixbox.vendor.six import u
7
8from cybox.objects.network_flow_object import (
9    BidirectionalRecord, IPFIXDataSet, IPFIXTemplateRecord, IPFIXDataRecord,
10    IPFIXMessage, IPFIXSet, IPFIXMessageHeader, FlowDataRecord,
11    FlowCollectionElement, OptionsDataRecord, OptionCollectionElement,
12    IPFIXSetHeader, IPFIXOptionsTemplateRecord,
13    IPFIXOptionsTemplateRecordHeader, IPFIXOptionsTemplateSet,
14    IPFIXTemplateRecordFieldSpecifiers, IPFIXTemplateRecordHeader,
15    IPFIXTemplateSet, NetflowV5Packet, NetflowV5FlowHeader,
16    NetflowV5FlowRecord, NetflowV9DataFlowSet, NetflowV9PacketHeader,
17    NetflowV9ExportPacket, NetflowV9OptionsTemplateFlowSet, NetflowV9Field,
18    NetflowV9FlowSet, NetflowV9DataRecord, NetflowV9TemplateFlowSet,
19    NetflowV9TemplateRecord, NetflowV9OptionsTemplateRecord, NetworkFlow,
20    NetworkFlowLabel, NetworkLayerInfo, UnidirectionalRecord, SiLKRecord,
21    SiLKSensorInfo, SiLKFlowAttributes, SiLKAddress, SiLKSensorClass,
22    SiLKSensorDirection, YAFReverseFlow, YAFRecord, YAFFlow, YAFTCPFlow
23)
24from cybox.test import EntityTestCase
25from cybox.test.objects import ObjectTestCase
26from cybox.test.objects.address_test import TestAddress
27from cybox.test.objects.network_packet_test import TestTCPFlags
28from cybox.test.objects.socket_address_test import TestSocketAddress
29
30
31class TestNetworkFlowLabel(EntityTestCase, unittest.TestCase):
32    klass = NetworkFlowLabel
33
34    _full_dict = {
35        'ingress_interface_index': 2,
36        'egress_interface_index': 3,
37        'src_socket_address': TestSocketAddress._full_dict,
38        'dest_socket_address': TestSocketAddress._full_dict,
39        'ip_protocol': 'TCP',
40    }
41
42
43class TestNetworkLayerInfo(EntityTestCase, unittest.TestCase):
44    klass = NetworkLayerInfo
45
46    _full_dict = {
47        'src_socket_address': TestSocketAddress._full_dict,
48        'dest_socket_address': TestSocketAddress._full_dict,
49        'ip_protocol': 'TCP',
50    }
51
52
53class TestYAFTCPFlow(EntityTestCase, unittest.TestCase):
54    klass = YAFTCPFlow
55
56    _full_dict = {
57        'tcp_sequence_number': 5,
58        'initial_tcp_flags': TestTCPFlags._full_dict,
59        'union_tcp_flags': u('0C'),
60    }
61
62
63class TestYAFFlow(EntityTestCase, unittest.TestCase):
64    klass = YAFFlow
65
66    _full_dict = {
67        'flow_start_milliseconds': 1582060813,
68        'flow_end_milliseconds': 1582060900,
69        'octet_total_count': 144,
70        'packet_total_count': 100,
71        'flow_end_reason': u('0C'),
72        'silk_app_label': 53,
73        'payload_entropy': 34,
74        'ml_app_label': u('0C'),
75        'tcp_flow': TestYAFTCPFlow._full_dict,
76        'vlan_id_mac_addr': TestAddress._full_dict,
77        'passive_os_fingerprinting': {
78            'description': u("The best platform"),
79            'identifier': [
80                u("value1")
81            ]
82        },
83        'first_packet_banner': u('00'),
84        'second_packet_banner': u('0A'),
85        'n_bytes_payload': u('EC'),
86    }
87
88
89class TestYAFReverseFlow(EntityTestCase, unittest.TestCase):
90    klass = YAFReverseFlow
91
92    _full_dict = {
93        'reverse_octet_total_count': 144,
94        'reverse_packet_total_count': 100,
95        'reverse_payload_entropy': 25,
96        'reverse_flow_delta_milliseconds': 1356,
97        'tcp_reverse_flow': TestYAFTCPFlow._full_dict,
98        'reverse_vlan_id_mac_addr': TestAddress._full_dict,
99        'reverse_passive_os_fingerprinting': {
100            'description': u("The best platform"),
101            'identifier': [
102                u("value1")
103            ]
104        },
105        'reverse_first_packet': u('00'),
106        'reverse_n_bytes_payload': u('EC'),
107    }
108
109
110class TestYAFRecord(EntityTestCase, unittest.TestCase):
111    klass = YAFRecord
112
113    _full_dict = {
114        'flow': TestYAFFlow._full_dict,
115        'reverse_flow': TestYAFReverseFlow._full_dict,
116    }
117
118
119class TestIPFIXSetHeader(EntityTestCase, unittest.TestCase):
120    klass = IPFIXSetHeader
121
122    _full_dict = {
123        'set_id': 35,
124        'length': 100,
125    }
126
127
128class TestIPFIXDataRecord(EntityTestCase, unittest.TestCase):
129    klass = IPFIXDataRecord
130
131    _full_dict = {
132        'field_value': [
133            u('value1'),
134            u('value2'),
135        ]
136    }
137
138
139class TestIPFIXMessageHeader(EntityTestCase, unittest.TestCase):
140    klass = IPFIXMessageHeader
141
142    _full_dict = {
143        'version': u('A0'),
144        'byte_length': u('FEAC'),
145        'export_timestamp': 1582060823,
146        'sequence_number': 19382,
147        'observation_domain_id': 245,
148    }
149
150
151class TestIPFIXTemplateRecordFieldSpecifiers(EntityTestCase, unittest.TestCase):
152    klass = IPFIXTemplateRecordFieldSpecifiers
153
154    _full_dict = {
155        'enterprise_bit': True,
156        'information_element_id': 'Something',
157        'field_length': '128395',
158        'enterprise_number': '1.3.6.1.4.1',
159    }
160
161
162class TestIPFIXTemplateRecordHeader(EntityTestCase, unittest.TestCase):
163    klass = IPFIXTemplateRecordHeader
164
165    _full_dict = {
166        'template_id': 556,
167        'field_count': 'FF452'
168    }
169
170
171class TestIPFIXOptionsTemplateRecordHeader(EntityTestCase, unittest.TestCase):
172    klass = IPFIXOptionsTemplateRecordHeader
173
174    _full_dict = {
175        'template_id': 160,
176        'field_count': 'BF45',
177        'scope_field_count': 15,
178    }
179
180
181class TestIPFIXDataSet(EntityTestCase, unittest.TestCase):
182    klass = IPFIXDataSet
183
184    _full_dict = {
185        'set_header': TestIPFIXSetHeader._full_dict,
186        'data_record': [
187            TestIPFIXDataRecord._full_dict
188        ],
189        'padding': u('0C'),
190    }
191
192
193class TestIPFIXOptionsTemplateRecord(EntityTestCase, unittest.TestCase):
194    klass = IPFIXOptionsTemplateRecord
195
196    _full_dict = {
197        'options_template_record_header': TestIPFIXOptionsTemplateRecordHeader._full_dict,
198        'field_specifier': [
199            TestIPFIXTemplateRecordFieldSpecifiers._full_dict
200        ]
201    }
202
203
204class TestIPFIXOptionsTemplateSet(EntityTestCase, unittest.TestCase):
205    klass = IPFIXOptionsTemplateSet
206
207    _full_dict = {
208        'set_header': TestIPFIXSetHeader._full_dict,
209        'options_template_record': [
210            TestIPFIXOptionsTemplateRecord._full_dict
211        ],
212        'padding': u('C0')
213    }
214
215
216class TestIPFIXTemplateRecord(EntityTestCase, unittest.TestCase):
217    klass = IPFIXTemplateRecord
218
219    _full_dict = {
220        'template_record_header': TestIPFIXTemplateRecordHeader._full_dict,
221        'field_specifier': [
222            TestIPFIXTemplateRecordFieldSpecifiers._full_dict
223        ]
224    }
225
226
227class TestIPFIXTemplateSet(EntityTestCase, unittest.TestCase):
228    klass = IPFIXTemplateSet
229
230    _full_dict = {
231        'set_header': TestIPFIXSetHeader._full_dict,
232        'template_record': [
233            TestIPFIXTemplateRecord._full_dict
234        ],
235        'padding': u('FE')
236    }
237
238
239class TestIPFIXSet(EntityTestCase, unittest.TestCase):
240    klass = IPFIXSet
241
242    _full_dict = {
243        'template_set': TestIPFIXTemplateSet._full_dict,
244        'options_template_set': TestIPFIXOptionsTemplateSet._full_dict,
245        'data_set': TestIPFIXDataSet._full_dict,
246    }
247
248
249class TestIPFIXMessage(EntityTestCase, unittest.TestCase):
250    klass = IPFIXMessage
251
252    _full_dict = {
253        'message_header': TestIPFIXMessageHeader._full_dict,
254        'set': [
255            TestIPFIXSet._full_dict
256        ]
257    }
258
259
260class TestNetflowV5FlowRecord(EntityTestCase, unittest.TestCase):
261    klass = NetflowV5FlowRecord
262
263    _full_dict = {
264        'nexthop_ipv4_addr': TestAddress._full_dict,
265        'packet_count': 50,
266        'byte_count': 123448,
267        'sysuptime_start': 1582060823,
268        'sysuptime_end': 1582060824,
269        'padding1': u('0'),
270        'tcp_flags': u('02'),
271        'src_autonomous_system': 2,
272        'dest_autonomous_system': 1,
273        'src_ip_mask_bit_count': u('0'),
274        'dest_ip_mask_bit_count': u('0'),
275        'padding2': u('0'),
276    }
277
278
279class TestNetflowV5FlowHeader(EntityTestCase, unittest.TestCase):
280    klass = NetflowV5FlowHeader
281
282    _full_dict = {
283        'version': u('05'),
284        'count': 2,
285        'sys_up_time': 1582060823,
286        'unix_secs': 345,
287        'unix_nsecs': 348,
288        'flow_sequence': 55,
289        'engine_type': u('Some Engine'),
290        'engine_id': 4523,
291        'sampling_interval': u('CE'),
292    }
293
294
295class TestNetflowV5Packet(EntityTestCase, unittest.TestCase):
296    klass = NetflowV5Packet
297
298    _full_dict = {
299        'flow_header': TestNetflowV5FlowHeader._full_dict,
300        'flow_record': [
301            TestNetflowV5FlowRecord._full_dict
302        ]
303    }
304
305
306class TestOptionCollectionElement(EntityTestCase, unittest.TestCase):
307    klass = OptionCollectionElement
308
309    _full_dict = {
310        'option_record_field_value': [
311            u('Some Value2')
312        ]
313    }
314
315
316class TestFlowCollectionElement(EntityTestCase, unittest.TestCase):
317    klass = FlowCollectionElement
318
319    _full_dict = {
320        'flow_record_field_value': [
321            u('Some String')
322        ]
323    }
324
325
326class TestFlowDataRecord(EntityTestCase, unittest.TestCase):
327    klass = FlowDataRecord
328
329    _full_dict = {
330        'flow_record_collection_element': [
331            TestFlowCollectionElement._full_dict
332        ]
333    }
334
335
336class TestOptionsDataRecord(EntityTestCase, unittest.TestCase):
337    klass = OptionsDataRecord
338
339    _full_dict = {
340        'scope_field_value': u('Some Value'),
341        'option_record_collection_element': [
342            TestOptionCollectionElement._full_dict
343        ]
344    }
345
346
347class TestNetflowV9PacketHeader(EntityTestCase, unittest.TestCase):
348    klass = NetflowV9PacketHeader
349
350    _full_dict = {
351        'version': u('09'),
352        'record_count': 20,
353        'sys_up_time': 1582060823,
354        'unix_secs': 34,
355        'sequence_number': 553242,
356        'source_id': u('312E'),
357    }
358
359
360class TestNetflowV9DataRecord(EntityTestCase, unittest.TestCase):
361    klass = NetflowV9DataRecord
362
363    _full_dict = {
364        'flow_data_record': [
365            TestFlowDataRecord._full_dict
366        ],
367        'options_data_record': [
368            TestOptionsDataRecord._full_dict
369        ],
370    }
371
372
373class TestNetflowV9DataFlowSet(EntityTestCase, unittest.TestCase):
374    klass = NetflowV9DataFlowSet
375
376    _full_dict = {
377        'flow_set_id_template_id': 2,
378        'length': 10,
379        'data_record': [
380            TestNetflowV9DataRecord._full_dict
381        ],
382        'padding': u('DE'),
383    }
384
385
386class TestNetflowV9TemplateRecord(EntityTestCase, unittest.TestCase):
387    klass = NetflowV9TemplateRecord
388
389    _full_dict = {
390        'template_id': 2345,
391        'field_count': 110,
392        'field_type': u(NetflowV9Field.TERM_IN_BYTES),
393        'field_length': u('FF'),
394    }
395
396
397class TestNetflowV9TemplateFlowSet(EntityTestCase, unittest.TestCase):
398    klass = NetflowV9TemplateFlowSet
399
400    _full_dict = {
401        'flow_set_id': u('AA'),
402        'length': 100,
403        'template_record': [
404            TestNetflowV9TemplateRecord._full_dict
405        ],
406    }
407
408
409class TestNetflowV9OptionsTemplateRecord(EntityTestCase, unittest.TestCase):
410    klass = NetflowV9OptionsTemplateRecord
411
412    _full_dict = {
413        'template_id': 341,
414        'option_scope_length': u('A4'),
415        'option_length': u('2'),
416        'scope_field_type': u('Scope Type'),
417        'scope_field_length': u('E0'),
418        'option_field_type': u('IN_BYTES(1)'),
419        'option_field_length': u('B0'),
420    }
421
422
423class TestNetflowV9OptionsTemplateFlowSet(EntityTestCase, unittest.TestCase):
424    klass = NetflowV9OptionsTemplateFlowSet
425
426    _full_dict = {
427        'flow_set_id': u('01'),
428        'length': 10,
429        'options_template_record': [
430            TestNetflowV9OptionsTemplateRecord._full_dict
431        ],
432        'padding': u('AB'),
433    }
434
435
436class TestNetflowV9FlowSet(EntityTestCase, unittest.TestCase):
437    klass = NetflowV9FlowSet
438
439    _full_dict = {
440        'template_flow_set': TestNetflowV9TemplateFlowSet._full_dict,
441        'options_template_flow_set': TestNetflowV9OptionsTemplateFlowSet._full_dict,
442        'data_flow_set': TestNetflowV9DataFlowSet._full_dict,
443    }
444
445
446class TestNetflowV9ExportPacket(EntityTestCase, unittest.TestCase):
447    klass = NetflowV9ExportPacket
448
449    _full_dict = {
450        'packet_header': TestNetflowV9PacketHeader._full_dict,
451        'flow_set': [
452            TestNetflowV9FlowSet._full_dict
453        ]
454    }
455
456
457class TestSiLKSensorInfo(EntityTestCase, unittest.TestCase):
458    klass = SiLKSensorInfo
459
460    _full_dict = {
461        'sensor_id': u('2349'),
462        'class': u(SiLKSensorClass.TERM_ALL),
463        'type': u(SiLKSensorDirection.TERM_IN),
464    }
465
466
467class TestSiLKRecord(EntityTestCase, unittest.TestCase):
468    klass = SiLKRecord
469
470    _full_dict = {
471        'packet_count': 55,
472        'byte_count': 12800,
473        'tcp_flags': u('B9'),
474        'start_time': 1582060823,
475        'duration': 100,
476        'end_time': 1582060923,
477        'sensor_info': TestSiLKSensorInfo._full_dict,
478        'icmp_type': 6,
479        'icmp_code': 2,
480        'router_next_hop_ip': TestAddress._full_dict,
481        'initial_tcp_flags': TestTCPFlags._full_dict,
482        'session_tcp_flags': u('3'),
483        'flow_attributes': u(SiLKFlowAttributes.TERM_F),
484        'flow_application': u('My Flow Application'),
485        'src_ip_type': u(SiLKAddress.TERM_INTERNAL),
486        'dest_ip_type': u(SiLKAddress.TERM_EXTERNAL),
487        'src_country_code': u('US'),
488        'dest_country_code': u('US'),
489        'src_mapname': u('some value1'),
490        'dest_mapname': u('some value2'),
491    }
492
493
494class TestUnidirectionalRecord(EntityTestCase, unittest.TestCase):
495    klass = UnidirectionalRecord
496
497    _full_dict = {
498        'ipfix_message': TestIPFIXMessage._full_dict,
499        'netflowv9_export_packet': TestNetflowV9ExportPacket._full_dict,
500        'netflowv5_packet': TestNetflowV5Packet._full_dict,
501        'silk_record': TestSiLKRecord._full_dict,
502    }
503
504
505class TestBidirectionalRecord(EntityTestCase, unittest.TestCase):
506    klass = BidirectionalRecord
507
508    _full_dict = {
509        'yaf_record': TestYAFRecord._full_dict
510    }
511
512
513class TestNetworkFlow(ObjectTestCase, unittest.TestCase):
514    object_type = "NetworkFlowObjectType"
515    klass = NetworkFlow
516
517    _full_dict = {
518        'network_flow_label': TestNetworkFlowLabel._full_dict,
519        'unidirectional_flow_record': TestUnidirectionalRecord._full_dict,
520        'bidirectional_flow_record': TestBidirectionalRecord._full_dict,
521        'xsi:type': object_type,
522    }
523
524
525if __name__ == "__main__":
526    unittest.main()
527