1# 2# Wireshark tests 3# By Gerald Combs <gerald@wireshark.org> 4# 5# Ported from a set of Bash scripts which were copyright 2005 Ulf Lamping 6# 7# SPDX-License-Identifier: GPL-2.0-or-later 8# 9'''File format conversion tests''' 10 11import os.path 12import subprocesstest 13import unittest 14import fixtures 15 16# XXX Currently unused. It would be nice to be able to use this below. 17time_output_args = ('-Tfields', '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta') 18 19# Microsecond pcap, direct read was used to generate the baseline: 20# tshark -Tfields -e frame.number -e frame.time_epoch -e frame.time_delta \ 21# -r captures/dhcp.pcap > baseline/ff-ts-usec-pcap-direct.txt 22baseline_file = 'ff-ts-usec-pcap-direct.txt' 23 24 25@fixtures.fixture(scope='session') 26def fileformats_baseline_str(dirs): 27 with open(os.path.join(dirs.baseline_dir, baseline_file), 'r') as f: 28 return f.read() 29 30 31@fixtures.mark_usefixtures('test_env') 32@fixtures.uses_fixtures 33class case_fileformat_pcap(subprocesstest.SubprocessTestCase): 34 def test_pcap_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): 35 '''Microsecond pcap direct vs microsecond pcap stdin''' 36 capture_proc = self.assertRun(' '.join((cmd_tshark, 37 '-r', '-', 38 '-Tfields', 39 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', 40 '<', capture_file('dhcp.pcap') 41 )), 42 shell=True) 43 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 44 45 def test_pcap_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): 46 '''Microsecond pcap direct vs nanosecond pcap stdin''' 47 capture_proc = self.assertRun(' '.join((cmd_tshark, 48 '-r', '-', 49 '-Tfields', 50 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', 51 '<', capture_file('dhcp-nanosecond.pcap') 52 )), 53 shell=True) 54 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 55 56 def test_pcap_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str): 57 '''Microsecond pcap direct vs nanosecond pcap direct''' 58 capture_proc = self.assertRun((cmd_tshark, 59 '-r', capture_file('dhcp-nanosecond.pcap'), 60 '-Tfields', 61 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', 62 ), 63 ) 64 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 65 66 67@fixtures.mark_usefixtures('test_env') 68@fixtures.uses_fixtures 69class case_fileformat_pcapng(subprocesstest.SubprocessTestCase): 70 def test_pcapng_usec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): 71 '''Microsecond pcap direct vs microsecond pcapng stdin''' 72 capture_proc = self.assertRun(' '.join((cmd_tshark, 73 '-r', '-', 74 '-Tfields', 75 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta' 76 '<', capture_file('dhcp.pcapng') 77 )), 78 shell=True) 79 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 80 81 def test_pcapng_usec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str): 82 '''Microsecond pcap direct vs microsecond pcapng direct''' 83 capture_proc = self.assertRun((cmd_tshark, 84 '-r', capture_file('dhcp.pcapng'), 85 '-Tfields', 86 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', 87 ), 88 ) 89 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 90 91 def test_pcapng_nsec_stdin(self, cmd_tshark, capture_file, fileformats_baseline_str): 92 '''Microsecond pcap direct vs nanosecond pcapng stdin''' 93 capture_proc = self.assertRun(' '.join((cmd_tshark, 94 '-r', '-', 95 '-Tfields', 96 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta' 97 '<', capture_file('dhcp-nanosecond.pcapng') 98 )), 99 shell=True) 100 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 101 102 def test_pcapng_nsec_direct(self, cmd_tshark, capture_file, fileformats_baseline_str): 103 '''Microsecond pcap direct vs nanosecond pcapng direct''' 104 capture_proc = self.assertRun((cmd_tshark, 105 '-r', capture_file('dhcp-nanosecond.pcapng'), 106 '-Tfields', 107 '-e', 'frame.number', '-e', 'frame.time_epoch', '-e', 'frame.time_delta', 108 ), 109 ) 110 self.assertTrue(self.diffOutput(capture_proc.stdout_str, fileformats_baseline_str, 'tshark', baseline_file)) 111 112@fixtures.fixture 113def check_pcapng_dsb_fields(request, cmd_tshark): 114 '''Factory that checks whether the DSB within the capture file matches.''' 115 self = request.instance 116 def check_dsb_fields_real(outfile, fields): 117 proc = self.assertRun((cmd_tshark, 118 '-r', outfile, 119 '-Xread_format:MIME Files Format', 120 '-Tfields', 121 '-e', 'pcapng.dsb.secrets_type', 122 '-e', 'pcapng.dsb.secrets_length', 123 '-e', 'pcapng.dsb.secrets_data', 124 '-Y', 'pcapng.dsb.secrets_data' 125 )) 126 # Convert "t1,t2 l1,l2 v1,2" -> [(t1, l1, v1), (t2, l2, v2)] 127 output = proc.stdout_str.strip() 128 actual = list(zip(*[x.split(",") for x in output.split('\t')])) 129 def format_field(field): 130 t, l, v = field 131 v_hex = ''.join('%02x' % c for c in v) 132 return ('0x%08x' % t, str(l), v_hex) 133 fields = [format_field(field) for field in fields] 134 self.assertEqual(fields, actual) 135 return check_dsb_fields_real 136 137 138@fixtures.mark_usefixtures('base_env') 139@fixtures.uses_fixtures 140class case_fileformat_pcapng_dsb(subprocesstest.SubprocessTestCase): 141 def test_pcapng_dsb_1(self, cmd_tshark, dirs, capture_file, check_pcapng_dsb_fields): 142 '''Check that DSBs are preserved while rewriting files.''' 143 dsb_keys1 = os.path.join(dirs.key_dir, 'tls12-dsb-1.keys') 144 dsb_keys2 = os.path.join(dirs.key_dir, 'tls12-dsb-2.keys') 145 outfile = self.filename_from_id('tls12-dsb-same.pcapng') 146 self.assertRun((cmd_tshark, 147 '-r', capture_file('tls12-dsb.pcapng'), 148 '-w', outfile, 149 )) 150 with open(dsb_keys1, 'r') as f: 151 dsb1_contents = f.read().encode('utf8') 152 with open(dsb_keys2, 'r') as f: 153 dsb2_contents = f.read().encode('utf8') 154 check_pcapng_dsb_fields(outfile, ( 155 (0x544c534b, len(dsb1_contents), dsb1_contents), 156 (0x544c534b, len(dsb2_contents), dsb2_contents), 157 )) 158 159 def test_pcapng_dsb_2(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields): 160 '''Insert a single DSB into a pcapng file.''' 161 key_file = os.path.join(dirs.key_dir, 'dhe1_keylog.dat') 162 outfile = self.filename_from_id('dhe1-dsb.pcapng') 163 self.assertRun((cmd_editcap, 164 '--inject-secrets', 'tls,%s' % key_file, 165 capture_file('dhe1.pcapng.gz'), outfile 166 )) 167 with open(key_file, 'rb') as f: 168 keylog_contents = f.read() 169 check_pcapng_dsb_fields(outfile, ( 170 (0x544c534b, len(keylog_contents), keylog_contents), 171 )) 172 173 def test_pcapng_dsb_3(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields): 174 '''Insert two DSBs into a pcapng file.''' 175 key_file1 = os.path.join(dirs.key_dir, 'dhe1_keylog.dat') 176 key_file2 = os.path.join(dirs.key_dir, 'http2-data-reassembly.keys') 177 outfile = self.filename_from_id('dhe1-dsb.pcapng') 178 self.assertRun((cmd_editcap, 179 '--inject-secrets', 'tls,%s' % key_file1, 180 '--inject-secrets', 'tls,%s' % key_file2, 181 capture_file('dhe1.pcapng.gz'), outfile 182 )) 183 with open(key_file1, 'rb') as f: 184 keylog1_contents = f.read() 185 with open(key_file2, 'rb') as f: 186 keylog2_contents = f.read() 187 check_pcapng_dsb_fields(outfile, ( 188 (0x544c534b, len(keylog1_contents), keylog1_contents), 189 (0x544c534b, len(keylog2_contents), keylog2_contents), 190 )) 191 192 def test_pcapng_dsb_4(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields): 193 '''Insert a single DSB into a pcapng file with existing DSBs.''' 194 dsb_keys1 = os.path.join(dirs.key_dir, 'tls12-dsb-1.keys') 195 dsb_keys2 = os.path.join(dirs.key_dir, 'tls12-dsb-2.keys') 196 key_file = os.path.join(dirs.key_dir, 'dhe1_keylog.dat') 197 outfile = self.filename_from_id('tls12-dsb-extra.pcapng') 198 self.assertRun((cmd_editcap, 199 '--inject-secrets', 'tls,%s' % key_file, 200 capture_file('tls12-dsb.pcapng'), outfile 201 )) 202 with open(dsb_keys1, 'r') as f: 203 dsb1_contents = f.read().encode('utf8') 204 with open(dsb_keys2, 'r') as f: 205 dsb2_contents = f.read().encode('utf8') 206 with open(key_file, 'rb') as f: 207 keylog_contents = f.read() 208 # New DSBs are inserted before the first record. Due to the current 209 # implementation, this is inserted before other (existing) DSBs. This 210 # might change in the future if it is deemed more logical. 211 check_pcapng_dsb_fields(outfile, ( 212 (0x544c534b, len(keylog_contents), keylog_contents), 213 (0x544c534b, len(dsb1_contents), dsb1_contents), 214 (0x544c534b, len(dsb2_contents), dsb2_contents), 215 )) 216 217 def test_pcapng_dsb_bad_key(self, cmd_editcap, dirs, capture_file, check_pcapng_dsb_fields): 218 '''Insertion of a RSA key file is not very effective.''' 219 rsa_keyfile = os.path.join(dirs.key_dir, 'rsasnakeoil2.key') 220 p12_keyfile = os.path.join(dirs.key_dir, 'key.p12') 221 outfile = self.filename_from_id('rsasnakeoil2-dsb.pcapng') 222 proc = self.assertRun((cmd_editcap, 223 '--inject-secrets', 'tls,%s' % rsa_keyfile, 224 '--inject-secrets', 'tls,%s' % p12_keyfile, 225 capture_file('rsasnakeoil2.pcap'), outfile 226 )) 227 self.assertEqual(proc.stderr_str.count('unsupported private key file'), 2) 228 with open(rsa_keyfile, 'rb') as f: 229 dsb1_contents = f.read() 230 with open(p12_keyfile, 'rb') as f: 231 dsb2_contents = f.read() 232 check_pcapng_dsb_fields(outfile, ( 233 (0x544c534b, len(dsb1_contents), dsb1_contents), 234 (0x544c534b, len(dsb2_contents), dsb2_contents), 235 )) 236 237 238@fixtures.mark_usefixtures('test_env') 239@fixtures.uses_fixtures 240class case_fileformat_mime(subprocesstest.SubprocessTestCase): 241 def test_mime_pcapng_gz(self, cmd_tshark, capture_file): 242 '''Test that the full uncompressed contents is shown.''' 243 proc = self.assertRun((cmd_tshark, 244 '-r', capture_file('icmp.pcapng.gz'), 245 '-Xread_format:MIME Files Format', 246 '-Tfields', 247 '-e', 'frame.len', 248 '-e', 'pcapng.block.length', 249 '-e', 'pcapng.block.length_trailer', 250 )) 251 self.assertEqual(proc.stdout_str.strip(), '480\t128,88,132,132\t128,88,132,132') 252