1# $Id: ip6.py 87 2013-03-05 19:41:04Z andrewflnr@gmail.com $ 2# -*- coding: utf-8 -*- 3"""Internet Protocol, version 6.""" 4from __future__ import print_function 5from __future__ import absolute_import 6 7from . import dpkt 8from . import ip 9from . import tcp 10from .compat import compat_ord 11import struct 12 13# The allowed extension headers and their classes (in order according to RFC). 14EXT_HDRS = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP, 15 ip.IP_PROTO_DSTOPTS] 16# EXT_HDRS_CLS - classes is below - after all the used classes are defined. 17 18 19class IP6(dpkt.Packet): 20 """Internet Protocol, version 6. 21 22 TODO: Longer class information.... 23 24 Attributes: 25 __hdr__: Header fields of IPv6. 26 TODO. 27 """ 28 29 __hdr__ = ( 30 ('_v_fc_flow', 'I', 0x60000000), 31 ('plen', 'H', 0), # payload length (not including header) 32 ('nxt', 'B', 0), # next header protocol 33 ('hlim', 'B', 0), # hop limit 34 ('src', '16s', b''), 35 ('dst', '16s', b'') 36 ) 37 _protosw = ip.IP._protosw 38 39 @property 40 def v(self): 41 return self._v_fc_flow >> 28 42 43 @v.setter 44 def v(self, v): 45 self._v_fc_flow = (self._v_fc_flow & ~0xf0000000) | (v << 28) 46 47 @property 48 def fc(self): 49 return (self._v_fc_flow >> 20) & 0xff 50 51 @fc.setter 52 def fc(self, v): 53 self._v_fc_flow = (self._v_fc_flow & ~0xff00000) | (v << 20) 54 55 @property 56 def flow(self): 57 return self._v_fc_flow & 0xfffff 58 59 @flow.setter 60 def flow(self, v): 61 self._v_fc_flow = (self._v_fc_flow & ~0xfffff) | (v & 0xfffff) 62 63 def unpack(self, buf): 64 dpkt.Packet.unpack(self, buf) 65 self.extension_hdrs = {} 66 67 # NOTE: self.extension_hdrs is not accurate, as it doesn't support duplicate header types. 68 # According to RFC-1883 "Each extension header should occur at most once, except for the 69 # Destination Options header which should occur at most twice". 70 # Secondly, the .headers_str() method attempts to pack the extension headers in order as 71 # defined in the RFC, however it doesn't adjust the next header (nxt) pointer accordingly. 72 # Here we introduce the new field .all_extension_headers; it allows duplicate types and 73 # keeps the original order. 74 self.all_extension_headers = [] 75 76 if self.plen: 77 buf = self.data[:self.plen] 78 else: # due to jumbo payload or TSO 79 buf = self.data 80 81 next_ext_hdr = self.nxt 82 83 while next_ext_hdr in EXT_HDRS: 84 ext = EXT_HDRS_CLS[next_ext_hdr](buf) 85 self.extension_hdrs[next_ext_hdr] = ext 86 self.all_extension_headers.append(ext) 87 buf = buf[ext.length:] 88 next_ext_hdr = getattr(ext, 'nxt', None) 89 90 # set the payload protocol id 91 if next_ext_hdr is not None: 92 self.p = next_ext_hdr 93 94 try: 95 self.data = self._protosw[next_ext_hdr](buf) 96 setattr(self, self.data.__class__.__name__.lower(), self.data) 97 except (KeyError, dpkt.UnpackError): 98 self.data = buf 99 100 def headers_str(self): 101 nxt = self.nxt 102 # If all_extension_headers is available, return the headers as they originally appeared 103 if hasattr(self, 'all_extension_headers') and self.all_extension_headers: 104 # get the nxt header from the last one 105 nxt = self.all_extension_headers[-1].nxt 106 return nxt, b''.join(bytes(ext) for ext in self.all_extension_headers) 107 108 # Output extension headers in order defined in RFC1883 (except dest opts) 109 header_str = b"" 110 if hasattr(self, 'extension_hdrs'): 111 for hdr in EXT_HDRS: 112 if hdr in self.extension_hdrs: 113 nxt = self.extension_hdrs[hdr].nxt 114 header_str += bytes(self.extension_hdrs[hdr]) 115 return nxt, header_str 116 117 def __bytes__(self): 118 self.p, hdr_str = self.headers_str() 119 if (self.p == 6 or self.p == 17 or self.p == 58) and not self.data.sum: 120 # XXX - set TCP, UDP, and ICMPv6 checksums 121 p = bytes(self.data) 122 s = struct.pack('>16s16sxBH', self.src, self.dst, self.p, len(p)) 123 s = dpkt.in_cksum_add(0, s) 124 s = dpkt.in_cksum_add(s, p) 125 self.data.sum = dpkt.in_cksum_done(s) 126 127 return self.pack_hdr() + hdr_str + bytes(self.data) 128 129 def __len__(self): 130 baselen = self.__hdr_len__ + len(self.data) 131 if hasattr(self, 'all_extension_headers') and self.all_extension_headers: 132 return baselen + sum(len(hh) for hh in self.all_extension_headers) 133 elif hasattr(self, 'extension_hdrs') and self.extension_hdrs: 134 return baselen + sum(len(hh) for hh in self.extension_hdrs.values()) 135 return baselen 136 137 @classmethod 138 def set_proto(cls, p, pktclass): 139 cls._protosw[p] = pktclass 140 141 @classmethod 142 def get_proto(cls, p): 143 return cls._protosw[p] 144 145 146class IP6ExtensionHeader(dpkt.Packet): 147 """ 148 An extension header is very similar to a 'sub-packet'. 149 We just want to re-use all the hdr unpacking etc. 150 """ 151 pass 152 153 154class IP6OptsHeader(IP6ExtensionHeader): 155 __hdr__ = ( 156 ('nxt', 'B', 0), # next extension header protocol 157 ('len', 'B', 0) # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header 158 ) 159 160 def unpack(self, buf): 161 dpkt.Packet.unpack(self, buf) 162 self.length = (self.len + 1) * 8 163 options = [] 164 165 index = 0 166 167 while index < self.length - 2: 168 try: 169 opt_type = compat_ord(self.data[index]) 170 171 # PAD1 option 172 if opt_type == 0: 173 index += 1 174 continue 175 176 opt_length = compat_ord(self.data[index + 1]) 177 178 if opt_type == 1: # PADN option 179 # PADN uses opt_length bytes in total 180 index += opt_length + 2 181 continue 182 183 options.append({ 184 'type': opt_type, 185 'opt_length': opt_length, 186 'data': self.data[index + 2:index + 2 + opt_length] 187 }) 188 189 # add the two chars and the option_length, to move to the next option 190 index += opt_length + 2 191 192 except IndexError: 193 raise dpkt.NeedData 194 195 self.options = options 196 self.data = buf[2:self.length] # keep raw data with all pad options, but not the following data 197 198 199class IP6HopOptsHeader(IP6OptsHeader): 200 pass 201 202 203class IP6DstOptsHeader(IP6OptsHeader): 204 pass 205 206 207class IP6RoutingHeader(IP6ExtensionHeader): 208 __hdr__ = ( 209 ('nxt', 'B', 0), # next extension header protocol 210 ('len', 'B', 0), # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0) 211 ('type', 'B', 0), # routing type (currently, only 0 is used) 212 ('segs_left', 'B', 0), # remaining segments in route, until destination (<= 23) 213 ('rsvd_sl_bits', 'I', 0), # reserved (1 byte), strict/loose bitmap for addresses 214 ) 215 216 @property 217 def sl_bits(self): 218 return self.rsvd_sl_bits & 0xffffff 219 220 @sl_bits.setter 221 def sl_bits(self, v): 222 self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff) 223 224 def unpack(self, buf): 225 hdr_size = 8 226 addr_size = 16 227 228 dpkt.Packet.unpack(self, buf) 229 230 addresses = [] 231 num_addresses = self.len // 2 232 buf = buf[hdr_size:hdr_size + num_addresses * addr_size] 233 234 for i in range(num_addresses): 235 addresses.append(buf[i * addr_size: i * addr_size + addr_size]) 236 237 self.data = buf 238 self.addresses = addresses 239 self.length = self.len * 8 + 8 240 241 242class IP6FragmentHeader(IP6ExtensionHeader): 243 __hdr__ = ( 244 ('nxt', 'B', 0), # next extension header protocol 245 ('resv', 'B', 0), # reserved, set to 0 246 ('frag_off_resv_m', 'H', 0), # frag offset (13 bits), reserved zero (2 bits), More frags flag 247 ('id', 'I', 0) # fragments id 248 ) 249 250 def unpack(self, buf): 251 dpkt.Packet.unpack(self, buf) 252 self.length = self.__hdr_len__ 253 self.data = b'' 254 255 @property 256 def frag_off(self): 257 return self.frag_off_resv_m >> 3 258 259 @frag_off.setter 260 def frag_off(self, v): 261 self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3) 262 263 @property 264 def m_flag(self): 265 return self.frag_off_resv_m & 1 266 267 @m_flag.setter 268 def m_flag(self, v): 269 self.frag_off_resv_m = (self.frag_off_resv_m & 0xfffe) | (v & 1) 270 271 272class IP6AHHeader(IP6ExtensionHeader): 273 __hdr__ = ( 274 ('nxt', 'B', 0), # next extension header protocol 275 ('len', 'B', 0), # length of header in 4 octet units (ignoring first 2 units) 276 ('resv', 'H', 0), # reserved, 2 bytes of 0 277 ('spi', 'I', 0), # SPI security parameter index 278 ('seq', 'I', 0) # sequence no. 279 ) 280 281 def unpack(self, buf): 282 dpkt.Packet.unpack(self, buf) 283 self.length = (self.len + 2) * 4 284 self.auth_data = self.data[:(self.len - 1) * 4] 285 286 287class IP6ESPHeader(IP6ExtensionHeader): 288 __hdr__ = ( 289 ('spi', 'I', 0), 290 ('seq', 'I', 0) 291 ) 292 293 def unpack(self, buf): 294 dpkt.Packet.unpack(self, buf) 295 self.length = self.__hdr_len__ + len(self.data) 296 297 298EXT_HDRS_CLS = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader, 299 ip.IP_PROTO_ROUTING: IP6RoutingHeader, 300 ip.IP_PROTO_FRAGMENT: IP6FragmentHeader, 301 ip.IP_PROTO_ESP: IP6ESPHeader, 302 ip.IP_PROTO_AH: IP6AHHeader, 303 ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader} 304 305# Unit tests 306 307 308def test_ipg(): 309 s = (b'\x60\x00\x00\x00\x00\x28\x06\x40\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c' 310 b'\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72\xcd\xca\x00\x16' 311 b'\x04\x84\x46\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\x09\x00\x00\x02\x04\x05\xa0\x01\x03' 312 b'\x03\x00\x01\x01\x08\x0a\x7d\x18\x35\x3f\x00\x00\x00\x00') 313 _ip = IP6(s) 314 315 # basic properties 316 assert _ip.v == 6 317 assert _ip.fc == 0 318 assert _ip.flow == 0 319 320 _ip.data.sum = 0 321 s2 = bytes(_ip) 322 assert s == s2 323 324 325def test_dict(): 326 s = (b'\x60\x00\x00\x00\x00\x28\x06\x40\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c' 327 b'\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72\xcd\xca\x00\x16' 328 b'\x04\x84\x46\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\x09\x00\x00\x02\x04\x05\xa0\x01\x03' 329 b'\x03\x00\x01\x01\x08\x0a\x7d\x18\x35\x3f\x00\x00\x00\x00') 330 _ip = IP6(s) 331 d = dict(_ip) 332 333 # basic properties 334 assert d['src'] == b'\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c\x11\xde' 335 assert d['dst'] == b'\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72' 336 337 338def test_ip6_routing_header(): 339 s = (b'\x60\x00\x00\x00\x00\x3c\x2b\x40\x20\x48\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 340 b'\xde\xca\x20\x47\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02' 341 b'\x00\x00\x00\x00\x20\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x20\x22' 342 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00\x50\x00\x00\x00\x00' 343 b'\x00\x00\x00\x00\x50\x02\x20\x00\x91\x7f\x00\x00') 344 _ip = IP6(s) 345 s2 = bytes(_ip) 346 # 43 is Routing header id 347 assert len(_ip.extension_hdrs[43].addresses) == 2 348 assert _ip.tcp 349 assert s == s2 350 351 352def test_ip6_fragment_header(): 353 s = b'\x06\xee\xff\xf9\x00\x00\xff\xff' 354 fh = IP6FragmentHeader(s) 355 # s2 = str(fh) variable 's2' is not used 356 assert fh.nxt == 6 357 assert fh.id == 65535 358 assert fh.frag_off == 8191 359 assert fh.m_flag == 1 360 361 # test packing 362 fh.frag_off_resv_m = 0 363 fh.frag_off = 8191 364 fh.m_flag = 1 365 assert bytes(fh) == s 366 367 # IP6 with fragment header 368 s = (b'\x60\x00\x00\x00\x00\x10\x2c\x00\x02\x22\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 369 b'\x00\x02\x03\x33\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x29\x00\x00\x01' 370 b'\x00\x00\x00\x00\x60\x00\x00\x00\x00\x10\x2c\x00') 371 _ip = IP6(s) 372 assert bytes(_ip) == s 373 374 375def test_ip6_options_header(): 376 s = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 377 b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00') 378 options = IP6OptsHeader(s).options 379 assert len(options) == 3 380 assert bytes(IP6OptsHeader(s)) == s 381 382 383def test_ip6_ah_header(): 384 s = b'\x3b\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 385 ah = IP6AHHeader(s) 386 assert ah.length == 24 387 assert ah.auth_data == b'xxxxxxxx' 388 assert ah.spi == 0x2020202 389 assert ah.seq == 0x1010101 390 assert bytes(ah) == s 391 392 393def test_ip6_esp_header(): 394 s = (b'\x00\x00\x01\x00\x00\x00\x00\x44\xe2\x4f\x9e\x68\xf3\xcd\xb1\x5f\x61\x65\x42\x8b\x78\x0b' 395 b'\x4a\xfd\x13\xf0\x15\x98\xf5\x55\x16\xa8\x12\xb3\xb8\x4d\xbc\x16\xb2\x14\xbe\x3d\xf9\x96' 396 b'\xd4\xa0\x39\x1f\x85\x74\x25\x81\x83\xa6\x0d\x99\xb6\xba\xa3\xcc\xb6\xe0\x9a\x78\xee\xf2' 397 b'\xaf\x9a') 398 esp = IP6ESPHeader(s) 399 assert esp.length == 68 400 assert esp.spi == 256 401 assert bytes(esp) == s 402 403 404def test_ip6_extension_headers(): 405 p = (b'\x60\x00\x00\x00\x00\x3c\x2b\x40\x20\x48\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 406 b'\xde\xca\x20\x47\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02' 407 b'\x00\x00\x00\x00\x20\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x20\x22' 408 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00\x50\x00\x00\x00\x00' 409 b'\x00\x00\x00\x00\x50\x02\x20\x00\x91\x7f\x00\x00') 410 _ip = IP6(p) 411 o = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 412 b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00') 413 _ip.extension_hdrs[0] = IP6HopOptsHeader(o) 414 fh = b'\x06\xee\xff\xfb\x00\x00\xff\xff' 415 _ip.extension_hdrs[44] = IP6FragmentHeader(fh) 416 ah = b'\x3b\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78' 417 _ip.extension_hdrs[51] = IP6AHHeader(ah) 418 do = b'\x3b\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 419 _ip.extension_hdrs[60] = IP6DstOptsHeader(do) 420 assert len(_ip.extension_hdrs) == 5 421 422 # this is a legacy unit test predating the addition of .all_extension_headers 423 # this way of adding extension headers does not update .all_extension_headers 424 # so we need to kick .all_extension_headers to force the __len__() method pick up 425 # the updated legacy attribute and calculate the len correctly 426 del _ip.all_extension_headers 427 assert len(_ip) == len(p) + len(o) + len(fh) + len(ah) + len(do) 428 429 430def test_ip6_all_extension_headers(): # https://github.com/kbandla/dpkt/pull/403 431 s = (b'\x60\x00\x00\x00\x00\x47\x3c\x40\xfe\xd0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01' 432 b'\x00\x02\xfe\xd0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x3c\x00\x01\x04' 433 b'\x00\x00\x00\x00\x3c\x00\x01\x04\x00\x00\x00\x00\x2c\x00\x01\x04\x00\x00\x00\x00\x2c\x00' 434 b'\x00\x00\x00\x00\x00\x00\x3c\x00\x00\x00\x00\x00\x00\x00\x2c\x00\x01\x04\x00\x00\x00\x00' 435 b'\x3a\x00\x00\x00\x00\x00\x00\x00\x80\x00\xd8\xe5\x0c\x1a\x00\x00\x50\x61\x79\x4c\x6f\x61' 436 b'\x64') 437 _ip = IP6(s) 438 assert _ip.p == 58 # ICMPv6 439 hdrs = _ip.all_extension_headers 440 assert len(hdrs) == 7 441 assert isinstance(hdrs[0], IP6DstOptsHeader) 442 assert isinstance(hdrs[3], IP6FragmentHeader) 443 assert isinstance(hdrs[5], IP6DstOptsHeader) 444 assert bytes(_ip) == s 445 assert len(_ip) == len(s) 446 447 448def test_ip6_gen_tcp_ack(): 449 t = tcp.TCP() 450 t.win = 8192 451 t.dport = 80 452 t.sport = 4711 453 t.flags = tcp.TH_ACK 454 t.seq = 22 455 t.ack = 33 456 ipp = IP6() 457 ipp.src = b'\xfd\x00\x00\x00\x00\x00\x00\x00\xc8\xba\x88\x88\x00\xaa\xbb\x01' 458 ipp.dst = b'\x00d\xff\x9b\x00\x00\x00\x00\x00\x00\x00\x00\xc1\n@*' 459 ipp.hlim = 64 460 ipp.nxt = ip.IP_PROTO_TCP 461 ipp.data = t 462 ipp.plen = ipp.data.ulen = len(ipp.data) 463 464 assert len(bytes(ipp)) == 60 465 assert ipp.p == ip.IP_PROTO_TCP 466 467 # Second part of testing - with ext headers. 468 ipp.p = 0 469 o = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 470 b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00') 471 ipp.extension_hdrs = {} 472 ipp.extension_hdrs[0] = IP6HopOptsHeader(o) 473 ipp.extension_hdrs[0].nxt = ip.IP_PROTO_TCP 474 ipp.nxt = ip.proto = ip.IP_PROTO_HOPOPTS 475 _p, exthdrs = ipp.headers_str() 476 ipp.plen = len(exthdrs) + len(ipp.data) 477 478 assert bytes(ipp) 479 480 assert ipp.p == ip.IP_PROTO_TCP 481 assert ipp.nxt == ip.IP_PROTO_HOPOPTS 482 483 484def test_ip6_opts(): 485 import pytest 486 # https://github.com/kbandla/dpkt/issues/477 487 s = (b'\x52\x54\x00\xf3\x83\x6f\x52\x54\x00\x86\x33\xd9\x86\xdd\x60\x00\x00\x00\x05\x08\x3a\xff' 488 b'\xfd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\xfd\x00\x00\x00\x00\x00' 489 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x00\xd2\xf3\x00\x00\x05\x00\x00\x00\x00\x00' 490 b'\x00\x00\x00\x00\x00\x01\xfd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02' 491 b'\x00\x50\xd4\x34\x1a\x48\x24\x50\x6d\x8d\xb3\xc2\x80\x10\x01\xf6\x46\xe8\x00\x00\x01\x01' 492 b'\x08\x0a\xd7\x9d\x6b\x8a\x3a\xd1\xf4\x58\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61' 493 b'\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61' 494 b'\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61' 495 b'\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x0a') 496 497 from dpkt.ethernet import Ethernet 498 499 assert Ethernet(s) 500 assert Ethernet(s).ip6 501 assert Ethernet(s).ip6.icmp6 502 assert Ethernet(s).ip6.icmp6.data 503 504 with pytest.raises(dpkt.NeedData): 505 IP6(Ethernet(s).ip6.icmp6.data) # should raise NeedData 506 507 from binascii import unhexlify 508 buf_ip6_opts = unhexlify( 509 '00' # nxt 510 '00' # len 511 512 '000000000000' # only padding 513 ) 514 ip6opt = IP6OptsHeader(buf_ip6_opts) 515 assert ip6opt.options == [] 516 assert ip6opt.data == b'\x00' * 6 517 518 519def test_ip6_routing_properties(): 520 ip6rh = IP6RoutingHeader() 521 assert ip6rh.sl_bits == 0 522 ip6rh.sl_bits = 1024 523 assert ip6rh.sl_bits == 1024 524 525 526def test_ip6_fragment_properties(): 527 ip6fh = IP6FragmentHeader() 528 assert ip6fh.frag_off == 0 529 ip6fh.frag_off = 12345 530 assert ip6fh.frag_off == 12345 531 532 assert ip6fh.m_flag == 0 533 ip6fh.m_flag = 1 534 assert ip6fh.m_flag == 1 535 536 537def test_ip6_properties(): 538 ip6 = IP6() 539 540 assert ip6.v == 6 541 ip6.v = 10 542 assert ip6.v == 10 543 544 assert ip6.fc == 0 545 ip6.fc = 5 546 assert ip6.fc == 5 547 548 assert ip6.flow == 0 549 ip6.flow = 4 550 assert ip6.flow == 4 551 552 553def test_proto_accessors(): 554 class Proto: 555 pass 556 557 assert 'PROTO' not in IP6._protosw 558 IP6.set_proto('PROTO', Proto) 559 assert IP6.get_proto('PROTO') == Proto 560