1# This file is part of Scapy
2# See http://www.secdev.org/projects/scapy for more information
3# Copyright (C) Philippe Biondi <phil@secdev.org>
4# This program is published under a GPLv2 license
5
6# Copyright (C) 2005  Guillaume Valadon <guedou@hongo.wide.ad.jp>
7#                     Arnaud Ebalard <arnaud.ebalard@eads.net>
8
9"""
10Utility functions for IPv6.
11"""
12from __future__ import absolute_import
13import socket
14import struct
15import time
16
17from scapy.config import conf
18from scapy.base_classes import Net
19from scapy.data import IPV6_ADDR_GLOBAL, IPV6_ADDR_LINKLOCAL, \
20    IPV6_ADDR_SITELOCAL, IPV6_ADDR_LOOPBACK, IPV6_ADDR_UNICAST,\
21    IPV6_ADDR_MULTICAST, IPV6_ADDR_6TO4, IPV6_ADDR_UNSPECIFIED
22from scapy.utils import strxor
23from scapy.compat import orb, chb
24from scapy.pton_ntop import inet_pton, inet_ntop
25from scapy.volatile import RandMAC, RandBin
26from scapy.error import warning, Scapy_Exception
27from functools import reduce, cmp_to_key
28
29from scapy.compat import (
30    Iterator,
31    List,
32    Optional,
33    Tuple,
34    Union,
35    cast,
36)
37
38
39def construct_source_candidate_set(
40        addr,  # type: str
41        plen,  # type: int
42        laddr  # type: Iterator[Tuple[str, int, str]]
43):
44    # type: (...) -> List[str]
45    """
46    Given all addresses assigned to a specific interface ('laddr' parameter),
47    this function returns the "candidate set" associated with 'addr/plen'.
48
49    Basically, the function filters all interface addresses to keep only those
50    that have the same scope as provided prefix.
51
52    This is on this list of addresses that the source selection mechanism
53    will then be performed to select the best source address associated
54    with some specific destination that uses this prefix.
55    """
56    def cset_sort(x, y):
57        # type: (str, str) -> int
58        x_global = 0
59        if in6_isgladdr(x):
60            x_global = 1
61        y_global = 0
62        if in6_isgladdr(y):
63            y_global = 1
64        res = y_global - x_global
65        if res != 0 or y_global != 1:
66            return res
67        # two global addresses: if one is native, it wins.
68        if not in6_isaddr6to4(x):
69            return -1
70        return -res
71
72    cset = iter([])  # type: Iterator[Tuple[str, int, str]]
73    if in6_isgladdr(addr) or in6_isuladdr(addr):
74        cset = (x for x in laddr if x[1] == IPV6_ADDR_GLOBAL)
75    elif in6_islladdr(addr):
76        cset = (x for x in laddr if x[1] == IPV6_ADDR_LINKLOCAL)
77    elif in6_issladdr(addr):
78        cset = (x for x in laddr if x[1] == IPV6_ADDR_SITELOCAL)
79    elif in6_ismaddr(addr):
80        if in6_ismnladdr(addr):
81            cset = (x for x in [('::1', 16, conf.loopback_name)])
82        elif in6_ismgladdr(addr):
83            cset = (x for x in laddr if x[1] == IPV6_ADDR_GLOBAL)
84        elif in6_ismlladdr(addr):
85            cset = (x for x in laddr if x[1] == IPV6_ADDR_LINKLOCAL)
86        elif in6_ismsladdr(addr):
87            cset = (x for x in laddr if x[1] == IPV6_ADDR_SITELOCAL)
88    elif addr == '::' and plen == 0:
89        cset = (x for x in laddr if x[1] == IPV6_ADDR_GLOBAL)
90    addrs = [x[0] for x in cset]
91    # TODO convert the cmd use into a key
92    addrs.sort(key=cmp_to_key(cset_sort))  # Sort with global addresses first
93    return addrs
94
95
96def get_source_addr_from_candidate_set(dst, candidate_set):
97    # type: (str, List[str]) -> str
98    """
99    This function implement a limited version of source address selection
100    algorithm defined in section 5 of RFC 3484. The format is very different
101    from that described in the document because it operates on a set
102    of candidate source address for some specific route.
103    """
104
105    def scope_cmp(a, b):
106        # type: (str, str) -> int
107        """
108        Given two addresses, returns -1, 0 or 1 based on comparison of
109        their scope
110        """
111        scope_mapper = {IPV6_ADDR_GLOBAL: 4,
112                        IPV6_ADDR_SITELOCAL: 3,
113                        IPV6_ADDR_LINKLOCAL: 2,
114                        IPV6_ADDR_LOOPBACK: 1}
115        sa = in6_getscope(a)
116        if sa == -1:
117            sa = IPV6_ADDR_LOOPBACK
118        sb = in6_getscope(b)
119        if sb == -1:
120            sb = IPV6_ADDR_LOOPBACK
121
122        sa = scope_mapper[sa]
123        sb = scope_mapper[sb]
124
125        if sa == sb:
126            return 0
127        if sa > sb:
128            return 1
129        return -1
130
131    def rfc3484_cmp(source_a, source_b):
132        # type: (str, str) -> int
133        """
134        The function implements a limited version of the rules from Source
135        Address selection algorithm defined section of RFC 3484.
136        """
137
138        # Rule 1: Prefer same address
139        if source_a == dst:
140            return 1
141        if source_b == dst:
142            return 1
143
144        # Rule 2: Prefer appropriate scope
145        tmp = scope_cmp(source_a, source_b)
146        if tmp == -1:
147            if scope_cmp(source_a, dst) == -1:
148                return 1
149            else:
150                return -1
151        elif tmp == 1:
152            if scope_cmp(source_b, dst) == -1:
153                return 1
154            else:
155                return -1
156
157        # Rule 3: cannot be easily implemented
158        # Rule 4: cannot be easily implemented
159        # Rule 5: does not make sense here
160        # Rule 6: cannot be implemented
161        # Rule 7: cannot be implemented
162
163        # Rule 8: Longest prefix match
164        tmp1 = in6_get_common_plen(source_a, dst)
165        tmp2 = in6_get_common_plen(source_b, dst)
166        if tmp1 > tmp2:
167            return 1
168        elif tmp2 > tmp1:
169            return -1
170        return 0
171
172    if not candidate_set:
173        # Should not happen
174        return ""
175
176    candidate_set.sort(key=cmp_to_key(rfc3484_cmp), reverse=True)
177
178    return candidate_set[0]
179
180
181# Think before modify it : for instance, FE::1 does exist and is unicast
182# there are many others like that.
183# TODO : integrate Unique Local Addresses
184def in6_getAddrType(addr):
185    # type: (str) -> int
186    naddr = inet_pton(socket.AF_INET6, addr)
187    paddr = inet_ntop(socket.AF_INET6, naddr)  # normalize
188    addrType = 0
189    # _Assignable_ Global Unicast Address space
190    # is defined in RFC 3513 as those in 2000::/3
191    if ((orb(naddr[0]) & 0xE0) == 0x20):
192        addrType = (IPV6_ADDR_UNICAST | IPV6_ADDR_GLOBAL)
193        if naddr[:2] == b' \x02':  # Mark 6to4 @
194            addrType |= IPV6_ADDR_6TO4
195    elif orb(naddr[0]) == 0xff:  # multicast
196        addrScope = paddr[3]
197        if addrScope == '2':
198            addrType = (IPV6_ADDR_LINKLOCAL | IPV6_ADDR_MULTICAST)
199        elif addrScope == 'e':
200            addrType = (IPV6_ADDR_GLOBAL | IPV6_ADDR_MULTICAST)
201        else:
202            addrType = (IPV6_ADDR_GLOBAL | IPV6_ADDR_MULTICAST)
203    elif ((orb(naddr[0]) == 0xfe) and ((int(paddr[2], 16) & 0xC) == 0x8)):
204        addrType = (IPV6_ADDR_UNICAST | IPV6_ADDR_LINKLOCAL)
205    elif paddr == "::1":
206        addrType = IPV6_ADDR_LOOPBACK
207    elif paddr == "::":
208        addrType = IPV6_ADDR_UNSPECIFIED
209    else:
210        # Everything else is global unicast (RFC 3513)
211        # Even old deprecated (RFC3879) Site-Local addresses
212        addrType = (IPV6_ADDR_GLOBAL | IPV6_ADDR_UNICAST)
213
214    return addrType
215
216
217def in6_mactoifaceid(mac, ulbit=None):
218    # type: (str, Optional[int]) -> str
219    """
220    Compute the interface ID in modified EUI-64 format associated
221    to the Ethernet address provided as input.
222    value taken by U/L bit in the interface identifier is basically
223    the reversed value of that in given MAC address it can be forced
224    to a specific value by using optional 'ulbit' parameter.
225    """
226    if len(mac) != 17:
227        raise ValueError("Invalid MAC")
228    m = "".join(mac.split(':'))
229    if len(m) != 12:
230        raise ValueError("Invalid MAC")
231    first = int(m[0:2], 16)
232    if ulbit is None or not (ulbit == 0 or ulbit == 1):
233        ulbit = [1, 0, 0][first & 0x02]
234    ulbit *= 2
235    first_b = "%.02x" % ((first & 0xFD) | ulbit)
236    eui64 = first_b + m[2:4] + ":" + m[4:6] + "FF:FE" + m[6:8] + ":" + m[8:12]
237    return eui64.upper()
238
239
240def in6_ifaceidtomac(ifaceid_s):
241    # type: (str) -> Optional[str]
242    """
243    Extract the mac address from provided iface ID. Iface ID is provided
244    in printable format ("XXXX:XXFF:FEXX:XXXX", eventually compressed). None
245    is returned on error.
246    """
247    try:
248        # Set ifaceid to a binary form
249        ifaceid = inet_pton(socket.AF_INET6, "::" + ifaceid_s)[8:16]
250    except Exception:
251        return None
252
253    if ifaceid[3:5] != b'\xff\xfe':  # Check for burned-in MAC address
254        return None
255
256    # Unpacking and converting first byte of faceid to MAC address equivalent
257    first = struct.unpack("B", ifaceid[:1])[0]
258    ulbit = 2 * [1, '-', 0][first & 0x02]
259    first = struct.pack("B", ((first & 0xFD) | ulbit))
260    # Split into two vars to remove the \xff\xfe bytes
261    oui = first + ifaceid[1:3]
262    end = ifaceid[5:]
263    # Convert and reconstruct into a MAC Address
264    mac_bytes = ["%.02x" % orb(x) for x in list(oui + end)]
265    return ":".join(mac_bytes)
266
267
268def in6_addrtomac(addr):
269    # type: (str) -> Optional[str]
270    """
271    Extract the mac address from provided address. None is returned
272    on error.
273    """
274    mask = inet_pton(socket.AF_INET6, "::ffff:ffff:ffff:ffff")
275    x = in6_and(mask, inet_pton(socket.AF_INET6, addr))
276    ifaceid = inet_ntop(socket.AF_INET6, x)[2:]
277    return in6_ifaceidtomac(ifaceid)
278
279
280def in6_addrtovendor(addr):
281    # type: (str) -> Optional[str]
282    """
283    Extract the MAC address from a modified EUI-64 constructed IPv6
284    address provided and use the IANA oui.txt file to get the vendor.
285    The database used for the conversion is the one loaded by Scapy
286    from a Wireshark installation if discovered in a well-known
287    location. None is returned on error, "UNKNOWN" if the vendor is
288    unknown.
289    """
290    mac = in6_addrtomac(addr)
291    if mac is None or not conf.manufdb:
292        return None
293
294    res = conf.manufdb._get_manuf(mac)
295    if len(res) == 17 and res.count(':') != 5:  # Mac address, i.e. unknown
296        res = "UNKNOWN"
297
298    return res
299
300
301def in6_getLinkScopedMcastAddr(addr, grpid=None, scope=2):
302    # type: (str, Optional[Union[bytes, str, int]], int) -> Optional[str]
303    """
304    Generate a Link-Scoped Multicast Address as described in RFC 4489.
305    Returned value is in printable notation.
306
307    'addr' parameter specifies the link-local address to use for generating
308    Link-scoped multicast address IID.
309
310    By default, the function returns a ::/96 prefix (aka last 32 bits of
311    returned address are null). If a group id is provided through 'grpid'
312    parameter, last 32 bits of the address are set to that value (accepted
313    formats : b'\x12\x34\x56\x78' or '12345678' or 0x12345678 or 305419896).
314
315    By default, generated address scope is Link-Local (2). That value can
316    be modified by passing a specific 'scope' value as an argument of the
317    function. RFC 4489 only authorizes scope values <= 2. Enforcement
318    is performed by the function (None will be returned).
319
320    If no link-local address can be used to generate the Link-Scoped IPv6
321    Multicast address, or if another error occurs, None is returned.
322    """
323    if scope not in [0, 1, 2]:
324        return None
325    try:
326        if not in6_islladdr(addr):
327            return None
328        baddr = inet_pton(socket.AF_INET6, addr)
329    except Exception:
330        warning("in6_getLinkScopedMcastPrefix(): Invalid address provided")
331        return None
332
333    iid = baddr[8:]
334
335    if grpid is None:
336        b_grpid = b'\x00\x00\x00\x00'
337    else:
338        b_grpid = b''
339        # Is either bytes, str or int
340        if isinstance(grpid, (str, bytes)):
341            try:
342                if isinstance(grpid, str) and len(grpid) == 8:
343                    i_grpid = int(grpid, 16) & 0xffffffff
344                elif isinstance(grpid, bytes) and len(grpid) == 4:
345                    i_grpid = struct.unpack("!I", grpid)[0]
346                else:
347                    raise ValueError
348            except Exception:
349                warning(
350                    "in6_getLinkScopedMcastPrefix(): Invalid group id "
351                    "provided"
352                )
353                return None
354        elif isinstance(grpid, int):
355            i_grpid = grpid
356        else:
357            warning(
358                "in6_getLinkScopedMcastPrefix(): Invalid group id "
359                "provided"
360            )
361            return None
362        b_grpid = struct.pack("!I", i_grpid)
363
364    flgscope = struct.pack("B", 0xff & ((0x3 << 4) | scope))
365    plen = b'\xff'
366    res = b'\x00'
367    a = b'\xff' + flgscope + res + plen + iid + b_grpid
368
369    return inet_ntop(socket.AF_INET6, a)
370
371
372def in6_get6to4Prefix(addr):
373    # type: (str) -> Optional[str]
374    """
375    Returns the /48 6to4 prefix associated with provided IPv4 address
376    On error, None is returned. No check is performed on public/private
377    status of the address
378    """
379    try:
380        baddr = inet_pton(socket.AF_INET, addr)
381        return inet_ntop(socket.AF_INET6, b'\x20\x02' + baddr + b'\x00' * 10)
382    except Exception:
383        return None
384
385
386def in6_6to4ExtractAddr(addr):
387    # type: (str) -> Optional[str]
388    """
389    Extract IPv4 address embedded in 6to4 address. Passed address must be
390    a 6to4 address. None is returned on error.
391    """
392    try:
393        baddr = inet_pton(socket.AF_INET6, addr)
394    except Exception:
395        return None
396    if baddr[:2] != b" \x02":
397        return None
398    return inet_ntop(socket.AF_INET, baddr[2:6])
399
400
401def in6_getLocalUniquePrefix():
402    # type: () -> str
403    """
404    Returns a pseudo-randomly generated Local Unique prefix. Function
405    follows recommendation of Section 3.2.2 of RFC 4193 for prefix
406    generation.
407    """
408    # Extracted from RFC 1305 (NTP) :
409    # NTP timestamps are represented as a 64-bit unsigned fixed-point number,
410    # in seconds relative to 0h on 1 January 1900. The integer part is in the
411    # first 32 bits and the fraction part in the last 32 bits.
412
413    # epoch = (1900, 1, 1, 0, 0, 0, 5, 1, 0)
414    # x = time.time()
415    # from time import gmtime, strftime, gmtime, mktime
416    # delta = mktime(gmtime(0)) - mktime(self.epoch)
417    # x = x-delta
418
419    tod = time.time()  # time of day. Will bother with epoch later
420    i = int(tod)
421    j = int((tod - i) * (2**32))
422    btod = struct.pack("!II", i, j)
423    mac = RandMAC()
424    # construct modified EUI-64 ID
425    eui64 = inet_pton(socket.AF_INET6, '::' + in6_mactoifaceid(mac))[8:]
426    import hashlib
427    globalid = hashlib.sha1(btod + eui64).digest()[:5]
428    return inet_ntop(socket.AF_INET6, b'\xfd' + globalid + b'\x00' * 10)
429
430
431def in6_getRandomizedIfaceId(ifaceid, previous=None):
432    # type: (str, Optional[str]) -> Tuple[str, str]
433    """
434    Implements the interface ID generation algorithm described in RFC 3041.
435    The function takes the Modified EUI-64 interface identifier generated
436    as described in RFC 4291 and an optional previous history value (the
437    first element of the output of this function). If no previous interface
438    identifier is provided, a random one is generated. The function returns
439    a tuple containing the randomized interface identifier and the history
440    value (for possible future use). Input and output values are provided in
441    a "printable" format as depicted below.
442
443    ex::
444        >>> in6_getRandomizedIfaceId('20b:93ff:feeb:2d3')
445        ('4c61:76ff:f46a:a5f3', 'd006:d540:db11:b092')
446        >>> in6_getRandomizedIfaceId('20b:93ff:feeb:2d3',
447                                     previous='d006:d540:db11:b092')
448        ('fe97:46fe:9871:bd38', 'eeed:d79c:2e3f:62e')
449    """
450
451    s = b""
452    if previous is None:
453        b_previous = bytes(RandBin(8))
454    else:
455        b_previous = inet_pton(socket.AF_INET6, "::" + previous)[8:]
456    s = inet_pton(socket.AF_INET6, "::" + ifaceid)[8:] + b_previous
457    import hashlib
458    s = hashlib.md5(s).digest()
459    s1, s2 = s[:8], s[8:]
460    s1 = chb(orb(s1[0]) & (~0x04)) + s1[1:]  # set bit 6 to 0
461    bs1 = inet_ntop(socket.AF_INET6, b"\xff" * 8 + s1)[20:]
462    bs2 = inet_ntop(socket.AF_INET6, b"\xff" * 8 + s2)[20:]
463    return (bs1, bs2)
464
465
466_rfc1924map = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E',  # noqa: E501
467               'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',  # noqa: E501
468               'U', 'V', 'W', 'X', 'Y', 'Z', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i',  # noqa: E501
469               'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',  # noqa: E501
470               'y', 'z', '!', '#', '$', '%', '&', '(', ')', '*', '+', '-', ';', '<', '=',  # noqa: E501
471               '>', '?', '@', '^', '_', '`', '{', '|', '}', '~']
472
473
474def in6_ctop(addr):
475    # type: (str) -> Optional[str]
476    """
477    Convert an IPv6 address in Compact Representation Notation
478    (RFC 1924) to printable representation ;-)
479    Returns None on error.
480    """
481    if len(addr) != 20 or not reduce(lambda x, y: x and y,
482                                     [x in _rfc1924map for x in addr]):
483        return None
484    i = 0
485    for c in addr:
486        j = _rfc1924map.index(c)
487        i = 85 * i + j
488    res = []
489    for j in range(4):
490        res.append(struct.pack("!I", i % 2**32))
491        i = i // (2**32)
492    res.reverse()
493    return inet_ntop(socket.AF_INET6, b"".join(res))
494
495
496def in6_ptoc(addr):
497    # type: (str) -> Optional[str]
498    """
499    Converts an IPv6 address in printable representation to RFC
500    1924 Compact Representation ;-)
501    Returns None on error.
502    """
503    try:
504        d = struct.unpack("!IIII", inet_pton(socket.AF_INET6, addr))
505    except Exception:
506        return None
507    rem = 0
508    m = [2**96, 2**64, 2**32, 1]
509    for i in range(4):
510        rem += d[i] * m[i]
511    res = []  # type: List[str]
512    while rem:
513        res.append(_rfc1924map[rem % 85])
514        rem = rem // 85
515    res.reverse()
516    return "".join(res)
517
518
519def in6_isaddr6to4(x):
520    # type: (str) -> bool
521    """
522    Return True if provided address (in printable format) is a 6to4
523    address (being in 2002::/16).
524    """
525    bx = inet_pton(socket.AF_INET6, x)
526    return bx[:2] == b' \x02'
527
528
529conf.teredoPrefix = "2001::"  # old one was 3ffe:831f (it is a /32)
530conf.teredoServerPort = 3544
531
532
533def in6_isaddrTeredo(x):
534    # type: (str) -> bool
535    """
536    Return True if provided address is a Teredo, meaning it is under
537    the /32 conf.teredoPrefix prefix value (by default, 2001::).
538    Otherwise, False is returned. Address must be passed in printable
539    format.
540    """
541    our = inet_pton(socket.AF_INET6, x)[0:4]
542    teredoPrefix = inet_pton(socket.AF_INET6, conf.teredoPrefix)[0:4]
543    return teredoPrefix == our
544
545
546def teredoAddrExtractInfo(x):
547    # type: (str) -> Tuple[str, int, str, int]
548    """
549    Extract information from a Teredo address. Return value is
550    a 4-tuple made of IPv4 address of Teredo server, flag value (int),
551    mapped address (non obfuscated) and mapped port (non obfuscated).
552    No specific checks are performed on passed address.
553    """
554    addr = inet_pton(socket.AF_INET6, x)
555    server = inet_ntop(socket.AF_INET, addr[4:8])
556    flag = struct.unpack("!H", addr[8:10])[0]  # type: int
557    mappedport = struct.unpack("!H", strxor(addr[10:12], b'\xff' * 2))[0]
558    mappedaddr = inet_ntop(socket.AF_INET, strxor(addr[12:16], b'\xff' * 4))
559    return server, flag, mappedaddr, mappedport
560
561
562def in6_iseui64(x):
563    # type: (str) -> bool
564    """
565    Return True if provided address has an interface identifier part
566    created in modified EUI-64 format (meaning it matches ``*::*:*ff:fe*:*``).
567    Otherwise, False is returned. Address must be passed in printable
568    format.
569    """
570    eui64 = inet_pton(socket.AF_INET6, '::ff:fe00:0')
571    bx = in6_and(inet_pton(socket.AF_INET6, x), eui64)
572    return bx == eui64
573
574
575def in6_isanycast(x):  # RFC 2526
576    # type: (str) -> bool
577    if in6_iseui64(x):
578        s = '::fdff:ffff:ffff:ff80'
579        packed_x = inet_pton(socket.AF_INET6, x)
580        packed_s = inet_pton(socket.AF_INET6, s)
581        x_and_s = in6_and(packed_x, packed_s)
582        return x_and_s == packed_s
583    else:
584        # not EUI-64
585        # |              n bits             |    121-n bits    |   7 bits   |
586        # +---------------------------------+------------------+------------+
587        # |           subnet prefix         | 1111111...111111 | anycast ID |
588        # +---------------------------------+------------------+------------+
589        #                                   |   interface identifier field  |
590        warning('in6_isanycast(): TODO not EUI-64')
591        return False
592
593
594def _in6_bitops(xa1, xa2, operator=0):
595    # type: (bytes, bytes, int) -> bytes
596    a1 = struct.unpack('4I', xa1)
597    a2 = struct.unpack('4I', xa2)
598    fop = [lambda x, y: x | y,
599           lambda x, y: x & y,
600           lambda x, y: x ^ y
601           ]
602    ret = map(fop[operator % len(fop)], a1, a2)
603    return b"".join(struct.pack('I', x) for x in ret)
604
605
606def in6_or(a1, a2):
607    # type: (bytes, bytes) -> bytes
608    """
609    Provides a bit to bit OR of provided addresses. They must be
610    passed in network format. Return value is also an IPv6 address
611    in network format.
612    """
613    return _in6_bitops(a1, a2, 0)
614
615
616def in6_and(a1, a2):
617    # type: (bytes, bytes) -> bytes
618    """
619    Provides a bit to bit AND of provided addresses. They must be
620    passed in network format. Return value is also an IPv6 address
621    in network format.
622    """
623    return _in6_bitops(a1, a2, 1)
624
625
626def in6_xor(a1, a2):
627    # type: (bytes, bytes) -> bytes
628    """
629    Provides a bit to bit XOR of provided addresses. They must be
630    passed in network format. Return value is also an IPv6 address
631    in network format.
632    """
633    return _in6_bitops(a1, a2, 2)
634
635
636def in6_cidr2mask(m):
637    # type: (int) -> bytes
638    """
639    Return the mask (bitstring) associated with provided length
640    value. For instance if function is called on 48, return value is
641    b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'.
642
643    """
644    if m > 128 or m < 0:
645        raise Scapy_Exception("value provided to in6_cidr2mask outside [0, 128] domain (%d)" % m)  # noqa: E501
646
647    t = []
648    for i in range(0, 4):
649        t.append(max(0, 2**32 - 2**(32 - min(32, m))))
650        m -= 32
651
652    return b"".join(struct.pack('!I', x) for x in t)
653
654
655def in6_getnsma(a):
656    # type: (bytes) -> bytes
657    """
658    Return link-local solicited-node multicast address for given
659    address. Passed address must be provided in network format.
660    Returned value is also in network format.
661    """
662
663    r = in6_and(a, inet_pton(socket.AF_INET6, '::ff:ffff'))
664    r = in6_or(inet_pton(socket.AF_INET6, 'ff02::1:ff00:0'), r)
665    return r
666
667
668def in6_getnsmac(a):
669    # type: (bytes) -> str
670    """
671    Return the multicast mac address associated with provided
672    IPv6 address. Passed address must be in network format.
673    """
674
675    ba = struct.unpack('16B', a)[-4:]
676    mac = '33:33:'
677    mac += ':'.join("%.2x" % x for x in ba)
678    return mac
679
680
681def in6_getha(prefix):
682    # type: (str) -> str
683    """
684    Return the anycast address associated with all home agents on a given
685    subnet.
686    """
687    r = in6_and(inet_pton(socket.AF_INET6, prefix), in6_cidr2mask(64))
688    r = in6_or(r, inet_pton(socket.AF_INET6, '::fdff:ffff:ffff:fffe'))
689    return inet_ntop(socket.AF_INET6, r)
690
691
692def in6_ptop(str):
693    # type: (str) -> str
694    """
695    Normalizes IPv6 addresses provided in printable format, returning the
696    same address in printable format. (2001:0db8:0:0::1 -> 2001:db8::1)
697    """
698    return inet_ntop(socket.AF_INET6, inet_pton(socket.AF_INET6, str))
699
700
701def in6_isincluded(addr, prefix, plen):
702    # type: (str, str, int) -> bool
703    """
704    Returns True when 'addr' belongs to prefix/plen. False otherwise.
705    """
706    temp = inet_pton(socket.AF_INET6, addr)
707    pref = in6_cidr2mask(plen)
708    zero = inet_pton(socket.AF_INET6, prefix)
709    return zero == in6_and(temp, pref)
710
711
712def in6_isllsnmaddr(str):
713    # type: (str) -> bool
714    """
715    Return True if provided address is a link-local solicited node
716    multicast address, i.e. belongs to ff02::1:ff00:0/104. False is
717    returned otherwise.
718    """
719    temp = in6_and(b"\xff" * 13 + b"\x00" * 3, inet_pton(socket.AF_INET6, str))
720    temp2 = b'\xff\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xff\x00\x00\x00'
721    return temp == temp2
722
723
724def in6_isdocaddr(str):
725    # type: (str) -> bool
726    """
727    Returns True if provided address in printable format belongs to
728    2001:db8::/32 address space reserved for documentation (as defined
729    in RFC 3849).
730    """
731    return in6_isincluded(str, '2001:db8::', 32)
732
733
734def in6_islladdr(str):
735    # type: (str) -> bool
736    """
737    Returns True if provided address in printable format belongs to
738    _allocated_ link-local unicast address space (fe80::/10)
739    """
740    return in6_isincluded(str, 'fe80::', 10)
741
742
743def in6_issladdr(str):
744    # type: (str) -> bool
745    """
746    Returns True if provided address in printable format belongs to
747    _allocated_ site-local address space (fec0::/10). This prefix has
748    been deprecated, address being now reserved by IANA. Function
749    will remain for historic reasons.
750    """
751    return in6_isincluded(str, 'fec0::', 10)
752
753
754def in6_isuladdr(str):
755    # type: (str) -> bool
756    """
757    Returns True if provided address in printable format belongs to
758    Unique local address space (fc00::/7).
759    """
760    return in6_isincluded(str, 'fc00::', 7)
761
762# TODO : we should see the status of Unique Local addresses against
763#        global address space.
764#        Up-to-date information is available through RFC 3587.
765#        We should review function behavior based on its content.
766
767
768def in6_isgladdr(str):
769    # type: (str) -> bool
770    """
771    Returns True if provided address in printable format belongs to
772    _allocated_ global address space (2000::/3). Please note that,
773    Unique Local addresses (FC00::/7) are not part of global address
774    space, and won't match.
775    """
776    return in6_isincluded(str, '2000::', 3)
777
778
779def in6_ismaddr(str):
780    # type: (str) -> bool
781    """
782    Returns True if provided address in printable format belongs to
783    allocated Multicast address space (ff00::/8).
784    """
785    return in6_isincluded(str, 'ff00::', 8)
786
787
788def in6_ismnladdr(str):
789    # type: (str) -> bool
790    """
791    Returns True if address belongs to node-local multicast address
792    space (ff01::/16) as defined in RFC
793    """
794    return in6_isincluded(str, 'ff01::', 16)
795
796
797def in6_ismgladdr(str):
798    # type: (str) -> bool
799    """
800    Returns True if address belongs to global multicast address
801    space (ff0e::/16).
802    """
803    return in6_isincluded(str, 'ff0e::', 16)
804
805
806def in6_ismlladdr(str):
807    # type: (str) -> bool
808    """
809    Returns True if address belongs to link-local multicast address
810    space (ff02::/16)
811    """
812    return in6_isincluded(str, 'ff02::', 16)
813
814
815def in6_ismsladdr(str):
816    # type: (str) -> bool
817    """
818    Returns True if address belongs to site-local multicast address
819    space (ff05::/16). Site local address space has been deprecated.
820    Function remains for historic reasons.
821    """
822    return in6_isincluded(str, 'ff05::', 16)
823
824
825def in6_isaddrllallnodes(str):
826    # type: (str) -> bool
827    """
828    Returns True if address is the link-local all-nodes multicast
829    address (ff02::1).
830    """
831    return (inet_pton(socket.AF_INET6, "ff02::1") ==
832            inet_pton(socket.AF_INET6, str))
833
834
835def in6_isaddrllallservers(str):
836    # type: (str) -> bool
837    """
838    Returns True if address is the link-local all-servers multicast
839    address (ff02::2).
840    """
841    return (inet_pton(socket.AF_INET6, "ff02::2") ==
842            inet_pton(socket.AF_INET6, str))
843
844
845def in6_getscope(addr):
846    # type: (str) -> int
847    """
848    Returns the scope of the address.
849    """
850    if in6_isgladdr(addr) or in6_isuladdr(addr):
851        scope = IPV6_ADDR_GLOBAL
852    elif in6_islladdr(addr):
853        scope = IPV6_ADDR_LINKLOCAL
854    elif in6_issladdr(addr):
855        scope = IPV6_ADDR_SITELOCAL
856    elif in6_ismaddr(addr):
857        if in6_ismgladdr(addr):
858            scope = IPV6_ADDR_GLOBAL
859        elif in6_ismlladdr(addr):
860            scope = IPV6_ADDR_LINKLOCAL
861        elif in6_ismsladdr(addr):
862            scope = IPV6_ADDR_SITELOCAL
863        elif in6_ismnladdr(addr):
864            scope = IPV6_ADDR_LOOPBACK
865        else:
866            scope = -1
867    elif addr == '::1':
868        scope = IPV6_ADDR_LOOPBACK
869    else:
870        scope = -1
871    return scope
872
873
874def in6_get_common_plen(a, b):
875    # type: (str, str) -> int
876    """
877    Return common prefix length of IPv6 addresses a and b.
878    """
879    def matching_bits(byte1, byte2):
880        # type: (int, int) -> int
881        for i in range(8):
882            cur_mask = 0x80 >> i
883            if (byte1 & cur_mask) != (byte2 & cur_mask):
884                return i
885        return 8
886
887    tmpA = inet_pton(socket.AF_INET6, a)
888    tmpB = inet_pton(socket.AF_INET6, b)
889    for i in range(16):
890        mbits = matching_bits(orb(tmpA[i]), orb(tmpB[i]))
891        if mbits != 8:
892            return 8 * i + mbits
893    return 128
894
895
896def in6_isvalid(address):
897    # type: (str) -> bool
898    """Return True if 'address' is a valid IPv6 address string, False
899       otherwise."""
900
901    try:
902        inet_pton(socket.AF_INET6, address)
903        return True
904    except Exception:
905        return False
906
907
908class Net6(Net):  # syntax ex. 2011:db8::/126
909    """Network object from an IP address or hostname and mask"""
910    name = "Net6"  # type: str
911    family = socket.AF_INET6  # type: int
912    max_mask = 128  # type: int
913
914    @classmethod
915    def ip2int(cls, addr):
916        # type: (str) -> int
917        val1, val2 = struct.unpack(
918            '!QQ', inet_pton(socket.AF_INET6, cls.name2addr(addr))
919        )
920        return cast(int, (val1 << 64) + val2)
921
922    @staticmethod
923    def int2ip(val):
924        # type: (int) -> str
925        return inet_ntop(
926            socket.AF_INET6,
927            struct.pack('!QQ', val >> 64, val & 0xffffffffffffffff),
928        )
929