1# $Id: ip6.py 87 2013-03-05 19:41:04Z andrewflnr@gmail.com $
2# -*- coding: utf-8 -*-
3"""Internet Protocol, version 6."""
4from __future__ import print_function
5from __future__ import absolute_import
6
7from . import dpkt
8from . import ip
9from . import tcp
10from .compat import compat_ord
11import struct
12
13# The allowed extension headers and their classes (in order according to RFC).
14EXT_HDRS = [ip.IP_PROTO_HOPOPTS, ip.IP_PROTO_ROUTING, ip.IP_PROTO_FRAGMENT, ip.IP_PROTO_AH, ip.IP_PROTO_ESP,
15            ip.IP_PROTO_DSTOPTS]
16# EXT_HDRS_CLS - classes is below - after all the used classes are defined.
17
18
19class IP6(dpkt.Packet):
20    """Internet Protocol, version 6.
21
22    TODO: Longer class information....
23
24    Attributes:
25        __hdr__: Header fields of IPv6.
26        TODO.
27    """
28
29    __hdr__ = (
30        ('_v_fc_flow', 'I', 0x60000000),
31        ('plen', 'H', 0),  # payload length (not including header)
32        ('nxt', 'B', 0),  # next header protocol
33        ('hlim', 'B', 0),  # hop limit
34        ('src', '16s', b''),
35        ('dst', '16s', b'')
36    )
37    _protosw = ip.IP._protosw
38
39    @property
40    def v(self):
41        return self._v_fc_flow >> 28
42
43    @v.setter
44    def v(self, v):
45        self._v_fc_flow = (self._v_fc_flow & ~0xf0000000) | (v << 28)
46
47    @property
48    def fc(self):
49        return (self._v_fc_flow >> 20) & 0xff
50
51    @fc.setter
52    def fc(self, v):
53        self._v_fc_flow = (self._v_fc_flow & ~0xff00000) | (v << 20)
54
55    @property
56    def flow(self):
57        return self._v_fc_flow & 0xfffff
58
59    @flow.setter
60    def flow(self, v):
61        self._v_fc_flow = (self._v_fc_flow & ~0xfffff) | (v & 0xfffff)
62
63    def unpack(self, buf):
64        dpkt.Packet.unpack(self, buf)
65        self.extension_hdrs = {}
66
67        # NOTE: self.extension_hdrs is not accurate, as it doesn't support duplicate header types.
68        # According to RFC-1883 "Each extension header should occur at most once, except for the
69        # Destination Options header which should occur at most twice".
70        # Secondly, the .headers_str() method attempts to pack the extension headers in order as
71        # defined in the RFC, however it doesn't adjust the next header (nxt) pointer accordingly.
72        # Here we introduce the new field .all_extension_headers; it allows duplicate types and
73        # keeps the original order.
74        self.all_extension_headers = []
75
76        if self.plen:
77            buf = self.data[:self.plen]
78        else:  # due to jumbo payload or TSO
79            buf = self.data
80
81        next_ext_hdr = self.nxt
82
83        while next_ext_hdr in EXT_HDRS:
84            ext = EXT_HDRS_CLS[next_ext_hdr](buf)
85            self.extension_hdrs[next_ext_hdr] = ext
86            self.all_extension_headers.append(ext)
87            buf = buf[ext.length:]
88            next_ext_hdr = getattr(ext, 'nxt', None)
89
90        # set the payload protocol id
91        if next_ext_hdr is not None:
92            self.p = next_ext_hdr
93
94        try:
95            self.data = self._protosw[next_ext_hdr](buf)
96            setattr(self, self.data.__class__.__name__.lower(), self.data)
97        except (KeyError, dpkt.UnpackError):
98            self.data = buf
99
100    def headers_str(self):
101        nxt = self.nxt
102        # If all_extension_headers is available, return the headers as they originally appeared
103        if hasattr(self, 'all_extension_headers') and self.all_extension_headers:
104            # get the nxt header from the last one
105            nxt = self.all_extension_headers[-1].nxt
106            return nxt, b''.join(bytes(ext) for ext in self.all_extension_headers)
107
108        # Output extension headers in order defined in RFC1883 (except dest opts)
109        header_str = b""
110        if hasattr(self, 'extension_hdrs'):
111            for hdr in EXT_HDRS:
112                if hdr in self.extension_hdrs:
113                    nxt = self.extension_hdrs[hdr].nxt
114                    header_str += bytes(self.extension_hdrs[hdr])
115        return nxt, header_str
116
117    def __bytes__(self):
118        self.p, hdr_str = self.headers_str()
119        if (self.p == 6 or self.p == 17 or self.p == 58) and not self.data.sum:
120            # XXX - set TCP, UDP, and ICMPv6 checksums
121            p = bytes(self.data)
122            s = struct.pack('>16s16sxBH', self.src, self.dst, self.p, len(p))
123            s = dpkt.in_cksum_add(0, s)
124            s = dpkt.in_cksum_add(s, p)
125            self.data.sum = dpkt.in_cksum_done(s)
126
127        return self.pack_hdr() + hdr_str + bytes(self.data)
128
129    def __len__(self):
130        baselen = self.__hdr_len__ + len(self.data)
131        if hasattr(self, 'all_extension_headers') and self.all_extension_headers:
132            return baselen + sum(len(hh) for hh in self.all_extension_headers)
133        elif hasattr(self, 'extension_hdrs') and self.extension_hdrs:
134            return baselen + sum(len(hh) for hh in self.extension_hdrs.values())
135        return baselen
136
137    @classmethod
138    def set_proto(cls, p, pktclass):
139        cls._protosw[p] = pktclass
140
141    @classmethod
142    def get_proto(cls, p):
143        return cls._protosw[p]
144
145
146class IP6ExtensionHeader(dpkt.Packet):
147    """
148    An extension header is very similar to a 'sub-packet'.
149    We just want to re-use all the hdr unpacking etc.
150    """
151    pass
152
153
154class IP6OptsHeader(IP6ExtensionHeader):
155    __hdr__ = (
156        ('nxt', 'B', 0),  # next extension header protocol
157        ('len', 'B', 0)  # option data length in 8 octect units (ignoring first 8 octets) so, len 0 == 64bit header
158    )
159
160    def unpack(self, buf):
161        dpkt.Packet.unpack(self, buf)
162        self.length = (self.len + 1) * 8
163        options = []
164
165        index = 0
166
167        while index < self.length - 2:
168            try:
169                opt_type = compat_ord(self.data[index])
170
171                # PAD1 option
172                if opt_type == 0:
173                    index += 1
174                    continue
175
176                opt_length = compat_ord(self.data[index + 1])
177
178                if opt_type == 1:  # PADN option
179                    # PADN uses opt_length bytes in total
180                    index += opt_length + 2
181                    continue
182
183                options.append({
184                    'type': opt_type,
185                    'opt_length': opt_length,
186                    'data': self.data[index + 2:index + 2 + opt_length]
187                })
188
189                # add the two chars and the option_length, to move to the next option
190                index += opt_length + 2
191
192            except IndexError:
193                raise dpkt.NeedData
194
195        self.options = options
196        self.data = buf[2:self.length]  # keep raw data with all pad options, but not the following data
197
198
199class IP6HopOptsHeader(IP6OptsHeader):
200    pass
201
202
203class IP6DstOptsHeader(IP6OptsHeader):
204    pass
205
206
207class IP6RoutingHeader(IP6ExtensionHeader):
208    __hdr__ = (
209        ('nxt', 'B', 0),  # next extension header protocol
210        ('len', 'B', 0),  # extension data length in 8 octect units (ignoring first 8 octets) (<= 46 for type 0)
211        ('type', 'B', 0),  # routing type (currently, only 0 is used)
212        ('segs_left', 'B', 0),  # remaining segments in route, until destination (<= 23)
213        ('rsvd_sl_bits', 'I', 0),  # reserved (1 byte), strict/loose bitmap for addresses
214    )
215
216    @property
217    def sl_bits(self):
218        return self.rsvd_sl_bits & 0xffffff
219
220    @sl_bits.setter
221    def sl_bits(self, v):
222        self.rsvd_sl_bits = (self.rsvd_sl_bits & ~0xfffff) | (v & 0xfffff)
223
224    def unpack(self, buf):
225        hdr_size = 8
226        addr_size = 16
227
228        dpkt.Packet.unpack(self, buf)
229
230        addresses = []
231        num_addresses = self.len // 2
232        buf = buf[hdr_size:hdr_size + num_addresses * addr_size]
233
234        for i in range(num_addresses):
235            addresses.append(buf[i * addr_size: i * addr_size + addr_size])
236
237        self.data = buf
238        self.addresses = addresses
239        self.length = self.len * 8 + 8
240
241
242class IP6FragmentHeader(IP6ExtensionHeader):
243    __hdr__ = (
244        ('nxt', 'B', 0),  # next extension header protocol
245        ('resv', 'B', 0),  # reserved, set to 0
246        ('frag_off_resv_m', 'H', 0),  # frag offset (13 bits), reserved zero (2 bits), More frags flag
247        ('id', 'I', 0)  # fragments id
248    )
249
250    def unpack(self, buf):
251        dpkt.Packet.unpack(self, buf)
252        self.length = self.__hdr_len__
253        self.data = b''
254
255    @property
256    def frag_off(self):
257        return self.frag_off_resv_m >> 3
258
259    @frag_off.setter
260    def frag_off(self, v):
261        self.frag_off_resv_m = (self.frag_off_resv_m & ~0xfff8) | (v << 3)
262
263    @property
264    def m_flag(self):
265        return self.frag_off_resv_m & 1
266
267    @m_flag.setter
268    def m_flag(self, v):
269        self.frag_off_resv_m = (self.frag_off_resv_m & 0xfffe) | (v & 1)
270
271
272class IP6AHHeader(IP6ExtensionHeader):
273    __hdr__ = (
274        ('nxt', 'B', 0),  # next extension header protocol
275        ('len', 'B', 0),  # length of header in 4 octet units (ignoring first 2 units)
276        ('resv', 'H', 0),  # reserved, 2 bytes of 0
277        ('spi', 'I', 0),  # SPI security parameter index
278        ('seq', 'I', 0)  # sequence no.
279    )
280
281    def unpack(self, buf):
282        dpkt.Packet.unpack(self, buf)
283        self.length = (self.len + 2) * 4
284        self.auth_data = self.data[:(self.len - 1) * 4]
285
286
287class IP6ESPHeader(IP6ExtensionHeader):
288    __hdr__ = (
289        ('spi', 'I', 0),
290        ('seq', 'I', 0)
291    )
292
293    def unpack(self, buf):
294        dpkt.Packet.unpack(self, buf)
295        self.length = self.__hdr_len__ + len(self.data)
296
297
298EXT_HDRS_CLS = {ip.IP_PROTO_HOPOPTS: IP6HopOptsHeader,
299                ip.IP_PROTO_ROUTING: IP6RoutingHeader,
300                ip.IP_PROTO_FRAGMENT: IP6FragmentHeader,
301                ip.IP_PROTO_ESP: IP6ESPHeader,
302                ip.IP_PROTO_AH: IP6AHHeader,
303                ip.IP_PROTO_DSTOPTS: IP6DstOptsHeader}
304
305# Unit tests
306
307
308def test_ipg():
309    s = (b'\x60\x00\x00\x00\x00\x28\x06\x40\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c'
310         b'\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72\xcd\xca\x00\x16'
311         b'\x04\x84\x46\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\x09\x00\x00\x02\x04\x05\xa0\x01\x03'
312         b'\x03\x00\x01\x01\x08\x0a\x7d\x18\x35\x3f\x00\x00\x00\x00')
313    _ip = IP6(s)
314
315    # basic properties
316    assert _ip.v == 6
317    assert _ip.fc == 0
318    assert _ip.flow == 0
319
320    _ip.data.sum = 0
321    s2 = bytes(_ip)
322    assert s == s2
323
324
325def test_dict():
326    s = (b'\x60\x00\x00\x00\x00\x28\x06\x40\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c'
327         b'\x11\xde\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72\xcd\xca\x00\x16'
328         b'\x04\x84\x46\xd5\x00\x00\x00\x00\xa0\x02\xff\xff\xf8\x09\x00\x00\x02\x04\x05\xa0\x01\x03'
329         b'\x03\x00\x01\x01\x08\x0a\x7d\x18\x35\x3f\x00\x00\x00\x00')
330    _ip = IP6(s)
331    d = dict(_ip)
332
333    # basic properties
334    assert d['src'] == b'\xfe\x80\x00\x00\x00\x00\x00\x00\x02\x11\x24\xff\xfe\x8c\x11\xde'
335    assert d['dst'] == b'\xfe\x80\x00\x00\x00\x00\x00\x00\x02\xb0\xd0\xff\xfe\xe1\x80\x72'
336
337
338def test_ip6_routing_header():
339    s = (b'\x60\x00\x00\x00\x00\x3c\x2b\x40\x20\x48\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
340         b'\xde\xca\x20\x47\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02'
341         b'\x00\x00\x00\x00\x20\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x20\x22'
342         b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00\x50\x00\x00\x00\x00'
343         b'\x00\x00\x00\x00\x50\x02\x20\x00\x91\x7f\x00\x00')
344    _ip = IP6(s)
345    s2 = bytes(_ip)
346    # 43 is Routing header id
347    assert len(_ip.extension_hdrs[43].addresses) == 2
348    assert _ip.tcp
349    assert s == s2
350
351
352def test_ip6_fragment_header():
353    s = b'\x06\xee\xff\xf9\x00\x00\xff\xff'
354    fh = IP6FragmentHeader(s)
355    # s2 = str(fh) variable 's2' is not used
356    assert fh.nxt == 6
357    assert fh.id == 65535
358    assert fh.frag_off == 8191
359    assert fh.m_flag == 1
360
361    # test packing
362    fh.frag_off_resv_m = 0
363    fh.frag_off = 8191
364    fh.m_flag = 1
365    assert bytes(fh) == s
366
367    # IP6 with fragment header
368    s = (b'\x60\x00\x00\x00\x00\x10\x2c\x00\x02\x22\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
369         b'\x00\x02\x03\x33\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x03\x29\x00\x00\x01'
370         b'\x00\x00\x00\x00\x60\x00\x00\x00\x00\x10\x2c\x00')
371    _ip = IP6(s)
372    assert bytes(_ip) == s
373
374
375def test_ip6_options_header():
376    s = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
377         b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00')
378    options = IP6OptsHeader(s).options
379    assert len(options) == 3
380    assert bytes(IP6OptsHeader(s)) == s
381
382
383def test_ip6_ah_header():
384    s = b'\x3b\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
385    ah = IP6AHHeader(s)
386    assert ah.length == 24
387    assert ah.auth_data == b'xxxxxxxx'
388    assert ah.spi == 0x2020202
389    assert ah.seq == 0x1010101
390    assert bytes(ah) == s
391
392
393def test_ip6_esp_header():
394    s = (b'\x00\x00\x01\x00\x00\x00\x00\x44\xe2\x4f\x9e\x68\xf3\xcd\xb1\x5f\x61\x65\x42\x8b\x78\x0b'
395         b'\x4a\xfd\x13\xf0\x15\x98\xf5\x55\x16\xa8\x12\xb3\xb8\x4d\xbc\x16\xb2\x14\xbe\x3d\xf9\x96'
396         b'\xd4\xa0\x39\x1f\x85\x74\x25\x81\x83\xa6\x0d\x99\xb6\xba\xa3\xcc\xb6\xe0\x9a\x78\xee\xf2'
397         b'\xaf\x9a')
398    esp = IP6ESPHeader(s)
399    assert esp.length == 68
400    assert esp.spi == 256
401    assert bytes(esp) == s
402
403
404def test_ip6_extension_headers():
405    p = (b'\x60\x00\x00\x00\x00\x3c\x2b\x40\x20\x48\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
406         b'\xde\xca\x20\x47\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xca\xfe\x06\x04\x00\x02'
407         b'\x00\x00\x00\x00\x20\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x20\x22'
408         b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xde\xca\x00\x14\x00\x50\x00\x00\x00\x00'
409         b'\x00\x00\x00\x00\x50\x02\x20\x00\x91\x7f\x00\x00')
410    _ip = IP6(p)
411    o = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
412         b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00')
413    _ip.extension_hdrs[0] = IP6HopOptsHeader(o)
414    fh = b'\x06\xee\xff\xfb\x00\x00\xff\xff'
415    _ip.extension_hdrs[44] = IP6FragmentHeader(fh)
416    ah = b'\x3b\x04\x00\x00\x02\x02\x02\x02\x01\x01\x01\x01\x78\x78\x78\x78\x78\x78\x78\x78'
417    _ip.extension_hdrs[51] = IP6AHHeader(ah)
418    do = b'\x3b\x02\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
419    _ip.extension_hdrs[60] = IP6DstOptsHeader(do)
420    assert len(_ip.extension_hdrs) == 5
421
422    # this is a legacy unit test predating the addition of .all_extension_headers
423    # this way of adding extension headers does not update .all_extension_headers
424    # so we need to kick .all_extension_headers to force the __len__() method pick up
425    # the updated legacy attribute and calculate the len correctly
426    del _ip.all_extension_headers
427    assert len(_ip) == len(p) + len(o) + len(fh) + len(ah) + len(do)
428
429
430def test_ip6_all_extension_headers():  # https://github.com/kbandla/dpkt/pull/403
431    s = (b'\x60\x00\x00\x00\x00\x47\x3c\x40\xfe\xd0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01'
432         b'\x00\x02\xfe\xd0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01\x3c\x00\x01\x04'
433         b'\x00\x00\x00\x00\x3c\x00\x01\x04\x00\x00\x00\x00\x2c\x00\x01\x04\x00\x00\x00\x00\x2c\x00'
434         b'\x00\x00\x00\x00\x00\x00\x3c\x00\x00\x00\x00\x00\x00\x00\x2c\x00\x01\x04\x00\x00\x00\x00'
435         b'\x3a\x00\x00\x00\x00\x00\x00\x00\x80\x00\xd8\xe5\x0c\x1a\x00\x00\x50\x61\x79\x4c\x6f\x61'
436         b'\x64')
437    _ip = IP6(s)
438    assert _ip.p == 58  # ICMPv6
439    hdrs = _ip.all_extension_headers
440    assert len(hdrs) == 7
441    assert isinstance(hdrs[0], IP6DstOptsHeader)
442    assert isinstance(hdrs[3], IP6FragmentHeader)
443    assert isinstance(hdrs[5], IP6DstOptsHeader)
444    assert bytes(_ip) == s
445    assert len(_ip) == len(s)
446
447
448def test_ip6_gen_tcp_ack():
449    t = tcp.TCP()
450    t.win = 8192
451    t.dport = 80
452    t.sport = 4711
453    t.flags = tcp.TH_ACK
454    t.seq = 22
455    t.ack = 33
456    ipp = IP6()
457    ipp.src = b'\xfd\x00\x00\x00\x00\x00\x00\x00\xc8\xba\x88\x88\x00\xaa\xbb\x01'
458    ipp.dst = b'\x00d\xff\x9b\x00\x00\x00\x00\x00\x00\x00\x00\xc1\n@*'
459    ipp.hlim = 64
460    ipp.nxt = ip.IP_PROTO_TCP
461    ipp.data = t
462    ipp.plen = ipp.data.ulen = len(ipp.data)
463
464    assert len(bytes(ipp)) == 60
465    assert ipp.p == ip.IP_PROTO_TCP
466
467    # Second part of testing - with ext headers.
468    ipp.p = 0
469    o = (b'\x3b\x04\x01\x02\x00\x00\xc9\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
470         b'\x00\x00\x01\x00\xc2\x04\x00\x00\x00\x00\x05\x02\x00\x00\x01\x02\x00\x00')
471    ipp.extension_hdrs = {}
472    ipp.extension_hdrs[0] = IP6HopOptsHeader(o)
473    ipp.extension_hdrs[0].nxt = ip.IP_PROTO_TCP
474    ipp.nxt = ip.proto = ip.IP_PROTO_HOPOPTS
475    _p, exthdrs = ipp.headers_str()
476    ipp.plen = len(exthdrs) + len(ipp.data)
477
478    assert bytes(ipp)
479
480    assert ipp.p == ip.IP_PROTO_TCP
481    assert ipp.nxt == ip.IP_PROTO_HOPOPTS
482
483
484def test_ip6_opts():
485    import pytest
486    # https://github.com/kbandla/dpkt/issues/477
487    s = (b'\x52\x54\x00\xf3\x83\x6f\x52\x54\x00\x86\x33\xd9\x86\xdd\x60\x00\x00\x00\x05\x08\x3a\xff'
488         b'\xfd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\xfd\x00\x00\x00\x00\x00'
489         b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x00\xd2\xf3\x00\x00\x05\x00\x00\x00\x00\x00'
490         b'\x00\x00\x00\x00\x00\x01\xfd\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02'
491         b'\x00\x50\xd4\x34\x1a\x48\x24\x50\x6d\x8d\xb3\xc2\x80\x10\x01\xf6\x46\xe8\x00\x00\x01\x01'
492         b'\x08\x0a\xd7\x9d\x6b\x8a\x3a\xd1\xf4\x58\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'
493         b'\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'
494         b'\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61'
495         b'\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x61\x0a')
496
497    from dpkt.ethernet import Ethernet
498
499    assert Ethernet(s)
500    assert Ethernet(s).ip6
501    assert Ethernet(s).ip6.icmp6
502    assert Ethernet(s).ip6.icmp6.data
503
504    with pytest.raises(dpkt.NeedData):
505        IP6(Ethernet(s).ip6.icmp6.data)  # should raise NeedData
506
507    from binascii import unhexlify
508    buf_ip6_opts = unhexlify(
509        '00'  # nxt
510        '00'  # len
511
512        '000000000000'  # only padding
513    )
514    ip6opt = IP6OptsHeader(buf_ip6_opts)
515    assert ip6opt.options == []
516    assert ip6opt.data == b'\x00' * 6
517
518
519def test_ip6_routing_properties():
520    ip6rh = IP6RoutingHeader()
521    assert ip6rh.sl_bits == 0
522    ip6rh.sl_bits = 1024
523    assert ip6rh.sl_bits == 1024
524
525
526def test_ip6_fragment_properties():
527    ip6fh = IP6FragmentHeader()
528    assert ip6fh.frag_off == 0
529    ip6fh.frag_off = 12345
530    assert ip6fh.frag_off == 12345
531
532    assert ip6fh.m_flag == 0
533    ip6fh.m_flag = 1
534    assert ip6fh.m_flag == 1
535
536
537def test_ip6_properties():
538    ip6 = IP6()
539
540    assert ip6.v == 6
541    ip6.v = 10
542    assert ip6.v == 10
543
544    assert ip6.fc == 0
545    ip6.fc = 5
546    assert ip6.fc == 5
547
548    assert ip6.flow == 0
549    ip6.flow = 4
550    assert ip6.flow == 4
551
552
553def test_proto_accessors():
554    class Proto:
555        pass
556
557    assert 'PROTO' not in IP6._protosw
558    IP6.set_proto('PROTO', Proto)
559    assert IP6.get_proto('PROTO') == Proto
560