1# Copyright (c) 2020, The MITRE Corporation. All rights reserved. 2# See LICENSE.txt for complete terms. 3 4import unittest 5 6from mixbox.vendor.six import u 7 8from cybox.objects.network_flow_object import ( 9 BidirectionalRecord, IPFIXDataSet, IPFIXTemplateRecord, IPFIXDataRecord, 10 IPFIXMessage, IPFIXSet, IPFIXMessageHeader, FlowDataRecord, 11 FlowCollectionElement, OptionsDataRecord, OptionCollectionElement, 12 IPFIXSetHeader, IPFIXOptionsTemplateRecord, 13 IPFIXOptionsTemplateRecordHeader, IPFIXOptionsTemplateSet, 14 IPFIXTemplateRecordFieldSpecifiers, IPFIXTemplateRecordHeader, 15 IPFIXTemplateSet, NetflowV5Packet, NetflowV5FlowHeader, 16 NetflowV5FlowRecord, NetflowV9DataFlowSet, NetflowV9PacketHeader, 17 NetflowV9ExportPacket, NetflowV9OptionsTemplateFlowSet, NetflowV9Field, 18 NetflowV9FlowSet, NetflowV9DataRecord, NetflowV9TemplateFlowSet, 19 NetflowV9TemplateRecord, NetflowV9OptionsTemplateRecord, NetworkFlow, 20 NetworkFlowLabel, NetworkLayerInfo, UnidirectionalRecord, SiLKRecord, 21 SiLKSensorInfo, SiLKFlowAttributes, SiLKAddress, SiLKSensorClass, 22 SiLKSensorDirection, YAFReverseFlow, YAFRecord, YAFFlow, YAFTCPFlow 23) 24from cybox.test import EntityTestCase 25from cybox.test.objects import ObjectTestCase 26from cybox.test.objects.address_test import TestAddress 27from cybox.test.objects.network_packet_test import TestTCPFlags 28from cybox.test.objects.socket_address_test import TestSocketAddress 29 30 31class TestNetworkFlowLabel(EntityTestCase, unittest.TestCase): 32 klass = NetworkFlowLabel 33 34 _full_dict = { 35 'ingress_interface_index': 2, 36 'egress_interface_index': 3, 37 'src_socket_address': TestSocketAddress._full_dict, 38 'dest_socket_address': TestSocketAddress._full_dict, 39 'ip_protocol': 'TCP', 40 } 41 42 43class TestNetworkLayerInfo(EntityTestCase, unittest.TestCase): 44 klass = NetworkLayerInfo 45 46 _full_dict = { 47 'src_socket_address': TestSocketAddress._full_dict, 48 'dest_socket_address': TestSocketAddress._full_dict, 49 'ip_protocol': 'TCP', 50 } 51 52 53class TestYAFTCPFlow(EntityTestCase, unittest.TestCase): 54 klass = YAFTCPFlow 55 56 _full_dict = { 57 'tcp_sequence_number': 5, 58 'initial_tcp_flags': TestTCPFlags._full_dict, 59 'union_tcp_flags': u('0C'), 60 } 61 62 63class TestYAFFlow(EntityTestCase, unittest.TestCase): 64 klass = YAFFlow 65 66 _full_dict = { 67 'flow_start_milliseconds': 1582060813, 68 'flow_end_milliseconds': 1582060900, 69 'octet_total_count': 144, 70 'packet_total_count': 100, 71 'flow_end_reason': u('0C'), 72 'silk_app_label': 53, 73 'payload_entropy': 34, 74 'ml_app_label': u('0C'), 75 'tcp_flow': TestYAFTCPFlow._full_dict, 76 'vlan_id_mac_addr': TestAddress._full_dict, 77 'passive_os_fingerprinting': { 78 'description': u("The best platform"), 79 'identifier': [ 80 u("value1") 81 ] 82 }, 83 'first_packet_banner': u('00'), 84 'second_packet_banner': u('0A'), 85 'n_bytes_payload': u('EC'), 86 } 87 88 89class TestYAFReverseFlow(EntityTestCase, unittest.TestCase): 90 klass = YAFReverseFlow 91 92 _full_dict = { 93 'reverse_octet_total_count': 144, 94 'reverse_packet_total_count': 100, 95 'reverse_payload_entropy': 25, 96 'reverse_flow_delta_milliseconds': 1356, 97 'tcp_reverse_flow': TestYAFTCPFlow._full_dict, 98 'reverse_vlan_id_mac_addr': TestAddress._full_dict, 99 'reverse_passive_os_fingerprinting': { 100 'description': u("The best platform"), 101 'identifier': [ 102 u("value1") 103 ] 104 }, 105 'reverse_first_packet': u('00'), 106 'reverse_n_bytes_payload': u('EC'), 107 } 108 109 110class TestYAFRecord(EntityTestCase, unittest.TestCase): 111 klass = YAFRecord 112 113 _full_dict = { 114 'flow': TestYAFFlow._full_dict, 115 'reverse_flow': TestYAFReverseFlow._full_dict, 116 } 117 118 119class TestIPFIXSetHeader(EntityTestCase, unittest.TestCase): 120 klass = IPFIXSetHeader 121 122 _full_dict = { 123 'set_id': 35, 124 'length': 100, 125 } 126 127 128class TestIPFIXDataRecord(EntityTestCase, unittest.TestCase): 129 klass = IPFIXDataRecord 130 131 _full_dict = { 132 'field_value': [ 133 u('value1'), 134 u('value2'), 135 ] 136 } 137 138 139class TestIPFIXMessageHeader(EntityTestCase, unittest.TestCase): 140 klass = IPFIXMessageHeader 141 142 _full_dict = { 143 'version': u('A0'), 144 'byte_length': u('FEAC'), 145 'export_timestamp': 1582060823, 146 'sequence_number': 19382, 147 'observation_domain_id': 245, 148 } 149 150 151class TestIPFIXTemplateRecordFieldSpecifiers(EntityTestCase, unittest.TestCase): 152 klass = IPFIXTemplateRecordFieldSpecifiers 153 154 _full_dict = { 155 'enterprise_bit': True, 156 'information_element_id': 'Something', 157 'field_length': '128395', 158 'enterprise_number': '1.3.6.1.4.1', 159 } 160 161 162class TestIPFIXTemplateRecordHeader(EntityTestCase, unittest.TestCase): 163 klass = IPFIXTemplateRecordHeader 164 165 _full_dict = { 166 'template_id': 556, 167 'field_count': 'FF452' 168 } 169 170 171class TestIPFIXOptionsTemplateRecordHeader(EntityTestCase, unittest.TestCase): 172 klass = IPFIXOptionsTemplateRecordHeader 173 174 _full_dict = { 175 'template_id': 160, 176 'field_count': 'BF45', 177 'scope_field_count': 15, 178 } 179 180 181class TestIPFIXDataSet(EntityTestCase, unittest.TestCase): 182 klass = IPFIXDataSet 183 184 _full_dict = { 185 'set_header': TestIPFIXSetHeader._full_dict, 186 'data_record': [ 187 TestIPFIXDataRecord._full_dict 188 ], 189 'padding': u('0C'), 190 } 191 192 193class TestIPFIXOptionsTemplateRecord(EntityTestCase, unittest.TestCase): 194 klass = IPFIXOptionsTemplateRecord 195 196 _full_dict = { 197 'options_template_record_header': TestIPFIXOptionsTemplateRecordHeader._full_dict, 198 'field_specifier': [ 199 TestIPFIXTemplateRecordFieldSpecifiers._full_dict 200 ] 201 } 202 203 204class TestIPFIXOptionsTemplateSet(EntityTestCase, unittest.TestCase): 205 klass = IPFIXOptionsTemplateSet 206 207 _full_dict = { 208 'set_header': TestIPFIXSetHeader._full_dict, 209 'options_template_record': [ 210 TestIPFIXOptionsTemplateRecord._full_dict 211 ], 212 'padding': u('C0') 213 } 214 215 216class TestIPFIXTemplateRecord(EntityTestCase, unittest.TestCase): 217 klass = IPFIXTemplateRecord 218 219 _full_dict = { 220 'template_record_header': TestIPFIXTemplateRecordHeader._full_dict, 221 'field_specifier': [ 222 TestIPFIXTemplateRecordFieldSpecifiers._full_dict 223 ] 224 } 225 226 227class TestIPFIXTemplateSet(EntityTestCase, unittest.TestCase): 228 klass = IPFIXTemplateSet 229 230 _full_dict = { 231 'set_header': TestIPFIXSetHeader._full_dict, 232 'template_record': [ 233 TestIPFIXTemplateRecord._full_dict 234 ], 235 'padding': u('FE') 236 } 237 238 239class TestIPFIXSet(EntityTestCase, unittest.TestCase): 240 klass = IPFIXSet 241 242 _full_dict = { 243 'template_set': TestIPFIXTemplateSet._full_dict, 244 'options_template_set': TestIPFIXOptionsTemplateSet._full_dict, 245 'data_set': TestIPFIXDataSet._full_dict, 246 } 247 248 249class TestIPFIXMessage(EntityTestCase, unittest.TestCase): 250 klass = IPFIXMessage 251 252 _full_dict = { 253 'message_header': TestIPFIXMessageHeader._full_dict, 254 'set': [ 255 TestIPFIXSet._full_dict 256 ] 257 } 258 259 260class TestNetflowV5FlowRecord(EntityTestCase, unittest.TestCase): 261 klass = NetflowV5FlowRecord 262 263 _full_dict = { 264 'nexthop_ipv4_addr': TestAddress._full_dict, 265 'packet_count': 50, 266 'byte_count': 123448, 267 'sysuptime_start': 1582060823, 268 'sysuptime_end': 1582060824, 269 'padding1': u('0'), 270 'tcp_flags': u('02'), 271 'src_autonomous_system': 2, 272 'dest_autonomous_system': 1, 273 'src_ip_mask_bit_count': u('0'), 274 'dest_ip_mask_bit_count': u('0'), 275 'padding2': u('0'), 276 } 277 278 279class TestNetflowV5FlowHeader(EntityTestCase, unittest.TestCase): 280 klass = NetflowV5FlowHeader 281 282 _full_dict = { 283 'version': u('05'), 284 'count': 2, 285 'sys_up_time': 1582060823, 286 'unix_secs': 345, 287 'unix_nsecs': 348, 288 'flow_sequence': 55, 289 'engine_type': u('Some Engine'), 290 'engine_id': 4523, 291 'sampling_interval': u('CE'), 292 } 293 294 295class TestNetflowV5Packet(EntityTestCase, unittest.TestCase): 296 klass = NetflowV5Packet 297 298 _full_dict = { 299 'flow_header': TestNetflowV5FlowHeader._full_dict, 300 'flow_record': [ 301 TestNetflowV5FlowRecord._full_dict 302 ] 303 } 304 305 306class TestOptionCollectionElement(EntityTestCase, unittest.TestCase): 307 klass = OptionCollectionElement 308 309 _full_dict = { 310 'option_record_field_value': [ 311 u('Some Value2') 312 ] 313 } 314 315 316class TestFlowCollectionElement(EntityTestCase, unittest.TestCase): 317 klass = FlowCollectionElement 318 319 _full_dict = { 320 'flow_record_field_value': [ 321 u('Some String') 322 ] 323 } 324 325 326class TestFlowDataRecord(EntityTestCase, unittest.TestCase): 327 klass = FlowDataRecord 328 329 _full_dict = { 330 'flow_record_collection_element': [ 331 TestFlowCollectionElement._full_dict 332 ] 333 } 334 335 336class TestOptionsDataRecord(EntityTestCase, unittest.TestCase): 337 klass = OptionsDataRecord 338 339 _full_dict = { 340 'scope_field_value': u('Some Value'), 341 'option_record_collection_element': [ 342 TestOptionCollectionElement._full_dict 343 ] 344 } 345 346 347class TestNetflowV9PacketHeader(EntityTestCase, unittest.TestCase): 348 klass = NetflowV9PacketHeader 349 350 _full_dict = { 351 'version': u('09'), 352 'record_count': 20, 353 'sys_up_time': 1582060823, 354 'unix_secs': 34, 355 'sequence_number': 553242, 356 'source_id': u('312E'), 357 } 358 359 360class TestNetflowV9DataRecord(EntityTestCase, unittest.TestCase): 361 klass = NetflowV9DataRecord 362 363 _full_dict = { 364 'flow_data_record': [ 365 TestFlowDataRecord._full_dict 366 ], 367 'options_data_record': [ 368 TestOptionsDataRecord._full_dict 369 ], 370 } 371 372 373class TestNetflowV9DataFlowSet(EntityTestCase, unittest.TestCase): 374 klass = NetflowV9DataFlowSet 375 376 _full_dict = { 377 'flow_set_id_template_id': 2, 378 'length': 10, 379 'data_record': [ 380 TestNetflowV9DataRecord._full_dict 381 ], 382 'padding': u('DE'), 383 } 384 385 386class TestNetflowV9TemplateRecord(EntityTestCase, unittest.TestCase): 387 klass = NetflowV9TemplateRecord 388 389 _full_dict = { 390 'template_id': 2345, 391 'field_count': 110, 392 'field_type': u(NetflowV9Field.TERM_IN_BYTES), 393 'field_length': u('FF'), 394 } 395 396 397class TestNetflowV9TemplateFlowSet(EntityTestCase, unittest.TestCase): 398 klass = NetflowV9TemplateFlowSet 399 400 _full_dict = { 401 'flow_set_id': u('AA'), 402 'length': 100, 403 'template_record': [ 404 TestNetflowV9TemplateRecord._full_dict 405 ], 406 } 407 408 409class TestNetflowV9OptionsTemplateRecord(EntityTestCase, unittest.TestCase): 410 klass = NetflowV9OptionsTemplateRecord 411 412 _full_dict = { 413 'template_id': 341, 414 'option_scope_length': u('A4'), 415 'option_length': u('2'), 416 'scope_field_type': u('Scope Type'), 417 'scope_field_length': u('E0'), 418 'option_field_type': u('IN_BYTES(1)'), 419 'option_field_length': u('B0'), 420 } 421 422 423class TestNetflowV9OptionsTemplateFlowSet(EntityTestCase, unittest.TestCase): 424 klass = NetflowV9OptionsTemplateFlowSet 425 426 _full_dict = { 427 'flow_set_id': u('01'), 428 'length': 10, 429 'options_template_record': [ 430 TestNetflowV9OptionsTemplateRecord._full_dict 431 ], 432 'padding': u('AB'), 433 } 434 435 436class TestNetflowV9FlowSet(EntityTestCase, unittest.TestCase): 437 klass = NetflowV9FlowSet 438 439 _full_dict = { 440 'template_flow_set': TestNetflowV9TemplateFlowSet._full_dict, 441 'options_template_flow_set': TestNetflowV9OptionsTemplateFlowSet._full_dict, 442 'data_flow_set': TestNetflowV9DataFlowSet._full_dict, 443 } 444 445 446class TestNetflowV9ExportPacket(EntityTestCase, unittest.TestCase): 447 klass = NetflowV9ExportPacket 448 449 _full_dict = { 450 'packet_header': TestNetflowV9PacketHeader._full_dict, 451 'flow_set': [ 452 TestNetflowV9FlowSet._full_dict 453 ] 454 } 455 456 457class TestSiLKSensorInfo(EntityTestCase, unittest.TestCase): 458 klass = SiLKSensorInfo 459 460 _full_dict = { 461 'sensor_id': u('2349'), 462 'class': u(SiLKSensorClass.TERM_ALL), 463 'type': u(SiLKSensorDirection.TERM_IN), 464 } 465 466 467class TestSiLKRecord(EntityTestCase, unittest.TestCase): 468 klass = SiLKRecord 469 470 _full_dict = { 471 'packet_count': 55, 472 'byte_count': 12800, 473 'tcp_flags': u('B9'), 474 'start_time': 1582060823, 475 'duration': 100, 476 'end_time': 1582060923, 477 'sensor_info': TestSiLKSensorInfo._full_dict, 478 'icmp_type': 6, 479 'icmp_code': 2, 480 'router_next_hop_ip': TestAddress._full_dict, 481 'initial_tcp_flags': TestTCPFlags._full_dict, 482 'session_tcp_flags': u('3'), 483 'flow_attributes': u(SiLKFlowAttributes.TERM_F), 484 'flow_application': u('My Flow Application'), 485 'src_ip_type': u(SiLKAddress.TERM_INTERNAL), 486 'dest_ip_type': u(SiLKAddress.TERM_EXTERNAL), 487 'src_country_code': u('US'), 488 'dest_country_code': u('US'), 489 'src_mapname': u('some value1'), 490 'dest_mapname': u('some value2'), 491 } 492 493 494class TestUnidirectionalRecord(EntityTestCase, unittest.TestCase): 495 klass = UnidirectionalRecord 496 497 _full_dict = { 498 'ipfix_message': TestIPFIXMessage._full_dict, 499 'netflowv9_export_packet': TestNetflowV9ExportPacket._full_dict, 500 'netflowv5_packet': TestNetflowV5Packet._full_dict, 501 'silk_record': TestSiLKRecord._full_dict, 502 } 503 504 505class TestBidirectionalRecord(EntityTestCase, unittest.TestCase): 506 klass = BidirectionalRecord 507 508 _full_dict = { 509 'yaf_record': TestYAFRecord._full_dict 510 } 511 512 513class TestNetworkFlow(ObjectTestCase, unittest.TestCase): 514 object_type = "NetworkFlowObjectType" 515 klass = NetworkFlow 516 517 _full_dict = { 518 'network_flow_label': TestNetworkFlowLabel._full_dict, 519 'unidirectional_flow_record': TestUnidirectionalRecord._full_dict, 520 'bidirectional_flow_record': TestBidirectionalRecord._full_dict, 521 'xsi:type': object_type, 522 } 523 524 525if __name__ == "__main__": 526 unittest.main() 527