1# This file is part of Scapy 2# See http://www.secdev.org/projects/scapy for more information 3# Copyright (C) Philippe Biondi <phil@secdev.org> 4# This program is published under a GPLv2 license 5 6""" 7Packet class 8 9Provides: 10 - the default Packet classes 11 - binding mechanisms 12 - fuzz() method 13 - exploration methods: explore() / ls() 14""" 15 16from __future__ import absolute_import 17from __future__ import print_function 18from collections import defaultdict 19import re 20import time 21import itertools 22import copy 23import types 24import warnings 25 26from scapy.fields import ( 27 AnyField, 28 BitField, 29 ConditionalField, 30 Emph, 31 EnumField, 32 Field, 33 FlagsField, 34 MultiEnumField, 35 MultipleTypeField, 36 PacketListField, 37 RawVal, 38 StrField, 39) 40from scapy.config import conf, _version_checker 41from scapy.compat import raw, orb, bytes_encode 42from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \ 43 _CanvasDumpExtended 44from scapy.interfaces import _GlobInterfaceType 45from scapy.volatile import RandField, VolatileValue 46from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \ 47 pretty_list, EDecimal 48from scapy.error import Scapy_Exception, log_runtime, warning 49from scapy.extlib import PYX 50import scapy.modules.six as six 51 52# Typing imports 53from scapy.compat import ( 54 Any, 55 Callable, 56 Dict, 57 Iterator, 58 List, 59 NoReturn, 60 Optional, 61 Set, 62 Tuple, 63 Type, 64 TypeVar, 65 Union, 66 Sequence, 67 cast, 68) 69try: 70 import pyx 71except ImportError: 72 pass 73 74 75_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]]) 76 77 78# six.with_metaclass typing is glitchy 79class Packet(six.with_metaclass(Packet_metaclass, # type: ignore 80 BasePacket, _CanvasDumpExtended)): 81 __slots__ = [ 82 "time", "sent_time", "name", 83 "default_fields", "fields", "fieldtype", 84 "overload_fields", "overloaded_fields", 85 "packetfields", 86 "original", "explicit", "raw_packet_cache", 87 "raw_packet_cache_fields", "_pkt", "post_transforms", 88 # then payload and underlayer 89 "payload", "underlayer", 90 "name", 91 # used for sr() 92 "_answered", 93 # used when sniffing 94 "direction", "sniffed_on", 95 # handle snaplen Vs real length 96 "wirelen", 97 ] 98 name = None 99 fields_desc = [] # type: Sequence[AnyField] 100 deprecated_fields = {} # type: Dict[str, Tuple[str, str]] 101 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 102 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]] 103 show_indent = 1 104 show_summary = True 105 match_subclass = False 106 class_dont_cache = {} # type: Dict[Type[Packet], bool] 107 class_packetfields = {} # type: Dict[Type[Packet], Any] 108 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 109 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]] 110 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501 111 112 @classmethod 113 def from_hexcap(cls): 114 # type: (Type[Packet]) -> Packet 115 return cls(import_hexcap()) 116 117 @classmethod 118 def upper_bonds(self): 119 # type: () -> None 120 for fval, upper in self.payload_guess: 121 print("%-20s %s" % (upper.__name__, ", ".join("%-12s" % ("%s=%r" % i) for i in six.iteritems(fval)))) # noqa: E501 122 123 @classmethod 124 def lower_bonds(self): 125 # type: () -> None 126 for lower, fval in six.iteritems(self._overload_fields): 127 print("%-20s %s" % (lower.__name__, ", ".join("%-12s" % ("%s=%r" % i) for i in six.iteritems(fval)))) # noqa: E501 128 129 def __init__(self, 130 _pkt=b"", # type: bytes 131 post_transform=None, # type: Any 132 _internal=0, # type: int 133 _underlayer=None, # type: Optional[Packet] 134 **fields # type: Any 135 ): 136 # type: (...) -> None 137 self.time = time.time() # type: Union[EDecimal, float] 138 self.sent_time = None # type: Union[EDecimal, float, None] 139 self.name = (self.__class__.__name__ 140 if self._name is None else 141 self._name) 142 self.default_fields = {} # type: Dict[str, Any] 143 self.overload_fields = self._overload_fields 144 self.overloaded_fields = {} # type: Dict[str, Any] 145 self.fields = {} # type: Dict[str, Any] 146 self.fieldtype = {} # type: Dict[str, AnyField] 147 self.packetfields = [] # type: List[AnyField] 148 self.payload = NoPayload() 149 self.init_fields() 150 self.underlayer = _underlayer 151 self.original = _pkt 152 self.explicit = 0 153 self.raw_packet_cache = None # type: Optional[bytes] 154 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501 155 self.wirelen = None # type: Optional[int] 156 self.direction = None # type: Optional[int] 157 self.sniffed_on = None # type: Optional[_GlobInterfaceType] 158 if _pkt: 159 self.dissect(_pkt) 160 if not _internal: 161 self.dissection_done(self) 162 # We use this strange initialization so that the fields 163 # are initialized in their declaration order. 164 # It is required to always support MultipleTypeField 165 for field in self.fields_desc: 166 fname = field.name 167 try: 168 value = fields.pop(fname) 169 except KeyError: 170 continue 171 self.fields[fname] = self.get_field(fname).any2i(self, value) 172 # The remaining fields are unknown 173 for fname in fields: 174 if fname in self.deprecated_fields: 175 # Resolve deprecated fields 176 value = fields[fname] 177 fname = self._resolve_alias(fname) 178 self.fields[fname] = self.get_field(fname).any2i(self, value) 179 continue 180 raise AttributeError(fname) 181 if isinstance(post_transform, list): 182 self.post_transforms = post_transform 183 elif post_transform is None: 184 self.post_transforms = [] 185 else: 186 self.post_transforms = [post_transform] 187 188 _PickleType = Tuple[ 189 Union[EDecimal, float], 190 Optional[Union[EDecimal, float, None]], 191 Optional[int], 192 Optional[_GlobInterfaceType], 193 Optional[int] 194 ] 195 196 def __reduce__(self): 197 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType] 198 """Used by pickling methods""" 199 return (self.__class__, (self.build(),), ( 200 self.time, 201 self.sent_time, 202 self.direction, 203 self.sniffed_on, 204 self.wirelen, 205 )) 206 207 def __setstate__(self, state): 208 # type: (Packet._PickleType) -> Packet 209 """Rebuild state using pickable methods""" 210 self.time = state[0] 211 self.sent_time = state[1] 212 self.direction = state[2] 213 self.sniffed_on = state[3] 214 self.wirelen = state[4] 215 return self 216 217 def __deepcopy__(self, 218 memo, # type: Any 219 ): 220 # type: (...) -> Packet 221 """Used by copy.deepcopy""" 222 return self.copy() 223 224 def init_fields(self): 225 # type: () -> None 226 """ 227 Initialize each fields of the fields_desc dict 228 """ 229 230 if self.class_dont_cache.get(self.__class__, False): 231 self.do_init_fields(self.fields_desc) 232 else: 233 self.do_init_cached_fields() 234 235 def do_init_fields(self, 236 flist, # type: Sequence[AnyField] 237 ): 238 # type: (...) -> None 239 """ 240 Initialize each fields of the fields_desc dict 241 """ 242 default_fields = {} 243 for f in flist: 244 default_fields[f.name] = copy.deepcopy(f.default) 245 self.fieldtype[f.name] = f 246 if f.holds_packets: 247 self.packetfields.append(f) 248 # We set default_fields last to avoid race issues 249 self.default_fields = default_fields 250 251 def do_init_cached_fields(self): 252 # type: () -> None 253 """ 254 Initialize each fields of the fields_desc dict, or use the cached 255 fields information 256 """ 257 258 cls_name = self.__class__ 259 260 # Build the fields information 261 if Packet.class_default_fields.get(cls_name, None) is None: 262 self.prepare_cached_fields(self.fields_desc) 263 264 # Use fields information from cache 265 default_fields = Packet.class_default_fields.get(cls_name, None) 266 if default_fields: 267 self.default_fields = default_fields 268 self.fieldtype = Packet.class_fieldtype[cls_name] 269 self.packetfields = Packet.class_packetfields[cls_name] 270 271 # Deepcopy default references 272 for fname in Packet.class_default_fields_ref[cls_name]: 273 value = self.default_fields[fname] 274 try: 275 self.fields[fname] = value.copy() 276 except AttributeError: 277 # Python 2.7 - list only 278 self.fields[fname] = value[:] 279 280 def prepare_cached_fields(self, flist): 281 # type: (Sequence[AnyField]) -> None 282 """ 283 Prepare the cached fields of the fields_desc dict 284 """ 285 286 cls_name = self.__class__ 287 288 # Fields cache initialization 289 if not flist: 290 return 291 292 class_default_fields = dict() 293 class_default_fields_ref = list() 294 class_fieldtype = dict() 295 class_packetfields = list() 296 297 # Fields initialization 298 for f in flist: 299 if isinstance(f, MultipleTypeField): 300 # Abort 301 self.class_dont_cache[cls_name] = True 302 self.do_init_fields(self.fields_desc) 303 return 304 305 tmp_copy = copy.deepcopy(f.default) 306 class_default_fields[f.name] = tmp_copy 307 class_fieldtype[f.name] = f 308 if f.holds_packets: 309 class_packetfields.append(f) 310 311 # Remember references 312 if isinstance(f.default, (list, dict, set, RandField, Packet)): 313 class_default_fields_ref.append(f.name) 314 315 # Apply 316 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref 317 Packet.class_fieldtype[cls_name] = class_fieldtype 318 Packet.class_packetfields[cls_name] = class_packetfields 319 # Last to avoid racing issues 320 Packet.class_default_fields[cls_name] = class_default_fields 321 322 def dissection_done(self, pkt): 323 # type: (Packet) -> None 324 """DEV: will be called after a dissection is completed""" 325 self.post_dissection(pkt) 326 self.payload.dissection_done(pkt) 327 328 def post_dissection(self, pkt): 329 # type: (Packet) -> None 330 """DEV: is called after the dissection of the whole packet""" 331 pass 332 333 def get_field(self, fld): 334 # type: (str) -> AnyField 335 """DEV: returns the field instance from the name of the field""" 336 return self.fieldtype[fld] 337 338 def add_payload(self, payload): 339 # type: (Union[Packet, bytes]) -> None 340 if payload is None: 341 return 342 elif not isinstance(self.payload, NoPayload): 343 self.payload.add_payload(payload) 344 else: 345 if isinstance(payload, Packet): 346 self.payload = payload 347 payload.add_underlayer(self) 348 for t in self.aliastypes: 349 if t in payload.overload_fields: 350 self.overloaded_fields = payload.overload_fields[t] 351 break 352 elif isinstance(payload, (bytes, str, bytearray, memoryview)): 353 self.payload = conf.raw_layer(load=bytes_encode(payload)) 354 else: 355 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501 356 357 def remove_payload(self): 358 # type: () -> None 359 self.payload.remove_underlayer(self) 360 self.payload = NoPayload() 361 self.overloaded_fields = {} 362 363 def add_underlayer(self, underlayer): 364 # type: (Packet) -> None 365 self.underlayer = underlayer 366 367 def remove_underlayer(self, other): 368 # type: (Packet) -> None 369 self.underlayer = None 370 371 def copy(self): 372 # type: () -> Packet 373 """Returns a deep copy of the instance.""" 374 clone = self.__class__() 375 clone.fields = self.copy_fields_dict(self.fields) 376 clone.default_fields = self.copy_fields_dict(self.default_fields) 377 clone.overloaded_fields = self.overloaded_fields.copy() 378 clone.underlayer = self.underlayer 379 clone.explicit = self.explicit 380 clone.raw_packet_cache = self.raw_packet_cache 381 clone.raw_packet_cache_fields = self.copy_fields_dict( 382 self.raw_packet_cache_fields 383 ) 384 clone.wirelen = self.wirelen 385 clone.post_transforms = self.post_transforms[:] 386 clone.payload = self.payload.copy() 387 clone.payload.add_underlayer(clone) 388 clone.time = self.time 389 return clone 390 391 def _resolve_alias(self, attr): 392 # type: (str) -> str 393 new_attr, version = self.deprecated_fields[attr] 394 warnings.warn( 395 "%s has been deprecated in favor of %s since %s !" % ( 396 attr, new_attr, version 397 ), DeprecationWarning 398 ) 399 return new_attr 400 401 def getfieldval(self, attr): 402 # type: (str) -> Any 403 if self.deprecated_fields and attr in self.deprecated_fields: 404 attr = self._resolve_alias(attr) 405 if attr in self.fields: 406 return self.fields[attr] 407 if attr in self.overloaded_fields: 408 return self.overloaded_fields[attr] 409 if attr in self.default_fields: 410 return self.default_fields[attr] 411 return self.payload.getfieldval(attr) 412 413 def getfield_and_val(self, attr): 414 # type: (str) -> Tuple[AnyField, Any] 415 if self.deprecated_fields and attr in self.deprecated_fields: 416 attr = self._resolve_alias(attr) 417 if attr in self.fields: 418 return self.get_field(attr), self.fields[attr] 419 if attr in self.overloaded_fields: 420 return self.get_field(attr), self.overloaded_fields[attr] 421 if attr in self.default_fields: 422 return self.get_field(attr), self.default_fields[attr] 423 raise ValueError 424 425 def __getattr__(self, attr): 426 # type: (str) -> Any 427 try: 428 fld, v = self.getfield_and_val(attr) 429 except ValueError: 430 return self.payload.__getattr__(attr) 431 if fld is not None: 432 return fld.i2h(self, v) 433 return v 434 435 def setfieldval(self, attr, val): 436 # type: (str, Any) -> None 437 if self.deprecated_fields and attr in self.deprecated_fields: 438 attr = self._resolve_alias(attr) 439 if attr in self.default_fields: 440 fld = self.get_field(attr) 441 if fld is None: 442 any2i = lambda x, y: y # type: Callable[..., Any] 443 else: 444 any2i = fld.any2i 445 self.fields[attr] = any2i(self, val) 446 self.explicit = 0 447 self.raw_packet_cache = None 448 self.raw_packet_cache_fields = None 449 self.wirelen = None 450 elif attr == "payload": 451 self.remove_payload() 452 self.add_payload(val) 453 else: 454 self.payload.setfieldval(attr, val) 455 456 def __setattr__(self, attr, val): 457 # type: (str, Any) -> None 458 if attr in self.__all_slots__: 459 if attr == "sent_time": 460 self.update_sent_time(val) 461 return object.__setattr__(self, attr, val) 462 try: 463 return self.setfieldval(attr, val) 464 except AttributeError: 465 pass 466 return object.__setattr__(self, attr, val) 467 468 def delfieldval(self, attr): 469 # type: (str) -> None 470 if attr in self.fields: 471 del(self.fields[attr]) 472 self.explicit = 0 # in case a default value must be explicit 473 self.raw_packet_cache = None 474 self.raw_packet_cache_fields = None 475 self.wirelen = None 476 elif attr in self.default_fields: 477 pass 478 elif attr == "payload": 479 self.remove_payload() 480 else: 481 self.payload.delfieldval(attr) 482 483 def __delattr__(self, attr): 484 # type: (str) -> None 485 if attr == "payload": 486 return self.remove_payload() 487 if attr in self.__all_slots__: 488 return object.__delattr__(self, attr) 489 try: 490 return self.delfieldval(attr) 491 except AttributeError: 492 pass 493 return object.__delattr__(self, attr) 494 495 def _superdir(self): 496 # type: () -> Set[str] 497 """ 498 Return a list of slots and methods, including those from subclasses. 499 """ 500 attrs = set() 501 cls = self.__class__ 502 if hasattr(cls, '__all_slots__'): 503 attrs.update(cls.__all_slots__) 504 for bcls in cls.__mro__: 505 if hasattr(bcls, '__dict__'): 506 attrs.update(bcls.__dict__) 507 return attrs 508 509 def __dir__(self): 510 # type: () -> List[str] 511 """ 512 Add fields to tab completion list. 513 """ 514 return sorted(itertools.chain(self._superdir(), self.default_fields)) 515 516 def __repr__(self): 517 # type: () -> str 518 s = "" 519 ct = conf.color_theme 520 for f in self.fields_desc: 521 if isinstance(f, ConditionalField) and not f._evalcond(self): 522 continue 523 if f.name in self.fields: 524 fval = self.fields[f.name] 525 if isinstance(fval, (list, dict, set)) and len(fval) == 0: 526 continue 527 val = f.i2repr(self, fval) 528 elif f.name in self.overloaded_fields: 529 fover = self.overloaded_fields[f.name] 530 if isinstance(fover, (list, dict, set)) and len(fover) == 0: 531 continue 532 val = f.i2repr(self, fover) 533 else: 534 continue 535 if isinstance(f, Emph) or f in conf.emph: 536 ncol = ct.emph_field_name 537 vcol = ct.emph_field_value 538 else: 539 ncol = ct.field_name 540 vcol = ct.field_value 541 542 s += " %s%s%s" % (ncol(f.name), 543 ct.punct("="), 544 vcol(val)) 545 return "%s%s %s %s%s%s" % (ct.punct("<"), 546 ct.layer_name(self.__class__.__name__), 547 s, 548 ct.punct("|"), 549 repr(self.payload), 550 ct.punct(">")) 551 552 if six.PY2: 553 def __str__(self): 554 # type: () -> str 555 return self.build() 556 else: 557 def __str__(self): 558 # type: () -> str 559 warning("Calling str(pkt) on Python 3 makes no sense!") 560 return str(self.build()) 561 562 def __bytes__(self): 563 # type: () -> bytes 564 return self.build() 565 566 def __div__(self, other): 567 # type: (Any) -> Packet 568 if isinstance(other, Packet): 569 cloneA = self.copy() 570 cloneB = other.copy() 571 cloneA.add_payload(cloneB) 572 return cloneA 573 elif isinstance(other, (bytes, str, bytearray, memoryview)): 574 return self / conf.raw_layer(load=bytes_encode(other)) 575 else: 576 return other.__rdiv__(self) # type: ignore 577 __truediv__ = __div__ 578 579 def __rdiv__(self, other): 580 # type: (Any) -> Packet 581 if isinstance(other, (bytes, str, bytearray, memoryview)): 582 return conf.raw_layer(load=bytes_encode(other)) / self 583 else: 584 raise TypeError 585 __rtruediv__ = __rdiv__ 586 587 def __mul__(self, other): 588 # type: (Any) -> List[Packet] 589 if isinstance(other, int): 590 return [self] * other 591 else: 592 raise TypeError 593 594 def __rmul__(self, other): 595 # type: (Any) -> List[Packet] 596 return self.__mul__(other) 597 598 def __nonzero__(self): 599 # type: () -> bool 600 return True 601 __bool__ = __nonzero__ 602 603 def __len__(self): 604 # type: () -> int 605 return len(self.__bytes__()) 606 607 def copy_field_value(self, fieldname, value): 608 # type: (str, Any) -> Any 609 return self.get_field(fieldname).do_copy(value) 610 611 def copy_fields_dict(self, fields): 612 # type: (_T) -> _T 613 if fields is None: 614 return None 615 return {fname: self.copy_field_value(fname, fval) 616 for fname, fval in six.iteritems(fields)} 617 618 def clear_cache(self): 619 # type: () -> None 620 """Clear the raw packet cache for the field and all its subfields""" 621 self.raw_packet_cache = None 622 for fld, fval in six.iteritems(self.fields): 623 fld = self.get_field(fld) 624 if fld.holds_packets: 625 if isinstance(fval, Packet): 626 fval.clear_cache() 627 elif isinstance(fval, list): 628 for fsubval in fval: 629 fsubval.clear_cache() 630 self.payload.clear_cache() 631 632 def self_build(self): 633 # type: () -> bytes 634 """ 635 Create the default layer regarding fields_desc dict 636 637 :param field_pos_list: 638 """ 639 if self.raw_packet_cache is not None: 640 for fname, fval in six.iteritems(self.raw_packet_cache_fields): 641 if self.getfieldval(fname) != fval: 642 self.raw_packet_cache = None 643 self.raw_packet_cache_fields = None 644 self.wirelen = None 645 break 646 if self.raw_packet_cache is not None: 647 return self.raw_packet_cache 648 p = b"" 649 for f in self.fields_desc: 650 val = self.getfieldval(f.name) 651 if isinstance(val, RawVal): 652 p += bytes(val) 653 else: 654 p = f.addfield(self, p, val) 655 return p 656 657 def do_build_payload(self): 658 # type: () -> bytes 659 """ 660 Create the default version of the payload layer 661 662 :return: a string of payload layer 663 """ 664 return self.payload.do_build() 665 666 def do_build(self): 667 # type: () -> bytes 668 """ 669 Create the default version of the layer 670 671 :return: a string of the packet with the payload 672 """ 673 if not self.explicit: 674 self = next(iter(self)) 675 pkt = self.self_build() 676 for t in self.post_transforms: 677 pkt = t(pkt) 678 pay = self.do_build_payload() 679 if self.raw_packet_cache is None: 680 return self.post_build(pkt, pay) 681 else: 682 return pkt + pay 683 684 def build_padding(self): 685 # type: () -> bytes 686 return self.payload.build_padding() 687 688 def build(self): 689 # type: () -> bytes 690 """ 691 Create the current layer 692 693 :return: string of the packet with the payload 694 """ 695 p = self.do_build() 696 p += self.build_padding() 697 p = self.build_done(p) 698 return p 699 700 def post_build(self, pkt, pay): 701 # type: (bytes, bytes) -> bytes 702 """ 703 DEV: called right after the current layer is build. 704 705 :param str pkt: the current packet (build by self_buil function) 706 :param str pay: the packet payload (build by do_build_payload function) 707 :return: a string of the packet with the payload 708 """ 709 return pkt + pay 710 711 def build_done(self, p): 712 # type: (bytes) -> bytes 713 return self.payload.build_done(p) 714 715 def do_build_ps(self): 716 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501 717 p = b"" 718 pl = [] 719 q = b"" 720 for f in self.fields_desc: 721 if isinstance(f, ConditionalField) and not f._evalcond(self): 722 continue 723 p = f.addfield(self, p, self.getfieldval(f.name)) 724 if isinstance(p, bytes): 725 r = p[len(q):] 726 q = p 727 else: 728 r = b"" 729 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r)) 730 731 pkt, lst = self.payload.build_ps(internal=1) 732 p += pkt 733 lst.append((self, pl)) 734 735 return p, lst 736 737 def build_ps(self, internal=0): 738 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501 739 p, lst = self.do_build_ps() 740# if not internal: 741# pkt = self 742# while pkt.haslayer(conf.padding_layer): 743# pkt = pkt.getlayer(conf.padding_layer) 744# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) 745# p += pkt.load 746# pkt = pkt.payload 747 return p, lst 748 749 def canvas_dump(self, layer_shift=0, rebuild=1): 750 # type: (int, int) -> pyx.canvas.canvas 751 if PYX == 0: 752 raise ImportError("PyX and its dependencies must be installed") 753 canvas = pyx.canvas.canvas() 754 if rebuild: 755 _, t = self.__class__(raw(self)).build_ps() 756 else: 757 _, t = self.build_ps() 758 YTXTI = len(t) 759 for _, l in t: 760 YTXTI += len(l) 761 YTXT = float(YTXTI) 762 YDUMP = YTXT 763 764 XSTART = 1 765 XDSTART = 10 766 y = 0.0 767 yd = 0.0 768 XMUL = 0.55 769 YMUL = 0.4 770 771 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) 772 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) 773# backcolor=makecol(0.376, 0.729, 0.525, 1.0) 774 775 def hexstr(x): 776 # type: (bytes) -> str 777 return " ".join("%02x" % orb(c) for c in x) 778 779 def make_dump_txt(x, y, txt): 780 # type: (int, float, bytes) -> pyx.text.text 781 return pyx.text.text( 782 XDSTART + x * XMUL, 783 (YDUMP - y) * YMUL, 784 r"\tt{%s}" % hexstr(txt), 785 [pyx.text.size.Large] 786 ) 787 788 def make_box(o): 789 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox 790 return pyx.box.rect( 791 o.left(), o.bottom(), o.width(), o.height(), 792 relcenter=(0.5, 0.5) 793 ) 794 795 def make_frame(lst): 796 # type: (List[Any]) -> pyx.path.path 797 if len(lst) == 1: 798 b = lst[0].bbox() 799 b.enlarge(pyx.unit.u_pt) 800 return b.path() 801 else: 802 fb = lst[0].bbox() 803 fb.enlarge(pyx.unit.u_pt) 804 lb = lst[-1].bbox() 805 lb.enlarge(pyx.unit.u_pt) 806 if len(lst) == 2 and fb.left() > lb.right(): 807 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), 808 pyx.path.lineto(fb.left(), fb.top()), 809 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501 810 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501 811 pyx.path.moveto(lb.left(), lb.top()), 812 pyx.path.lineto(lb.right(), lb.top()), 813 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 814 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501 815 else: 816 # XXX 817 gb = lst[1].bbox() 818 if gb != lb: 819 gb.enlarge(pyx.unit.u_pt) 820 kb = lst[-2].bbox() 821 if kb != gb and kb != lb: 822 kb.enlarge(pyx.unit.u_pt) 823 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), 824 pyx.path.lineto(fb.right(), fb.top()), 825 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501 826 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501 827 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 828 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501 829 pyx.path.lineto(lb.left(), gb.top()), 830 pyx.path.lineto(fb.left(), gb.top()), 831 pyx.path.closepath(),) 832 833 def make_dump(s, # type: bytes 834 shift=0, # type: int 835 y=0., # type: float 836 col=None, # type: pyx.color.color 837 bkcol=None, # type: pyx.color.color 838 large=16 # type: int 839 ): 840 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501 841 c = pyx.canvas.canvas() 842 tlist = [] 843 while s: 844 dmp, s = s[:large - shift], s[large - shift:] 845 txt = make_dump_txt(shift, y, dmp) 846 tlist.append(txt) 847 shift += len(dmp) 848 if shift >= 16: 849 shift = 0 850 y += 1 851 if col is None: 852 col = pyx.color.rgb.red 853 if bkcol is None: 854 bkcol = pyx.color.rgb.white 855 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501 856 for txt in tlist: 857 c.insert(txt) 858 return c, tlist[-1].bbox(), shift, y 859 860 last_shift, last_y = 0, 0.0 861 while t: 862 bkcol = next(backcolor) 863 proto, fields = t.pop() 864 y += 0.5 865 pt = pyx.text.text( 866 XSTART, 867 (YTXT - y) * YMUL, 868 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape( 869 str(proto.name) 870 ), 871 [pyx.text.size.Large] 872 ) 873 y += 1 874 ptbb = pt.bbox() 875 ptbb.enlarge(pyx.unit.u_pt * 2) 876 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501 877 canvas.insert(pt) 878 for field, fval, fdump in fields: 879 col = next(forecolor) 880 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501 881 if isinstance(field, BitField): 882 fsize = '%sb' % field.size 883 else: 884 fsize = '%sB' % len(fdump) 885 if (hasattr(field, 'field') and 886 'LE' in field.field.__class__.__name__[:3] or 887 'LE' in field.__class__.__name__[:3]): 888 fsize = r'$\scriptstyle\langle$' + fsize 889 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501 890 if isinstance(fval, str): 891 if len(fval) > 18: 892 fval = fval[:18] + "[...]" 893 else: 894 fval = "" 895 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501 896 y += 1.0 897 if fdump: 898 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501 899 900 dtb = target 901 vtb = vt.bbox() 902 bxvt = make_box(vtb) 903 bxdt = make_box(dtb) 904 dtb.enlarge(pyx.unit.u_pt) 905 try: 906 if yd < 0: 907 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501 908 else: 909 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501 910 except Exception: 911 pass 912 else: 913 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501 914 915 canvas.insert(dt) 916 917 canvas.insert(ft) 918 canvas.insert(st) 919 canvas.insert(vt) 920 last_y += layer_shift 921 922 return canvas 923 924 def extract_padding(self, s): 925 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 926 """ 927 DEV: to be overloaded to extract current layer's padding. 928 929 :param str s: the current layer 930 :return: a couple of strings (actual layer, padding) 931 """ 932 return s, None 933 934 def post_dissect(self, s): 935 # type: (bytes) -> bytes 936 """DEV: is called right after the current layer has been dissected""" 937 return s 938 939 def pre_dissect(self, s): 940 # type: (bytes) -> bytes 941 """DEV: is called right before the current layer is dissected""" 942 return s 943 944 def do_dissect(self, s): 945 # type: (bytes) -> bytes 946 _raw = s 947 self.raw_packet_cache_fields = {} 948 for f in self.fields_desc: 949 if not s: 950 break 951 s, fval = f.getfield(self, s) 952 # Skip unused ConditionalField 953 if isinstance(f, ConditionalField) and fval is None: 954 continue 955 # We need to track fields with mutable values to discard 956 # .raw_packet_cache when needed. 957 if f.islist or f.holds_packets or f.ismutable: 958 self.raw_packet_cache_fields[f.name] = f.do_copy(fval) 959 self.fields[f.name] = fval 960 self.raw_packet_cache = _raw[:-len(s)] if s else _raw 961 self.explicit = 1 962 return s 963 964 def do_dissect_payload(self, s): 965 # type: (bytes) -> None 966 """ 967 Perform the dissection of the layer's payload 968 969 :param str s: the raw layer 970 """ 971 if s: 972 cls = self.guess_payload_class(s) 973 try: 974 p = cls(s, _internal=1, _underlayer=self) 975 except KeyboardInterrupt: 976 raise 977 except Exception: 978 if conf.debug_dissector: 979 if issubtype(cls, Packet): 980 log_runtime.error("%s dissector failed", cls.__name__) 981 else: 982 log_runtime.error("%s.guess_payload_class() returned " 983 "[%s]", 984 self.__class__.__name__, repr(cls)) 985 if cls is not None: 986 raise 987 p = conf.raw_layer(s, _internal=1, _underlayer=self) 988 self.add_payload(p) 989 990 def dissect(self, s): 991 # type: (bytes) -> None 992 s = self.pre_dissect(s) 993 994 s = self.do_dissect(s) 995 996 s = self.post_dissect(s) 997 998 payl, pad = self.extract_padding(s) 999 self.do_dissect_payload(payl) 1000 if pad and conf.padding: 1001 self.add_payload(conf.padding_layer(pad)) 1002 1003 def guess_payload_class(self, payload): 1004 # type: (bytes) -> Type[Packet] 1005 """ 1006 DEV: Guesses the next payload class from layer bonds. 1007 Can be overloaded to use a different mechanism. 1008 1009 :param str payload: the layer's payload 1010 :return: the payload class 1011 """ 1012 for t in self.aliastypes: 1013 for fval, cls in t.payload_guess: 1014 try: 1015 if all(v == self.getfieldval(k) 1016 for k, v in six.iteritems(fval)): 1017 return cls # type: ignore 1018 except AttributeError: 1019 pass 1020 return self.default_payload_class(payload) 1021 1022 def default_payload_class(self, payload): 1023 # type: (bytes) -> Type[Packet] 1024 """ 1025 DEV: Returns the default payload class if nothing has been found by the 1026 guess_payload_class() method. 1027 1028 :param str payload: the layer's payload 1029 :return: the default payload class define inside the configuration file 1030 """ 1031 return conf.raw_layer 1032 1033 def hide_defaults(self): 1034 # type: () -> None 1035 """Removes fields' values that are the same as default values.""" 1036 # use list(): self.fields is modified in the loop 1037 for k, v in list(six.iteritems(self.fields)): 1038 v = self.fields[k] 1039 if k in self.default_fields: 1040 if self.default_fields[k] == v: 1041 del self.fields[k] 1042 self.payload.hide_defaults() 1043 1044 def update_sent_time(self, time): 1045 # type: (Optional[float]) -> None 1046 """Use by clone_with to share the sent_time value""" 1047 pass 1048 1049 def clone_with(self, payload=None, share_time=False, **kargs): 1050 # type: (Optional[Any], bool, **Any) -> Any 1051 pkt = self.__class__() 1052 pkt.explicit = 1 1053 pkt.fields = kargs 1054 pkt.default_fields = self.copy_fields_dict(self.default_fields) 1055 pkt.overloaded_fields = self.overloaded_fields.copy() 1056 pkt.time = self.time 1057 pkt.underlayer = self.underlayer 1058 pkt.post_transforms = self.post_transforms 1059 pkt.raw_packet_cache = self.raw_packet_cache 1060 pkt.raw_packet_cache_fields = self.copy_fields_dict( 1061 self.raw_packet_cache_fields 1062 ) 1063 pkt.wirelen = self.wirelen 1064 if payload is not None: 1065 pkt.add_payload(payload) 1066 if share_time: 1067 # This binds the subpacket .sent_time to this layer 1068 def _up_time(x, parent=self): 1069 # type: (float, Packet) -> None 1070 parent.sent_time = x 1071 pkt.update_sent_time = _up_time # type: ignore 1072 return pkt 1073 1074 def __iter__(self): 1075 # type: () -> Iterator[Packet] 1076 """Iterates through all sub-packets generated by this Packet.""" 1077 # We use __iterlen__ as low as possible, to lower processing time 1078 def loop(todo, done, self=self): 1079 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet] 1080 if todo: 1081 eltname = todo.pop() 1082 elt = self.getfieldval(eltname) 1083 if not isinstance(elt, Gen): 1084 if self.get_field(eltname).islist: 1085 elt = SetGen([elt]) 1086 else: 1087 elt = SetGen(elt) 1088 for e in elt: 1089 done[eltname] = e 1090 for x in loop(todo[:], done): 1091 yield x 1092 else: 1093 if isinstance(self.payload, NoPayload): 1094 payloads = SetGen([None]) # type: SetGen[Packet] 1095 else: 1096 payloads = self.payload 1097 share_time = False 1098 if self.fields == done and payloads.__iterlen__() == 1: 1099 # In this case, the packets are identical. Let's bind 1100 # their sent_time attribute for sending purpose 1101 share_time = True 1102 for payl in payloads: 1103 # Let's make sure subpackets are consistent 1104 done2 = done.copy() 1105 for k in done2: 1106 if isinstance(done2[k], VolatileValue): 1107 done2[k] = done2[k]._fix() 1108 pkt = self.clone_with(payload=payl, share_time=share_time, 1109 **done2) 1110 yield pkt 1111 1112 if self.explicit or self.raw_packet_cache is not None: 1113 todo = [] 1114 done = self.fields 1115 else: 1116 todo = [k for (k, v) in itertools.chain(six.iteritems(self.default_fields), # noqa: E501 1117 six.iteritems(self.overloaded_fields)) # noqa: E501 1118 if isinstance(v, VolatileValue)] + list(self.fields) 1119 done = {} 1120 return loop(todo, done) 1121 1122 def __iterlen__(self): 1123 # type: () -> int 1124 """Predict the total length of the iterator""" 1125 fields = [key for (key, val) in itertools.chain(six.iteritems(self.default_fields), # noqa: E501 1126 six.iteritems(self.overloaded_fields)) 1127 if isinstance(val, VolatileValue)] + list(self.fields) 1128 length = 1 1129 1130 def is_valid_gen_tuple(x): 1131 # type: (Any) -> bool 1132 if not isinstance(x, tuple): 1133 return False 1134 return len(x) == 2 and all(isinstance(z, int) for z in x) 1135 1136 for field in fields: 1137 fld, val = self.getfield_and_val(field) 1138 if hasattr(val, "__iterlen__"): 1139 length *= val.__iterlen__() 1140 elif is_valid_gen_tuple(val): 1141 length *= (val[1] - val[0] + 1) 1142 elif isinstance(val, list) and not fld.islist: 1143 len2 = 0 1144 for x in val: 1145 if hasattr(x, "__iterlen__"): 1146 len2 += x.__iterlen__() 1147 elif is_valid_gen_tuple(x): 1148 len2 += (x[1] - x[0] + 1) 1149 elif isinstance(x, list): 1150 len2 += len(x) 1151 else: 1152 len2 += 1 1153 length *= len2 or 1 1154 if not isinstance(self.payload, NoPayload): 1155 return length * self.payload.__iterlen__() 1156 return length 1157 1158 def iterpayloads(self): 1159 # type: () -> Iterator[Packet] 1160 """Used to iter through the payloads of a Packet. 1161 Useful for DNS or 802.11 for instance. 1162 """ 1163 yield self 1164 current = self 1165 while current.payload: 1166 current = current.payload 1167 yield current 1168 1169 def __gt__(self, other): 1170 # type: (Packet) -> int 1171 """True if other is an answer from self (self ==> other).""" 1172 if isinstance(other, Packet): 1173 return other < self 1174 elif isinstance(other, bytes): 1175 return 1 1176 else: 1177 raise TypeError((self, other)) 1178 1179 def __lt__(self, other): 1180 # type: (Packet) -> int 1181 """True if self is an answer from other (other ==> self).""" 1182 if isinstance(other, Packet): 1183 return self.answers(other) 1184 elif isinstance(other, bytes): 1185 return 1 1186 else: 1187 raise TypeError((self, other)) 1188 1189 def __eq__(self, other): 1190 # type: (Any) -> bool 1191 if not isinstance(other, self.__class__): 1192 return False 1193 for f in self.fields_desc: 1194 if f not in other.fields_desc: 1195 return False 1196 if self.getfieldval(f.name) != other.getfieldval(f.name): 1197 return False 1198 return self.payload == other.payload 1199 1200 def __ne__(self, other): 1201 # type: (Any) -> bool 1202 return not self.__eq__(other) 1203 1204 # Note: setting __hash__ to None is the standard way 1205 # of making an object un-hashable. mypy doesn't know that 1206 __hash__ = None # type: ignore 1207 1208 def hashret(self): 1209 # type: () -> bytes 1210 """DEV: returns a string that has the same value for a request 1211 and its answer.""" 1212 return self.payload.hashret() 1213 1214 def answers(self, other): 1215 # type: (Packet) -> int 1216 """DEV: true if self is an answer from other""" 1217 if other.__class__ == self.__class__: 1218 return self.payload.answers(other.payload) 1219 return 0 1220 1221 def layers(self): 1222 # type: () -> List[Type[Packet]] 1223 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501 1224 layers = [] 1225 lyr = self # type: Optional[Packet] 1226 while lyr: 1227 layers.append(lyr.__class__) 1228 lyr = lyr.payload.getlayer(0, _subclass=True) 1229 return layers 1230 1231 def haslayer(self, cls, _subclass=None): 1232 # type: (Union[Type[Packet], str], Optional[bool]) -> int 1233 """ 1234 true if self has a layer that is an instance of cls. 1235 Superseded by "cls in self" syntax. 1236 """ 1237 if _subclass is None: 1238 _subclass = self.match_subclass or None 1239 if _subclass: 1240 match = issubtype 1241 else: 1242 match = lambda cls1, cls2: bool(cls1 == cls2) 1243 if cls is None or match(self.__class__, cls) \ 1244 or cls in [self.__class__.__name__, self._name]: 1245 return True 1246 for f in self.packetfields: 1247 fvalue_gen = self.getfieldval(f.name) 1248 if fvalue_gen is None: 1249 continue 1250 if not f.islist: 1251 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 1252 for fvalue in fvalue_gen: 1253 if isinstance(fvalue, Packet): 1254 ret = fvalue.haslayer(cls, _subclass=_subclass) 1255 if ret: 1256 return ret 1257 return self.payload.haslayer(cls, _subclass=_subclass) 1258 1259 def getlayer(self, 1260 cls, # type: Union[int, Type[Packet], str] 1261 nb=1, # type: int 1262 _track=None, # type: Optional[List[int]] 1263 _subclass=None, # type: Optional[bool] 1264 **flt # type: Any 1265 ): 1266 # type: (...) -> Optional[Packet] 1267 """Return the nb^th layer that is an instance of cls, matching flt 1268values. 1269 """ 1270 if _subclass is None: 1271 _subclass = self.match_subclass or None 1272 if _subclass: 1273 match = issubtype 1274 else: 1275 match = lambda cls1, cls2: bool(cls1 == cls2) 1276 # Note: 1277 # cls can be int, packet, str 1278 # string_class_name can be packet, str (packet or packet+field) 1279 # class_name can be packet, str (packet only) 1280 if isinstance(cls, int): 1281 nb = cls + 1 1282 string_class_name = "" # type: Union[Type[Packet], str] 1283 else: 1284 string_class_name = cls 1285 class_name = "" # type: Union[Type[Packet], str] 1286 fld = None # type: Optional[str] 1287 if isinstance(string_class_name, str) and "." in string_class_name: 1288 class_name, fld = string_class_name.split(".", 1) 1289 else: 1290 class_name, fld = string_class_name, None 1291 if not class_name or match(self.__class__, class_name) \ 1292 or class_name in [self.__class__.__name__, self._name]: 1293 if all(self.getfieldval(fldname) == fldvalue 1294 for fldname, fldvalue in six.iteritems(flt)): 1295 if nb == 1: 1296 if fld is None: 1297 return self 1298 else: 1299 return self.getfieldval(fld) # type: ignore 1300 else: 1301 nb -= 1 1302 for f in self.packetfields: 1303 fvalue_gen = self.getfieldval(f.name) 1304 if fvalue_gen is None: 1305 continue 1306 if not f.islist: 1307 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 1308 for fvalue in fvalue_gen: 1309 if isinstance(fvalue, Packet): 1310 track = [] # type: List[int] 1311 ret = fvalue.getlayer(class_name, nb=nb, _track=track, 1312 _subclass=_subclass, **flt) 1313 if ret is not None: 1314 return ret 1315 nb = track[0] 1316 return self.payload.getlayer(class_name, nb=nb, _track=_track, 1317 _subclass=_subclass, **flt) 1318 1319 def firstlayer(self): 1320 # type: () -> Packet 1321 q = self 1322 while q.underlayer is not None: 1323 q = q.underlayer 1324 return q 1325 1326 def __getitem__(self, cls): 1327 # type: (Union[Type[Packet], str]) -> Any 1328 if isinstance(cls, slice): 1329 lname = cls.start 1330 if cls.stop: 1331 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) 1332 else: 1333 ret = self.getlayer(cls.start, **(cls.step or {})) 1334 else: 1335 lname = cls 1336 ret = self.getlayer(cls) 1337 if ret is None: 1338 if isinstance(lname, type): 1339 name = lname.__name__ 1340 elif not isinstance(lname, bytes): 1341 name = repr(lname) 1342 else: 1343 name = cast(str, lname) 1344 raise IndexError("Layer [%s] not found" % name) 1345 return ret 1346 1347 def __delitem__(self, cls): 1348 # type: (Type[Packet]) -> None 1349 del(self[cls].underlayer.payload) 1350 1351 def __setitem__(self, cls, val): 1352 # type: (Type[Packet], Packet) -> None 1353 self[cls].underlayer.payload = val 1354 1355 def __contains__(self, cls): 1356 # type: (Union[Type[Packet], str]) -> int 1357 """ 1358 "cls in self" returns true if self has a layer which is an 1359 instance of cls. 1360 """ 1361 return self.haslayer(cls) 1362 1363 def route(self): 1364 # type: () -> Tuple[Any, Optional[str], Optional[str]] 1365 return self.payload.route() 1366 1367 def fragment(self, *args, **kargs): 1368 # type: (*Any, **Any) -> List[Packet] 1369 return self.payload.fragment(*args, **kargs) 1370 1371 def display(self, *args, **kargs): # Deprecated. Use show() 1372 # type: (*Any, **Any) -> None 1373 """Deprecated. Use show() method.""" 1374 self.show(*args, **kargs) 1375 1376 def _show_or_dump(self, 1377 dump=False, # type: bool 1378 indent=3, # type: int 1379 lvl="", # type: str 1380 label_lvl="", # type: str 1381 first_call=True # type: bool 1382 ): 1383 # type: (...) -> Optional[str] 1384 """ 1385 Internal method that shows or dumps a hierarchical view of a packet. 1386 Called by show. 1387 1388 :param dump: determine if it prints or returns the string value 1389 :param int indent: the size of indentation for each layer 1390 :param str lvl: additional information about the layer lvl 1391 :param str label_lvl: additional information about the layer fields 1392 :param first_call: determine if the current function is the first 1393 :return: return a hierarchical view if dump, else print it 1394 """ 1395 1396 if dump: 1397 from scapy.themes import AnsiColorTheme 1398 ct = AnsiColorTheme() # No color for dump output 1399 else: 1400 ct = conf.color_theme 1401 s = "%s%s %s %s \n" % (label_lvl, 1402 ct.punct("###["), 1403 ct.layer_name(self.name), 1404 ct.punct("]###")) 1405 for f in self.fields_desc: 1406 if isinstance(f, ConditionalField) and not f._evalcond(self): 1407 continue 1408 if isinstance(f, Emph) or f in conf.emph: 1409 ncol = ct.emph_field_name 1410 vcol = ct.emph_field_value 1411 else: 1412 ncol = ct.field_name 1413 vcol = ct.field_value 1414 fvalue = self.getfieldval(f.name) 1415 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501 1416 pad = max(0, 10 - len(f.name)) * " " 1417 s += "%s \\%s%s\\\n" % (label_lvl + lvl, ncol(f.name), pad) 1418 fvalue_gen = SetGen( 1419 fvalue, 1420 _iterpacket=0 1421 ) # type: SetGen[Packet] 1422 for fvalue in fvalue_gen: 1423 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501 1424 else: 1425 pad = max(0, 10 - len(f.name)) * " " 1426 begn = "%s %s%s%s " % (label_lvl + lvl, 1427 ncol(f.name), 1428 pad, 1429 ct.punct("="),) 1430 reprval = f.i2repr(self, fvalue) 1431 if isinstance(reprval, str): 1432 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501 1433 len(lvl) + 1434 len(f.name) + 1435 4)) 1436 s += "%s%s\n" % (begn, vcol(reprval)) 1437 if self.payload: 1438 s += self.payload._show_or_dump( # type: ignore 1439 dump=dump, 1440 indent=indent, 1441 lvl=lvl + (" " * indent * self.show_indent), 1442 label_lvl=label_lvl, 1443 first_call=False 1444 ) 1445 1446 if first_call and not dump: 1447 print(s) 1448 return None 1449 else: 1450 return s 1451 1452 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 1453 # type: (bool, int, str, str) -> Optional[Any] 1454 """ 1455 Prints or returns (when "dump" is true) a hierarchical view of the 1456 packet. 1457 1458 :param dump: determine if it prints or returns the string value 1459 :param int indent: the size of indentation for each layer 1460 :param str lvl: additional information about the layer lvl 1461 :param str label_lvl: additional information about the layer fields 1462 :return: return a hierarchical view if dump, else print it 1463 """ 1464 return self._show_or_dump(dump, indent, lvl, label_lvl) 1465 1466 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 1467 # type: (bool, int, str, str) -> Optional[Any] 1468 """ 1469 Prints or returns (when "dump" is true) a hierarchical view of an 1470 assembled version of the packet, so that automatic fields are 1471 calculated (checksums, etc.) 1472 1473 :param dump: determine if it prints or returns the string value 1474 :param int indent: the size of indentation for each layer 1475 :param str lvl: additional information about the layer lvl 1476 :param str label_lvl: additional information about the layer fields 1477 :return: return a hierarchical view if dump, else print it 1478 """ 1479 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) 1480 1481 def sprintf(self, fmt, relax=1): 1482 # type: (str, int) -> str 1483 """ 1484 sprintf(format, [relax=1]) -> str 1485 1486 Where format is a string that can include directives. A directive 1487 begins and ends by % and has the following format: 1488 ``%[fmt[r],][cls[:nb].]field%`` 1489 1490 :param fmt: is a classic printf directive, "r" can be appended for raw 1491 substitution: 1492 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer 1493 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). 1494 Special case : "%.time%" is the creation time. 1495 Ex:: 1496 1497 p.sprintf( 1498 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " 1499 "%03xr,IP.proto% %r,TCP.flags%" 1500 ) 1501 1502 Moreover, the format string can include conditional statements. A 1503 conditional statement looks like : {layer:string} where layer is a 1504 layer name, and string is the string to insert in place of the 1505 condition if it is true, i.e. if layer is present. If layer is 1506 preceded by a "!", the result is inverted. Conditions can be 1507 imbricated. A valid statement can be:: 1508 1509 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") 1510 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") 1511 1512 A side effect is that, to obtain "{" and "}" characters, you must use 1513 "%(" and "%)". 1514 """ 1515 1516 escape = {"%": "%", 1517 "(": "{", 1518 ")": "}"} 1519 1520 # Evaluate conditions 1521 while "{" in fmt: 1522 i = fmt.rindex("{") 1523 j = fmt[i + 1:].index("}") 1524 cond = fmt[i + 1:i + j + 1] 1525 k = cond.find(":") 1526 if k < 0: 1527 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501 1528 cond, format_ = cond[:k], cond[k + 1:] 1529 res = False 1530 if cond[0] == "!": 1531 res = True 1532 cond = cond[1:] 1533 if self.haslayer(cond): 1534 res = not res 1535 if not res: 1536 format_ = "" 1537 fmt = fmt[:i] + format_ + fmt[i + j + 2:] 1538 1539 # Evaluate directives 1540 s = "" 1541 while "%" in fmt: 1542 i = fmt.index("%") 1543 s += fmt[:i] 1544 fmt = fmt[i + 1:] 1545 if fmt and fmt[0] in escape: 1546 s += escape[fmt[0]] 1547 fmt = fmt[1:] 1548 continue 1549 try: 1550 i = fmt.index("%") 1551 sfclsfld = fmt[:i] 1552 fclsfld = sfclsfld.split(",") 1553 if len(fclsfld) == 1: 1554 f = "s" 1555 clsfld = fclsfld[0] 1556 elif len(fclsfld) == 2: 1557 f, clsfld = fclsfld 1558 else: 1559 raise Scapy_Exception 1560 if "." in clsfld: 1561 cls, fld = clsfld.split(".") 1562 else: 1563 cls = self.__class__.__name__ 1564 fld = clsfld 1565 num = 1 1566 if ":" in cls: 1567 cls, snum = cls.split(":") 1568 num = int(snum) 1569 fmt = fmt[i + 1:] 1570 except Exception: 1571 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501 1572 else: 1573 if fld == "time": 1574 val = time.strftime( 1575 "%H:%M:%S.%%06i", 1576 time.localtime(float(self.time)) 1577 ) % int((self.time - int(self.time)) * 1000000) 1578 elif cls == self.__class__.__name__ and hasattr(self, fld): 1579 if num > 1: 1580 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501 1581 f = "s" 1582 elif f[-1] == "r": # Raw field value 1583 val = getattr(self, fld) 1584 f = f[:-1] 1585 if not f: 1586 f = "s" 1587 else: 1588 val = getattr(self, fld) 1589 if fld in self.fieldtype: 1590 val = self.fieldtype[fld].i2repr(self, val) 1591 else: 1592 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) 1593 f = "s" 1594 s += ("%" + f) % val 1595 1596 s += fmt 1597 return s 1598 1599 def mysummary(self): 1600 # type: () -> str 1601 """DEV: can be overloaded to return a string that summarizes the layer. 1602 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501 1603 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501 1604 mysummary() must be called if they are present.""" 1605 return "" 1606 1607 def _do_summary(self): 1608 # type: () -> Tuple[int, str, List[Any]] 1609 found, s, needed = self.payload._do_summary() 1610 ret = "" 1611 if not found or self.__class__ in needed: 1612 ret = self.mysummary() 1613 if isinstance(ret, tuple): 1614 ret, n = ret 1615 needed += n 1616 if ret or needed: 1617 found = 1 1618 if not ret: 1619 ret = self.__class__.__name__ if self.show_summary else "" 1620 if self.__class__ in conf.emph: 1621 impf = [] 1622 for f in self.fields_desc: 1623 if f in conf.emph: 1624 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501 1625 ret = "%s [%s]" % (ret, " ".join(impf)) 1626 if ret and s: 1627 ret = "%s / %s" % (ret, s) 1628 else: 1629 ret = "%s%s" % (ret, s) 1630 return found, ret, needed 1631 1632 def summary(self, intern=0): 1633 # type: (int) -> str 1634 """Prints a one line summary of a packet.""" 1635 return self._do_summary()[1] 1636 1637 def lastlayer(self, layer=None): 1638 # type: (Optional[Packet]) -> Packet 1639 """Returns the uppest layer of the packet""" 1640 return self.payload.lastlayer(self) 1641 1642 def decode_payload_as(self, cls): 1643 # type: (Type[Packet]) -> None 1644 """Reassembles the payload and decode it using another packet class""" 1645 s = raw(self.payload) 1646 self.payload = cls(s, _internal=1, _underlayer=self) 1647 pp = self 1648 while pp.underlayer is not None: 1649 pp = pp.underlayer 1650 self.payload.dissection_done(pp) 1651 1652 def command(self): 1653 # type: () -> str 1654 """ 1655 Returns a string representing the command you have to type to 1656 obtain the same packet 1657 """ 1658 f = [] 1659 for fn, fv in six.iteritems(self.fields): 1660 fld = self.get_field(fn) 1661 if isinstance(fv, (list, dict, set)) and len(fv) == 0: 1662 continue 1663 if isinstance(fv, Packet): 1664 fv = fv.command() 1665 elif fld.islist and fld.holds_packets and isinstance(fv, list): 1666 fv = "[%s]" % ",".join(map(Packet.command, fv)) 1667 elif isinstance(fld, FlagsField): 1668 fv = int(fv) 1669 elif callable(getattr(fv, 'command', None)): 1670 fv = fv.command() 1671 else: 1672 fv = repr(fv) 1673 f.append("%s=%s" % (fn, fv)) 1674 c = "%s(%s)" % (self.__class__.__name__, ", ".join(f)) 1675 pc = self.payload.command() 1676 if pc: 1677 c += "/" + pc 1678 return c 1679 1680 def convert_to(self, other_cls, **kwargs): 1681 # type: (Type[Packet], **Any) -> Packet 1682 """Converts this Packet to another type. 1683 1684 This is not guaranteed to be a lossless process. 1685 1686 By default, this only implements conversion to ``Raw``. 1687 1688 :param other_cls: Reference to a Packet class to convert to. 1689 :type other_cls: Type[scapy.packet.Packet] 1690 :return: Converted form of the packet. 1691 :rtype: other_cls 1692 :raises TypeError: When conversion is not possible 1693 """ 1694 if not issubtype(other_cls, Packet): 1695 raise TypeError("{} must implement Packet".format(other_cls)) 1696 1697 if other_cls is Raw: 1698 return Raw(raw(self)) 1699 1700 if "_internal" not in kwargs: 1701 return other_cls.convert_packet(self, _internal=True, **kwargs) 1702 1703 raise TypeError("Cannot convert {} to {}".format( 1704 type(self).__name__, other_cls.__name__)) 1705 1706 @classmethod 1707 def convert_packet(cls, pkt, **kwargs): 1708 # type: (Packet, **Any) -> Packet 1709 """Converts another packet to be this type. 1710 1711 This is not guaranteed to be a lossless process. 1712 1713 :param pkt: The packet to convert. 1714 :type pkt: scapy.packet.Packet 1715 :return: Converted form of the packet. 1716 :rtype: cls 1717 :raises TypeError: When conversion is not possible 1718 """ 1719 if not isinstance(pkt, Packet): 1720 raise TypeError("Can only convert Packets") 1721 1722 if "_internal" not in kwargs: 1723 return pkt.convert_to(cls, _internal=True, **kwargs) 1724 1725 raise TypeError("Cannot convert {} to {}".format( 1726 type(pkt).__name__, cls.__name__)) 1727 1728 @classmethod 1729 def convert_packets(cls, 1730 pkts, # type: List[Packet] 1731 **kwargs # type: Any 1732 ): 1733 # type: (...) -> Iterator[Iterator[Packet]] 1734 """Converts many packets to this type. 1735 1736 This is implemented as a generator. 1737 1738 See ``Packet.convert_packet``. 1739 """ 1740 for pkt in pkts: 1741 yield cls.convert_packet(pkt, **kwargs) 1742 1743 1744class NoPayload(Packet): 1745 def __new__(cls, *args, **kargs): 1746 # type: (Type[Packet], *Any, **Any) -> Packet 1747 singl = cls.__dict__.get("__singl__") 1748 if singl is None: 1749 cls.__singl__ = singl = Packet.__new__(cls) 1750 Packet.__init__(singl) 1751 return singl # type: ignore 1752 1753 def __init__(self, *args, **kargs): 1754 # type: (*Any, **Any) -> None 1755 pass 1756 1757 def dissection_done(self, pkt): 1758 # type: (Packet) -> None 1759 pass 1760 1761 def add_payload(self, payload): 1762 # type: (Union[Packet, bytes]) -> NoReturn 1763 raise Scapy_Exception("Can't add payload to NoPayload instance") 1764 1765 def remove_payload(self): 1766 # type: () -> None 1767 pass 1768 1769 def add_underlayer(self, underlayer): 1770 # type: (Any) -> None 1771 pass 1772 1773 def remove_underlayer(self, other): 1774 # type: (Packet) -> None 1775 pass 1776 1777 def copy(self): 1778 # type: () -> NoPayload 1779 return self 1780 1781 def clear_cache(self): 1782 # type: () -> None 1783 pass 1784 1785 def __repr__(self): 1786 # type: () -> str 1787 return "" 1788 1789 def __str__(self): 1790 # type: () -> str 1791 return "" 1792 1793 def __bytes__(self): 1794 # type: () -> bytes 1795 return b"" 1796 1797 def __nonzero__(self): 1798 # type: () -> bool 1799 return False 1800 __bool__ = __nonzero__ 1801 1802 def do_build(self): 1803 # type: () -> bytes 1804 return b"" 1805 1806 def build(self): 1807 # type: () -> bytes 1808 return b"" 1809 1810 def build_padding(self): 1811 # type: () -> bytes 1812 return b"" 1813 1814 def build_done(self, p): 1815 # type: (bytes) -> bytes 1816 return p 1817 1818 def build_ps(self, internal=0): 1819 # type: (int) -> Tuple[bytes, List[Any]] 1820 return b"", [] 1821 1822 def getfieldval(self, attr): 1823 # type: (str) -> NoReturn 1824 raise AttributeError(attr) 1825 1826 def getfield_and_val(self, attr): 1827 # type: (str) -> NoReturn 1828 raise AttributeError(attr) 1829 1830 def setfieldval(self, attr, val): 1831 # type: (str, Any) -> NoReturn 1832 raise AttributeError(attr) 1833 1834 def delfieldval(self, attr): 1835 # type: (str) -> NoReturn 1836 raise AttributeError(attr) 1837 1838 def hide_defaults(self): 1839 # type: () -> None 1840 pass 1841 1842 def __iter__(self): 1843 # type: () -> Iterator[Packet] 1844 return iter([]) 1845 1846 def __eq__(self, other): 1847 # type: (Any) -> bool 1848 if isinstance(other, NoPayload): 1849 return True 1850 return False 1851 1852 def hashret(self): 1853 # type: () -> bytes 1854 return b"" 1855 1856 def answers(self, other): 1857 # type: (NoPayload) -> bool 1858 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501 1859 1860 def haslayer(self, cls, _subclass=None): 1861 # type: (Union[Type[Packet], str], Optional[bool]) -> int 1862 return 0 1863 1864 def getlayer(self, 1865 cls, # type: Union[int, Type[Packet], str] 1866 nb=1, # type: int 1867 _track=None, # type: Optional[List[int]] 1868 _subclass=None, # type: Optional[bool] 1869 **flt # type: Any 1870 ): 1871 # type: (...) -> Optional[Packet] 1872 if _track is not None: 1873 _track.append(nb) 1874 return None 1875 1876 def fragment(self, *args, **kargs): 1877 # type: (*Any, **Any) -> List[Packet] 1878 raise Scapy_Exception("cannot fragment this packet") 1879 1880 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 1881 # type: (bool, int, str, str) -> None 1882 pass 1883 1884 def sprintf(self, fmt, relax=1): 1885 # type: (str, int) -> str 1886 if relax: 1887 return "??" 1888 else: 1889 raise Scapy_Exception("Format not found [%s]" % fmt) 1890 1891 def _do_summary(self): 1892 # type: () -> Tuple[int, str, List[Any]] 1893 return 0, "", [] 1894 1895 def layers(self): 1896 # type: () -> List[Type[Packet]] 1897 return [] 1898 1899 def lastlayer(self, layer=None): 1900 # type: (Optional[Packet]) -> Packet 1901 return layer or self 1902 1903 def command(self): 1904 # type: () -> str 1905 return "" 1906 1907 def route(self): 1908 # type: () -> Tuple[None, None, None] 1909 return (None, None, None) 1910 1911 1912#################### 1913# packet classes # 1914#################### 1915 1916 1917class Raw(Packet): 1918 name = "Raw" 1919 fields_desc = [StrField("load", b"")] 1920 1921 def __init__(self, _pkt=b"", *args, **kwargs): 1922 # type: (bytes, *Any, **Any) -> None 1923 if _pkt and not isinstance(_pkt, bytes): 1924 _pkt = bytes_encode(_pkt) 1925 super(Raw, self).__init__(_pkt, *args, **kwargs) 1926 1927 def answers(self, other): 1928 # type: (Packet) -> int 1929 return 1 1930 1931 def mysummary(self): 1932 # type: () -> str 1933 cs = conf.raw_summary 1934 if cs: 1935 if callable(cs): 1936 return "Raw %s" % cs(self.load) 1937 else: 1938 return "Raw %r" % self.load 1939 return Packet.mysummary(self) 1940 1941 @classmethod 1942 def convert_packet(cls, pkt, **kwargs): 1943 # type: (Packet, **Any) -> Raw 1944 return Raw(raw(pkt)) 1945 1946 1947class Padding(Raw): 1948 name = "Padding" 1949 1950 def self_build(self, field_pos_list=None): 1951 # type: (Optional[Any]) -> bytes 1952 return b"" 1953 1954 def build_padding(self): 1955 # type: () -> bytes 1956 return ( 1957 bytes_encode(self.load) if self.raw_packet_cache is None 1958 else self.raw_packet_cache 1959 ) + self.payload.build_padding() 1960 1961 1962conf.raw_layer = Raw 1963conf.padding_layer = Padding 1964if conf.default_l2 is None: 1965 conf.default_l2 = Raw 1966 1967################# 1968# Bind layers # 1969################# 1970 1971 1972def bind_bottom_up(lower, # type: Type[Packet] 1973 upper, # type: Type[Packet] 1974 __fval=None, # type: Optional[Any] 1975 **fval # type: Any 1976 ): 1977 # type: (...) -> None 1978 r"""Bind 2 layers for dissection. 1979 The upper layer will be chosen for dissection on top of the lower layer, if 1980 ALL the passed arguments are validated. If multiple calls are made with 1981 the same layers, the last one will be used as default. 1982 1983 ex: 1984 >>> bind_bottom_up(Ether, SNAP, type=0x1234) 1985 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501 1986 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501 1987 """ 1988 if __fval is not None: 1989 fval.update(__fval) 1990 lower.payload_guess = lower.payload_guess[:] 1991 lower.payload_guess.append((fval, upper)) 1992 1993 1994def bind_top_down(lower, # type: Type[Packet] 1995 upper, # type: Type[Packet] 1996 __fval=None, # type: Optional[Any] 1997 **fval # type: Any 1998 ): 1999 # type: (...) -> None 2000 """Bind 2 layers for building. 2001 When the upper layer is added as a payload of the lower layer, all the 2002 arguments will be applied to them. 2003 2004 ex: 2005 >>> bind_top_down(Ether, SNAP, type=0x1234) 2006 >>> Ether()/SNAP() 2007 <Ether type=0x1234 |<SNAP |>> 2008 """ 2009 if __fval is not None: 2010 fval.update(__fval) 2011 upper._overload_fields = upper._overload_fields.copy() 2012 upper._overload_fields[lower] = fval 2013 2014 2015@conf.commands.register 2016def bind_layers(lower, # type: Type[Packet] 2017 upper, # type: Type[Packet] 2018 __fval=None, # type: Optional[Dict[str, int]] 2019 **fval # type: Any 2020 ): 2021 # type: (...) -> None 2022 """Bind 2 layers on some specific fields' values. 2023 2024 It makes the packet being built and dissected when the arguments 2025 are present. 2026 2027 This function calls both bind_bottom_up and bind_top_down, with 2028 all passed arguments. 2029 2030 Please have a look at their docs: 2031 - help(bind_bottom_up) 2032 - help(bind_top_down) 2033 """ 2034 if __fval is not None: 2035 fval.update(__fval) 2036 bind_top_down(lower, upper, **fval) 2037 bind_bottom_up(lower, upper, **fval) 2038 2039 2040def split_bottom_up(lower, # type: Type[Packet] 2041 upper, # type: Type[Packet] 2042 __fval=None, # type: Optional[Any] 2043 **fval # type: Any 2044 ): 2045 # type: (...) -> None 2046 """This call un-links an association that was made using bind_bottom_up. 2047 Have a look at help(bind_bottom_up) 2048 """ 2049 if __fval is not None: 2050 fval.update(__fval) 2051 2052 def do_filter(params, cls): 2053 # type: (Dict[str, int], Type[Packet]) -> bool 2054 params_is_invalid = any( 2055 k not in params or params[k] != v for k, v in six.iteritems(fval) 2056 ) 2057 return cls != upper or params_is_invalid 2058 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)] 2059 2060 2061def split_top_down(lower, # type: Type[Packet] 2062 upper, # type: Type[Packet] 2063 __fval=None, # type: Optional[Any] 2064 **fval # type: Any 2065 ): 2066 # type: (...) -> None 2067 """This call un-links an association that was made using bind_top_down. 2068 Have a look at help(bind_top_down) 2069 """ 2070 if __fval is not None: 2071 fval.update(__fval) 2072 if lower in upper._overload_fields: 2073 ofval = upper._overload_fields[lower] 2074 if any(k not in ofval or ofval[k] != v for k, v in six.iteritems(fval)): # noqa: E501 2075 return 2076 upper._overload_fields = upper._overload_fields.copy() 2077 del(upper._overload_fields[lower]) 2078 2079 2080@conf.commands.register 2081def split_layers(lower, # type: Type[Packet] 2082 upper, # type: Type[Packet] 2083 __fval=None, # type: Optional[Any] 2084 **fval # type: Any 2085 ): 2086 # type: (...) -> None 2087 """Split 2 layers previously bound. 2088 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501 2089 bind_layers. 2090 2091 Please have a look at their docs: 2092 - help(split_bottom_up) 2093 - help(split_top_down) 2094 """ 2095 if __fval is not None: 2096 fval.update(__fval) 2097 split_bottom_up(lower, upper, **fval) 2098 split_top_down(lower, upper, **fval) 2099 2100 2101@conf.commands.register 2102def explore(layer=None): 2103 # type: (Optional[str]) -> None 2104 """Function used to discover the Scapy layers and protocols. 2105 It helps to see which packets exists in contrib or layer files. 2106 2107 params: 2108 - layer: If specified, the function will explore the layer. If not, 2109 the GUI mode will be activated, to browse the available layers 2110 2111 examples: 2112 >>> explore() # Launches the GUI 2113 >>> explore("dns") # Explore scapy.layers.dns 2114 >>> explore("http2") # Explore scapy.contrib.http2 2115 >>> explore(scapy.layers.bluetooth4LE) 2116 2117 Note: to search a packet by name, use ls("name") rather than explore. 2118 """ 2119 if layer is None: # GUI MODE 2120 if not conf.interactive: 2121 raise Scapy_Exception("explore() GUI-mode cannot be run in " 2122 "interactive mode. Please provide a " 2123 "'layer' parameter !") 2124 # 0 - Imports 2125 try: 2126 import prompt_toolkit 2127 except ImportError: 2128 raise ImportError("prompt_toolkit is not installed ! " 2129 "You may install IPython, which contains it, via" 2130 " `pip install ipython`") 2131 if not _version_checker(prompt_toolkit, (2, 0)): 2132 raise ImportError("prompt_toolkit >= 2.0.0 is required !") 2133 # Only available with prompt_toolkit > 2.0, not released on PyPi yet 2134 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \ 2135 button_dialog 2136 from prompt_toolkit.formatted_text import HTML 2137 # Check for prompt_toolkit >= 3.0.0 2138 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str] 2139 if _version_checker(prompt_toolkit, (3, 0)): 2140 call_ptk = lambda x: x.run() # type: ignore 2141 # 1 - Ask for layer or contrib 2142 btn_diag = button_dialog( 2143 title=six.text_type("Scapy v%s" % conf.version), 2144 text=HTML( 2145 six.text_type( 2146 '<style bg="white" fg="red">Chose the type of packets' 2147 ' you want to explore:</style>' 2148 ) 2149 ), 2150 buttons=[ 2151 (six.text_type("Layers"), "layers"), 2152 (six.text_type("Contribs"), "contribs"), 2153 (six.text_type("Cancel"), "cancel") 2154 ]) 2155 action = call_ptk(btn_diag) 2156 # 2 - Retrieve list of Packets 2157 if action == "layers": 2158 # Get all loaded layers 2159 lvalues = conf.layers.layers() 2160 # Restrict to layers-only (not contribs) + packet.py and asn1*.py 2161 values = [x for x in lvalues if ("layers" in x[0] or 2162 "packet" in x[0] or 2163 "asn1" in x[0])] 2164 elif action == "contribs": 2165 # Get all existing contribs 2166 from scapy.main import list_contrib 2167 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True)) 2168 values = [(x['name'], x['description']) 2169 for x in cvalues] 2170 # Remove very specific modules 2171 values = [x for x in values if "can" not in x[0]] 2172 else: 2173 # Escape/Cancel was pressed 2174 return 2175 # Python 2 compat 2176 if six.PY2: 2177 values = [(six.text_type(x), six.text_type(y)) 2178 for x, y in values] 2179 # Build tree 2180 if action == "contribs": 2181 # A tree is a dictionary. Each layer contains a keyword 2182 # _l which contains the files in the layer, and a _name 2183 # argument which is its name. The other keys are the subfolders, 2184 # which are similar dictionaries 2185 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501 2186 for name, desc in values: 2187 if "." in name: # Folder detected 2188 parts = name.split(".") 2189 subtree = tree 2190 for pa in parts[:-1]: 2191 if pa not in subtree: 2192 subtree[pa] = {} 2193 # one layer deeper 2194 subtree = subtree[pa] # type: ignore 2195 subtree["_name"] = pa # type: ignore 2196 if "_l" not in subtree: 2197 subtree["_l"] = [] 2198 subtree["_l"].append((parts[-1], desc)) # type: ignore 2199 else: 2200 tree["_l"].append((name, desc)) # type: ignore 2201 elif action == "layers": 2202 tree = {"_l": values} 2203 # 3 - Ask for the layer/contrib module to explore 2204 current = tree # type: Any 2205 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501 2206 while True: 2207 # Generate tests & form 2208 folders = list(current.keys()) 2209 _radio_values = [ 2210 ("$" + name, six.text_type('[+] ' + name.capitalize())) 2211 for name in folders if not name.startswith("_") 2212 ] + current.get("_l", []) # type: List[str] 2213 cur_path = "" 2214 if previous: 2215 cur_path = ".".join( 2216 itertools.chain( 2217 (x["_name"] for x in previous[1:]), # type: ignore 2218 (current["_name"],) 2219 ) 2220 ) 2221 extra_text = ( 2222 '\n<style bg="white" fg="green">> scapy.%s</style>' 2223 ) % (action + ("." + cur_path if cur_path else "")) 2224 # Show popup 2225 rd_diag = radiolist_dialog( 2226 values=_radio_values, 2227 title=six.text_type( 2228 "Scapy v%s" % conf.version 2229 ), 2230 text=HTML( 2231 six.text_type(( 2232 '<style bg="white" fg="red">Please select a file' 2233 'among the following, to see all layers contained in' 2234 ' it:</style>' 2235 ) + extra_text) 2236 ), 2237 cancel_text="Back" if previous else "Cancel" 2238 ) 2239 result = call_ptk(rd_diag) 2240 if result is None: 2241 # User pressed "Cancel/Back" 2242 if previous: # Back 2243 current = previous.pop() 2244 continue 2245 else: # Cancel 2246 return 2247 if result.startswith("$"): 2248 previous.append(current) 2249 current = current[result[1:]] 2250 else: 2251 # Enter on layer 2252 if previous: # In subfolder 2253 result = cur_path + "." + result 2254 break 2255 # 4 - (Contrib only): load contrib 2256 if action == "contribs": 2257 from scapy.main import load_contrib 2258 load_contrib(result) 2259 result = "scapy.contrib." + result 2260 else: # NON-GUI MODE 2261 # We handle layer as a short layer name, full layer name 2262 # or the module itself 2263 if isinstance(layer, types.ModuleType): 2264 layer = layer.__name__ 2265 if isinstance(layer, str): 2266 if layer.startswith("scapy.layers."): 2267 result = layer 2268 else: 2269 if layer.startswith("scapy.contrib."): 2270 layer = layer.replace("scapy.contrib.", "") 2271 from scapy.main import load_contrib 2272 load_contrib(layer) 2273 result_layer, result_contrib = (("scapy.layers.%s" % layer), 2274 ("scapy.contrib.%s" % layer)) 2275 if result_layer in conf.layers.ldict: 2276 result = result_layer 2277 elif result_contrib in conf.layers.ldict: 2278 result = result_contrib 2279 else: 2280 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 2281 else: 2282 warning("Wrong usage ! Check out help(explore)") 2283 return 2284 2285 # COMMON PART 2286 # Get the list of all Packets contained in that module 2287 try: 2288 all_layers = conf.layers.ldict[result] 2289 except KeyError: 2290 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 2291 # Print 2292 print(conf.color_theme.layer_name("Packets contained in %s:" % result)) 2293 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 2294 rtlst = [(lay.__name__ or "", lay._name or "") for lay in all_layers] 2295 print(pretty_list(rtlst, [("Class", "Name")], borders=True)) 2296 2297 2298def _pkt_ls(obj, # type: Union[Packet, Type[Packet]] 2299 verbose=False, # type: bool 2300 ): 2301 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501 2302 """Internal function used to resolve `fields_desc` to display it. 2303 2304 :param obj: a packet object or class 2305 :returns: a list containing tuples [(name, clsname, clsname_extras, 2306 default, long_attrs)] 2307 """ 2308 is_pkt = isinstance(obj, Packet) 2309 if not issubtype(obj, Packet) and not is_pkt: 2310 raise ValueError 2311 fields = [] 2312 for f in obj.fields_desc: 2313 cur_fld = f 2314 attrs = [] # type: List[str] 2315 long_attrs = [] # type: List[str] 2316 while isinstance(cur_fld, (Emph, ConditionalField)): 2317 if isinstance(cur_fld, ConditionalField): 2318 attrs.append(cur_fld.__class__.__name__[:4]) 2319 cur_fld = cur_fld.fld 2320 name = cur_fld.name 2321 default = cur_fld.default 2322 if verbose and isinstance(cur_fld, EnumField) \ 2323 and hasattr(cur_fld, "i2s"): 2324 if len(cur_fld.i2s or []) < 50: 2325 long_attrs.extend( 2326 "%s: %d" % (strval, numval) 2327 for numval, strval in 2328 sorted(six.iteritems(cur_fld.i2s)) 2329 ) 2330 elif isinstance(cur_fld, MultiEnumField): 2331 fld_depend = cur_fld.depends_on( 2332 cast(Packet, obj if is_pkt else obj()) 2333 ) 2334 attrs.append("Depends on %s" % fld_depend) 2335 if verbose: 2336 cur_i2s = cur_fld.i2s_multi.get( 2337 cur_fld.depends_on( 2338 cast(Packet, obj if is_pkt else obj()) 2339 ), {} 2340 ) 2341 if len(cur_i2s) < 50: 2342 long_attrs.extend( 2343 "%s: %d" % (strval, numval) 2344 for numval, strval in 2345 sorted(six.iteritems(cur_i2s)) 2346 ) 2347 elif verbose and isinstance(cur_fld, FlagsField): 2348 names = cur_fld.names 2349 long_attrs.append(", ".join(names)) 2350 elif isinstance(cur_fld, MultipleTypeField): 2351 default = cur_fld.dflt.default 2352 attrs.append(", ".join( 2353 x[0].__class__.__name__ for x in 2354 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)]) 2355 )) 2356 2357 cls = cur_fld.__class__ 2358 class_name_extras = "(%s)" % ( 2359 ", ".join(attrs) 2360 ) if attrs else "" 2361 if isinstance(cur_fld, BitField): 2362 class_name_extras += " (%d bit%s)" % ( 2363 cur_fld.size, 2364 "s" if cur_fld.size > 1 else "" 2365 ) 2366 fields.append( 2367 (name, 2368 cls, 2369 class_name_extras, 2370 repr(default), 2371 long_attrs) 2372 ) 2373 return fields 2374 2375 2376@conf.commands.register 2377def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]] 2378 case_sensitive=False, # type: bool 2379 verbose=False # type: bool 2380 ): 2381 # type: (...) -> None 2382 """List available layers, or infos on a given layer class or name. 2383 2384 :param obj: Packet / packet name to use 2385 :param case_sensitive: if obj is a string, is it case sensitive? 2386 :param verbose: 2387 """ 2388 is_string = isinstance(obj, str) 2389 2390 if obj is None or is_string: 2391 tip = False 2392 if obj is None: 2393 tip = True 2394 all_layers = sorted(conf.layers, key=lambda x: x.__name__) 2395 else: 2396 pattern = re.compile( 2397 cast(str, obj), 2398 0 if case_sensitive else re.I 2399 ) 2400 # We first order by accuracy, then length 2401 if case_sensitive: 2402 sorter = lambda x: (x.__name__.index(obj), len(x.__name__)) 2403 else: 2404 obj = obj.lower() 2405 sorter = lambda x: (x.__name__.lower().index(obj), 2406 len(x.__name__)) 2407 all_layers = sorted((layer for layer in conf.layers 2408 if (isinstance(layer.__name__, str) and 2409 pattern.search(layer.__name__)) or 2410 (isinstance(layer.name, str) and 2411 pattern.search(layer.name))), 2412 key=sorter) 2413 for layer in all_layers: 2414 print("%-10s : %s" % (layer.__name__, layer._name)) 2415 if tip and conf.interactive: 2416 print("\nTIP: You may use explore() to navigate through all " 2417 "layers using a clear GUI") 2418 else: 2419 try: 2420 fields = _pkt_ls( 2421 obj, # type: ignore 2422 verbose=verbose 2423 ) 2424 is_pkt = isinstance(obj, Packet) 2425 # Print 2426 for fname, cls, clsne, dflt, long_attrs in fields: 2427 clsinfo = cls.__name__ + " " + clsne 2428 print("%-10s : %-35s =" % (fname, clsinfo), end=' ') 2429 if is_pkt: 2430 print("%-15r" % (getattr(obj, fname),), end=' ') 2431 print("(%r)" % (dflt,)) 2432 for attr in long_attrs: 2433 print("%-15s%s" % ("", attr)) 2434 # Restart for payload if any 2435 if is_pkt: 2436 obj = cast(Packet, obj) 2437 if isinstance(obj.payload, NoPayload): 2438 return 2439 print("--") 2440 ls(obj.payload) 2441 except ValueError: 2442 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501 2443 2444 2445@conf.commands.register 2446def rfc(cls, ret=False, legend=True): 2447 # type: (Type[Packet], bool, bool) -> Optional[str] 2448 """ 2449 Generate an RFC-like representation of a packet def. 2450 2451 :param cls: the Packet class 2452 :param ret: return the result instead of printing (def. False) 2453 :param legend: show text under the diagram (default True) 2454 2455 Ex:: 2456 2457 >>> rfc(Ether) 2458 2459 """ 2460 if not issubclass(cls, Packet): 2461 raise TypeError("Packet class expected") 2462 cur_len = 0 2463 cur_line = [] 2464 lines = [] 2465 # Get the size (width) that a field will take 2466 # when formatted, from its length in bits 2467 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int] 2468 ident = 0 # Fields UUID 2469 # Generate packet groups 2470 for f in cls.fields_desc: 2471 flen = int(f.sz * 8) 2472 cur_len += flen 2473 ident += 1 2474 # Fancy field name 2475 fname = f.name.upper().replace("_", " ") 2476 # The field might exceed the current line or 2477 # take more than one line. Copy it as required 2478 while True: 2479 over = max(0, cur_len - 32) # Exceed 2480 len1 = clsize(flen - over) # What fits 2481 cur_line.append((fname[:len1], len1, ident)) 2482 if cur_len >= 32: 2483 # Current line is full. start a new line 2484 lines.append(cur_line) 2485 cur_len = flen = over 2486 fname = "" # do not repeat the field 2487 cur_line = [] 2488 if not over: 2489 # there is no data left 2490 break 2491 else: 2492 # End of the field 2493 break 2494 # Add the last line if un-finished 2495 if cur_line: 2496 lines.append(cur_line) 2497 # Calculate separations between lines 2498 seps = [] 2499 seps.append("+-" * 32 + "+\n") 2500 for i in range(len(lines) - 1): 2501 # Start with a full line 2502 sep = "+-" * 32 + "+\n" 2503 # Get the line above and below the current 2504 # separation 2505 above, below = lines[i], lines[i + 1] 2506 # The last field of above is shared with below 2507 if above[-1][2] == below[0][2]: 2508 # where the field in "above" starts 2509 pos_above = sum(x[1] for x in above[:-1]) 2510 # where the field in "below" ends 2511 pos_below = below[0][1] 2512 if pos_above < pos_below: 2513 # they are overlapping. 2514 # Now crop the space between those pos 2515 # and fill it with " " 2516 pos_above = pos_above + pos_above % 2 2517 sep = ( 2518 sep[:1 + pos_above] + 2519 " " * (pos_below - pos_above) + 2520 sep[1 + pos_below:] 2521 ) 2522 # line is complete 2523 seps.append(sep) 2524 # Graph 2525 result = "" 2526 # Bytes markers 2527 result += " " + (" " * 19).join( 2528 str(x) for x in range(4) 2529 ) + "\n" 2530 # Bits markers 2531 result += " " + " ".join( 2532 str(x % 10) for x in range(32) 2533 ) + "\n" 2534 # Add fields and their separations 2535 for line, sep in zip(lines, seps): 2536 result += sep 2537 for elt, flen, _ in line: 2538 result += "|" + elt.center(flen, " ") 2539 result += "|\n" 2540 result += "+-" * (cur_len or 32) + "+\n" 2541 # Annotate with the figure name 2542 if legend: 2543 result += "\n" + ("Fig. " + cls.__name__).center(66, " ") 2544 # return if asked for, else print 2545 if ret: 2546 return result 2547 print(result) 2548 return None 2549 2550 2551############# 2552# Fuzzing # 2553############# 2554 2555@conf.commands.register 2556def fuzz(p, # type: Packet 2557 _inplace=0, # type: int 2558 ): 2559 # type: (...) -> Packet 2560 """ 2561 Transform a layer into a fuzzy layer by replacing some default values 2562 by random objects. 2563 2564 :param p: the Packet instance to fuzz 2565 :return: the fuzzed packet. 2566 """ 2567 if not _inplace: 2568 p = p.copy() 2569 q = p 2570 while not isinstance(q, NoPayload): 2571 new_default_fields = {} 2572 multiple_type_fields = [] # type: List[str] 2573 for f in q.fields_desc: 2574 if isinstance(f, PacketListField): 2575 for r in getattr(q, f.name): 2576 fuzz(r, _inplace=1) 2577 elif isinstance(f, MultipleTypeField): 2578 # the type of the field will depend on others 2579 multiple_type_fields.append(f.name) 2580 elif f.default is not None: 2581 if not isinstance(f, ConditionalField) or f._evalcond(q): 2582 rnd = f.randval() 2583 if rnd is not None: 2584 new_default_fields[f.name] = rnd 2585 # Process packets with MultipleTypeFields 2586 if multiple_type_fields: 2587 # freeze the other random values 2588 new_default_fields = { 2589 key: (val._fix() if isinstance(val, VolatileValue) else val) 2590 for key, val in six.iteritems(new_default_fields) 2591 } 2592 q.default_fields.update(new_default_fields) 2593 # add the random values of the MultipleTypeFields 2594 for name in multiple_type_fields: 2595 fld = cast(MultipleTypeField, q.get_field(name)) 2596 rnd = fld._find_fld_pkt(q).randval() 2597 if rnd is not None: 2598 new_default_fields[name] = rnd 2599 q.default_fields.update(new_default_fields) 2600 q = q.payload 2601 return p 2602