1# -*- mode: python3; indent-tabs-mode: nil; tab-width: 4 -*- 2# This file is part of Scapy 3# See http://www.secdev.org/projects/scapy for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5# Copyright (C) Michael Farrell <micolous+git@gmail.com> 6# This program is published under a GPLv2 license 7 8""" 9Fields: basic data structures that make up parts of packets. 10""" 11 12from __future__ import absolute_import 13import calendar 14import collections 15import copy 16import inspect 17import math 18import socket 19import struct 20import time 21import warnings 22 23from types import MethodType 24from uuid import UUID 25 26from scapy.config import conf 27from scapy.dadict import DADict 28from scapy.volatile import RandBin, RandByte, RandEnumKeys, RandInt, \ 29 RandIP, RandIP6, RandLong, RandMAC, RandNum, RandShort, RandSInt, \ 30 RandSByte, RandTermString, RandUUID, VolatileValue, RandSShort, \ 31 RandSLong, RandFloat 32from scapy.data import EPOCH 33from scapy.error import log_runtime, Scapy_Exception 34from scapy.compat import bytes_hex, chb, orb, plain_str, raw, bytes_encode 35from scapy.pton_ntop import inet_ntop, inet_pton 36from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal 37from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \ 38 in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo 39from scapy.base_classes import Gen, Net, BasePacket, Field_metaclass 40from scapy.error import warning 41import scapy.modules.six as six 42from scapy.modules.six.moves import range 43from scapy.modules.six import integer_types 44 45# Typing imports 46from scapy.compat import ( 47 Any, 48 AnyStr, 49 Callable, 50 Dict, 51 List, 52 Generic, 53 Optional, 54 Set, 55 Tuple, 56 Type, 57 TypeVar, 58 Union, 59 # func 60 cast, 61 TYPE_CHECKING, 62) 63 64if TYPE_CHECKING: 65 # Do not import on runtime ! (import loop) 66 from scapy.packet import Packet 67 68 69class RawVal: 70 r""" 71 A raw value that will not be processed by the field and inserted 72 as-is in the packet string. 73 74 Example:: 75 76 >>> a = IP(len=RawVal("####")) 77 >>> bytes(a) 78 b'F\x00####\x00\x01\x00\x005\xb5\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00' 79 80 """ 81 def __init__(self, val=b""): 82 # type: (bytes) -> None 83 self.val = bytes_encode(val) 84 85 def __str__(self): 86 # type: () -> str 87 return str(self.val) 88 89 def __bytes__(self): 90 # type: () -> bytes 91 return self.val 92 93 def __len__(self): 94 # type: () -> int 95 return len(self.val) 96 97 def __repr__(self): 98 # type: () -> str 99 return "<RawVal [%r]>" % self.val 100 101 102class ObservableDict(Dict[int, str]): 103 """ 104 Helper class to specify a protocol extendable for runtime modifications 105 """ 106 def __init__(self, *args, **kw): 107 # type: (*Dict[int, str], **Any) -> None 108 self.observers = [] # type: List[_EnumField[Any]] 109 super(ObservableDict, self).__init__(*args, **kw) 110 111 def observe(self, observer): 112 # type: (_EnumField[Any]) -> None 113 self.observers.append(observer) 114 115 def __setitem__(self, key, value): 116 # type: (int, str) -> None 117 for o in self.observers: 118 o.notify_set(self, key, value) 119 super(ObservableDict, self).__setitem__(key, value) 120 121 def __delitem__(self, key): 122 # type: (int) -> None 123 for o in self.observers: 124 o.notify_del(self, key) 125 super(ObservableDict, self).__delitem__(key) 126 127 def update(self, anotherDict): # type: ignore 128 for k in anotherDict: 129 self[k] = anotherDict[k] 130 131 132############ 133# Fields # 134############ 135 136I = TypeVar('I') # Internal storage # noqa: E741 137M = TypeVar('M') # Machine storage 138 139 140@six.add_metaclass(Field_metaclass) 141class Field(Generic[I, M]): 142 """ 143 For more information on how this works, please refer to the 144 'Adding new protocols' chapter in the online documentation: 145 146 https://scapy.readthedocs.io/en/stable/build_dissect.html 147 """ 148 __slots__ = [ 149 "name", 150 "fmt", 151 "default", 152 "sz", 153 "owners", 154 "struct" 155 ] 156 islist = 0 157 ismutable = False 158 holds_packets = 0 159 160 def __init__(self, name, default, fmt="H"): 161 # type: (str, Any, str) -> None 162 if not isinstance(name, str): 163 raise ValueError("name should be a string") 164 self.name = name 165 if fmt[0] in "@=<>!": 166 self.fmt = fmt 167 else: 168 self.fmt = "!" + fmt 169 self.struct = struct.Struct(self.fmt) 170 self.default = self.any2i(None, default) 171 self.sz = struct.calcsize(self.fmt) # type: int 172 self.owners = [] # type: List[Type[Packet]] 173 174 def register_owner(self, cls): 175 # type: (Type[Packet]) -> None 176 self.owners.append(cls) 177 178 def i2len(self, 179 pkt, # type: Packet 180 x, # type: Any 181 ): 182 # type: (...) -> int 183 """Convert internal value to a length usable by a FieldLenField""" 184 if isinstance(x, RawVal): 185 return len(x) 186 return self.sz 187 188 def i2count(self, pkt, x): 189 # type: (Optional[Packet], I) -> int 190 """Convert internal value to a number of elements usable by a FieldLenField. 191 Always 1 except for list fields""" 192 return 1 193 194 def h2i(self, pkt, x): 195 # type: (Optional[Packet], Any) -> I 196 """Convert human value to internal value""" 197 return cast(I, x) 198 199 def i2h(self, pkt, x): 200 # type: (Optional[Packet], I) -> Any 201 """Convert internal value to human value""" 202 return x 203 204 def m2i(self, pkt, x): 205 # type: (Optional[Packet], M) -> I 206 """Convert machine value to internal value""" 207 return cast(I, x) 208 209 def i2m(self, pkt, x): 210 # type: (Optional[Packet], Optional[I]) -> M 211 """Convert internal value to machine value""" 212 if x is None: 213 return cast(M, 0) 214 elif isinstance(x, str): 215 return cast(M, bytes_encode(x)) 216 return cast(M, x) 217 218 def any2i(self, pkt, x): 219 # type: (Optional[Packet], Any) -> Optional[I] 220 """Try to understand the most input values possible and make an internal value from them""" # noqa: E501 221 return self.h2i(pkt, x) 222 223 def i2repr(self, pkt, x): 224 # type: (Optional[Packet], I) -> str 225 """Convert internal value to a nice representation""" 226 return repr(self.i2h(pkt, x)) 227 228 def addfield(self, pkt, s, val): 229 # type: (Packet, bytes, Optional[I]) -> bytes 230 """Add an internal value to a string 231 232 Copy the network representation of field `val` (belonging to layer 233 `pkt`) to the raw string packet `s`, and return the new string packet. 234 """ 235 try: 236 return s + self.struct.pack(self.i2m(pkt, val)) 237 except struct.error as ex: 238 raise ValueError( 239 "Incorrect type of value for field %s:\n" % self.name + 240 "struct.error('%s')\n" % ex + 241 "To inject bytes into the field regardless of the type, " + 242 "use RawVal. See help(RawVal)" 243 ) 244 245 def getfield(self, pkt, s): 246 # type: (Packet, bytes) -> Tuple[bytes, I] 247 """Extract an internal value from a string 248 249 Extract from the raw packet `s` the field value belonging to layer 250 `pkt`. 251 252 Returns a two-element list, 253 first the raw packet string after having removed the extracted field, 254 second the extracted field itself in internal representation. 255 """ 256 return s[self.sz:], self.m2i(pkt, self.struct.unpack(s[:self.sz])[0]) 257 258 def do_copy(self, x): 259 # type: (I) -> I 260 if hasattr(x, "copy"): 261 return x.copy() # type: ignore 262 if isinstance(x, list): 263 x = x[:] # type: ignore 264 for i in range(len(x)): 265 if isinstance(x[i], BasePacket): 266 x[i] = x[i].copy() 267 return x 268 269 def __repr__(self): 270 # type: () -> str 271 return "<%s (%s).%s>" % ( 272 self.__class__.__name__, 273 ",".join(x.__name__ for x in self.owners), 274 self.name 275 ) 276 277 def copy(self): 278 # type: () -> Field[I, M] 279 return copy.copy(self) 280 281 def randval(self): 282 # type: () -> VolatileValue 283 """Return a volatile object whose value is both random and suitable for this field""" # noqa: E501 284 fmtt = self.fmt[-1] 285 if fmtt in "BbHhIiQq": 286 return {"B": RandByte, "b": RandSByte, 287 "H": RandShort, "h": RandSShort, 288 "I": RandInt, "i": RandSInt, 289 "Q": RandLong, "q": RandSLong}[fmtt]() 290 elif fmtt == "s": 291 if self.fmt[0] in "0123456789": 292 value = int(self.fmt[:-1]) 293 else: 294 value = int(self.fmt[1:-1]) 295 return RandBin(value) 296 else: 297 warning("no random class for [%s] (fmt=%s).", self.name, self.fmt) 298 299 300class _FieldContainer(object): 301 """ 302 A field that acts as a container for another field 303 """ 304 def __getattr__(self, attr): 305 # type: (str) -> Any 306 return getattr(self.fld, attr) 307 308 309AnyField = Union[Field[Any, Any], _FieldContainer] 310 311 312class Emph(_FieldContainer): 313 """Empathize sub-layer for display""" 314 __slots__ = ["fld"] 315 316 def __init__(self, fld): 317 # type: (Any) -> None 318 self.fld = fld 319 320 def __eq__(self, other): 321 # type: (Any) -> bool 322 return bool(self.fld == other) 323 324 def __ne__(self, other): 325 # type: (Any) -> bool 326 # Python 2.7 compat 327 return not self == other 328 329 # mypy doesn't support __hash__ = None 330 __hash__ = None # type: ignore 331 332 333class ActionField(_FieldContainer): 334 __slots__ = ["fld", "_action_method", "_privdata"] 335 336 def __init__(self, fld, action_method, **kargs): 337 # type: (Field[Any, Any], str, **Any) -> None 338 self.fld = fld 339 self._action_method = action_method 340 self._privdata = kargs 341 342 def any2i(self, pkt, val): 343 # type: (Optional[Packet], int) -> Any 344 getattr(pkt, self._action_method)(val, self.fld, **self._privdata) 345 return getattr(self.fld, "any2i")(pkt, val) 346 347 348class ConditionalField(_FieldContainer): 349 __slots__ = ["fld", "cond"] 350 351 def __init__(self, 352 fld, # type: Field[Any, Any] 353 cond # type: Callable[[Packet], bool] 354 ): 355 # type: (...) -> None 356 self.fld = fld 357 self.cond = cond 358 359 def _evalcond(self, pkt): 360 # type: (Packet) -> bool 361 return bool(self.cond(pkt)) 362 363 def any2i(self, pkt, x): 364 # type: (Optional[Packet], Any) -> Any 365 # BACKWARD COMPATIBILITY 366 # Note: we shouldn't need this function. (it's not correct) 367 # However, having i2h implemented (#2364), it changes the default 368 # behavior and broke all packets that wrongly use two ConditionalField 369 # with the same name. Those packets are the problem: they are wrongly 370 # built (they should either be re-using the same conditional field, or 371 # using a MultipleTypeField). 372 # But I don't want to dive into fixing all of them just yet, 373 # so for now, let's keep this this way, even though it's not correct. 374 if type(self.fld) is Field: 375 return x 376 return self.fld.any2i(pkt, x) 377 378 def i2h(self, pkt, val): 379 # type: (Optional[Packet], Any) -> Any 380 if pkt and not self._evalcond(pkt): 381 return None 382 return self.fld.i2h(pkt, val) 383 384 def getfield(self, pkt, s): 385 # type: (Packet, bytes) -> Tuple[bytes, Any] 386 if self._evalcond(pkt): 387 return self.fld.getfield(pkt, s) 388 else: 389 return s, None 390 391 def addfield(self, pkt, s, val): 392 # type: (Packet, bytes, Any) -> bytes 393 if self._evalcond(pkt): 394 return self.fld.addfield(pkt, s, val) 395 else: 396 return s 397 398 def __getattr__(self, attr): 399 # type: (str) -> Any 400 return getattr(self.fld, attr) 401 402 403class MultipleTypeField(_FieldContainer): 404 """MultipleTypeField are used for fields that can be implemented by 405various Field subclasses, depending on conditions on the packet. 406 407It is initialized with `flds` and `dflt`. 408 409`dflt` is the default field type, to be used when none of the 410conditions matched the current packet. 411 412`flds` is a list of tuples (`fld`, `cond`), where `fld` if a field 413type, and `cond` a "condition" to determine if `fld` is the field type 414that should be used. 415 416`cond` is either: 417 418 - a callable `cond_pkt` that accepts one argument (the packet) and 419 returns True if `fld` should be used, False otherwise. 420 421 - a tuple (`cond_pkt`, `cond_pkt_val`), where `cond_pkt` is the same 422 as in the previous case and `cond_pkt_val` is a callable that 423 accepts two arguments (the packet, and the value to be set) and 424 returns True if `fld` should be used, False otherwise. 425 426See scapy.layers.l2.ARP (type "help(ARP)" in Scapy) for an example of 427use. 428 429 """ 430 431 __slots__ = ["flds", "dflt", "name", "default"] 432 433 def __init__(self, 434 flds, # type: List[Tuple[Field[Any, Any], Any]] 435 dflt # type: Field[Any, Any] 436 ): 437 # type: (...) -> None 438 self.flds = flds 439 self.dflt = dflt 440 self.default = None # So that we can detect changes in defaults 441 self.name = self.dflt.name 442 if any(x[0].name != self.name for x in self.flds): 443 warnings.warn( 444 "All fields should have the same name in a MultipleTypeField", 445 SyntaxWarning 446 ) 447 448 def _iterate_fields_cond(self, pkt, val, use_val): 449 # type: (Optional[Packet], Any, bool) -> Field[Any, Any] 450 """Internal function used by _find_fld_pkt & _find_fld_pkt_val""" 451 # Iterate through the fields 452 for fld, cond in self.flds: 453 if isinstance(cond, tuple): 454 if use_val: 455 if val is None: 456 val = self.dflt.default 457 if cond[1](pkt, val): 458 return fld 459 continue 460 else: 461 cond = cond[0] 462 if cond(pkt): 463 return fld 464 return self.dflt 465 466 def _find_fld_pkt(self, pkt): 467 # type: (Optional[Packet]) -> Field[Any, Any] 468 """Given a Packet instance `pkt`, returns the Field subclass to be 469used. If you know the value to be set (e.g., in .addfield()), use 470._find_fld_pkt_val() instead. 471 472 """ 473 return self._iterate_fields_cond(pkt, None, False) 474 475 def _find_fld_pkt_val(self, 476 pkt, # type: Optional[Packet] 477 val, # type: Any 478 ): 479 # type: (...) -> Tuple[Field[Any, Any], Any] 480 """Given a Packet instance `pkt` and the value `val` to be set, 481returns the Field subclass to be used, and the updated `val` if necessary. 482 483 """ 484 fld = self._iterate_fields_cond(pkt, val, True) 485 if val is None: 486 val = fld.default 487 return fld, val 488 489 def _find_fld(self): 490 # type: () -> Field[Any, Any] 491 """Returns the Field subclass to be used, depending on the Packet 492instance, or the default subclass. 493 494DEV: since the Packet instance is not provided, we have to use a hack 495to guess it. It should only be used if you cannot provide the current 496Packet instance (for example, because of the current Scapy API). 497 498If you have the current Packet instance, use ._find_fld_pkt_val() (if 499the value to set is also known) of ._find_fld_pkt() instead. 500 501 """ 502 # Hack to preserve current Scapy API 503 # See https://stackoverflow.com/a/7272464/3223422 504 frame = inspect.currentframe().f_back.f_back # type: ignore 505 while frame is not None: 506 try: 507 pkt = frame.f_locals['self'] 508 except KeyError: 509 pass 510 else: 511 if isinstance(pkt, tuple(self.dflt.owners)): 512 if not pkt.default_fields: 513 # Packet not initialized 514 return self.dflt 515 return self._find_fld_pkt(pkt) 516 frame = frame.f_back 517 return self.dflt 518 519 def getfield(self, 520 pkt, # type: Packet 521 s, # type: bytes 522 ): 523 # type: (...) -> Tuple[bytes, Any] 524 return self._find_fld_pkt(pkt).getfield(pkt, s) 525 526 def addfield(self, pkt, s, val): 527 # type: (Packet, bytes, Any) -> bytes 528 fld, val = self._find_fld_pkt_val(pkt, val) 529 return fld.addfield(pkt, s, val) 530 531 def any2i(self, pkt, val): 532 # type: (Optional[Packet], Any) -> Any 533 fld, val = self._find_fld_pkt_val(pkt, val) 534 return fld.any2i(pkt, val) 535 536 def h2i(self, pkt, val): 537 # type: (Optional[Packet], Any) -> Any 538 fld, val = self._find_fld_pkt_val(pkt, val) 539 return fld.h2i(pkt, val) 540 541 def i2h(self, 542 pkt, # type: Packet 543 val, # type: Any 544 ): 545 # type: (...) -> Any 546 fld, val = self._find_fld_pkt_val(pkt, val) 547 return fld.i2h(pkt, val) 548 549 def i2m(self, pkt, val): 550 # type: (Optional[Packet], Optional[Any]) -> Any 551 fld, val = self._find_fld_pkt_val(pkt, val) 552 return fld.i2m(pkt, val) 553 554 def i2len(self, pkt, val): 555 # type: (Packet, Any) -> int 556 fld, val = self._find_fld_pkt_val(pkt, val) 557 return fld.i2len(pkt, val) 558 559 def i2repr(self, pkt, val): 560 # type: (Optional[Packet], Any) -> str 561 fld, val = self._find_fld_pkt_val(pkt, val) 562 return fld.i2repr(pkt, val) 563 564 def register_owner(self, cls): 565 # type: (Type[Packet]) -> None 566 for fld, _ in self.flds: 567 fld.owners.append(cls) 568 self.dflt.owners.append(cls) 569 570 @property 571 def fld(self): 572 # type: () -> Field[Any, Any] 573 return self._find_fld() 574 575 576class PadField(_FieldContainer): 577 """Add bytes after the proxified field so that it ends at the specified 578 alignment from its beginning""" 579 __slots__ = ["fld", "_align", "_padwith"] 580 581 def __init__(self, fld, align, padwith=None): 582 # type: (Field[Any, Any], int, Optional[bytes]) -> None 583 self.fld = fld 584 self._align = align 585 self._padwith = padwith or b"\x00" 586 587 def padlen(self, flen): 588 # type: (int) -> int 589 return -flen % self._align 590 591 def getfield(self, 592 pkt, # type: Packet 593 s, # type: bytes 594 ): 595 # type: (...) -> Tuple[bytes, Any] 596 remain, val = self.fld.getfield(pkt, s) 597 padlen = self.padlen(len(s) - len(remain)) 598 return remain[padlen:], val 599 600 def addfield(self, 601 pkt, # type: Packet 602 s, # type: bytes 603 val, # type: Any 604 ): 605 # type: (...) -> bytes 606 sval = self.fld.addfield(pkt, b"", val) 607 return s + sval + struct.pack( 608 "%is" % ( 609 self.padlen(len(sval)) 610 ), 611 self._padwith 612 ) 613 614 615class ReversePadField(PadField): 616 """Add bytes BEFORE the proxified field so that it starts at the specified 617 alignment from its beginning""" 618 619 def getfield(self, 620 pkt, # type: Packet 621 s, # type: bytes 622 ): 623 # type: (...) -> Tuple[bytes, Any] 624 # We need to get the length that has already been dissected 625 padlen = self.padlen(len(pkt.original) - len(s)) 626 remain, val = self.fld.getfield(pkt, s[padlen:]) 627 return remain, val 628 629 def addfield(self, 630 pkt, # type: Packet 631 s, # type: bytes 632 val, # type: Any 633 ): 634 # type: (...) -> bytes 635 sval = self.fld.addfield(pkt, b"", val) 636 return s + struct.pack("%is" % ( 637 self.padlen(len(s)) 638 ), self._padwith) + sval 639 640 641class FCSField(Field[int, int]): 642 """Special Field that gets its value from the end of the *packet* 643 (Note: not layer, but packet). 644 645 Mostly used for FCS 646 """ 647 648 def getfield(self, pkt, s): 649 # type: (Packet, bytes) -> Tuple[bytes, int] 650 previous_post_dissect = pkt.post_dissect 651 val = self.m2i(pkt, struct.unpack(self.fmt, s[-self.sz:])[0]) 652 653 def _post_dissect(self, s): 654 # type: (Packet, bytes) -> bytes 655 # Reset packet to allow post_build 656 self.raw_packet_cache = None 657 self.post_dissect = previous_post_dissect # type: ignore 658 return previous_post_dissect(s) 659 pkt.post_dissect = MethodType(_post_dissect, pkt) # type: ignore 660 return s[:-self.sz], val 661 662 def addfield(self, pkt, s, val): 663 # type: (Packet, bytes, Optional[int]) -> bytes 664 previous_post_build = pkt.post_build 665 value = struct.pack(self.fmt, self.i2m(pkt, val)) 666 667 def _post_build(self, p, pay): 668 # type: (Packet, bytes, bytes) -> bytes 669 pay += value 670 self.post_build = previous_post_build # type: ignore 671 return previous_post_build(p, pay) 672 pkt.post_build = MethodType(_post_build, pkt) # type: ignore 673 return s 674 675 def i2repr(self, pkt, x): 676 # type: (Optional[Packet], int) -> str 677 return lhex(self.i2h(pkt, x)) 678 679 680class DestField(Field[str, bytes]): 681 __slots__ = ["defaultdst"] 682 # Each subclass must have its own bindings attribute 683 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]] 684 685 def __init__(self, name, default): 686 # type: (str, str) -> None 687 self.defaultdst = default 688 689 def dst_from_pkt(self, pkt): 690 # type: (Packet) -> str 691 for addr, condition in self.bindings.get(pkt.payload.__class__, []): 692 try: 693 if all(pkt.payload.getfieldval(field) == value 694 for field, value in six.iteritems(condition)): 695 return addr # type: ignore 696 except AttributeError: 697 pass 698 return self.defaultdst 699 700 @classmethod 701 def bind_addr(cls, layer, addr, **condition): 702 # type: (Type[Packet], str, **Any) -> None 703 cls.bindings.setdefault(layer, []).append( # type: ignore 704 (addr, condition) 705 ) 706 707 708class MACField(Field[Optional[str], bytes]): 709 def __init__(self, name, default): 710 # type: (str, Optional[Any]) -> None 711 Field.__init__(self, name, default, "6s") 712 713 def i2m(self, pkt, x): 714 # type: (Optional[Packet], Optional[str]) -> bytes 715 if x is None: 716 return b"\0\0\0\0\0\0" 717 try: 718 y = mac2str(x) 719 except (struct.error, OverflowError): 720 y = bytes_encode(x) 721 return y 722 723 def m2i(self, pkt, x): 724 # type: (Optional[Packet], bytes) -> str 725 return str2mac(x) 726 727 def any2i(self, pkt, x): 728 # type: (Optional[Packet], Any) -> str 729 if isinstance(x, bytes) and len(x) == 6: 730 return self.m2i(pkt, x) 731 return cast(str, x) 732 733 def i2repr(self, pkt, x): 734 # type: (Optional[Packet], Optional[str]) -> str 735 x = self.i2h(pkt, x) 736 if x is None: 737 return repr(x) 738 if self in conf.resolve: 739 x = conf.manufdb._resolve_MAC(x) 740 return x 741 742 def randval(self): 743 # type: () -> RandMAC 744 return RandMAC() 745 746 747class IPField(Field[Union[str, Net], bytes]): 748 def __init__(self, name, default): 749 # type: (str, Optional[str]) -> None 750 Field.__init__(self, name, default, "4s") 751 752 def h2i(self, pkt, x): 753 # type: (Optional[Packet], Union[AnyStr, List[AnyStr]]) -> Any 754 if isinstance(x, bytes): 755 x = plain_str(x) # type: ignore 756 if isinstance(x, str): 757 try: 758 inet_aton(x) 759 except socket.error: 760 return Net(x) 761 elif isinstance(x, list): 762 return [self.h2i(pkt, n) for n in x] 763 return x 764 765 def i2h(self, pkt, x): 766 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str 767 return cast(str, x) 768 769 def resolve(self, x): 770 # type: (str) -> str 771 if self in conf.resolve: 772 try: 773 ret = socket.gethostbyaddr(x)[0] 774 except Exception: 775 pass 776 else: 777 if ret: 778 return ret 779 return x 780 781 def i2m(self, pkt, x): 782 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 783 if x is None: 784 return b'\x00\x00\x00\x00' 785 return inet_aton(plain_str(x)) 786 787 def m2i(self, pkt, x): 788 # type: (Optional[Packet], bytes) -> str 789 return inet_ntoa(x) 790 791 def any2i(self, pkt, x): 792 # type: (Optional[Packet], Any) -> Any 793 return self.h2i(pkt, x) 794 795 def i2repr(self, pkt, x): 796 # type: (Optional[Packet], Union[str, Net]) -> str 797 r = self.resolve(self.i2h(pkt, x)) 798 return r if isinstance(r, str) else repr(r) 799 800 def randval(self): 801 # type: () -> RandIP 802 return RandIP() 803 804 805class SourceIPField(IPField): 806 __slots__ = ["dstname"] 807 808 def __init__(self, name, dstname): 809 # type: (str, Optional[str]) -> None 810 IPField.__init__(self, name, None) 811 self.dstname = dstname 812 813 def __findaddr(self, pkt): 814 # type: (Packet) -> str 815 if conf.route is None: 816 # unused import, only to initialize conf.route 817 import scapy.route # noqa: F401 818 dst = ("0.0.0.0" if self.dstname is None 819 else getattr(pkt, self.dstname) or "0.0.0.0") 820 if isinstance(dst, (Gen, list)): 821 r = { 822 conf.route.route(str(daddr)) 823 for daddr in dst 824 } # type: Set[Tuple[str, str, str]] 825 if len(r) > 1: 826 warning("More than one possible route for %r" % (dst,)) 827 return min(r)[1] 828 return conf.route.route(dst)[1] 829 830 def i2m(self, pkt, x): 831 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 832 if x is None and pkt is not None: 833 x = self.__findaddr(pkt) 834 return super(SourceIPField, self).i2m(pkt, x) 835 836 def i2h(self, pkt, x): 837 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str 838 if x is None and pkt is not None: 839 x = self.__findaddr(pkt) 840 return super(SourceIPField, self).i2h(pkt, x) 841 842 843class IP6Field(Field[Optional[Union[str, Net6]], bytes]): 844 def __init__(self, name, default): 845 # type: (str, Optional[str]) -> None 846 Field.__init__(self, name, default, "16s") 847 848 def h2i(self, pkt, x): 849 # type: (Optional[Packet], Optional[str]) -> str 850 if isinstance(x, bytes): 851 x = plain_str(x) 852 if isinstance(x, str): 853 try: 854 x = in6_ptop(x) 855 except socket.error: 856 return Net6(x) # type: ignore 857 elif isinstance(x, list): 858 x = [self.h2i(pkt, n) for n in x] 859 return x # type: ignore 860 861 def i2h(self, pkt, x): 862 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 863 return cast(str, x) 864 865 def i2m(self, pkt, x): 866 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 867 if x is None: 868 x = "::" 869 return inet_pton(socket.AF_INET6, plain_str(x)) 870 871 def m2i(self, pkt, x): 872 # type: (Optional[Packet], bytes) -> str 873 return inet_ntop(socket.AF_INET6, x) 874 875 def any2i(self, pkt, x): 876 # type: (Optional[Packet], Optional[str]) -> str 877 return self.h2i(pkt, x) 878 879 def i2repr(self, pkt, x): 880 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 881 if x is None: 882 return self.i2h(pkt, x) 883 elif not isinstance(x, Net6) and not isinstance(x, list): 884 if in6_isaddrTeredo(x): # print Teredo info 885 server, _, maddr, mport = teredoAddrExtractInfo(x) 886 return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr, mport) # noqa: E501 887 elif in6_isaddr6to4(x): # print encapsulated address 888 vaddr = in6_6to4ExtractAddr(x) 889 return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr) 890 r = self.i2h(pkt, x) # No specific information to return 891 return r if isinstance(r, str) else repr(r) 892 893 def randval(self): 894 # type: () -> RandIP6 895 return RandIP6() 896 897 898class SourceIP6Field(IP6Field): 899 __slots__ = ["dstname"] 900 901 def __init__(self, name, dstname): 902 # type: (str, str) -> None 903 IP6Field.__init__(self, name, None) 904 self.dstname = dstname 905 906 def i2m(self, pkt, x): 907 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 908 if x is None: 909 dst = ("::" if self.dstname is None else 910 getattr(pkt, self.dstname) or "::") 911 iff, x, nh = conf.route6.route(dst) 912 return super(SourceIP6Field, self).i2m(pkt, x) 913 914 def i2h(self, pkt, x): 915 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 916 if x is None: 917 if conf.route6 is None: 918 # unused import, only to initialize conf.route6 919 import scapy.route6 # noqa: F401 920 dst = ("::" if self.dstname is None else getattr(pkt, self.dstname)) # noqa: E501 921 if isinstance(dst, (Gen, list)): 922 r = {conf.route6.route(str(daddr)) 923 for daddr in dst} 924 if len(r) > 1: 925 warning("More than one possible route for %r" % (dst,)) 926 x = min(r)[1] 927 else: 928 x = conf.route6.route(dst)[1] 929 return super(SourceIP6Field, self).i2h(pkt, x) 930 931 932class DestIP6Field(IP6Field, DestField): 933 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]] 934 935 def __init__(self, name, default): 936 # type: (str, str) -> None 937 IP6Field.__init__(self, name, None) 938 DestField.__init__(self, name, default) 939 940 def i2m(self, pkt, x): 941 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 942 if x is None and pkt is not None: 943 x = self.dst_from_pkt(pkt) 944 return IP6Field.i2m(self, pkt, x) 945 946 def i2h(self, pkt, x): 947 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 948 if x is None and pkt is not None: 949 x = self.dst_from_pkt(pkt) 950 return super(DestIP6Field, self).i2h(pkt, x) 951 952 953class ByteField(Field[int, int]): 954 def __init__(self, name, default): 955 # type: (str, Optional[int]) -> None 956 Field.__init__(self, name, default, "B") 957 958 959class XByteField(ByteField): 960 def i2repr(self, pkt, x): 961 # type: (Optional[Packet], int) -> str 962 return lhex(self.i2h(pkt, x)) 963 964 965# XXX Unused field: at least add some tests 966class OByteField(ByteField): 967 def i2repr(self, pkt, x): 968 # type: (Optional[Packet], int) -> str 969 return "%03o" % self.i2h(pkt, x) 970 971 972class ThreeBytesField(Field[int, int]): 973 def __init__(self, name, default): 974 # type: (str, int) -> None 975 Field.__init__(self, name, default, "!I") 976 977 def addfield(self, pkt, s, val): 978 # type: (Packet, bytes, Optional[int]) -> bytes 979 return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4] 980 981 def getfield(self, pkt, s): 982 # type: (Packet, bytes) -> Tuple[bytes, int] 983 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) # noqa: E501 984 985 986class X3BytesField(ThreeBytesField, XByteField): 987 def i2repr(self, pkt, x): 988 # type: (Optional[Packet], int) -> str 989 return XByteField.i2repr(self, pkt, x) 990 991 992class LEThreeBytesField(ByteField): 993 def __init__(self, name, default): 994 # type: (str, Optional[int]) -> None 995 Field.__init__(self, name, default, "<I") 996 997 def addfield(self, pkt, s, val): 998 # type: (Packet, bytes, Optional[int]) -> bytes 999 return s + struct.pack(self.fmt, self.i2m(pkt, val))[:3] 1000 1001 def getfield(self, pkt, s): 1002 # type: (Optional[Packet], bytes) -> Tuple[bytes, int] 1003 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, s[:3] + b"\x00")[0]) # noqa: E501 1004 1005 1006class LEX3BytesField(LEThreeBytesField, XByteField): 1007 def i2repr(self, pkt, x): 1008 # type: (Optional[Packet], int) -> str 1009 return XByteField.i2repr(self, pkt, x) 1010 1011 1012class NBytesField(Field[int, List[int]]): 1013 def __init__(self, name, default, sz): 1014 # type: (str, Optional[int], int) -> None 1015 Field.__init__(self, name, default, "<" + "B" * sz) 1016 1017 def i2m(self, pkt, x): 1018 # type: (Optional[Packet], Optional[int]) -> List[int] 1019 if x is None: 1020 return [] 1021 x2m = list() 1022 for _ in range(self.sz): 1023 x2m.append(x % 256) 1024 x //= 256 1025 return x2m[::-1] 1026 1027 def m2i(self, pkt, x): 1028 # type: (Optional[Packet], Union[List[int], int]) -> int 1029 if isinstance(x, int): 1030 return x 1031 # x can be a tuple when coming from struct.unpack (from getfield) 1032 if isinstance(x, (list, tuple)): 1033 return sum(d * (256 ** i) for i, d in enumerate(x)) 1034 return 0 1035 1036 def i2repr(self, pkt, x): 1037 # type: (Optional[Packet], int) -> str 1038 if isinstance(x, integer_types): 1039 return '%i' % x 1040 return super(NBytesField, self).i2repr(pkt, x) 1041 1042 def addfield(self, pkt, s, val): 1043 # type: (Optional[Packet], bytes, Optional[int]) -> bytes 1044 return s + self.struct.pack(*self.i2m(pkt, val)) 1045 1046 def getfield(self, pkt, s): 1047 # type: (Optional[Packet], bytes) -> Tuple[bytes, int] 1048 return (s[self.sz:], 1049 self.m2i(pkt, self.struct.unpack(s[:self.sz]))) # type: ignore 1050 1051 1052class XNBytesField(NBytesField): 1053 def i2repr(self, pkt, x): 1054 # type: (Optional[Packet], int) -> str 1055 if isinstance(x, integer_types): 1056 return '0x%x' % x 1057 # x can be a tuple when coming from struct.unpack (from getfield) 1058 if isinstance(x, (list, tuple)): 1059 return "0x" + "".join("%02x" % b for b in x) 1060 return super(XNBytesField, self).i2repr(pkt, x) 1061 1062 1063class SignedByteField(Field[int, int]): 1064 def __init__(self, name, default): 1065 # type: (str, Optional[int]) -> None 1066 Field.__init__(self, name, default, "b") 1067 1068 1069class FieldValueRangeException(Scapy_Exception): 1070 pass 1071 1072 1073class FieldAttributeException(Scapy_Exception): 1074 pass 1075 1076 1077class YesNoByteField(ByteField): 1078 """ 1079 A byte based flag field that shows representation of its number 1080 based on a given association 1081 1082 In its default configuration the following representation is generated: 1083 x == 0 : 'no' 1084 x != 0 : 'yes' 1085 1086 In more sophisticated use-cases (e.g. yes/no/invalid) one can use the 1087 config attribute to configure. 1088 Key-value, key-range and key-value-set associations that will be used to 1089 generate the values representation. 1090 1091 - A range is given by a tuple (<first-val>, <last-value>) including the 1092 last value. 1093 - A single-value tuple is treated as scalar. 1094 - A list defines a set of (probably non consecutive) values that should be 1095 associated to a given key. 1096 1097 All values not associated with a key will be shown as number of type 1098 unsigned byte. 1099 1100 **For instance**:: 1101 1102 config = { 1103 'no' : 0, 1104 'foo' : (1,22), 1105 'yes' : 23, 1106 'bar' : [24,25, 42, 48, 87, 253] 1107 } 1108 1109 Generates the following representations:: 1110 1111 x == 0 : 'no' 1112 x == 15: 'foo' 1113 x == 23: 'yes' 1114 x == 42: 'bar' 1115 x == 43: 43 1116 1117 Another example, using the config attribute one could also revert 1118 the stock-yes-no-behavior:: 1119 1120 config = { 1121 'yes' : 0, 1122 'no' : (1,255) 1123 } 1124 1125 Will generate the following value representation:: 1126 1127 x == 0 : 'yes' 1128 x != 0 : 'no' 1129 1130 """ 1131 __slots__ = ['eval_fn'] 1132 1133 def _build_config_representation(self, config): 1134 # type: (Dict[str, Any]) -> None 1135 assoc_table = dict() 1136 for key in config: 1137 value_spec = config[key] 1138 1139 value_spec_type = type(value_spec) 1140 1141 if value_spec_type is int: 1142 if value_spec < 0 or value_spec > 255: 1143 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1144 'must be in range [0..255]'.format(value_spec)) # noqa: E501 1145 assoc_table[value_spec] = key 1146 1147 elif value_spec_type is list: 1148 for value in value_spec: 1149 if value < 0 or value > 255: 1150 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1151 'must be in range [0..255]'.format(value)) # noqa: E501 1152 assoc_table[value] = key 1153 1154 elif value_spec_type is tuple: 1155 value_spec_len = len(value_spec) 1156 if value_spec_len != 2: 1157 raise FieldAttributeException('invalid length {} of given config item tuple {} - must be ' # noqa: E501 1158 '(<start-range>, <end-range>).'.format(value_spec_len, value_spec)) # noqa: E501 1159 1160 value_range_start = value_spec[0] 1161 if value_range_start < 0 or value_range_start > 255: 1162 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1163 'must be in range [0..255]'.format(value_range_start)) # noqa: E501 1164 1165 value_range_end = value_spec[1] 1166 if value_range_end < 0 or value_range_end > 255: 1167 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1168 'must be in range [0..255]'.format(value_range_end)) # noqa: E501 1169 1170 for value in range(value_range_start, value_range_end + 1): 1171 1172 assoc_table[value] = key 1173 1174 self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x 1175 1176 def __init__(self, name, default, config=None): 1177 # type: (str, int, Optional[Dict[str, Any]]) -> None 1178 1179 if not config: 1180 # this represents the common use case and therefore it is kept small # noqa: E501 1181 self.eval_fn = lambda x: 'no' if x == 0 else 'yes' 1182 else: 1183 self._build_config_representation(config) 1184 ByteField.__init__(self, name, default) 1185 1186 def i2repr(self, pkt, x): 1187 # type: (Optional[Packet], int) -> str 1188 return self.eval_fn(x) # type: ignore 1189 1190 1191class ShortField(Field[int, int]): 1192 def __init__(self, name, default): 1193 # type: (str, Optional[int]) -> None 1194 Field.__init__(self, name, default, "H") 1195 1196 1197class SignedShortField(Field[int, int]): 1198 def __init__(self, name, default): 1199 # type: (str, Optional[int]) -> None 1200 Field.__init__(self, name, default, "h") 1201 1202 1203class LEShortField(Field[int, int]): 1204 def __init__(self, name, default): 1205 # type: (str, Optional[int]) -> None 1206 Field.__init__(self, name, default, "<H") 1207 1208 1209class LESignedShortField(Field[int, int]): 1210 def __init__(self, name, default): 1211 # type: (str, Optional[int]) -> None 1212 Field.__init__(self, name, default, "<h") 1213 1214 1215class XShortField(ShortField): 1216 def i2repr(self, pkt, x): 1217 # type: (Optional[Packet], int) -> str 1218 return lhex(self.i2h(pkt, x)) 1219 1220 1221class IntField(Field[int, int]): 1222 def __init__(self, name, default): 1223 # type: (str, Optional[int]) -> None 1224 Field.__init__(self, name, default, "I") 1225 1226 1227class SignedIntField(Field[int, int]): 1228 def __init__(self, name, default): 1229 # type: (str, int) -> None 1230 Field.__init__(self, name, default, "i") 1231 1232 1233class LEIntField(Field[int, int]): 1234 def __init__(self, name, default): 1235 # type: (str, Optional[int]) -> None 1236 Field.__init__(self, name, default, "<I") 1237 1238 1239class LESignedIntField(Field[int, int]): 1240 def __init__(self, name, default): 1241 # type: (str, int) -> None 1242 Field.__init__(self, name, default, "<i") 1243 1244 1245class XIntField(IntField): 1246 def i2repr(self, pkt, x): 1247 # type: (Optional[Packet], int) -> str 1248 return lhex(self.i2h(pkt, x)) 1249 1250 1251class XLEIntField(LEIntField, XIntField): 1252 def i2repr(self, pkt, x): 1253 # type: (Optional[Packet], int) -> str 1254 return XIntField.i2repr(self, pkt, x) 1255 1256 1257class XLEShortField(LEShortField, XShortField): 1258 def i2repr(self, pkt, x): 1259 # type: (Optional[Packet], int) -> str 1260 return XShortField.i2repr(self, pkt, x) 1261 1262 1263class LongField(Field[int, int]): 1264 def __init__(self, name, default): 1265 # type: (str, int) -> None 1266 Field.__init__(self, name, default, "Q") 1267 1268 1269class SignedLongField(Field[int, int]): 1270 def __init__(self, name, default): 1271 # type: (str, Optional[int]) -> None 1272 Field.__init__(self, name, default, "q") 1273 1274 1275class LELongField(LongField): 1276 def __init__(self, name, default): 1277 # type: (str, Optional[int]) -> None 1278 Field.__init__(self, name, default, "<Q") 1279 1280 1281class LESignedLongField(Field[int, int]): 1282 def __init__(self, name, default): 1283 # type: (str, Optional[Any]) -> None 1284 Field.__init__(self, name, default, "<q") 1285 1286 1287class XLongField(LongField): 1288 def i2repr(self, pkt, x): 1289 # type: (Optional[Packet], int) -> str 1290 return lhex(self.i2h(pkt, x)) 1291 1292 1293class XLELongField(LELongField, XLongField): 1294 def i2repr(self, pkt, x): 1295 # type: (Optional[Packet], int) -> str 1296 return XLongField.i2repr(self, pkt, x) 1297 1298 1299class IEEEFloatField(Field[int, int]): 1300 def __init__(self, name, default): 1301 # type: (str, Optional[int]) -> None 1302 Field.__init__(self, name, default, "f") 1303 1304 1305class IEEEDoubleField(Field[int, int]): 1306 def __init__(self, name, default): 1307 # type: (str, Optional[int]) -> None 1308 Field.__init__(self, name, default, "d") 1309 1310 1311class _StrField(Field[I, bytes]): 1312 __slots__ = ["remain"] 1313 1314 def __init__(self, name, default, fmt="H", remain=0): 1315 # type: (str, Optional[I], str, int) -> None 1316 Field.__init__(self, name, default, fmt) 1317 self.remain = remain 1318 1319 def i2len(self, pkt, x): 1320 # type: (Optional[Packet], Any) -> int 1321 if x is None: 1322 return 0 1323 return len(x) 1324 1325 def any2i(self, pkt, x): 1326 # type: (Optional[Packet], Any) -> I 1327 if isinstance(x, six.text_type): 1328 x = bytes_encode(x) 1329 return super(_StrField, self).any2i(pkt, x) # type: ignore 1330 1331 def i2repr(self, pkt, x): 1332 # type: (Optional[Packet], I) -> str 1333 if isinstance(x, bytes): 1334 return repr(plain_str(x)) 1335 return super(_StrField, self).i2repr(pkt, x) 1336 1337 def i2m(self, pkt, x): 1338 # type: (Optional[Packet], Optional[I]) -> bytes 1339 if x is None: 1340 return b"" 1341 if not isinstance(x, bytes): 1342 return bytes_encode(x) 1343 return x 1344 1345 def addfield(self, pkt, s, val): 1346 # type: (Packet, bytes, Optional[I]) -> bytes 1347 return s + self.i2m(pkt, val) 1348 1349 def getfield(self, pkt, s): 1350 # type: (Packet, bytes) -> Tuple[bytes, I] 1351 if self.remain == 0: 1352 return b"", self.m2i(pkt, s) 1353 else: 1354 return s[-self.remain:], self.m2i(pkt, s[:-self.remain]) 1355 1356 def randval(self): 1357 # type: () -> RandBin 1358 return RandBin(RandNum(0, 1200)) 1359 1360 1361class StrField(_StrField[bytes]): 1362 pass 1363 1364 1365class StrFieldUtf16(StrField): 1366 def h2i(self, pkt, x): 1367 # type: (Optional[Packet], Optional[str]) -> bytes 1368 return plain_str(x).encode('utf-16')[2:] 1369 1370 def any2i(self, pkt, x): 1371 # type: (Optional[Packet], Optional[str]) -> bytes 1372 if isinstance(x, six.text_type): 1373 return self.h2i(pkt, x) 1374 return super(StrFieldUtf16, self).any2i(pkt, x) 1375 1376 def i2repr(self, pkt, x): 1377 # type: (Optional[Packet], bytes) -> str 1378 return plain_str(x) 1379 1380 def i2h(self, pkt, x): 1381 # type: (Optional[Packet], bytes) -> str 1382 return bytes_encode(x).decode('utf-16') 1383 1384 1385K = TypeVar('K', List[BasePacket], BasePacket) 1386 1387 1388class _PacketField(_StrField[K]): 1389 __slots__ = ["cls"] 1390 holds_packets = 1 1391 1392 def __init__(self, 1393 name, # type: str 1394 default, # type: Optional[K] 1395 pkt_cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501 1396 ): 1397 # type: (...) -> None 1398 super(_PacketField, self).__init__(name, default) 1399 self.cls = pkt_cls 1400 1401 def i2m(self, 1402 pkt, # type: Optional[Packet] 1403 i, # type: Any 1404 ): 1405 # type: (...) -> bytes 1406 if i is None: 1407 return b"" 1408 return raw(i) 1409 1410 def m2i(self, pkt, m): 1411 # type: (Optional[Packet], bytes) -> Packet 1412 return self.cls(m) 1413 1414 def getfield(self, 1415 pkt, # type: Packet 1416 s, # type: bytes 1417 ): 1418 # type: (...) -> Tuple[bytes, K] 1419 i = self.m2i(pkt, s) 1420 remain = b"" 1421 if conf.padding_layer in i: 1422 r = i[conf.padding_layer] 1423 del(r.underlayer.payload) 1424 remain = r.load 1425 return remain, i 1426 1427 def randval(self): 1428 # type: () -> K 1429 from scapy.packet import fuzz 1430 return fuzz(self.cls()) # type: ignore 1431 1432 1433class PacketField(_PacketField[BasePacket]): 1434 pass 1435 1436 1437class PacketLenField(PacketField): 1438 __slots__ = ["length_from"] 1439 1440 def __init__(self, 1441 name, # type: str 1442 default, # type: Packet 1443 cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501 1444 length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501 1445 ): 1446 # type: (...) -> None 1447 super(PacketLenField, self).__init__(name, default, cls) 1448 self.length_from = length_from or (lambda x: 0) 1449 1450 def getfield(self, 1451 pkt, # type: Packet 1452 s, # type: bytes 1453 ): 1454 # type: (...) -> Tuple[bytes, Packet] 1455 len_pkt = self.length_from(pkt) 1456 try: 1457 i = self.m2i(pkt, s[:len_pkt]) 1458 except Exception: 1459 if conf.debug_dissector: 1460 raise 1461 i = conf.raw_layer(load=s[:len_pkt]) 1462 return s[len_pkt:], i 1463 1464 1465class PacketListField(_PacketField[List[BasePacket]]): 1466 """PacketListField represents a series of Packet instances that might 1467 occur right in the middle of another Packet field list. 1468 This field type may also be used to indicate that a series of Packet 1469 instances have a sibling semantic instead of a parent/child relationship 1470 (i.e. a stack of layers). 1471 """ 1472 __slots__ = ["count_from", "length_from", "next_cls_cb"] 1473 islist = 1 1474 1475 def __init__( 1476 self, 1477 name, # type: str 1478 default, # type: Optional[List[BasePacket]] 1479 pkt_cls=None, # type: Optional[Union[Callable[[bytes], Packet], Type[Packet]]] # noqa: E501 1480 count_from=None, # type: Optional[Callable[[Packet], int]] 1481 length_from=None, # type: Optional[Callable[[Packet], int]] 1482 next_cls_cb=None, # type: Optional[Callable[[Packet, List[BasePacket], Optional[Packet], bytes], Type[Packet]]] # noqa: E501 1483 ): 1484 # type: (...) -> None 1485 """ 1486 The number of Packet instances that are dissected by this field can 1487 be parametrized using one of three different mechanisms/parameters: 1488 1489 * count_from: a callback that returns the number of Packet 1490 instances to dissect. The callback prototype is:: 1491 1492 count_from(pkt:Packet) -> int 1493 1494 * length_from: a callback that returns the number of bytes that 1495 must be dissected by this field. The callback prototype is:: 1496 1497 length_from(pkt:Packet) -> int 1498 1499 * next_cls_cb: a callback that enables a Scapy developer to 1500 dynamically discover if another Packet instance should be 1501 dissected or not. See below for this callback prototype. 1502 1503 The bytes that are not consumed during the dissection of this field 1504 are passed to the next field of the current packet. 1505 1506 For the serialization of such a field, the list of Packets that are 1507 contained in a PacketListField can be heterogeneous and is 1508 unrestricted. 1509 1510 The type of the Packet instances that are dissected with this field is 1511 specified or discovered using one of the following mechanism: 1512 1513 * the pkt_cls parameter may contain a callable that returns an 1514 instance of the dissected Packet. This may either be a 1515 reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) 1516 to generate an homogeneous PacketListField or a function 1517 deciding the type of the Packet instance 1518 (e.g. _CDPGuessAddrRecord in contrib/cdp.py) 1519 1520 * the pkt_cls parameter may contain a class object with a defined 1521 ``dispatch_hook`` classmethod. That method must return a Packet 1522 instance. The ``dispatch_hook`` callmethod must implement the 1523 following prototype:: 1524 1525 dispatch_hook(cls, 1526 _pkt:Optional[Packet], 1527 *args, **kargs 1528 ) -> Type[Packet] 1529 1530 The _pkt parameter may contain a reference to the packet 1531 instance containing the PacketListField that is being 1532 dissected. 1533 1534 * the ``next_cls_cb`` parameter may contain a callable whose 1535 prototype is:: 1536 1537 cbk(pkt:Packet, 1538 lst:List[Packet], 1539 cur:Optional[Packet], 1540 remain:str 1541 ) -> Optional[Type[Packet]] 1542 1543 The pkt argument contains a reference to the Packet instance 1544 containing the PacketListField that is being dissected. 1545 The lst argument is the list of all Packet instances that were 1546 previously parsed during the current ``PacketListField`` 1547 dissection, saved for the very last Packet instance. 1548 The cur argument contains a reference to that very last parsed 1549 ``Packet`` instance. The remain argument contains the bytes 1550 that may still be consumed by the current PacketListField 1551 dissection operation. 1552 1553 This callback returns either the type of the next Packet to 1554 dissect or None to indicate that no more Packet are to be 1555 dissected. 1556 1557 These four arguments allows a variety of dynamic discovery of 1558 the number of Packet to dissect and of the type of each one of 1559 these Packets, including: type determination based on current 1560 Packet instances or its underlayers, continuation based on the 1561 previously parsed Packet instances within that PacketListField, 1562 continuation based on a look-ahead on the bytes to be 1563 dissected... 1564 1565 The pkt_cls and next_cls_cb parameters are semantically exclusive, 1566 although one could specify both. If both are specified, pkt_cls is 1567 silently ignored. The same is true for count_from and next_cls_cb. 1568 1569 length_from and next_cls_cb are compatible and the dissection will 1570 end, whichever of the two stop conditions comes first. 1571 1572 :param name: the name of the field 1573 :param default: the default value of this field; generally an empty 1574 Python list 1575 :param pkt_cls: either a callable returning a Packet instance or a 1576 class object defining a ``dispatch_hook`` class method 1577 :param count_from: a callback returning the number of Packet 1578 instances to dissect. 1579 :param length_from: a callback returning the number of bytes to dissect 1580 :param next_cls_cb: a callback returning either None or the type of 1581 the next Packet to dissect. 1582 """ 1583 if default is None: 1584 default = [] # Create a new list for each instance 1585 super(PacketListField, self).__init__( 1586 name, 1587 default, 1588 pkt_cls # type: ignore 1589 ) 1590 self.count_from = count_from 1591 self.length_from = length_from 1592 self.next_cls_cb = next_cls_cb 1593 1594 def any2i(self, pkt, x): 1595 # type: (Optional[Packet], Any) -> List[BasePacket] 1596 if not isinstance(x, list): 1597 return [x] 1598 else: 1599 return x 1600 1601 def i2count(self, 1602 pkt, # type: Optional[Packet] 1603 val, # type: List[BasePacket] 1604 ): 1605 # type: (...) -> int 1606 if isinstance(val, list): 1607 return len(val) 1608 return 1 1609 1610 def i2len(self, 1611 pkt, # type: Optional[Packet] 1612 val, # type: List[Packet] 1613 ): 1614 # type: (...) -> int 1615 return sum(len(p) for p in val) 1616 1617 def getfield(self, pkt, s): 1618 # type: (Packet, bytes) -> Tuple[bytes, List[BasePacket]] 1619 c = len_pkt = cls = None 1620 if self.length_from is not None: 1621 len_pkt = self.length_from(pkt) 1622 elif self.count_from is not None: 1623 c = self.count_from(pkt) 1624 if self.next_cls_cb is not None: 1625 cls = self.next_cls_cb(pkt, [], None, s) 1626 c = 1 1627 if cls is None: 1628 c = 0 1629 1630 lst = [] # type: List[BasePacket] 1631 ret = b"" 1632 remain = s 1633 if len_pkt is not None: 1634 remain, ret = s[:len_pkt], s[len_pkt:] 1635 while remain: 1636 if c is not None: 1637 if c <= 0: 1638 break 1639 c -= 1 1640 try: 1641 if cls is not None: 1642 p = cls(remain) 1643 else: 1644 p = self.m2i(pkt, remain) 1645 except Exception: 1646 if conf.debug_dissector: 1647 raise 1648 p = conf.raw_layer(load=remain) 1649 remain = b"" 1650 else: 1651 if conf.padding_layer in p: 1652 pad = p[conf.padding_layer] 1653 remain = pad.load 1654 del(pad.underlayer.payload) 1655 if self.next_cls_cb is not None: 1656 cls = self.next_cls_cb(pkt, lst, p, remain) 1657 if cls is not None: 1658 c = 0 if c is None else c 1659 c += 1 1660 else: 1661 remain = b"" 1662 lst.append(p) 1663 return remain + ret, lst 1664 1665 def addfield(self, pkt, s, val): 1666 # type: (Packet, bytes, Any) -> bytes 1667 return s + b"".join(bytes_encode(v) for v in val) 1668 1669 1670class StrFixedLenField(StrField): 1671 __slots__ = ["length_from"] 1672 1673 def __init__( 1674 self, 1675 name, # type: str 1676 default, # type: Optional[bytes] 1677 length=None, # type: Optional[int] 1678 length_from=None, # type: Optional[Callable[[Packet], int]] # noqa: E501 1679 ): 1680 # type: (...) -> None 1681 super(StrFixedLenField, self).__init__(name, default) 1682 self.length_from = length_from or (lambda x: 0) 1683 if length is not None: 1684 self.length_from = lambda x, length=length: length # type: ignore 1685 1686 def i2repr(self, 1687 pkt, # type: Optional[Packet] 1688 v, # type: bytes 1689 ): 1690 # type: (...) -> str 1691 if isinstance(v, bytes): 1692 v = v.rstrip(b"\0") 1693 return super(StrFixedLenField, self).i2repr(pkt, v) 1694 1695 def getfield(self, pkt, s): 1696 # type: (Packet, bytes) -> Tuple[bytes, bytes] 1697 len_pkt = self.length_from(pkt) 1698 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 1699 1700 def addfield(self, pkt, s, val): 1701 # type: (Packet, bytes, Optional[bytes]) -> bytes 1702 len_pkt = self.length_from(pkt) 1703 if len_pkt is None: 1704 return s + self.i2m(pkt, val) 1705 return s + struct.pack("%is" % len_pkt, self.i2m(pkt, val)) 1706 1707 def randval(self): 1708 # type: () -> RandBin 1709 try: 1710 len_pkt = self.length_from(None) # type: ignore 1711 except Exception: 1712 len_pkt = RandNum(0, 200) 1713 return RandBin(len_pkt) 1714 1715 1716class StrFixedLenEnumField(StrFixedLenField): 1717 __slots__ = ["enum"] 1718 1719 def __init__( 1720 self, 1721 name, # type: str 1722 default, # type: bytes 1723 length=None, # type: Optional[int] 1724 enum=None, # type: Optional[Dict[str, str]] 1725 length_from=None # type: Optional[Callable[[Optional[Packet]], int]] # noqa: E501 1726 ): 1727 # type: (...) -> None 1728 StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) # noqa: E501 1729 self.enum = enum 1730 1731 def i2repr(self, pkt, w): 1732 # type: (Optional[Packet], bytes) -> str 1733 v = plain_str(w) 1734 r = v.rstrip("\0") 1735 rr = repr(r) 1736 if self.enum: 1737 if v in self.enum: 1738 rr = "%s (%s)" % (rr, self.enum[v]) 1739 elif r in self.enum: 1740 rr = "%s (%s)" % (rr, self.enum[r]) 1741 return rr 1742 1743 1744class NetBIOSNameField(StrFixedLenField): 1745 def __init__(self, name, default, length=31): 1746 # type: (str, bytes, int) -> None 1747 StrFixedLenField.__init__(self, name, default, length) 1748 1749 def i2m(self, pkt, y): 1750 # type: (Optional[Packet], Optional[bytes]) -> bytes 1751 if pkt: 1752 len_pkt = self.length_from(pkt) // 2 1753 else: 1754 len_pkt = 0 1755 x = bytes_encode(y or b"") # type: bytes 1756 x += b" " * len_pkt 1757 x = x[:len_pkt] 1758 x = b"".join( 1759 chb(0x41 + (orb(b) >> 4)) + 1760 chb(0x41 + (orb(b) & 0xf)) 1761 for b in x 1762 ) # noqa: E501 1763 return b" " + x 1764 1765 def m2i(self, pkt, x): 1766 # type: (Optional[Packet], bytes) -> bytes 1767 x = x.strip(b"\x00").strip(b" ") 1768 return b"".join(map( 1769 lambda x, y: chb( 1770 (((orb(x) - 1) & 0xf) << 4) + ((orb(y) - 1) & 0xf) 1771 ), 1772 x[::2], x[1::2] 1773 )) 1774 1775 1776class StrLenField(StrField): 1777 __slots__ = ["length_from", "max_length"] 1778 1779 def __init__( 1780 self, 1781 name, # type: str 1782 default, # type: bytes 1783 length_from=None, # type: Optional[Callable[[Packet], int]] 1784 max_length=None, # type: Optional[Any] 1785 ): 1786 # type: (...) -> None 1787 super(StrLenField, self).__init__(name, default) 1788 self.length_from = length_from 1789 self.max_length = max_length 1790 1791 def getfield(self, pkt, s): 1792 # type: (Any, bytes) -> Tuple[bytes, bytes] 1793 len_pkt = (self.length_from or (lambda x: 0))(pkt) 1794 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 1795 1796 def randval(self): 1797 # type: () -> RandBin 1798 return RandBin(RandNum(0, self.max_length or 1200)) 1799 1800 1801class XStrField(StrField): 1802 """ 1803 StrField which value is printed as hexadecimal. 1804 """ 1805 1806 def i2repr(self, pkt, x): 1807 # type: (Optional[Packet], bytes) -> str 1808 if x is None: 1809 return repr(x) 1810 return bytes_hex(x).decode() 1811 1812 1813class _XStrLenField: 1814 def i2repr(self, pkt, x): 1815 # type: (Optional[Packet], bytes) -> str 1816 if not x: 1817 return repr(x) 1818 return bytes_hex( 1819 x[:(self.length_from or (lambda x: 0))(pkt)] # type: ignore 1820 ).decode() 1821 1822 1823class XStrLenField(_XStrLenField, StrLenField): 1824 """ 1825 StrLenField which value is printed as hexadecimal. 1826 """ 1827 1828 1829class XStrFixedLenField(_XStrLenField, StrFixedLenField): 1830 """ 1831 StrFixedLenField which value is printed as hexadecimal. 1832 """ 1833 1834 1835class XLEStrLenField(XStrLenField): 1836 def i2m(self, pkt, x): 1837 # type: (Optional[Packet], Optional[bytes]) -> bytes 1838 if not x: 1839 return b"" 1840 return x[:: -1] 1841 1842 def m2i(self, pkt, x): 1843 # type: (Optional[Packet], bytes) -> bytes 1844 return x[:: -1] 1845 1846 1847class StrLenFieldUtf16(StrLenField): 1848 def h2i(self, pkt, x): 1849 # type: (Optional[Packet], Optional[str]) -> bytes 1850 return plain_str(x).encode('utf-16')[2:] 1851 1852 def any2i(self, pkt, x): 1853 # type: (Optional[Packet], Any) -> bytes 1854 if isinstance(x, six.text_type): 1855 return self.h2i(pkt, x) 1856 return super(StrLenFieldUtf16, self).any2i(pkt, x) 1857 1858 def i2repr(self, pkt, x): 1859 # type: (Optional[Packet], bytes) -> str 1860 return plain_str(x) 1861 1862 def i2h(self, 1863 pkt, # type: Optional[Packet] 1864 x, # type: bytes 1865 ): 1866 # type: (...) -> str 1867 return bytes_encode(x).decode('utf-16') 1868 1869 1870class BoundStrLenField(StrLenField): 1871 __slots__ = ["minlen", "maxlen"] 1872 1873 def __init__( 1874 self, 1875 name, # type: str 1876 default, # type: bytes 1877 minlen=0, # type: int 1878 maxlen=255, # type: int 1879 length_from=None # type: Optional[Callable[[Packet], int]] 1880 ): 1881 # type: (...) -> None 1882 StrLenField.__init__(self, name, default, length_from=length_from) 1883 self.minlen = minlen 1884 self.maxlen = maxlen 1885 1886 def randval(self): 1887 # type: () -> RandBin 1888 return RandBin(RandNum(self.minlen, self.maxlen)) 1889 1890 1891class FieldListField(Field[List[Any], List[Any]]): 1892 __slots__ = ["field", "count_from", "length_from"] 1893 islist = 1 1894 1895 def __init__( 1896 self, 1897 name, # type: str 1898 default, # type: Optional[List[AnyField]] 1899 field, # type: AnyField 1900 length_from=None, # type: Optional[Callable[[Packet], int]] 1901 count_from=None, # type: Optional[Callable[[Packet], int]] 1902 ): 1903 # type: (...) -> None 1904 if default is None: 1905 default = [] # Create a new list for each instance 1906 self.field = field 1907 Field.__init__(self, name, default) 1908 self.count_from = count_from 1909 self.length_from = length_from 1910 1911 def i2count(self, pkt, val): 1912 # type: (Optional[Packet], List[Any]) -> int 1913 if isinstance(val, list): 1914 return len(val) 1915 return 1 1916 1917 def i2len(self, pkt, val): 1918 # type: (Packet, List[Any]) -> int 1919 return int(sum(self.field.i2len(pkt, v) for v in val)) 1920 1921 def any2i(self, pkt, x): 1922 # type: (Optional[Packet], List[Any]) -> List[Any] 1923 if not isinstance(x, list): 1924 return [self.field.any2i(pkt, x)] 1925 else: 1926 return [self.field.any2i(pkt, e) for e in x] 1927 1928 def i2repr(self, 1929 pkt, # type: Optional[Packet] 1930 x, # type: List[Any] 1931 ): 1932 # type: (...) -> str 1933 return "[%s]" % ", ".join(self.field.i2repr(pkt, v) for v in x) 1934 1935 def addfield(self, 1936 pkt, # type: Packet 1937 s, # type: bytes 1938 val, # type: Optional[List[Any]] 1939 ): 1940 # type: (...) -> bytes 1941 val = self.i2m(pkt, val) 1942 for v in val: 1943 s = self.field.addfield(pkt, s, v) 1944 return s 1945 1946 def getfield(self, 1947 pkt, # type: Packet 1948 s, # type: bytes 1949 ): 1950 # type: (...) -> Any 1951 c = len_pkt = None 1952 if self.length_from is not None: 1953 len_pkt = self.length_from(pkt) 1954 elif self.count_from is not None: 1955 c = self.count_from(pkt) 1956 1957 val = [] 1958 ret = b"" 1959 if len_pkt is not None: 1960 s, ret = s[:len_pkt], s[len_pkt:] 1961 1962 while s: 1963 if c is not None: 1964 if c <= 0: 1965 break 1966 c -= 1 1967 s, v = self.field.getfield(pkt, s) 1968 val.append(v) 1969 return s + ret, val 1970 1971 1972class FieldLenField(Field[int, int]): 1973 __slots__ = ["length_of", "count_of", "adjust"] 1974 1975 def __init__( 1976 self, 1977 name, # type: str 1978 default, # type: Optional[Any] 1979 length_of=None, # type: Optional[str] 1980 fmt="H", # type: str 1981 count_of=None, # type: Optional[str] 1982 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int] 1983 ): 1984 # type: (...) -> None 1985 Field.__init__(self, name, default, fmt) 1986 self.length_of = length_of 1987 self.count_of = count_of 1988 self.adjust = adjust 1989 1990 def i2m(self, pkt, x): 1991 # type: (Optional[Packet], Optional[int]) -> int 1992 if x is None and pkt is not None: 1993 if self.length_of is not None: 1994 fld, fval = pkt.getfield_and_val(self.length_of) 1995 f = fld.i2len(pkt, fval) 1996 elif self.count_of is not None: 1997 fld, fval = pkt.getfield_and_val(self.count_of) 1998 f = fld.i2count(pkt, fval) 1999 else: 2000 raise ValueError( 2001 "Field should have either length_of or count_of" 2002 ) 2003 x = self.adjust(pkt, f) 2004 elif x is None: 2005 x = 0 2006 return x 2007 2008 2009class StrNullField(StrField): 2010 def addfield(self, pkt, s, val): 2011 # type: (Packet, bytes, Optional[bytes]) -> bytes 2012 return s + self.i2m(pkt, val) + b"\x00" 2013 2014 def getfield(self, 2015 pkt, # type: Packet 2016 s, # type: bytes 2017 ): 2018 # type: (...) -> Tuple[bytes, bytes] 2019 len_str = s.find(b"\x00") 2020 if len_str < 0: 2021 # \x00 not found: return empty 2022 return b"", s 2023 return s[len_str + 1:], self.m2i(pkt, s[:len_str]) 2024 2025 def randval(self): 2026 # type: () -> RandTermString 2027 return RandTermString(RandNum(0, 1200), b"\x00") 2028 2029 2030class StrStopField(StrField): 2031 __slots__ = ["stop", "additional"] 2032 2033 def __init__(self, name, default, stop, additional=0): 2034 # type: (str, str, bytes, int) -> None 2035 Field.__init__(self, name, default) 2036 self.stop = stop 2037 self.additional = additional 2038 2039 def getfield(self, pkt, s): 2040 # type: (Optional[Packet], bytes) -> Tuple[bytes, bytes] 2041 len_str = s.find(self.stop) 2042 if len_str < 0: 2043 return b"", s 2044# raise Scapy_Exception,"StrStopField: stop value [%s] not found" %stop # noqa: E501 2045 len_str += len(self.stop) + self.additional 2046 return s[len_str:], s[:len_str] 2047 2048 def randval(self): 2049 # type: () -> RandTermString 2050 return RandTermString(RandNum(0, 1200), self.stop) 2051 2052 2053class LenField(Field[int, int]): 2054 """ 2055 If None, will be filled with the size of the payload 2056 """ 2057 __slots__ = ["adjust"] 2058 2059 def __init__(self, name, default, fmt="H", adjust=lambda x: x): 2060 # type: (str, Optional[Any], str, Callable[[int], int]) -> None 2061 Field.__init__(self, name, default, fmt) 2062 self.adjust = adjust 2063 2064 def i2m(self, 2065 pkt, # type: Optional[Packet] 2066 x, # type: Optional[int] 2067 ): 2068 # type: (...) -> int 2069 if x is None: 2070 x = 0 2071 if pkt is not None: 2072 x = self.adjust(len(pkt.payload)) 2073 return x 2074 2075 2076class BCDFloatField(Field[float, int]): 2077 def i2m(self, pkt, x): 2078 # type: (Optional[Packet], Optional[float]) -> int 2079 if x is None: 2080 return 0 2081 return int(256 * x) 2082 2083 def m2i(self, pkt, x): 2084 # type: (Optional[Packet], int) -> float 2085 return x / 256.0 2086 2087 2088class _BitField(Field[I, int]): 2089 """ 2090 Field to handle bits. 2091 2092 :param name: name of the field 2093 :param default: default value 2094 :param size: size (in bits). If negative, Low endian 2095 :param tot_size: size of the total group of bits (in bytes) the bitfield 2096 is in. If negative, Low endian. 2097 :param end_tot_size: same but for the BitField ending a group. 2098 2099 Example - normal usage:: 2100 2101 0 1 2 3 2102 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2103 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 2104 | A | B | C | 2105 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 2106 2107 Fig. TestPacket 2108 2109 class TestPacket(Packet): 2110 fields_desc = [ 2111 BitField("a", 0, 14), 2112 BitField("b", 0, 16), 2113 BitField("c", 0, 2), 2114 ] 2115 2116 Example - Low endian stored as 16 bits on the network:: 2117 2118 x x x x x x x x x x x x x x x x 2119 a [b] [ c ] [ a ] 2120 2121 Will first get reversed during dissecion: 2122 2123 x x x x x x x x x x x x x x x x 2124 [ a ] [b] [ c ] 2125 2126 class TestPacket(Packet): 2127 fields_desc = [ 2128 BitField("a", 0, 9, tot_size=-2), 2129 BitField("b", 0, 2), 2130 BitField("c", 0, 5, end_tot_size=-2) 2131 ] 2132 2133 """ 2134 __slots__ = ["rev", "size", "tot_size", "end_tot_size"] 2135 2136 def __init__(self, name, default, size, 2137 tot_size=0, end_tot_size=0): 2138 # type: (str, I, int, int, int) -> None 2139 Field.__init__(self, name, default) 2140 if callable(size): 2141 size = size(self) 2142 self.rev = size < 0 or tot_size < 0 or end_tot_size < 0 2143 self.size = abs(size) 2144 if not tot_size: 2145 tot_size = self.size // 8 2146 self.tot_size = abs(tot_size) 2147 if not end_tot_size: 2148 end_tot_size = self.size // 8 2149 self.end_tot_size = abs(end_tot_size) 2150 # Fields always have a round sz except BitField 2151 # so to keep it simple, we'll ignore it here. 2152 self.sz = self.size / 8. # type: ignore 2153 2154 # We need to # type: ignore a few things because of how special 2155 # BitField is 2156 def addfield(self, # type: ignore 2157 pkt, # type: Packet 2158 s, # type: Union[Tuple[bytes, int, int], bytes] 2159 ival, # type: I 2160 ): 2161 # type: (...) -> Union[Tuple[bytes, int, int], bytes] 2162 val = self.i2m(pkt, ival) 2163 if isinstance(s, tuple): 2164 s, bitsdone, v = s 2165 else: 2166 bitsdone = 0 2167 v = 0 2168 v <<= self.size 2169 v |= val & ((1 << self.size) - 1) 2170 bitsdone += self.size 2171 while bitsdone >= 8: 2172 bitsdone -= 8 2173 s = s + struct.pack("!B", v >> bitsdone) 2174 v &= (1 << bitsdone) - 1 2175 if bitsdone: 2176 return s, bitsdone, v 2177 else: 2178 # Apply LE if necessary 2179 if self.rev and self.end_tot_size > 1: 2180 s = s[:-self.end_tot_size] + s[-self.end_tot_size:][::-1] 2181 return s 2182 2183 def getfield(self, # type: ignore 2184 pkt, # type: Packet 2185 s, # type: Union[Tuple[bytes, int], bytes] 2186 ): 2187 # type: (...) -> Union[Tuple[Tuple[bytes, int], I], Tuple[bytes, I]] # noqa: E501 2188 if isinstance(s, tuple): 2189 s, bn = s 2190 else: 2191 bn = 0 2192 # Apply LE if necessary 2193 if self.rev and self.tot_size > 1: 2194 s = s[:self.tot_size][::-1] + s[self.tot_size:] 2195 2196 # we don't want to process all the string 2197 nb_bytes = (self.size + bn - 1) // 8 + 1 2198 w = s[:nb_bytes] 2199 2200 # split the substring byte by byte 2201 _bytes = struct.unpack('!%dB' % nb_bytes, w) 2202 2203 b = 0 2204 for c in range(nb_bytes): 2205 b |= int(_bytes[c]) << (nb_bytes - c - 1) * 8 2206 2207 # get rid of high order bits 2208 b &= (1 << (nb_bytes * 8 - bn)) - 1 2209 2210 # remove low order bits 2211 b = b >> (nb_bytes * 8 - self.size - bn) 2212 2213 bn += self.size 2214 s = s[bn // 8:] 2215 bn = bn % 8 2216 b2 = self.m2i(pkt, b) 2217 if bn: 2218 return (s, bn), b2 2219 else: 2220 return s, b2 2221 2222 def randval(self): 2223 # type: () -> RandNum 2224 return RandNum(0, 2**self.size - 1) 2225 2226 def i2len(self, pkt, x): # type: ignore 2227 # type: (Optional[Packet], Optional[float]) -> float 2228 return float(self.size) / 8 2229 2230 2231class BitField(_BitField[int]): 2232 __doc__ = _BitField.__doc__ 2233 2234 2235class BitFixedLenField(BitField): 2236 __slots__ = ["length_from"] 2237 2238 def __init__(self, 2239 name, # type: str 2240 default, # type: int 2241 length_from # type: Callable[[Packet], int] 2242 ): 2243 # type: (...) -> None 2244 self.length_from = length_from 2245 super(BitFixedLenField, self).__init__(name, default, 0) 2246 2247 def getfield(self, # type: ignore 2248 pkt, # type: Packet 2249 s, # type: Union[Tuple[bytes, int], bytes] 2250 ): 2251 # type: (...) -> Union[Tuple[Tuple[bytes, int], int], Tuple[bytes, int]] # noqa: E501 2252 self.size = self.length_from(pkt) 2253 return super(BitFixedLenField, self).getfield(pkt, s) 2254 2255 def addfield(self, # type: ignore 2256 pkt, # type: Packet 2257 s, # type: Union[Tuple[bytes, int, int], bytes] 2258 val # type: int 2259 ): 2260 # type: (...) -> Union[Tuple[bytes, int, int], bytes] 2261 self.size = self.length_from(pkt) 2262 return super(BitFixedLenField, self).addfield(pkt, s, val) 2263 2264 2265class BitFieldLenField(BitField): 2266 __slots__ = ["length_of", "count_of", "adjust"] 2267 2268 def __init__(self, 2269 name, # type: str 2270 default, # type: int 2271 size, # type: int 2272 length_of=None, # type: Optional[Union[Callable[[Optional[Packet]], int], str]] # noqa: E501 2273 count_of=None, # type: Optional[str] 2274 adjust=lambda pkt, x: x, # type: Callable[[Optional[Packet], int], int] # noqa: E501 2275 ): 2276 # type: (...) -> None 2277 super(BitFieldLenField, self).__init__(name, default, size) 2278 self.length_of = length_of 2279 self.count_of = count_of 2280 self.adjust = adjust 2281 2282 def i2m(self, pkt, x): 2283 # type: (Optional[Packet], Optional[Any]) -> int 2284 if six.PY2: 2285 func = FieldLenField.i2m.__func__ 2286 else: 2287 func = FieldLenField.i2m 2288 return func(self, pkt, x) # type: ignore 2289 2290 2291class XBitField(BitField): 2292 def i2repr(self, pkt, x): 2293 # type: (Optional[Packet], int) -> str 2294 return lhex(self.i2h(pkt, x)) 2295 2296 2297class _EnumField(Field[Union[List[I], I], I]): 2298 def __init__(self, 2299 name, # type: str 2300 default, # type: Optional[I] 2301 enum, # type: Union[Dict[I, str], Dict[str, I], List[str], DADict[I, str], Tuple[Callable[[I], str], Callable[[str], I]]] # noqa: E501 2302 fmt="H", # type: str 2303 ): 2304 # type: (...) -> None 2305 """ Initializes enum fields. 2306 2307 @param name: name of this field 2308 @param default: default value of this field 2309 @param enum: either a dict or a tuple of two callables. Dict keys are # noqa: E501 2310 the internal values, while the dict values are the 2311 user-friendly representations. If the tuple is provided, # noqa: E501 2312 the first callable receives the internal value as 2313 parameter and returns the user-friendly representation 2314 and the second callable does the converse. The first 2315 callable may return None to default to a literal string 2316 (repr()) representation. 2317 @param fmt: struct.pack format used to parse and serialize the 2318 internal value from and to machine representation. 2319 """ 2320 if isinstance(enum, ObservableDict): 2321 cast(ObservableDict, enum).observe(self) 2322 2323 if isinstance(enum, tuple): 2324 self.i2s_cb = enum[0] # type: Optional[Callable[[I], str]] 2325 self.s2i_cb = enum[1] # type: Optional[Callable[[str], I]] 2326 self.i2s = None # type: Optional[Dict[I, str]] 2327 self.s2i = None # type: Optional[Dict[str, I]] 2328 else: 2329 i2s = self.i2s = {} 2330 s2i = self.s2i = {} 2331 self.i2s_cb = None 2332 self.s2i_cb = None 2333 if isinstance(enum, list): 2334 keys = list(range(len(enum))) 2335 elif isinstance(enum, DADict): 2336 keys = enum.keys() 2337 else: 2338 keys = list(enum) 2339 if any(isinstance(x, str) for x in keys): 2340 i2s, s2i = s2i, i2s # type: ignore 2341 for k in keys: 2342 value = cast(str, enum[k]) 2343 i2s[k] = value 2344 s2i[value] = k 2345 Field.__init__(self, name, default, fmt) 2346 2347 def any2i_one(self, pkt, x): 2348 # type: (Optional[Packet], Any) -> I 2349 if isinstance(x, str): 2350 if self.s2i: 2351 try: 2352 x = self.s2i[x] 2353 except KeyError: 2354 pass 2355 elif self.s2i_cb: 2356 x = self.s2i_cb(x) 2357 return cast(I, x) 2358 2359 def i2repr_one(self, pkt, x): 2360 # type: (Optional[Packet], I) -> str 2361 if self not in conf.noenum and not isinstance(x, VolatileValue): 2362 if self.i2s: 2363 try: 2364 return self.i2s[x] 2365 except KeyError: 2366 pass 2367 elif self.i2s_cb: 2368 ret = self.i2s_cb(x) 2369 if ret is not None: 2370 return ret 2371 return repr(x) 2372 2373 def any2i(self, pkt, x): 2374 # type: (Optional[Packet], Any) -> Union[I, List[I]] 2375 if isinstance(x, list): 2376 return [self.any2i_one(pkt, z) for z in x] 2377 else: 2378 return self.any2i_one(pkt, x) 2379 2380 def i2repr(self, pkt, x): # type: ignore 2381 # type: (Optional[Packet], Any) -> Union[List[str], str] 2382 if isinstance(x, list): 2383 return [self.i2repr_one(pkt, z) for z in x] 2384 else: 2385 return self.i2repr_one(pkt, x) 2386 2387 def notify_set(self, enum, key, value): 2388 # type: (ObservableDict, I, str) -> None 2389 ks = "0x%x" if isinstance(key, int) else "%s" 2390 log_runtime.debug( 2391 "At %s: Change to %s at " + ks, self, value, key 2392 ) 2393 if self.i2s is not None and self.s2i is not None: 2394 self.i2s[key] = value 2395 self.s2i[value] = key 2396 2397 def notify_del(self, enum, key): 2398 # type: (ObservableDict, I) -> None 2399 ks = "0x%x" if isinstance(key, int) else "%s" 2400 log_runtime.debug("At %s: Delete value at " + ks, self, key) 2401 if self.i2s is not None and self.s2i is not None: 2402 value = self.i2s[key] 2403 del self.i2s[key] 2404 del self.s2i[value] 2405 2406 2407class EnumField(_EnumField[I]): 2408 __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"] 2409 2410 2411class CharEnumField(EnumField[str]): 2412 def __init__(self, 2413 name, # type: str 2414 default, # type: str 2415 enum, # type: Union[Dict[str, str], Tuple[Callable[[str], str], Callable[[str], str]]] # noqa: E501 2416 fmt="1s", # type: str 2417 ): 2418 # type: (...) -> None 2419 EnumField.__init__(self, name, default, enum, fmt) 2420 if self.i2s is not None: 2421 k = list(self.i2s) 2422 if k and len(k[0]) != 1: 2423 self.i2s, self.s2i = self.s2i, self.i2s 2424 2425 def any2i_one(self, pkt, x): 2426 # type: (Optional[Packet], str) -> str 2427 if len(x) != 1: 2428 if self.s2i: 2429 x = self.s2i[x] 2430 elif self.s2i_cb: 2431 x = self.s2i_cb(x) 2432 return x 2433 2434 2435class BitEnumField(_BitField[Union[List[int], int]], _EnumField[int]): 2436 __slots__ = EnumField.__slots__ 2437 2438 def __init__(self, name, default, size, enum): 2439 # type: (str, Optional[int], int, Dict[int, str]) -> None 2440 _EnumField.__init__(self, name, default, enum) 2441 self.rev = size < 0 2442 self.size = abs(size) 2443 self.sz = self.size / 8. # type: ignore 2444 2445 def any2i(self, pkt, x): 2446 # type: (Optional[Packet], Any) -> Union[List[int], int] 2447 return _EnumField.any2i(self, pkt, x) 2448 2449 def i2repr(self, 2450 pkt, # type: Optional[Packet] 2451 x, # type: Union[List[int], int] 2452 ): 2453 # type: (...) -> Any 2454 return _EnumField.i2repr(self, pkt, x) 2455 2456 2457class ShortEnumField(EnumField[int]): 2458 __slots__ = EnumField.__slots__ 2459 2460 def __init__(self, 2461 name, # type: str 2462 default, # type: int 2463 enum, # type: Union[Dict[int, str], Dict[str, int], Tuple[Callable[[int], str], Callable[[str], int]], DADict[int, str]] # noqa: E501 2464 ): 2465 # type: (...) -> None 2466 EnumField.__init__(self, name, default, enum, "H") 2467 2468 2469class LEShortEnumField(EnumField[int]): 2470 def __init__(self, name, default, enum): 2471 # type: (str, int, Union[Dict[int, str], List[str]]) -> None 2472 EnumField.__init__(self, name, default, enum, "<H") 2473 2474 2475class ByteEnumField(EnumField[int]): 2476 def __init__(self, name, default, enum): 2477 # type: (str, Optional[int], Dict[int, str]) -> None 2478 EnumField.__init__(self, name, default, enum, "B") 2479 2480 2481class XByteEnumField(ByteEnumField): 2482 def i2repr_one(self, pkt, x): 2483 # type: (Optional[Packet], int) -> str 2484 if self not in conf.noenum and not isinstance(x, VolatileValue): 2485 if self.i2s: 2486 try: 2487 return self.i2s[x] 2488 except KeyError: 2489 pass 2490 elif self.i2s_cb: 2491 ret = self.i2s_cb(x) 2492 if ret is not None: 2493 return ret 2494 return lhex(x) 2495 2496 2497class IntEnumField(EnumField[int]): 2498 def __init__(self, name, default, enum): 2499 # type: (str, Optional[int], Dict[int, str]) -> None 2500 EnumField.__init__(self, name, default, enum, "I") 2501 2502 2503class SignedIntEnumField(EnumField[int]): 2504 def __init__(self, name, default, enum): 2505 # type: (str, Optional[int], Dict[int, str]) -> None 2506 EnumField.__init__(self, name, default, enum, "i") 2507 2508 2509class LEIntEnumField(EnumField[int]): 2510 def __init__(self, name, default, enum): 2511 # type: (str, int, Dict[int, str]) -> None 2512 EnumField.__init__(self, name, default, enum, "<I") 2513 2514 2515class XShortEnumField(ShortEnumField): 2516 def i2repr_one(self, pkt, x): 2517 # type: (Optional[Packet], int) -> str 2518 if self not in conf.noenum and not isinstance(x, VolatileValue): 2519 if self.i2s is not None: 2520 try: 2521 return self.i2s[x] 2522 except KeyError: 2523 pass 2524 elif self.i2s_cb: 2525 ret = self.i2s_cb(x) 2526 if ret is not None: 2527 return ret 2528 return lhex(x) 2529 2530 2531class _MultiEnumField(_EnumField[I]): 2532 def __init__(self, 2533 name, # type: str 2534 default, # type: int 2535 enum, # type: Dict[I, Dict[I, str]] 2536 depends_on, # type: Callable[[Optional[Packet]], I] 2537 fmt="H" # type: str 2538 ): 2539 # type: (...) -> None 2540 2541 self.depends_on = depends_on 2542 self.i2s_multi = enum 2543 self.s2i_multi = {} # type: Dict[I, Dict[str, I]] 2544 self.s2i_all = {} # type: Dict[str, I] 2545 for m in enum: 2546 s2i = {} # type: Dict[str, I] 2547 self.s2i_multi[m] = s2i 2548 for k, v in six.iteritems(enum[m]): 2549 s2i[v] = k 2550 self.s2i_all[v] = k 2551 Field.__init__(self, name, default, fmt) 2552 2553 def any2i_one(self, pkt, x): 2554 # type: (Optional[Packet], Any) -> I 2555 if isinstance(x, str): 2556 v = self.depends_on(pkt) 2557 if v in self.s2i_multi: 2558 s2i = self.s2i_multi[v] 2559 if x in s2i: 2560 return s2i[x] 2561 return self.s2i_all[x] 2562 return cast(I, x) 2563 2564 def i2repr_one(self, pkt, x): 2565 # type: (Optional[Packet], I) -> str 2566 v = self.depends_on(pkt) 2567 if isinstance(v, VolatileValue): 2568 return repr(v) 2569 if v in self.i2s_multi: 2570 return str(self.i2s_multi[v].get(x, x)) 2571 return str(x) 2572 2573 2574class MultiEnumField(_MultiEnumField[int], EnumField[int]): 2575 __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"] 2576 2577 2578class BitMultiEnumField(_BitField[Union[List[int], int]], 2579 _MultiEnumField[int]): 2580 __slots__ = EnumField.__slots__ + MultiEnumField.__slots__ 2581 2582 def __init__( 2583 self, 2584 name, # type: str 2585 default, # type: int 2586 size, # type: int 2587 enum, # type: Dict[int, Dict[int, str]] 2588 depends_on # type: Callable[[Optional[Packet]], int] 2589 ): 2590 # type: (...) -> None 2591 _MultiEnumField.__init__(self, name, default, enum, depends_on) 2592 self.rev = size < 0 2593 self.size = abs(size) 2594 self.sz = self.size / 8. # type: ignore 2595 2596 def any2i(self, pkt, x): 2597 # type: (Optional[Packet], Any) -> Union[List[int], int] 2598 return _MultiEnumField.any2i(self, pkt, x) 2599 2600 def i2repr( # type: ignore 2601 self, 2602 pkt, # type: Optional[Packet] 2603 x # type: Union[List[int], int] 2604 ): 2605 # type: (...) -> Union[str, List[str]] 2606 return _MultiEnumField.i2repr(self, pkt, x) 2607 2608 2609class ByteEnumKeysField(ByteEnumField): 2610 """ByteEnumField that picks valid values when fuzzed. """ 2611 2612 def randval(self): 2613 # type: () -> RandEnumKeys 2614 return RandEnumKeys(self.i2s) 2615 2616 2617class ShortEnumKeysField(ShortEnumField): 2618 """ShortEnumField that picks valid values when fuzzed. """ 2619 2620 def randval(self): 2621 # type: () -> RandEnumKeys 2622 return RandEnumKeys(self.i2s) 2623 2624 2625class IntEnumKeysField(IntEnumField): 2626 """IntEnumField that picks valid values when fuzzed. """ 2627 2628 def randval(self): 2629 # type: () -> RandEnumKeys 2630 return RandEnumKeys(self.i2s) 2631 2632 2633# Little endian fixed length field 2634 2635 2636class LEFieldLenField(FieldLenField): 2637 def __init__( 2638 self, 2639 name, # type: str 2640 default, # type: int 2641 length_of=None, # type: Optional[str] 2642 fmt="<H", # type: str 2643 count_of=None, # type: Optional[str] 2644 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int] 2645 ): 2646 # type: (...) -> None 2647 FieldLenField.__init__( 2648 self, name, default, 2649 length_of=length_of, 2650 fmt=fmt, 2651 count_of=count_of, 2652 adjust=adjust 2653 ) 2654 2655 2656class FlagValueIter(object): 2657 2658 __slots__ = ["flagvalue", "cursor"] 2659 2660 def __init__(self, flagvalue): 2661 # type: (FlagValue) -> None 2662 self.flagvalue = flagvalue 2663 self.cursor = 0 2664 2665 def __iter__(self): 2666 # type: () -> FlagValueIter 2667 return self 2668 2669 def __next__(self): 2670 # type: () -> str 2671 x = int(self.flagvalue) 2672 x >>= self.cursor 2673 while x: 2674 self.cursor += 1 2675 if x & 1: 2676 return self.flagvalue.names[self.cursor - 1] 2677 x >>= 1 2678 raise StopIteration 2679 2680 next = __next__ 2681 2682 2683class FlagValue(object): 2684 __slots__ = ["value", "names", "multi"] 2685 2686 def _fixvalue(self, value): 2687 # type: (Any) -> int 2688 if not value: 2689 return 0 2690 if isinstance(value, six.string_types): 2691 value = value.split('+') if self.multi else list(value) 2692 if isinstance(value, list): 2693 y = 0 2694 for i in value: 2695 y |= 1 << self.names.index(i) 2696 value = y 2697 return int(value) 2698 2699 def __init__(self, value, names): 2700 # type: (Union[List[str], int, str], Union[List[str], str]) -> None 2701 self.multi = isinstance(names, list) 2702 self.names = names 2703 self.value = self._fixvalue(value) 2704 2705 def __hash__(self): 2706 # type: () -> int 2707 return hash(self.value) 2708 2709 def __int__(self): 2710 # type: () -> int 2711 return self.value 2712 2713 def __eq__(self, other): 2714 # type: (Any) -> bool 2715 return self.value == self._fixvalue(other) 2716 2717 def __lt__(self, other): 2718 # type: (Any) -> bool 2719 return self.value < self._fixvalue(other) 2720 2721 def __le__(self, other): 2722 # type: (Any) -> bool 2723 return self.value <= self._fixvalue(other) 2724 2725 def __gt__(self, other): 2726 # type: (Any) -> bool 2727 return self.value > self._fixvalue(other) 2728 2729 def __ge__(self, other): 2730 # type: (Any) -> bool 2731 return self.value >= self._fixvalue(other) 2732 2733 def __ne__(self, other): 2734 # type: (Any) -> bool 2735 return self.value != self._fixvalue(other) 2736 2737 def __and__(self, other): 2738 # type: (int) -> FlagValue 2739 return self.__class__(self.value & self._fixvalue(other), self.names) 2740 __rand__ = __and__ 2741 2742 def __or__(self, other): 2743 # type: (int) -> FlagValue 2744 return self.__class__(self.value | self._fixvalue(other), self.names) 2745 __ror__ = __or__ 2746 2747 def __lshift__(self, other): 2748 # type: (int) -> int 2749 return self.value << self._fixvalue(other) 2750 2751 def __rshift__(self, other): 2752 # type: (int) -> int 2753 return self.value >> self._fixvalue(other) 2754 2755 def __nonzero__(self): 2756 # type: () -> bool 2757 return bool(self.value) 2758 __bool__ = __nonzero__ 2759 2760 def flagrepr(self): 2761 # type: () -> str 2762 warnings.warn( 2763 "obj.flagrepr() is obsolete. Use str(obj) instead.", 2764 DeprecationWarning 2765 ) 2766 return str(self) 2767 2768 def __str__(self): 2769 # type: () -> str 2770 i = 0 2771 r = [] 2772 x = int(self) 2773 while x: 2774 if x & 1: 2775 try: 2776 name = self.names[i] 2777 except IndexError: 2778 name = "?" 2779 r.append(name) 2780 i += 1 2781 x >>= 1 2782 return ("+" if self.multi else "").join(r) 2783 2784 def __iter__(self): 2785 # type: () -> FlagValueIter 2786 return FlagValueIter(self) 2787 2788 def __repr__(self): 2789 # type: () -> str 2790 return "<Flag %d (%s)>" % (self, self) 2791 2792 def __deepcopy__(self, memo): 2793 # type: (Dict[Any, Any]) -> FlagValue 2794 return self.__class__(int(self), self.names) 2795 2796 def __getattr__(self, attr): 2797 # type: (str) -> Any 2798 if attr in self.__slots__: 2799 return super(FlagValue, self).__getattribute__(attr) 2800 try: 2801 if self.multi: 2802 return bool((2 ** self.names.index(attr)) & int(self)) 2803 return all(bool((2 ** self.names.index(flag)) & int(self)) 2804 for flag in attr) 2805 except ValueError: 2806 if '_' in attr: 2807 try: 2808 return self.__getattr__(attr.replace('_', '-')) 2809 except AttributeError: 2810 pass 2811 return super(FlagValue, self).__getattribute__(attr) 2812 2813 def __setattr__(self, attr, value): 2814 # type: (str, Union[List[str], int, str]) -> None 2815 if attr == "value" and not isinstance(value, six.integer_types): 2816 raise ValueError(value) 2817 if attr in self.__slots__: 2818 return super(FlagValue, self).__setattr__(attr, value) 2819 if attr in self.names: 2820 if value: 2821 self.value |= (2 ** self.names.index(attr)) 2822 else: 2823 self.value &= ~(2 ** self.names.index(attr)) 2824 else: 2825 return super(FlagValue, self).__setattr__(attr, value) 2826 2827 def copy(self): 2828 # type: () -> FlagValue 2829 return self.__class__(self.value, self.names) 2830 2831 2832class FlagsField(_BitField[Optional[Union[int, FlagValue]]]): 2833 """ Handle Flag type field 2834 2835 Make sure all your flags have a label 2836 2837 Example (list): 2838 >>> from scapy.packet import Packet 2839 >>> class FlagsTest(Packet): 2840 fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])] # noqa: E501 2841 >>> FlagsTest(flags=9).show2() 2842 ###[ FlagsTest ]### 2843 flags = f0+f3 2844 2845 Example (str): 2846 >>> from scapy.packet import Packet 2847 >>> class TCPTest(Packet): 2848 fields_desc = [ 2849 BitField("reserved", 0, 7), 2850 FlagsField("flags", 0x2, 9, "FSRPAUECN") 2851 ] 2852 >>> TCPTest(flags=3).show2() 2853 ###[ FlagsTest ]### 2854 reserved = 0 2855 flags = FS 2856 2857 Example (dict): 2858 >>> from scapy.packet import Packet 2859 >>> class FlagsTest2(Packet): 2860 fields_desc = [ 2861 FlagsField("flags", 0x2, 16, { 2862 1: "1", # 1st bit 2863 8: "2" # 8th bit 2864 }) 2865 ] 2866 2867 :param name: field's name 2868 :param default: default value for the field 2869 :param size: number of bits in the field (in bits) 2870 :param names: (list or str or dict) label for each flag 2871 If it's a str or a list, the least Significant Bit tag's name 2872 is written first. 2873 """ 2874 ismutable = True 2875 __slots__ = ["names"] 2876 2877 def __init__(self, 2878 name, # type: str 2879 default, # type: Optional[Union[int, FlagValue]] 2880 size, # type: int 2881 names # type: Union[List[str], str, Dict[int, str]] 2882 ): 2883 # type: (...) -> None 2884 # Convert the dict to a list 2885 if isinstance(names, dict): 2886 tmp = ["bit_%d" % i for i in range(size)] 2887 for i, v in six.viewitems(names): 2888 tmp[i] = v 2889 names = tmp 2890 # Store the names as str or list 2891 self.names = names 2892 super(FlagsField, self).__init__(name, default, size) 2893 2894 def _fixup_val(self, x): 2895 # type: (Any) -> Optional[FlagValue] 2896 """Returns a FlagValue instance when needed. Internal method, to be 2897used in *2i() and i2*() methods. 2898 2899 """ 2900 if isinstance(x, (FlagValue, VolatileValue)): 2901 return x # type: ignore 2902 if x is None: 2903 return None 2904 return FlagValue(x, self.names) 2905 2906 def any2i(self, pkt, x): 2907 # type: (Optional[Packet], Any) -> Optional[FlagValue] 2908 return self._fixup_val(super(FlagsField, self).any2i(pkt, x)) 2909 2910 def m2i(self, pkt, x): 2911 # type: (Optional[Packet], int) -> Optional[FlagValue] 2912 return self._fixup_val(super(FlagsField, self).m2i(pkt, x)) 2913 2914 def i2h(self, pkt, x): 2915 # type: (Optional[Packet], Any) -> Optional[FlagValue] 2916 return self._fixup_val(super(FlagsField, self).i2h(pkt, x)) 2917 2918 def i2repr(self, 2919 pkt, # type: Optional[Packet] 2920 x, # type: Any 2921 ): 2922 # type: (...) -> str 2923 if isinstance(x, (list, tuple)): 2924 return repr(type(x)( 2925 "None" if v is None else str(self._fixup_val(v)) for v in x 2926 )) 2927 return "None" if x is None else str(self._fixup_val(x)) 2928 2929 2930MultiFlagsEntry = collections.namedtuple('MultiFlagEntry', ['short', 'long']) 2931 2932 2933class MultiFlagsField(_BitField[Set[str]]): 2934 __slots__ = FlagsField.__slots__ + ["depends_on"] 2935 2936 def __init__(self, 2937 name, # type: str 2938 default, # type: Set[str] 2939 size, # type: int 2940 names, # type: Dict[int, Dict[int, MultiFlagsEntry]] 2941 depends_on, # type: Callable[[Optional[Packet]], int] 2942 ): 2943 # type: (...) -> None 2944 self.names = names 2945 self.depends_on = depends_on 2946 super(MultiFlagsField, self).__init__(name, default, size) 2947 2948 def any2i(self, pkt, x): 2949 # type: (Optional[Packet], Any) -> Set[str] 2950 if not isinstance(x, (set, int)): 2951 raise ValueError('set expected') 2952 2953 if pkt is not None: 2954 if isinstance(x, int): 2955 return self.m2i(pkt, x) 2956 else: 2957 v = self.depends_on(pkt) 2958 if v is not None: 2959 assert v in self.names, 'invalid dependency' 2960 these_names = self.names[v] 2961 s = set() 2962 for i in x: 2963 for val in six.itervalues(these_names): 2964 if val.short == i: 2965 s.add(i) 2966 break 2967 else: 2968 assert False, 'Unknown flag "{}" with this dependency'.format(i) # noqa: E501 2969 continue 2970 return s 2971 if isinstance(x, int): 2972 return set() 2973 return x 2974 2975 def i2m(self, pkt, x): 2976 # type: (Optional[Packet], Optional[Set[str]]) -> int 2977 v = self.depends_on(pkt) 2978 these_names = self.names.get(v, {}) 2979 2980 r = 0 2981 if x is None: 2982 return r 2983 for flag_set in x: 2984 for i, val in six.iteritems(these_names): 2985 if val.short == flag_set: 2986 r |= 1 << i 2987 break 2988 else: 2989 r |= 1 << int(flag_set[len('bit '):]) 2990 return r 2991 2992 def m2i(self, pkt, x): 2993 # type: (Optional[Packet], int) -> Set[str] 2994 v = self.depends_on(pkt) 2995 these_names = self.names.get(v, {}) 2996 2997 r = set() 2998 i = 0 2999 while x: 3000 if x & 1: 3001 if i in these_names: 3002 r.add(these_names[i].short) 3003 else: 3004 r.add('bit {}'.format(i)) 3005 x >>= 1 3006 i += 1 3007 return r 3008 3009 def i2repr(self, pkt, x): 3010 # type: (Optional[Packet], Set[str]) -> str 3011 v = self.depends_on(pkt) 3012 these_names = self.names.get(v, {}) 3013 3014 r = set() 3015 for flag_set in x: 3016 for i in six.itervalues(these_names): 3017 if i.short == flag_set: 3018 r.add("{} ({})".format(i.long, i.short)) 3019 break 3020 else: 3021 r.add(flag_set) 3022 return repr(r) 3023 3024 3025class FixedPointField(BitField): 3026 __slots__ = ['frac_bits'] 3027 3028 def __init__(self, name, default, size, frac_bits=16): 3029 # type: (str, int, int, int) -> None 3030 self.frac_bits = frac_bits 3031 super(FixedPointField, self).__init__(name, default, size) 3032 3033 def any2i(self, pkt, val): 3034 # type: (Optional[Packet], Optional[float]) -> Optional[int] 3035 if val is None: 3036 return val 3037 ival = int(val) 3038 fract = int((val - ival) * 2**self.frac_bits) 3039 return (ival << self.frac_bits) | fract 3040 3041 def i2h(self, pkt, val): 3042 # type: (Optional[Packet], int) -> EDecimal 3043 # A bit of trickery to get precise floats 3044 int_part = val >> self.frac_bits 3045 pw = 2.0**self.frac_bits 3046 frac_part = EDecimal(val & (1 << self.frac_bits) - 1) 3047 frac_part /= pw # type: ignore 3048 return int_part + frac_part.normalize(int(math.log10(pw))) 3049 3050 def i2repr(self, pkt, val): 3051 # type: (Optional[Packet], int) -> str 3052 return str(self.i2h(pkt, val)) 3053 3054 3055# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field. 3056# Machine values are encoded in a multiple of wordbytes bytes. 3057class _IPPrefixFieldBase(Field[Tuple[str, int], Tuple[bytes, int]]): 3058 __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"] 3059 3060 def __init__( 3061 self, 3062 name, # type: str 3063 default, # type: Tuple[str, int] 3064 wordbytes, # type: int 3065 maxbytes, # type: int 3066 aton, # type: Callable[..., Any] 3067 ntoa, # type: Callable[..., Any] 3068 length_from=None # type: Optional[Callable[[Packet], int]] 3069 ): 3070 # type: (...) -> None 3071 self.wordbytes = wordbytes 3072 self.maxbytes = maxbytes 3073 self.aton = aton 3074 self.ntoa = ntoa 3075 Field.__init__(self, name, default, "%is" % self.maxbytes) 3076 if length_from is None: 3077 length_from = lambda x: 0 3078 self.length_from = length_from 3079 3080 def _numbytes(self, pfxlen): 3081 # type: (int) -> int 3082 wbits = self.wordbytes * 8 3083 return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes 3084 3085 def h2i(self, pkt, x): 3086 # type: (Optional[Packet], str) -> Tuple[str, int] 3087 # "fc00:1::1/64" -> ("fc00:1::1", 64) 3088 [pfx, pfxlen] = x.split('/') 3089 self.aton(pfx) # check for validity 3090 return (pfx, int(pfxlen)) 3091 3092 def i2h(self, pkt, x): 3093 # type: (Optional[Packet], Tuple[str, int]) -> str 3094 # ("fc00:1::1", 64) -> "fc00:1::1/64" 3095 (pfx, pfxlen) = x 3096 return "%s/%i" % (pfx, pfxlen) 3097 3098 def i2m(self, 3099 pkt, # type: Optional[Packet] 3100 x # type: Optional[Tuple[str, int]] 3101 ): 3102 # type: (...) -> Tuple[bytes, int] 3103 # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) # noqa: E501 3104 if x is None: 3105 pfx, pfxlen = "", 0 3106 else: 3107 (pfx, pfxlen) = x 3108 s = self.aton(pfx) 3109 return (s[:self._numbytes(pfxlen)], pfxlen) 3110 3111 def m2i(self, pkt, x): 3112 # type: (Optional[Packet], Tuple[bytes, int]) -> Tuple[str, int] 3113 # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64) # noqa: E501 3114 (s, pfxlen) = x 3115 3116 if len(s) < self.maxbytes: 3117 s = s + (b"\0" * (self.maxbytes - len(s))) 3118 return (self.ntoa(s), pfxlen) 3119 3120 def any2i(self, pkt, x): 3121 # type: (Optional[Packet], Optional[Any]) -> Tuple[str, int] 3122 if x is None: 3123 return (self.ntoa(b"\0" * self.maxbytes), 1) 3124 3125 return self.h2i(pkt, x) 3126 3127 def i2len(self, pkt, x): 3128 # type: (Packet, Tuple[str, int]) -> int 3129 (_, pfxlen) = x 3130 return pfxlen 3131 3132 def addfield(self, pkt, s, val): 3133 # type: (Packet, bytes, Optional[Tuple[str, int]]) -> bytes 3134 (rawpfx, pfxlen) = self.i2m(pkt, val) 3135 fmt = "!%is" % self._numbytes(pfxlen) 3136 return s + struct.pack(fmt, rawpfx) 3137 3138 def getfield(self, pkt, s): 3139 # type: (Packet, bytes) -> Tuple[bytes, Tuple[str, int]] 3140 pfxlen = self.length_from(pkt) 3141 numbytes = self._numbytes(pfxlen) 3142 fmt = "!%is" % numbytes 3143 return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen)) # noqa: E501 3144 3145 3146class IPPrefixField(_IPPrefixFieldBase): 3147 def __init__( 3148 self, 3149 name, # type: str 3150 default, # type: Tuple[str, int] 3151 wordbytes=1, # type: int 3152 length_from=None # type: Optional[Callable[[Packet], int]] 3153 ): 3154 _IPPrefixFieldBase.__init__( 3155 self, 3156 name, 3157 default, 3158 wordbytes, 3159 4, 3160 inet_aton, 3161 inet_ntoa, 3162 length_from 3163 ) 3164 3165 3166class IP6PrefixField(_IPPrefixFieldBase): 3167 def __init__( 3168 self, 3169 name, # type: str 3170 default, # type: Tuple[str, int] 3171 wordbytes=1, # type: int 3172 length_from=None # type: Optional[Callable[[Packet], int]] 3173 ): 3174 # type: (...) -> None 3175 _IPPrefixFieldBase.__init__( 3176 self, 3177 name, 3178 default, 3179 wordbytes, 3180 16, 3181 lambda a: inet_pton(socket.AF_INET6, a), 3182 lambda n: inet_ntop(socket.AF_INET6, n), 3183 length_from 3184 ) 3185 3186 3187class UTCTimeField(Field[float, int]): 3188 __slots__ = ["epoch", "delta", "strf", 3189 "use_msec", "use_micro", "use_nano"] 3190 3191 # Do not change the order of the keywords in here 3192 # Netflow heavily rely on this 3193 def __init__(self, 3194 name, # type: str 3195 default, # type: int 3196 use_msec=False, # type: bool 3197 use_micro=False, # type: bool 3198 use_nano=False, # type: bool 3199 epoch=None, # type: Optional[Tuple[int, int, int, int, int, int, int, int, int]] # noqa: E501 3200 strf="%a, %d %b %Y %H:%M:%S %z", # type: str 3201 ): 3202 # type: (...) -> None 3203 Field.__init__(self, name, default, "I") 3204 mk_epoch = EPOCH if epoch is None else calendar.timegm(epoch) 3205 self.epoch = mk_epoch 3206 self.delta = mk_epoch - EPOCH 3207 self.strf = strf 3208 self.use_msec = use_msec 3209 self.use_micro = use_micro 3210 self.use_nano = use_nano 3211 3212 def i2repr(self, pkt, x): 3213 # type: (Optional[Packet], float) -> str 3214 if x is None: 3215 x = 0 3216 elif self.use_msec: 3217 x = x / 1e3 3218 elif self.use_micro: 3219 x = x / 1e6 3220 elif self.use_nano: 3221 x = x / 1e9 3222 x = int(x) + self.delta 3223 t = time.strftime(self.strf, time.gmtime(x)) 3224 return "%s (%d)" % (t, x) 3225 3226 def i2m(self, pkt, x): 3227 # type: (Optional[Packet], Optional[float]) -> int 3228 return int(x) if x is not None else 0 3229 3230 3231class SecondsIntField(Field[float, int]): 3232 __slots__ = ["use_msec", "use_micro", "use_nano"] 3233 3234 # Do not change the order of the keywords in here 3235 # Netflow heavily rely on this 3236 def __init__(self, name, default, 3237 use_msec=False, 3238 use_micro=False, 3239 use_nano=False): 3240 # type: (str, int, bool, bool, bool) -> None 3241 Field.__init__(self, name, default, "I") 3242 self.use_msec = use_msec 3243 self.use_micro = use_micro 3244 self.use_nano = use_nano 3245 3246 def i2repr(self, pkt, x): 3247 # type: (Optional[Packet], Optional[float]) -> str 3248 if x is None: 3249 y = 0 # type: Union[int, float] 3250 elif self.use_msec: 3251 y = x / 1e3 3252 elif self.use_micro: 3253 y = x / 1e6 3254 elif self.use_nano: 3255 y = x / 1e9 3256 else: 3257 y = x 3258 return "%s sec" % y 3259 3260 3261class _ScalingField(object): 3262 def __init__(self, 3263 name, # type: str 3264 default, # type: float 3265 scaling=1, # type: Union[int, float] 3266 unit="", # type: str 3267 offset=0, # type: Union[int, float] 3268 ndigits=3, # type: int 3269 fmt="B", # type: str 3270 ): 3271 # type: (...) -> None 3272 self.scaling = scaling 3273 self.unit = unit 3274 self.offset = offset 3275 self.ndigits = ndigits 3276 Field.__init__(self, name, default, fmt) # type: ignore 3277 3278 def i2m(self, 3279 pkt, # type: Optional[Packet] 3280 x # type: Optional[Union[int, float]] 3281 ): 3282 # type: (...) -> Union[int, float] 3283 if x is None: 3284 x = 0 3285 x = (x - self.offset) / self.scaling 3286 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore 3287 x = int(round(x)) 3288 return x 3289 3290 def m2i(self, pkt, x): 3291 # type: (Optional[Packet], Union[int, float]) -> Union[int, float] 3292 x = x * self.scaling + self.offset 3293 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore 3294 x = round(x, self.ndigits) 3295 return x 3296 3297 def any2i(self, pkt, x): 3298 # type: (Optional[Packet], Any) -> Union[int, float] 3299 if isinstance(x, (str, bytes)): 3300 x = struct.unpack(self.fmt, bytes_encode(x))[0] # type: ignore 3301 x = self.m2i(pkt, x) 3302 if not isinstance(x, (int, float)): 3303 raise ValueError("Unknown type") 3304 return x 3305 3306 def i2repr(self, pkt, x): 3307 # type: (Optional[Packet], Union[int, float]) -> str 3308 return "%s %s" % ( 3309 self.i2h(pkt, x), # type: ignore 3310 self.unit 3311 ) 3312 3313 def randval(self): 3314 # type: () -> RandFloat 3315 value = Field.randval(self) # type: ignore 3316 if value is not None: 3317 min_val = round(value.min * self.scaling + self.offset, 3318 self.ndigits) 3319 max_val = round(value.max * self.scaling + self.offset, 3320 self.ndigits) 3321 3322 return RandFloat(min(min_val, max_val), max(min_val, max_val)) 3323 3324 3325class ScalingField(_ScalingField, 3326 Field[Union[int, float], Union[int, float]]): 3327 """ Handle physical values which are scaled and/or offset for communication 3328 3329 Example: 3330 >>> from scapy.packet import Packet 3331 >>> class ScalingFieldTest(Packet): 3332 fields_desc = [ScalingField('data', 0, scaling=0.1, offset=-1, unit='mV')] # noqa: E501 3333 >>> ScalingFieldTest(data=10).show2() 3334 ###[ ScalingFieldTest ]### 3335 data= 10.0 mV 3336 >>> hexdump(ScalingFieldTest(data=10)) 3337 0000 6E n 3338 >>> hexdump(ScalingFieldTest(data=b"\x6D")) 3339 0000 6D m 3340 >>> ScalingFieldTest(data=b"\x6D").show2() 3341 ###[ ScalingFieldTest ]### 3342 data= 9.9 mV 3343 3344 bytes(ScalingFieldTest(...)) will produce 0x6E in this example. 3345 0x6E is 110 (decimal). This is calculated through the scaling factor 3346 and the offset. "data" was set to 10, which means, we want to transfer 3347 the physical value 10 mV. To calculate the value, which has to be 3348 sent on the bus, the offset has to subtracted and the scaling has to be 3349 applied by division through the scaling factor. 3350 bytes = (data - offset) / scaling 3351 bytes = ( 10 - (-1) ) / 0.1 3352 bytes = 110 = 0x6E 3353 3354 If you want to force a certain internal value, you can assign a byte- 3355 string to the field (data=b"\x6D"). If a string of a bytes object is 3356 given to the field, no internal value conversion will be applied 3357 3358 :param name: field's name 3359 :param default: default value for the field 3360 :param scaling: scaling factor for the internal value conversion 3361 :param unit: string for the unit representation of the internal value 3362 :param offset: value to offset the internal value during conversion 3363 :param ndigits: number of fractional digits for the internal conversion 3364 :param fmt: struct.pack format used to parse and serialize the internal value from and to machine representation # noqa: E501 3365 """ 3366 3367 3368class BitScalingField(_ScalingField, BitField): # type: ignore 3369 """ 3370 A ScalingField that is a BitField 3371 """ 3372 def __init__(self, name, default, size, *args, **kwargs): 3373 # type: (str, int, int, *Any, **Any) -> None 3374 _ScalingField.__init__(self, name, default, *args, **kwargs) 3375 BitField.__init__(self, name, default, size) # type: ignore 3376 3377 3378class OUIField(X3BytesField): 3379 """ 3380 A field designed to carry a OUI (3 bytes) 3381 """ 3382 def i2repr(self, pkt, val): 3383 # type: (Optional[Packet], int) -> str 3384 by_val = struct.pack("!I", val or 0)[1:] 3385 oui = str2mac(by_val + b"\0" * 3)[:8] 3386 if conf.manufdb: 3387 fancy = conf.manufdb._get_manuf(oui) 3388 if fancy != oui: 3389 return "%s (%s)" % (fancy, oui) 3390 return oui 3391 3392 3393class UUIDField(Field[UUID, bytes]): 3394 """Field for UUID storage, wrapping Python's uuid.UUID type. 3395 3396 The internal storage format of this field is ``uuid.UUID`` from the Python 3397 standard library. 3398 3399 There are three formats (``uuid_fmt``) for this field type: 3400 3401 * ``FORMAT_BE`` (default): the UUID is six fields in big-endian byte order, 3402 per RFC 4122. 3403 3404 This format is used by DHCPv6 (RFC 6355) and most network protocols. 3405 3406 * ``FORMAT_LE``: the UUID is six fields, with ``time_low``, ``time_mid`` 3407 and ``time_high_version`` in little-endian byte order. This *doesn't* 3408 change the arrangement of the fields from RFC 4122. 3409 3410 This format is used by Microsoft's COM/OLE libraries. 3411 3412 * ``FORMAT_REV``: the UUID is a single 128-bit integer in little-endian 3413 byte order. This *changes the arrangement* of the fields. 3414 3415 This format is used by Bluetooth Low Energy. 3416 3417 Note: You should use the constants here. 3418 3419 The "human encoding" of this field supports a number of different input 3420 formats, and wraps Python's ``uuid.UUID`` library appropriately: 3421 3422 * Given a bytearray, bytes or str of 16 bytes, this class decodes UUIDs in 3423 wire format. 3424 3425 * Given a bytearray, bytes or str of other lengths, this delegates to 3426 ``uuid.UUID`` the Python standard library. This supports a number of 3427 different encoding options -- see the Python standard library 3428 documentation for more details. 3429 3430 * Given an int or long, presumed to be a 128-bit integer to pass to 3431 ``uuid.UUID``. 3432 3433 * Given a tuple: 3434 3435 * Tuples of 11 integers are treated as having the last 6 integers forming 3436 the ``node`` field, and are merged before being passed as a tuple of 6 3437 integers to ``uuid.UUID``. 3438 3439 * Otherwise, the tuple is passed as the ``fields`` parameter to 3440 ``uuid.UUID`` directly without modification. 3441 3442 ``uuid.UUID`` expects a tuple of 6 integers. 3443 3444 Other types (such as ``uuid.UUID``) are passed through. 3445 """ 3446 3447 __slots__ = ["uuid_fmt"] 3448 3449 FORMAT_BE = 0 3450 FORMAT_LE = 1 3451 FORMAT_REV = 2 3452 3453 # Change this when we get new formats 3454 FORMATS = (FORMAT_BE, FORMAT_LE, FORMAT_REV) 3455 3456 def __init__(self, name, default, uuid_fmt=FORMAT_BE): 3457 # type: (str, Optional[int], int) -> None 3458 self.uuid_fmt = uuid_fmt 3459 self._check_uuid_fmt() 3460 Field.__init__(self, name, default, "16s") 3461 3462 def _check_uuid_fmt(self): 3463 # type: () -> None 3464 """Checks .uuid_fmt, and raises an exception if it is not valid.""" 3465 if self.uuid_fmt not in UUIDField.FORMATS: 3466 raise FieldValueRangeException( 3467 "Unsupported uuid_fmt ({})".format(self.uuid_fmt)) 3468 3469 def i2m(self, pkt, x): 3470 # type: (Optional[Packet], Optional[UUID]) -> bytes 3471 self._check_uuid_fmt() 3472 if x is None: 3473 return b'\0' * 16 3474 if self.uuid_fmt == UUIDField.FORMAT_BE: 3475 return x.bytes 3476 elif self.uuid_fmt == UUIDField.FORMAT_LE: 3477 return x.bytes_le 3478 elif self.uuid_fmt == UUIDField.FORMAT_REV: 3479 return x.bytes[::-1] 3480 else: 3481 raise FieldAttributeException("Unknown fmt") 3482 3483 def m2i(self, 3484 pkt, # type: Optional[Packet] 3485 x, # type: bytes 3486 ): 3487 # type: (...) -> UUID 3488 self._check_uuid_fmt() 3489 if self.uuid_fmt == UUIDField.FORMAT_BE: 3490 return UUID(bytes=x) 3491 elif self.uuid_fmt == UUIDField.FORMAT_LE: 3492 return UUID(bytes_le=x) 3493 elif self.uuid_fmt == UUIDField.FORMAT_REV: 3494 return UUID(bytes=x[::-1]) 3495 else: 3496 raise FieldAttributeException("Unknown fmt") 3497 3498 def any2i(self, 3499 pkt, # type: Optional[Packet] 3500 x # type: Any # noqa: E501 3501 ): 3502 # type: (...) -> Optional[UUID] 3503 # Python's uuid doesn't handle bytearray, so convert to an immutable 3504 # type first. 3505 if isinstance(x, bytearray): 3506 x = bytes_encode(x) 3507 3508 if isinstance(x, int): 3509 u = UUID(int=x) 3510 elif isinstance(x, tuple): 3511 if len(x) == 11: 3512 # For compatibility with dce_rpc: this packs into a tuple where 3513 # elements 7..10 are the 48-bit node ID. 3514 node = 0 3515 for i in x[5:]: 3516 node = (node << 8) | i 3517 3518 x = (x[0], x[1], x[2], x[3], x[4], node) 3519 3520 u = UUID(fields=x) 3521 elif isinstance(x, (str, bytes)): 3522 if len(x) == 16: 3523 # Raw bytes 3524 u = self.m2i(pkt, bytes_encode(x)) 3525 else: 3526 u = UUID(plain_str(x)) 3527 elif isinstance(x, UUID): 3528 u = x 3529 else: 3530 return None 3531 return u 3532 3533 @staticmethod 3534 def randval(): 3535 # type: () -> RandUUID 3536 return RandUUID() 3537 3538 3539class BitExtendedField(Field[Optional[int], bytes]): 3540 """ 3541 Bit Extended Field 3542 3543 This type of field has a variable number of bytes. Each byte is defined 3544 as follows: 3545 - 7 bits of data 3546 - 1 bit an an extension bit: 3547 3548 + 0 means it is last byte of the field ("stopping bit") 3549 + 1 means there is another byte after this one ("forwarding bit") 3550 3551 To get the actual data, it is necessary to hop the binary data byte per 3552 byte and to check the extension bit until 0 3553 """ 3554 3555 __slots__ = ["extension_bit"] 3556 3557 def prepare_byte(self, x): 3558 # type: (int) -> int 3559 # Moves the forwarding bit to the LSB 3560 x = int(x) 3561 fx_bit = (x & 2**self.extension_bit) >> self.extension_bit 3562 lsb_bits = x & 2**self.extension_bit - 1 3563 msb_bits = x >> (self.extension_bit + 1) 3564 x = (msb_bits << (self.extension_bit + 1)) + (lsb_bits << 1) + fx_bit 3565 return x 3566 3567 def str2extended(self, x=b""): 3568 # type: (bytes) -> Tuple[bytes, Optional[int]] 3569 # For convenience, we reorder the byte so that the forwarding 3570 # bit is always the LSB. We then apply the same algorithm 3571 # whatever the real forwarding bit position 3572 3573 # First bit is the stopping bit at zero 3574 bits = 0b0 3575 end = None 3576 3577 # We retrieve 7 bits. 3578 # If "forwarding bit" is 1 then we continue on another byte 3579 i = 0 3580 for c in bytearray(x): 3581 c = self.prepare_byte(c) 3582 bits = bits << 7 | (int(c) >> 1) 3583 if not int(c) & 0b1: 3584 end = x[i + 1:] 3585 break 3586 i = i + 1 3587 if end is None: 3588 # We reached the end of the data but there was no 3589 # "ending bit". This is not normal. 3590 return b"", None 3591 else: 3592 return end, bits 3593 3594 def extended2str(self, x): 3595 # type: (Optional[int]) -> bytes 3596 if x is None: 3597 return b"" 3598 x = int(x) 3599 s = [] 3600 LSByte = True 3601 FX_Missing = True 3602 bits = 0b0 3603 i = 0 3604 while (x > 0 or FX_Missing): 3605 if i == 8: 3606 # End of byte 3607 i = 0 3608 s.append(bits) 3609 bits = 0b0 3610 FX_Missing = True 3611 else: 3612 if i % 8 == self.extension_bit: 3613 # This is extension bit 3614 if LSByte: 3615 bits = bits | 0b0 << i 3616 LSByte = False 3617 else: 3618 bits = bits | 0b1 << i 3619 FX_Missing = False 3620 else: 3621 bits = bits | (x & 0b1) << i 3622 x = x >> 1 3623 # Still some bits 3624 i = i + 1 3625 s.append(bits) 3626 3627 result = "".encode() 3628 for x in s[:: -1]: 3629 result = result + struct.pack(">B", x) 3630 return result 3631 3632 def __init__(self, name, default, extension_bit): 3633 # type: (str, Optional[Any], int) -> None 3634 Field.__init__(self, name, default, "B") 3635 self.extension_bit = extension_bit 3636 3637 def i2m(self, pkt, x): 3638 # type: (Optional[Any], Optional[int]) -> bytes 3639 return self.extended2str(x) 3640 3641 def m2i(self, pkt, x): 3642 # type: (Optional[Any], bytes) -> Optional[int] 3643 return self.str2extended(x)[1] 3644 3645 def addfield(self, pkt, s, val): 3646 # type: (Optional[Packet], bytes, Optional[int]) -> bytes 3647 return s + self.i2m(pkt, val) 3648 3649 def getfield(self, pkt, s): 3650 # type: (Optional[Any], bytes) -> Tuple[bytes, Optional[int]] 3651 return self.str2extended(s) 3652 3653 3654class LSBExtendedField(BitExtendedField): 3655 # This is a BitExtendedField with the extension bit on LSB 3656 def __init__(self, name, default): 3657 # type: (str, Optional[Any]) -> None 3658 BitExtendedField.__init__(self, name, default, extension_bit=0) 3659 3660 3661class MSBExtendedField(BitExtendedField): 3662 # This is a BitExtendedField with the extension bit on MSB 3663 def __init__(self, name, default): 3664 # type: (str, Optional[Any]) -> None 3665 BitExtendedField.__init__(self, name, default, extension_bit=7) 3666