1# This file is part of Scapy
2# See http://www.secdev.org/projects/scapy for more information
3# Copyright (C) Philippe Biondi <phil@secdev.org>
4# This program is published under a GPLv2 license
5
6"""
7Packet class
8
9Provides:
10 - the default Packet classes
11 - binding mechanisms
12 - fuzz() method
13 - exploration methods: explore() / ls()
14"""
15
16from __future__ import absolute_import
17from __future__ import print_function
18from collections import defaultdict
19import re
20import time
21import itertools
22import copy
23import types
24import warnings
25
26from scapy.fields import (
27    AnyField,
28    BitField,
29    ConditionalField,
30    Emph,
31    EnumField,
32    Field,
33    FlagsField,
34    MultiEnumField,
35    MultipleTypeField,
36    PacketListField,
37    RawVal,
38    StrField,
39)
40from scapy.config import conf, _version_checker
41from scapy.compat import raw, orb, bytes_encode
42from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \
43    _CanvasDumpExtended
44from scapy.interfaces import _GlobInterfaceType
45from scapy.volatile import RandField, VolatileValue
46from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \
47    pretty_list, EDecimal
48from scapy.error import Scapy_Exception, log_runtime, warning
49from scapy.extlib import PYX
50import scapy.modules.six as six
51
52# Typing imports
53from scapy.compat import (
54    Any,
55    Callable,
56    Dict,
57    Iterator,
58    List,
59    NoReturn,
60    Optional,
61    Set,
62    Tuple,
63    Type,
64    TypeVar,
65    Union,
66    Sequence,
67    cast,
68)
69try:
70    import pyx
71except ImportError:
72    pass
73
74
75_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]])
76
77
78# six.with_metaclass typing is glitchy
79class Packet(six.with_metaclass(Packet_metaclass,  # type: ignore
80             BasePacket, _CanvasDumpExtended)):
81    __slots__ = [
82        "time", "sent_time", "name",
83        "default_fields", "fields", "fieldtype",
84        "overload_fields", "overloaded_fields",
85        "packetfields",
86        "original", "explicit", "raw_packet_cache",
87        "raw_packet_cache_fields", "_pkt", "post_transforms",
88        # then payload and underlayer
89        "payload", "underlayer",
90        "name",
91        # used for sr()
92        "_answered",
93        # used when sniffing
94        "direction", "sniffed_on",
95        # handle snaplen Vs real length
96        "wirelen",
97    ]
98    name = None
99    fields_desc = []  # type: Sequence[AnyField]
100    deprecated_fields = {}  # type: Dict[str, Tuple[str, str]]
101    overload_fields = {}  # type: Dict[Type[Packet], Dict[str, Any]]
102    payload_guess = []  # type: List[Tuple[Dict[str, Any], Type[Packet]]]
103    show_indent = 1
104    show_summary = True
105    match_subclass = False
106    class_dont_cache = {}  # type: Dict[Type[Packet], bool]
107    class_packetfields = {}  # type: Dict[Type[Packet], Any]
108    class_default_fields = {}  # type: Dict[Type[Packet], Dict[str, Any]]
109    class_default_fields_ref = {}  # type: Dict[Type[Packet], List[str]]
110    class_fieldtype = {}  # type: Dict[Type[Packet], Dict[str, AnyField]]  # noqa: E501
111
112    @classmethod
113    def from_hexcap(cls):
114        # type: (Type[Packet]) -> Packet
115        return cls(import_hexcap())
116
117    @classmethod
118    def upper_bonds(self):
119        # type: () -> None
120        for fval, upper in self.payload_guess:
121            print("%-20s  %s" % (upper.__name__, ", ".join("%-12s" % ("%s=%r" % i) for i in six.iteritems(fval))))  # noqa: E501
122
123    @classmethod
124    def lower_bonds(self):
125        # type: () -> None
126        for lower, fval in six.iteritems(self._overload_fields):
127            print("%-20s  %s" % (lower.__name__, ", ".join("%-12s" % ("%s=%r" % i) for i in six.iteritems(fval))))  # noqa: E501
128
129    def __init__(self,
130                 _pkt=b"",  # type: bytes
131                 post_transform=None,  # type: Any
132                 _internal=0,  # type: int
133                 _underlayer=None,  # type: Optional[Packet]
134                 **fields  # type: Any
135                 ):
136        # type: (...) -> None
137        self.time = time.time()  # type: Union[EDecimal, float]
138        self.sent_time = None  # type: Union[EDecimal, float, None]
139        self.name = (self.__class__.__name__
140                     if self._name is None else
141                     self._name)
142        self.default_fields = {}  # type: Dict[str, Any]
143        self.overload_fields = self._overload_fields
144        self.overloaded_fields = {}  # type: Dict[str, Any]
145        self.fields = {}  # type: Dict[str, Any]
146        self.fieldtype = {}  # type: Dict[str, AnyField]
147        self.packetfields = []  # type: List[AnyField]
148        self.payload = NoPayload()
149        self.init_fields()
150        self.underlayer = _underlayer
151        self.original = _pkt
152        self.explicit = 0
153        self.raw_packet_cache = None  # type: Optional[bytes]
154        self.raw_packet_cache_fields = None  # type: Optional[Dict[str, Any]]  # noqa: E501
155        self.wirelen = None  # type: Optional[int]
156        self.direction = None  # type: Optional[int]
157        self.sniffed_on = None  # type: Optional[_GlobInterfaceType]
158        if _pkt:
159            self.dissect(_pkt)
160            if not _internal:
161                self.dissection_done(self)
162        # We use this strange initialization so that the fields
163        # are initialized in their declaration order.
164        # It is required to always support MultipleTypeField
165        for field in self.fields_desc:
166            fname = field.name
167            try:
168                value = fields.pop(fname)
169            except KeyError:
170                continue
171            self.fields[fname] = self.get_field(fname).any2i(self, value)
172        # The remaining fields are unknown
173        for fname in fields:
174            if fname in self.deprecated_fields:
175                # Resolve deprecated fields
176                value = fields[fname]
177                fname = self._resolve_alias(fname)
178                self.fields[fname] = self.get_field(fname).any2i(self, value)
179                continue
180            raise AttributeError(fname)
181        if isinstance(post_transform, list):
182            self.post_transforms = post_transform
183        elif post_transform is None:
184            self.post_transforms = []
185        else:
186            self.post_transforms = [post_transform]
187
188    _PickleType = Tuple[
189        Union[EDecimal, float],
190        Optional[Union[EDecimal, float, None]],
191        Optional[int],
192        Optional[_GlobInterfaceType],
193        Optional[int]
194    ]
195
196    def __reduce__(self):
197        # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType]
198        """Used by pickling methods"""
199        return (self.__class__, (self.build(),), (
200            self.time,
201            self.sent_time,
202            self.direction,
203            self.sniffed_on,
204            self.wirelen,
205        ))
206
207    def __setstate__(self, state):
208        # type: (Packet._PickleType) -> Packet
209        """Rebuild state using pickable methods"""
210        self.time = state[0]
211        self.sent_time = state[1]
212        self.direction = state[2]
213        self.sniffed_on = state[3]
214        self.wirelen = state[4]
215        return self
216
217    def __deepcopy__(self,
218                     memo,  # type: Any
219                     ):
220        # type: (...) -> Packet
221        """Used by copy.deepcopy"""
222        return self.copy()
223
224    def init_fields(self):
225        # type: () -> None
226        """
227        Initialize each fields of the fields_desc dict
228        """
229
230        if self.class_dont_cache.get(self.__class__, False):
231            self.do_init_fields(self.fields_desc)
232        else:
233            self.do_init_cached_fields()
234
235    def do_init_fields(self,
236                       flist,  # type: Sequence[AnyField]
237                       ):
238        # type: (...) -> None
239        """
240        Initialize each fields of the fields_desc dict
241        """
242        default_fields = {}
243        for f in flist:
244            default_fields[f.name] = copy.deepcopy(f.default)
245            self.fieldtype[f.name] = f
246            if f.holds_packets:
247                self.packetfields.append(f)
248        # We set default_fields last to avoid race issues
249        self.default_fields = default_fields
250
251    def do_init_cached_fields(self):
252        # type: () -> None
253        """
254        Initialize each fields of the fields_desc dict, or use the cached
255        fields information
256        """
257
258        cls_name = self.__class__
259
260        # Build the fields information
261        if Packet.class_default_fields.get(cls_name, None) is None:
262            self.prepare_cached_fields(self.fields_desc)
263
264        # Use fields information from cache
265        default_fields = Packet.class_default_fields.get(cls_name, None)
266        if default_fields:
267            self.default_fields = default_fields
268            self.fieldtype = Packet.class_fieldtype[cls_name]
269            self.packetfields = Packet.class_packetfields[cls_name]
270
271            # Deepcopy default references
272            for fname in Packet.class_default_fields_ref[cls_name]:
273                value = self.default_fields[fname]
274                try:
275                    self.fields[fname] = value.copy()
276                except AttributeError:
277                    # Python 2.7 - list only
278                    self.fields[fname] = value[:]
279
280    def prepare_cached_fields(self, flist):
281        # type: (Sequence[AnyField]) -> None
282        """
283        Prepare the cached fields of the fields_desc dict
284        """
285
286        cls_name = self.__class__
287
288        # Fields cache initialization
289        if not flist:
290            return
291
292        class_default_fields = dict()
293        class_default_fields_ref = list()
294        class_fieldtype = dict()
295        class_packetfields = list()
296
297        # Fields initialization
298        for f in flist:
299            if isinstance(f, MultipleTypeField):
300                # Abort
301                self.class_dont_cache[cls_name] = True
302                self.do_init_fields(self.fields_desc)
303                return
304
305            tmp_copy = copy.deepcopy(f.default)
306            class_default_fields[f.name] = tmp_copy
307            class_fieldtype[f.name] = f
308            if f.holds_packets:
309                class_packetfields.append(f)
310
311            # Remember references
312            if isinstance(f.default, (list, dict, set, RandField, Packet)):
313                class_default_fields_ref.append(f.name)
314
315        # Apply
316        Packet.class_default_fields_ref[cls_name] = class_default_fields_ref
317        Packet.class_fieldtype[cls_name] = class_fieldtype
318        Packet.class_packetfields[cls_name] = class_packetfields
319        # Last to avoid racing issues
320        Packet.class_default_fields[cls_name] = class_default_fields
321
322    def dissection_done(self, pkt):
323        # type: (Packet) -> None
324        """DEV: will be called after a dissection is completed"""
325        self.post_dissection(pkt)
326        self.payload.dissection_done(pkt)
327
328    def post_dissection(self, pkt):
329        # type: (Packet) -> None
330        """DEV: is called after the dissection of the whole packet"""
331        pass
332
333    def get_field(self, fld):
334        # type: (str) -> AnyField
335        """DEV: returns the field instance from the name of the field"""
336        return self.fieldtype[fld]
337
338    def add_payload(self, payload):
339        # type: (Union[Packet, bytes]) -> None
340        if payload is None:
341            return
342        elif not isinstance(self.payload, NoPayload):
343            self.payload.add_payload(payload)
344        else:
345            if isinstance(payload, Packet):
346                self.payload = payload
347                payload.add_underlayer(self)
348                for t in self.aliastypes:
349                    if t in payload.overload_fields:
350                        self.overloaded_fields = payload.overload_fields[t]
351                        break
352            elif isinstance(payload, (bytes, str, bytearray, memoryview)):
353                self.payload = conf.raw_layer(load=bytes_encode(payload))
354            else:
355                raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload))  # noqa: E501
356
357    def remove_payload(self):
358        # type: () -> None
359        self.payload.remove_underlayer(self)
360        self.payload = NoPayload()
361        self.overloaded_fields = {}
362
363    def add_underlayer(self, underlayer):
364        # type: (Packet) -> None
365        self.underlayer = underlayer
366
367    def remove_underlayer(self, other):
368        # type: (Packet) -> None
369        self.underlayer = None
370
371    def copy(self):
372        # type: () -> Packet
373        """Returns a deep copy of the instance."""
374        clone = self.__class__()
375        clone.fields = self.copy_fields_dict(self.fields)
376        clone.default_fields = self.copy_fields_dict(self.default_fields)
377        clone.overloaded_fields = self.overloaded_fields.copy()
378        clone.underlayer = self.underlayer
379        clone.explicit = self.explicit
380        clone.raw_packet_cache = self.raw_packet_cache
381        clone.raw_packet_cache_fields = self.copy_fields_dict(
382            self.raw_packet_cache_fields
383        )
384        clone.wirelen = self.wirelen
385        clone.post_transforms = self.post_transforms[:]
386        clone.payload = self.payload.copy()
387        clone.payload.add_underlayer(clone)
388        clone.time = self.time
389        return clone
390
391    def _resolve_alias(self, attr):
392        # type: (str) -> str
393        new_attr, version = self.deprecated_fields[attr]
394        warnings.warn(
395            "%s has been deprecated in favor of %s since %s !" % (
396                attr, new_attr, version
397            ), DeprecationWarning
398        )
399        return new_attr
400
401    def getfieldval(self, attr):
402        # type: (str) -> Any
403        if self.deprecated_fields and attr in self.deprecated_fields:
404            attr = self._resolve_alias(attr)
405        if attr in self.fields:
406            return self.fields[attr]
407        if attr in self.overloaded_fields:
408            return self.overloaded_fields[attr]
409        if attr in self.default_fields:
410            return self.default_fields[attr]
411        return self.payload.getfieldval(attr)
412
413    def getfield_and_val(self, attr):
414        # type: (str) -> Tuple[AnyField, Any]
415        if self.deprecated_fields and attr in self.deprecated_fields:
416            attr = self._resolve_alias(attr)
417        if attr in self.fields:
418            return self.get_field(attr), self.fields[attr]
419        if attr in self.overloaded_fields:
420            return self.get_field(attr), self.overloaded_fields[attr]
421        if attr in self.default_fields:
422            return self.get_field(attr), self.default_fields[attr]
423        raise ValueError
424
425    def __getattr__(self, attr):
426        # type: (str) -> Any
427        try:
428            fld, v = self.getfield_and_val(attr)
429        except ValueError:
430            return self.payload.__getattr__(attr)
431        if fld is not None:
432            return fld.i2h(self, v)
433        return v
434
435    def setfieldval(self, attr, val):
436        # type: (str, Any) -> None
437        if self.deprecated_fields and attr in self.deprecated_fields:
438            attr = self._resolve_alias(attr)
439        if attr in self.default_fields:
440            fld = self.get_field(attr)
441            if fld is None:
442                any2i = lambda x, y: y  # type: Callable[..., Any]
443            else:
444                any2i = fld.any2i
445            self.fields[attr] = any2i(self, val)
446            self.explicit = 0
447            self.raw_packet_cache = None
448            self.raw_packet_cache_fields = None
449            self.wirelen = None
450        elif attr == "payload":
451            self.remove_payload()
452            self.add_payload(val)
453        else:
454            self.payload.setfieldval(attr, val)
455
456    def __setattr__(self, attr, val):
457        # type: (str, Any) -> None
458        if attr in self.__all_slots__:
459            if attr == "sent_time":
460                self.update_sent_time(val)
461            return object.__setattr__(self, attr, val)
462        try:
463            return self.setfieldval(attr, val)
464        except AttributeError:
465            pass
466        return object.__setattr__(self, attr, val)
467
468    def delfieldval(self, attr):
469        # type: (str) -> None
470        if attr in self.fields:
471            del(self.fields[attr])
472            self.explicit = 0  # in case a default value must be explicit
473            self.raw_packet_cache = None
474            self.raw_packet_cache_fields = None
475            self.wirelen = None
476        elif attr in self.default_fields:
477            pass
478        elif attr == "payload":
479            self.remove_payload()
480        else:
481            self.payload.delfieldval(attr)
482
483    def __delattr__(self, attr):
484        # type: (str) -> None
485        if attr == "payload":
486            return self.remove_payload()
487        if attr in self.__all_slots__:
488            return object.__delattr__(self, attr)
489        try:
490            return self.delfieldval(attr)
491        except AttributeError:
492            pass
493        return object.__delattr__(self, attr)
494
495    def _superdir(self):
496        # type: () -> Set[str]
497        """
498        Return a list of slots and methods, including those from subclasses.
499        """
500        attrs = set()
501        cls = self.__class__
502        if hasattr(cls, '__all_slots__'):
503            attrs.update(cls.__all_slots__)
504        for bcls in cls.__mro__:
505            if hasattr(bcls, '__dict__'):
506                attrs.update(bcls.__dict__)
507        return attrs
508
509    def __dir__(self):
510        # type: () -> List[str]
511        """
512        Add fields to tab completion list.
513        """
514        return sorted(itertools.chain(self._superdir(), self.default_fields))
515
516    def __repr__(self):
517        # type: () -> str
518        s = ""
519        ct = conf.color_theme
520        for f in self.fields_desc:
521            if isinstance(f, ConditionalField) and not f._evalcond(self):
522                continue
523            if f.name in self.fields:
524                fval = self.fields[f.name]
525                if isinstance(fval, (list, dict, set)) and len(fval) == 0:
526                    continue
527                val = f.i2repr(self, fval)
528            elif f.name in self.overloaded_fields:
529                fover = self.overloaded_fields[f.name]
530                if isinstance(fover, (list, dict, set)) and len(fover) == 0:
531                    continue
532                val = f.i2repr(self, fover)
533            else:
534                continue
535            if isinstance(f, Emph) or f in conf.emph:
536                ncol = ct.emph_field_name
537                vcol = ct.emph_field_value
538            else:
539                ncol = ct.field_name
540                vcol = ct.field_value
541
542            s += " %s%s%s" % (ncol(f.name),
543                              ct.punct("="),
544                              vcol(val))
545        return "%s%s %s %s%s%s" % (ct.punct("<"),
546                                   ct.layer_name(self.__class__.__name__),
547                                   s,
548                                   ct.punct("|"),
549                                   repr(self.payload),
550                                   ct.punct(">"))
551
552    if six.PY2:
553        def __str__(self):
554            # type: () -> str
555            return self.build()
556    else:
557        def __str__(self):
558            # type: () -> str
559            warning("Calling str(pkt) on Python 3 makes no sense!")
560            return str(self.build())
561
562    def __bytes__(self):
563        # type: () -> bytes
564        return self.build()
565
566    def __div__(self, other):
567        # type: (Any) -> Packet
568        if isinstance(other, Packet):
569            cloneA = self.copy()
570            cloneB = other.copy()
571            cloneA.add_payload(cloneB)
572            return cloneA
573        elif isinstance(other, (bytes, str, bytearray, memoryview)):
574            return self / conf.raw_layer(load=bytes_encode(other))
575        else:
576            return other.__rdiv__(self)  # type: ignore
577    __truediv__ = __div__
578
579    def __rdiv__(self, other):
580        # type: (Any) -> Packet
581        if isinstance(other, (bytes, str, bytearray, memoryview)):
582            return conf.raw_layer(load=bytes_encode(other)) / self
583        else:
584            raise TypeError
585    __rtruediv__ = __rdiv__
586
587    def __mul__(self, other):
588        # type: (Any) -> List[Packet]
589        if isinstance(other, int):
590            return [self] * other
591        else:
592            raise TypeError
593
594    def __rmul__(self, other):
595        # type: (Any) -> List[Packet]
596        return self.__mul__(other)
597
598    def __nonzero__(self):
599        # type: () -> bool
600        return True
601    __bool__ = __nonzero__
602
603    def __len__(self):
604        # type: () -> int
605        return len(self.__bytes__())
606
607    def copy_field_value(self, fieldname, value):
608        # type: (str, Any) -> Any
609        return self.get_field(fieldname).do_copy(value)
610
611    def copy_fields_dict(self, fields):
612        # type: (_T) -> _T
613        if fields is None:
614            return None
615        return {fname: self.copy_field_value(fname, fval)
616                for fname, fval in six.iteritems(fields)}
617
618    def clear_cache(self):
619        # type: () -> None
620        """Clear the raw packet cache for the field and all its subfields"""
621        self.raw_packet_cache = None
622        for fld, fval in six.iteritems(self.fields):
623            fld = self.get_field(fld)
624            if fld.holds_packets:
625                if isinstance(fval, Packet):
626                    fval.clear_cache()
627                elif isinstance(fval, list):
628                    for fsubval in fval:
629                        fsubval.clear_cache()
630        self.payload.clear_cache()
631
632    def self_build(self):
633        # type: () -> bytes
634        """
635        Create the default layer regarding fields_desc dict
636
637        :param field_pos_list:
638        """
639        if self.raw_packet_cache is not None:
640            for fname, fval in six.iteritems(self.raw_packet_cache_fields):
641                if self.getfieldval(fname) != fval:
642                    self.raw_packet_cache = None
643                    self.raw_packet_cache_fields = None
644                    self.wirelen = None
645                    break
646            if self.raw_packet_cache is not None:
647                return self.raw_packet_cache
648        p = b""
649        for f in self.fields_desc:
650            val = self.getfieldval(f.name)
651            if isinstance(val, RawVal):
652                p += bytes(val)
653            else:
654                p = f.addfield(self, p, val)
655        return p
656
657    def do_build_payload(self):
658        # type: () -> bytes
659        """
660        Create the default version of the payload layer
661
662        :return: a string of payload layer
663        """
664        return self.payload.do_build()
665
666    def do_build(self):
667        # type: () -> bytes
668        """
669        Create the default version of the layer
670
671        :return: a string of the packet with the payload
672        """
673        if not self.explicit:
674            self = next(iter(self))
675        pkt = self.self_build()
676        for t in self.post_transforms:
677            pkt = t(pkt)
678        pay = self.do_build_payload()
679        if self.raw_packet_cache is None:
680            return self.post_build(pkt, pay)
681        else:
682            return pkt + pay
683
684    def build_padding(self):
685        # type: () -> bytes
686        return self.payload.build_padding()
687
688    def build(self):
689        # type: () -> bytes
690        """
691        Create the current layer
692
693        :return: string of the packet with the payload
694        """
695        p = self.do_build()
696        p += self.build_padding()
697        p = self.build_done(p)
698        return p
699
700    def post_build(self, pkt, pay):
701        # type: (bytes, bytes) -> bytes
702        """
703        DEV: called right after the current layer is build.
704
705        :param str pkt: the current packet (build by self_buil function)
706        :param str pay: the packet payload (build by do_build_payload function)
707        :return: a string of the packet with the payload
708        """
709        return pkt + pay
710
711    def build_done(self, p):
712        # type: (bytes) -> bytes
713        return self.payload.build_done(p)
714
715    def do_build_ps(self):
716        # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]]  # noqa: E501
717        p = b""
718        pl = []
719        q = b""
720        for f in self.fields_desc:
721            if isinstance(f, ConditionalField) and not f._evalcond(self):
722                continue
723            p = f.addfield(self, p, self.getfieldval(f.name))
724            if isinstance(p, bytes):
725                r = p[len(q):]
726                q = p
727            else:
728                r = b""
729            pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r))
730
731        pkt, lst = self.payload.build_ps(internal=1)
732        p += pkt
733        lst.append((self, pl))
734
735        return p, lst
736
737    def build_ps(self, internal=0):
738        # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]]  # noqa: E501
739        p, lst = self.do_build_ps()
740#        if not internal:
741#            pkt = self
742#            while pkt.haslayer(conf.padding_layer):
743#                pkt = pkt.getlayer(conf.padding_layer)
744#                lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
745#                p += pkt.load
746#                pkt = pkt.payload
747        return p, lst
748
749    def canvas_dump(self, layer_shift=0, rebuild=1):
750        # type: (int, int) -> pyx.canvas.canvas
751        if PYX == 0:
752            raise ImportError("PyX and its dependencies must be installed")
753        canvas = pyx.canvas.canvas()
754        if rebuild:
755            _, t = self.__class__(raw(self)).build_ps()
756        else:
757            _, t = self.build_ps()
758        YTXTI = len(t)
759        for _, l in t:
760            YTXTI += len(l)
761        YTXT = float(YTXTI)
762        YDUMP = YTXT
763
764        XSTART = 1
765        XDSTART = 10
766        y = 0.0
767        yd = 0.0
768        XMUL = 0.55
769        YMUL = 0.4
770
771        backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
772        forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
773#        backcolor=makecol(0.376, 0.729, 0.525, 1.0)
774
775        def hexstr(x):
776            # type: (bytes) -> str
777            return " ".join("%02x" % orb(c) for c in x)
778
779        def make_dump_txt(x, y, txt):
780            # type: (int, float, bytes) -> pyx.text.text
781            return pyx.text.text(
782                XDSTART + x * XMUL,
783                (YDUMP - y) * YMUL,
784                r"\tt{%s}" % hexstr(txt),
785                [pyx.text.size.Large]
786            )
787
788        def make_box(o):
789            # type: (pyx.bbox.bbox) -> pyx.bbox.bbox
790            return pyx.box.rect(
791                o.left(), o.bottom(), o.width(), o.height(),
792                relcenter=(0.5, 0.5)
793            )
794
795        def make_frame(lst):
796            # type: (List[Any]) -> pyx.path.path
797            if len(lst) == 1:
798                b = lst[0].bbox()
799                b.enlarge(pyx.unit.u_pt)
800                return b.path()
801            else:
802                fb = lst[0].bbox()
803                fb.enlarge(pyx.unit.u_pt)
804                lb = lst[-1].bbox()
805                lb.enlarge(pyx.unit.u_pt)
806                if len(lst) == 2 and fb.left() > lb.right():
807                    return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
808                                         pyx.path.lineto(fb.left(), fb.top()),
809                                         pyx.path.lineto(fb.left(), fb.bottom()),  # noqa: E501
810                                         pyx.path.lineto(fb.right(), fb.bottom()),  # noqa: E501
811                                         pyx.path.moveto(lb.left(), lb.top()),
812                                         pyx.path.lineto(lb.right(), lb.top()),
813                                         pyx.path.lineto(lb.right(), lb.bottom()),  # noqa: E501
814                                         pyx.path.lineto(lb.left(), lb.bottom()))  # noqa: E501
815                else:
816                    # XXX
817                    gb = lst[1].bbox()
818                    if gb != lb:
819                        gb.enlarge(pyx.unit.u_pt)
820                    kb = lst[-2].bbox()
821                    if kb != gb and kb != lb:
822                        kb.enlarge(pyx.unit.u_pt)
823                    return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
824                                         pyx.path.lineto(fb.right(), fb.top()),
825                                         pyx.path.lineto(fb.right(), kb.bottom()),  # noqa: E501
826                                         pyx.path.lineto(lb.right(), kb.bottom()),  # noqa: E501
827                                         pyx.path.lineto(lb.right(), lb.bottom()),  # noqa: E501
828                                         pyx.path.lineto(lb.left(), lb.bottom()),  # noqa: E501
829                                         pyx.path.lineto(lb.left(), gb.top()),
830                                         pyx.path.lineto(fb.left(), gb.top()),
831                                         pyx.path.closepath(),)
832
833        def make_dump(s,   # type: bytes
834                      shift=0,  # type: int
835                      y=0.,  # type: float
836                      col=None,  # type: pyx.color.color
837                      bkcol=None,  # type: pyx.color.color
838                      large=16  # type: int
839                      ):
840            # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float]  # noqa: E501
841            c = pyx.canvas.canvas()
842            tlist = []
843            while s:
844                dmp, s = s[:large - shift], s[large - shift:]
845                txt = make_dump_txt(shift, y, dmp)
846                tlist.append(txt)
847                shift += len(dmp)
848                if shift >= 16:
849                    shift = 0
850                    y += 1
851            if col is None:
852                col = pyx.color.rgb.red
853            if bkcol is None:
854                bkcol = pyx.color.rgb.white
855            c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick])  # noqa: E501
856            for txt in tlist:
857                c.insert(txt)
858            return c, tlist[-1].bbox(), shift, y
859
860        last_shift, last_y = 0, 0.0
861        while t:
862            bkcol = next(backcolor)
863            proto, fields = t.pop()
864            y += 0.5
865            pt = pyx.text.text(
866                XSTART,
867                (YTXT - y) * YMUL,
868                r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(
869                    str(proto.name)
870                ),
871                [pyx.text.size.Large]
872            )
873            y += 1
874            ptbb = pt.bbox()
875            ptbb.enlarge(pyx.unit.u_pt * 2)
876            canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])])  # noqa: E501
877            canvas.insert(pt)
878            for field, fval, fdump in fields:
879                col = next(forecolor)
880                ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name))  # noqa: E501
881                if isinstance(field, BitField):
882                    fsize = '%sb' % field.size
883                else:
884                    fsize = '%sB' % len(fdump)
885                if (hasattr(field, 'field') and
886                        'LE' in field.field.__class__.__name__[:3] or
887                        'LE' in field.__class__.__name__[:3]):
888                    fsize = r'$\scriptstyle\langle$' + fsize
889                st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright])  # noqa: E501
890                if isinstance(fval, str):
891                    if len(fval) > 18:
892                        fval = fval[:18] + "[...]"
893                else:
894                    fval = ""
895                vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval))  # noqa: E501
896                y += 1.0
897                if fdump:
898                    dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol)  # noqa: E501
899
900                    dtb = target
901                    vtb = vt.bbox()
902                    bxvt = make_box(vtb)
903                    bxdt = make_box(dtb)
904                    dtb.enlarge(pyx.unit.u_pt)
905                    try:
906                        if yd < 0:
907                            cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90)  # noqa: E501
908                        else:
909                            cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90)  # noqa: E501
910                    except Exception:
911                        pass
912                    else:
913                        canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col])  # noqa: E501
914
915                    canvas.insert(dt)
916
917                canvas.insert(ft)
918                canvas.insert(st)
919                canvas.insert(vt)
920            last_y += layer_shift
921
922        return canvas
923
924    def extract_padding(self, s):
925        # type: (bytes) -> Tuple[bytes, Optional[bytes]]
926        """
927        DEV: to be overloaded to extract current layer's padding.
928
929        :param str s: the current layer
930        :return: a couple of strings (actual layer, padding)
931        """
932        return s, None
933
934    def post_dissect(self, s):
935        # type: (bytes) -> bytes
936        """DEV: is called right after the current layer has been dissected"""
937        return s
938
939    def pre_dissect(self, s):
940        # type: (bytes) -> bytes
941        """DEV: is called right before the current layer is dissected"""
942        return s
943
944    def do_dissect(self, s):
945        # type: (bytes) -> bytes
946        _raw = s
947        self.raw_packet_cache_fields = {}
948        for f in self.fields_desc:
949            if not s:
950                break
951            s, fval = f.getfield(self, s)
952            # Skip unused ConditionalField
953            if isinstance(f, ConditionalField) and fval is None:
954                continue
955            # We need to track fields with mutable values to discard
956            # .raw_packet_cache when needed.
957            if f.islist or f.holds_packets or f.ismutable:
958                self.raw_packet_cache_fields[f.name] = f.do_copy(fval)
959            self.fields[f.name] = fval
960        self.raw_packet_cache = _raw[:-len(s)] if s else _raw
961        self.explicit = 1
962        return s
963
964    def do_dissect_payload(self, s):
965        # type: (bytes) -> None
966        """
967        Perform the dissection of the layer's payload
968
969        :param str s: the raw layer
970        """
971        if s:
972            cls = self.guess_payload_class(s)
973            try:
974                p = cls(s, _internal=1, _underlayer=self)
975            except KeyboardInterrupt:
976                raise
977            except Exception:
978                if conf.debug_dissector:
979                    if issubtype(cls, Packet):
980                        log_runtime.error("%s dissector failed", cls.__name__)
981                    else:
982                        log_runtime.error("%s.guess_payload_class() returned "
983                                          "[%s]",
984                                          self.__class__.__name__, repr(cls))
985                    if cls is not None:
986                        raise
987                p = conf.raw_layer(s, _internal=1, _underlayer=self)
988            self.add_payload(p)
989
990    def dissect(self, s):
991        # type: (bytes) -> None
992        s = self.pre_dissect(s)
993
994        s = self.do_dissect(s)
995
996        s = self.post_dissect(s)
997
998        payl, pad = self.extract_padding(s)
999        self.do_dissect_payload(payl)
1000        if pad and conf.padding:
1001            self.add_payload(conf.padding_layer(pad))
1002
1003    def guess_payload_class(self, payload):
1004        # type: (bytes) -> Type[Packet]
1005        """
1006        DEV: Guesses the next payload class from layer bonds.
1007        Can be overloaded to use a different mechanism.
1008
1009        :param str payload: the layer's payload
1010        :return: the payload class
1011        """
1012        for t in self.aliastypes:
1013            for fval, cls in t.payload_guess:
1014                try:
1015                    if all(v == self.getfieldval(k)
1016                           for k, v in six.iteritems(fval)):
1017                        return cls  # type: ignore
1018                except AttributeError:
1019                    pass
1020        return self.default_payload_class(payload)
1021
1022    def default_payload_class(self, payload):
1023        # type: (bytes) -> Type[Packet]
1024        """
1025        DEV: Returns the default payload class if nothing has been found by the
1026        guess_payload_class() method.
1027
1028        :param str payload: the layer's payload
1029        :return: the default payload class define inside the configuration file
1030        """
1031        return conf.raw_layer
1032
1033    def hide_defaults(self):
1034        # type: () -> None
1035        """Removes fields' values that are the same as default values."""
1036        # use list(): self.fields is modified in the loop
1037        for k, v in list(six.iteritems(self.fields)):
1038            v = self.fields[k]
1039            if k in self.default_fields:
1040                if self.default_fields[k] == v:
1041                    del self.fields[k]
1042        self.payload.hide_defaults()
1043
1044    def update_sent_time(self, time):
1045        # type: (Optional[float]) -> None
1046        """Use by clone_with to share the sent_time value"""
1047        pass
1048
1049    def clone_with(self, payload=None, share_time=False, **kargs):
1050        # type: (Optional[Any], bool, **Any) -> Any
1051        pkt = self.__class__()
1052        pkt.explicit = 1
1053        pkt.fields = kargs
1054        pkt.default_fields = self.copy_fields_dict(self.default_fields)
1055        pkt.overloaded_fields = self.overloaded_fields.copy()
1056        pkt.time = self.time
1057        pkt.underlayer = self.underlayer
1058        pkt.post_transforms = self.post_transforms
1059        pkt.raw_packet_cache = self.raw_packet_cache
1060        pkt.raw_packet_cache_fields = self.copy_fields_dict(
1061            self.raw_packet_cache_fields
1062        )
1063        pkt.wirelen = self.wirelen
1064        if payload is not None:
1065            pkt.add_payload(payload)
1066        if share_time:
1067            # This binds the subpacket .sent_time to this layer
1068            def _up_time(x, parent=self):
1069                # type: (float, Packet) -> None
1070                parent.sent_time = x
1071            pkt.update_sent_time = _up_time  # type: ignore
1072        return pkt
1073
1074    def __iter__(self):
1075        # type: () -> Iterator[Packet]
1076        """Iterates through all sub-packets generated by this Packet."""
1077        # We use __iterlen__ as low as possible, to lower processing time
1078        def loop(todo, done, self=self):
1079            # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet]
1080            if todo:
1081                eltname = todo.pop()
1082                elt = self.getfieldval(eltname)
1083                if not isinstance(elt, Gen):
1084                    if self.get_field(eltname).islist:
1085                        elt = SetGen([elt])
1086                    else:
1087                        elt = SetGen(elt)
1088                for e in elt:
1089                    done[eltname] = e
1090                    for x in loop(todo[:], done):
1091                        yield x
1092            else:
1093                if isinstance(self.payload, NoPayload):
1094                    payloads = SetGen([None])  # type: SetGen[Packet]
1095                else:
1096                    payloads = self.payload
1097                share_time = False
1098                if self.fields == done and payloads.__iterlen__() == 1:
1099                    # In this case, the packets are identical. Let's bind
1100                    # their sent_time attribute for sending purpose
1101                    share_time = True
1102                for payl in payloads:
1103                    # Let's make sure subpackets are consistent
1104                    done2 = done.copy()
1105                    for k in done2:
1106                        if isinstance(done2[k], VolatileValue):
1107                            done2[k] = done2[k]._fix()
1108                    pkt = self.clone_with(payload=payl, share_time=share_time,
1109                                          **done2)
1110                    yield pkt
1111
1112        if self.explicit or self.raw_packet_cache is not None:
1113            todo = []
1114            done = self.fields
1115        else:
1116            todo = [k for (k, v) in itertools.chain(six.iteritems(self.default_fields),  # noqa: E501
1117                                                    six.iteritems(self.overloaded_fields))  # noqa: E501
1118                    if isinstance(v, VolatileValue)] + list(self.fields)
1119            done = {}
1120        return loop(todo, done)
1121
1122    def __iterlen__(self):
1123        # type: () -> int
1124        """Predict the total length of the iterator"""
1125        fields = [key for (key, val) in itertools.chain(six.iteritems(self.default_fields),  # noqa: E501
1126                  six.iteritems(self.overloaded_fields))
1127                  if isinstance(val, VolatileValue)] + list(self.fields)
1128        length = 1
1129
1130        def is_valid_gen_tuple(x):
1131            # type: (Any) -> bool
1132            if not isinstance(x, tuple):
1133                return False
1134            return len(x) == 2 and all(isinstance(z, int) for z in x)
1135
1136        for field in fields:
1137            fld, val = self.getfield_and_val(field)
1138            if hasattr(val, "__iterlen__"):
1139                length *= val.__iterlen__()
1140            elif is_valid_gen_tuple(val):
1141                length *= (val[1] - val[0] + 1)
1142            elif isinstance(val, list) and not fld.islist:
1143                len2 = 0
1144                for x in val:
1145                    if hasattr(x, "__iterlen__"):
1146                        len2 += x.__iterlen__()
1147                    elif is_valid_gen_tuple(x):
1148                        len2 += (x[1] - x[0] + 1)
1149                    elif isinstance(x, list):
1150                        len2 += len(x)
1151                    else:
1152                        len2 += 1
1153                length *= len2 or 1
1154        if not isinstance(self.payload, NoPayload):
1155            return length * self.payload.__iterlen__()
1156        return length
1157
1158    def iterpayloads(self):
1159        # type: () -> Iterator[Packet]
1160        """Used to iter through the payloads of a Packet.
1161        Useful for DNS or 802.11 for instance.
1162        """
1163        yield self
1164        current = self
1165        while current.payload:
1166            current = current.payload
1167            yield current
1168
1169    def __gt__(self, other):
1170        # type: (Packet) -> int
1171        """True if other is an answer from self (self ==> other)."""
1172        if isinstance(other, Packet):
1173            return other < self
1174        elif isinstance(other, bytes):
1175            return 1
1176        else:
1177            raise TypeError((self, other))
1178
1179    def __lt__(self, other):
1180        # type: (Packet) -> int
1181        """True if self is an answer from other (other ==> self)."""
1182        if isinstance(other, Packet):
1183            return self.answers(other)
1184        elif isinstance(other, bytes):
1185            return 1
1186        else:
1187            raise TypeError((self, other))
1188
1189    def __eq__(self, other):
1190        # type: (Any) -> bool
1191        if not isinstance(other, self.__class__):
1192            return False
1193        for f in self.fields_desc:
1194            if f not in other.fields_desc:
1195                return False
1196            if self.getfieldval(f.name) != other.getfieldval(f.name):
1197                return False
1198        return self.payload == other.payload
1199
1200    def __ne__(self, other):
1201        # type: (Any) -> bool
1202        return not self.__eq__(other)
1203
1204    # Note: setting __hash__ to None is the standard way
1205    # of making an object un-hashable. mypy doesn't know that
1206    __hash__ = None  # type: ignore
1207
1208    def hashret(self):
1209        # type: () -> bytes
1210        """DEV: returns a string that has the same value for a request
1211        and its answer."""
1212        return self.payload.hashret()
1213
1214    def answers(self, other):
1215        # type: (Packet) -> int
1216        """DEV: true if self is an answer from other"""
1217        if other.__class__ == self.__class__:
1218            return self.payload.answers(other.payload)
1219        return 0
1220
1221    def layers(self):
1222        # type: () -> List[Type[Packet]]
1223        """returns a list of layer classes (including subclasses) in this packet"""  # noqa: E501
1224        layers = []
1225        lyr = self  # type: Optional[Packet]
1226        while lyr:
1227            layers.append(lyr.__class__)
1228            lyr = lyr.payload.getlayer(0, _subclass=True)
1229        return layers
1230
1231    def haslayer(self, cls, _subclass=None):
1232        # type: (Union[Type[Packet], str], Optional[bool]) -> int
1233        """
1234        true if self has a layer that is an instance of cls.
1235        Superseded by "cls in self" syntax.
1236        """
1237        if _subclass is None:
1238            _subclass = self.match_subclass or None
1239        if _subclass:
1240            match = issubtype
1241        else:
1242            match = lambda cls1, cls2: bool(cls1 == cls2)
1243        if cls is None or match(self.__class__, cls) \
1244           or cls in [self.__class__.__name__, self._name]:
1245            return True
1246        for f in self.packetfields:
1247            fvalue_gen = self.getfieldval(f.name)
1248            if fvalue_gen is None:
1249                continue
1250            if not f.islist:
1251                fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1252            for fvalue in fvalue_gen:
1253                if isinstance(fvalue, Packet):
1254                    ret = fvalue.haslayer(cls, _subclass=_subclass)
1255                    if ret:
1256                        return ret
1257        return self.payload.haslayer(cls, _subclass=_subclass)
1258
1259    def getlayer(self,
1260                 cls,  # type: Union[int, Type[Packet], str]
1261                 nb=1,  # type: int
1262                 _track=None,  # type: Optional[List[int]]
1263                 _subclass=None,  # type: Optional[bool]
1264                 **flt  # type: Any
1265                 ):
1266        # type: (...) -> Optional[Packet]
1267        """Return the nb^th layer that is an instance of cls, matching flt
1268values.
1269        """
1270        if _subclass is None:
1271            _subclass = self.match_subclass or None
1272        if _subclass:
1273            match = issubtype
1274        else:
1275            match = lambda cls1, cls2: bool(cls1 == cls2)
1276        # Note:
1277        # cls can be int, packet, str
1278        # string_class_name can be packet, str (packet or packet+field)
1279        # class_name can be packet, str (packet only)
1280        if isinstance(cls, int):
1281            nb = cls + 1
1282            string_class_name = ""  # type: Union[Type[Packet], str]
1283        else:
1284            string_class_name = cls
1285        class_name = ""  # type: Union[Type[Packet], str]
1286        fld = None  # type: Optional[str]
1287        if isinstance(string_class_name, str) and "." in string_class_name:
1288            class_name, fld = string_class_name.split(".", 1)
1289        else:
1290            class_name, fld = string_class_name, None
1291        if not class_name or match(self.__class__, class_name) \
1292           or class_name in [self.__class__.__name__, self._name]:
1293            if all(self.getfieldval(fldname) == fldvalue
1294                   for fldname, fldvalue in six.iteritems(flt)):
1295                if nb == 1:
1296                    if fld is None:
1297                        return self
1298                    else:
1299                        return self.getfieldval(fld)  # type: ignore
1300                else:
1301                    nb -= 1
1302        for f in self.packetfields:
1303            fvalue_gen = self.getfieldval(f.name)
1304            if fvalue_gen is None:
1305                continue
1306            if not f.islist:
1307                fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1308            for fvalue in fvalue_gen:
1309                if isinstance(fvalue, Packet):
1310                    track = []  # type: List[int]
1311                    ret = fvalue.getlayer(class_name, nb=nb, _track=track,
1312                                          _subclass=_subclass, **flt)
1313                    if ret is not None:
1314                        return ret
1315                    nb = track[0]
1316        return self.payload.getlayer(class_name, nb=nb, _track=_track,
1317                                     _subclass=_subclass, **flt)
1318
1319    def firstlayer(self):
1320        # type: () -> Packet
1321        q = self
1322        while q.underlayer is not None:
1323            q = q.underlayer
1324        return q
1325
1326    def __getitem__(self, cls):
1327        # type: (Union[Type[Packet], str]) -> Any
1328        if isinstance(cls, slice):
1329            lname = cls.start
1330            if cls.stop:
1331                ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
1332            else:
1333                ret = self.getlayer(cls.start, **(cls.step or {}))
1334        else:
1335            lname = cls
1336            ret = self.getlayer(cls)
1337        if ret is None:
1338            if isinstance(lname, type):
1339                name = lname.__name__
1340            elif not isinstance(lname, bytes):
1341                name = repr(lname)
1342            else:
1343                name = cast(str, lname)
1344            raise IndexError("Layer [%s] not found" % name)
1345        return ret
1346
1347    def __delitem__(self, cls):
1348        # type: (Type[Packet]) -> None
1349        del(self[cls].underlayer.payload)
1350
1351    def __setitem__(self, cls, val):
1352        # type: (Type[Packet], Packet) -> None
1353        self[cls].underlayer.payload = val
1354
1355    def __contains__(self, cls):
1356        # type: (Union[Type[Packet], str]) -> int
1357        """
1358        "cls in self" returns true if self has a layer which is an
1359        instance of cls.
1360        """
1361        return self.haslayer(cls)
1362
1363    def route(self):
1364        # type: () -> Tuple[Any, Optional[str], Optional[str]]
1365        return self.payload.route()
1366
1367    def fragment(self, *args, **kargs):
1368        # type: (*Any, **Any) -> List[Packet]
1369        return self.payload.fragment(*args, **kargs)
1370
1371    def display(self, *args, **kargs):  # Deprecated. Use show()
1372        # type: (*Any, **Any) -> None
1373        """Deprecated. Use show() method."""
1374        self.show(*args, **kargs)
1375
1376    def _show_or_dump(self,
1377                      dump=False,  # type: bool
1378                      indent=3,  # type: int
1379                      lvl="",  # type: str
1380                      label_lvl="",  # type: str
1381                      first_call=True  # type: bool
1382                      ):
1383        # type: (...) -> Optional[str]
1384        """
1385        Internal method that shows or dumps a hierarchical view of a packet.
1386        Called by show.
1387
1388        :param dump: determine if it prints or returns the string value
1389        :param int indent: the size of indentation for each layer
1390        :param str lvl: additional information about the layer lvl
1391        :param str label_lvl: additional information about the layer fields
1392        :param first_call: determine if the current function is the first
1393        :return: return a hierarchical view if dump, else print it
1394        """
1395
1396        if dump:
1397            from scapy.themes import AnsiColorTheme
1398            ct = AnsiColorTheme()  # No color for dump output
1399        else:
1400            ct = conf.color_theme
1401        s = "%s%s %s %s \n" % (label_lvl,
1402                               ct.punct("###["),
1403                               ct.layer_name(self.name),
1404                               ct.punct("]###"))
1405        for f in self.fields_desc:
1406            if isinstance(f, ConditionalField) and not f._evalcond(self):
1407                continue
1408            if isinstance(f, Emph) or f in conf.emph:
1409                ncol = ct.emph_field_name
1410                vcol = ct.emph_field_value
1411            else:
1412                ncol = ct.field_name
1413                vcol = ct.field_value
1414            fvalue = self.getfieldval(f.name)
1415            if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)):  # noqa: E501
1416                pad = max(0, 10 - len(f.name)) * " "
1417                s += "%s  \\%s%s\\\n" % (label_lvl + lvl, ncol(f.name), pad)
1418                fvalue_gen = SetGen(
1419                    fvalue,
1420                    _iterpacket=0
1421                )  # type: SetGen[Packet]
1422                for fvalue in fvalue_gen:
1423                    s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + "   |", first_call=False)  # noqa: E501
1424            else:
1425                pad = max(0, 10 - len(f.name)) * " "
1426                begn = "%s  %s%s%s " % (label_lvl + lvl,
1427                                        ncol(f.name),
1428                                        pad,
1429                                        ct.punct("="),)
1430                reprval = f.i2repr(self, fvalue)
1431                if isinstance(reprval, str):
1432                    reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) +  # noqa: E501
1433                                                                  len(lvl) +
1434                                                                  len(f.name) +
1435                                                                  4))
1436                s += "%s%s\n" % (begn, vcol(reprval))
1437        if self.payload:
1438            s += self.payload._show_or_dump(  # type: ignore
1439                dump=dump,
1440                indent=indent,
1441                lvl=lvl + (" " * indent * self.show_indent),
1442                label_lvl=label_lvl,
1443                first_call=False
1444            )
1445
1446        if first_call and not dump:
1447            print(s)
1448            return None
1449        else:
1450            return s
1451
1452    def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1453        # type: (bool, int, str, str) -> Optional[Any]
1454        """
1455        Prints or returns (when "dump" is true) a hierarchical view of the
1456        packet.
1457
1458        :param dump: determine if it prints or returns the string value
1459        :param int indent: the size of indentation for each layer
1460        :param str lvl: additional information about the layer lvl
1461        :param str label_lvl: additional information about the layer fields
1462        :return: return a hierarchical view if dump, else print it
1463        """
1464        return self._show_or_dump(dump, indent, lvl, label_lvl)
1465
1466    def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
1467        # type: (bool, int, str, str) -> Optional[Any]
1468        """
1469        Prints or returns (when "dump" is true) a hierarchical view of an
1470        assembled version of the packet, so that automatic fields are
1471        calculated (checksums, etc.)
1472
1473        :param dump: determine if it prints or returns the string value
1474        :param int indent: the size of indentation for each layer
1475        :param str lvl: additional information about the layer lvl
1476        :param str label_lvl: additional information about the layer fields
1477        :return: return a hierarchical view if dump, else print it
1478        """
1479        return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
1480
1481    def sprintf(self, fmt, relax=1):
1482        # type: (str, int) -> str
1483        """
1484        sprintf(format, [relax=1]) -> str
1485
1486        Where format is a string that can include directives. A directive
1487        begins and ends by % and has the following format:
1488        ``%[fmt[r],][cls[:nb].]field%``
1489
1490        :param fmt: is a classic printf directive, "r" can be appended for raw
1491          substitution:
1492          (ex: IP.flags=0x18 instead of SA), nb is the number of the layer
1493          (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
1494          Special case : "%.time%" is the creation time.
1495          Ex::
1496
1497            p.sprintf(
1498              "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
1499              "%03xr,IP.proto% %r,TCP.flags%"
1500            )
1501
1502          Moreover, the format string can include conditional statements. A
1503          conditional statement looks like : {layer:string} where layer is a
1504          layer name, and string is the string to insert in place of the
1505          condition if it is true, i.e. if layer is present. If layer is
1506          preceded by a "!", the result is inverted. Conditions can be
1507          imbricated. A valid statement can be::
1508
1509            p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
1510            p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
1511
1512          A side effect is that, to obtain "{" and "}" characters, you must use
1513          "%(" and "%)".
1514        """
1515
1516        escape = {"%": "%",
1517                  "(": "{",
1518                  ")": "}"}
1519
1520        # Evaluate conditions
1521        while "{" in fmt:
1522            i = fmt.rindex("{")
1523            j = fmt[i + 1:].index("}")
1524            cond = fmt[i + 1:i + j + 1]
1525            k = cond.find(":")
1526            if k < 0:
1527                raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond)  # noqa: E501
1528            cond, format_ = cond[:k], cond[k + 1:]
1529            res = False
1530            if cond[0] == "!":
1531                res = True
1532                cond = cond[1:]
1533            if self.haslayer(cond):
1534                res = not res
1535            if not res:
1536                format_ = ""
1537            fmt = fmt[:i] + format_ + fmt[i + j + 2:]
1538
1539        # Evaluate directives
1540        s = ""
1541        while "%" in fmt:
1542            i = fmt.index("%")
1543            s += fmt[:i]
1544            fmt = fmt[i + 1:]
1545            if fmt and fmt[0] in escape:
1546                s += escape[fmt[0]]
1547                fmt = fmt[1:]
1548                continue
1549            try:
1550                i = fmt.index("%")
1551                sfclsfld = fmt[:i]
1552                fclsfld = sfclsfld.split(",")
1553                if len(fclsfld) == 1:
1554                    f = "s"
1555                    clsfld = fclsfld[0]
1556                elif len(fclsfld) == 2:
1557                    f, clsfld = fclsfld
1558                else:
1559                    raise Scapy_Exception
1560                if "." in clsfld:
1561                    cls, fld = clsfld.split(".")
1562                else:
1563                    cls = self.__class__.__name__
1564                    fld = clsfld
1565                num = 1
1566                if ":" in cls:
1567                    cls, snum = cls.split(":")
1568                    num = int(snum)
1569                fmt = fmt[i + 1:]
1570            except Exception:
1571                raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "..."))  # noqa: E501
1572            else:
1573                if fld == "time":
1574                    val = time.strftime(
1575                        "%H:%M:%S.%%06i",
1576                        time.localtime(float(self.time))
1577                    ) % int((self.time - int(self.time)) * 1000000)
1578                elif cls == self.__class__.__name__ and hasattr(self, fld):
1579                    if num > 1:
1580                        val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax)  # noqa: E501
1581                        f = "s"
1582                    elif f[-1] == "r":  # Raw field value
1583                        val = getattr(self, fld)
1584                        f = f[:-1]
1585                        if not f:
1586                            f = "s"
1587                    else:
1588                        val = getattr(self, fld)
1589                        if fld in self.fieldtype:
1590                            val = self.fieldtype[fld].i2repr(self, val)
1591                else:
1592                    val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
1593                    f = "s"
1594                s += ("%" + f) % val
1595
1596        s += fmt
1597        return s
1598
1599    def mysummary(self):
1600        # type: () -> str
1601        """DEV: can be overloaded to return a string that summarizes the layer.
1602           Only one mysummary() is used in a whole packet summary: the one of the upper layer,  # noqa: E501
1603           except if a mysummary() also returns (as a couple) a list of layers whose  # noqa: E501
1604           mysummary() must be called if they are present."""
1605        return ""
1606
1607    def _do_summary(self):
1608        # type: () -> Tuple[int, str, List[Any]]
1609        found, s, needed = self.payload._do_summary()
1610        ret = ""
1611        if not found or self.__class__ in needed:
1612            ret = self.mysummary()
1613            if isinstance(ret, tuple):
1614                ret, n = ret
1615                needed += n
1616        if ret or needed:
1617            found = 1
1618        if not ret:
1619            ret = self.__class__.__name__ if self.show_summary else ""
1620        if self.__class__ in conf.emph:
1621            impf = []
1622            for f in self.fields_desc:
1623                if f in conf.emph:
1624                    impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name))))  # noqa: E501
1625            ret = "%s [%s]" % (ret, " ".join(impf))
1626        if ret and s:
1627            ret = "%s / %s" % (ret, s)
1628        else:
1629            ret = "%s%s" % (ret, s)
1630        return found, ret, needed
1631
1632    def summary(self, intern=0):
1633        # type: (int) -> str
1634        """Prints a one line summary of a packet."""
1635        return self._do_summary()[1]
1636
1637    def lastlayer(self, layer=None):
1638        # type: (Optional[Packet]) -> Packet
1639        """Returns the uppest layer of the packet"""
1640        return self.payload.lastlayer(self)
1641
1642    def decode_payload_as(self, cls):
1643        # type: (Type[Packet]) -> None
1644        """Reassembles the payload and decode it using another packet class"""
1645        s = raw(self.payload)
1646        self.payload = cls(s, _internal=1, _underlayer=self)
1647        pp = self
1648        while pp.underlayer is not None:
1649            pp = pp.underlayer
1650        self.payload.dissection_done(pp)
1651
1652    def command(self):
1653        # type: () -> str
1654        """
1655        Returns a string representing the command you have to type to
1656        obtain the same packet
1657        """
1658        f = []
1659        for fn, fv in six.iteritems(self.fields):
1660            fld = self.get_field(fn)
1661            if isinstance(fv, (list, dict, set)) and len(fv) == 0:
1662                continue
1663            if isinstance(fv, Packet):
1664                fv = fv.command()
1665            elif fld.islist and fld.holds_packets and isinstance(fv, list):
1666                fv = "[%s]" % ",".join(map(Packet.command, fv))
1667            elif isinstance(fld, FlagsField):
1668                fv = int(fv)
1669            elif callable(getattr(fv, 'command', None)):
1670                fv = fv.command()
1671            else:
1672                fv = repr(fv)
1673            f.append("%s=%s" % (fn, fv))
1674        c = "%s(%s)" % (self.__class__.__name__, ", ".join(f))
1675        pc = self.payload.command()
1676        if pc:
1677            c += "/" + pc
1678        return c
1679
1680    def convert_to(self, other_cls, **kwargs):
1681        # type: (Type[Packet], **Any) -> Packet
1682        """Converts this Packet to another type.
1683
1684        This is not guaranteed to be a lossless process.
1685
1686        By default, this only implements conversion to ``Raw``.
1687
1688        :param other_cls: Reference to a Packet class to convert to.
1689        :type other_cls: Type[scapy.packet.Packet]
1690        :return: Converted form of the packet.
1691        :rtype: other_cls
1692        :raises TypeError: When conversion is not possible
1693        """
1694        if not issubtype(other_cls, Packet):
1695            raise TypeError("{} must implement Packet".format(other_cls))
1696
1697        if other_cls is Raw:
1698            return Raw(raw(self))
1699
1700        if "_internal" not in kwargs:
1701            return other_cls.convert_packet(self, _internal=True, **kwargs)
1702
1703        raise TypeError("Cannot convert {} to {}".format(
1704            type(self).__name__, other_cls.__name__))
1705
1706    @classmethod
1707    def convert_packet(cls, pkt, **kwargs):
1708        # type: (Packet, **Any) -> Packet
1709        """Converts another packet to be this type.
1710
1711        This is not guaranteed to be a lossless process.
1712
1713        :param pkt: The packet to convert.
1714        :type pkt: scapy.packet.Packet
1715        :return: Converted form of the packet.
1716        :rtype: cls
1717        :raises TypeError: When conversion is not possible
1718        """
1719        if not isinstance(pkt, Packet):
1720            raise TypeError("Can only convert Packets")
1721
1722        if "_internal" not in kwargs:
1723            return pkt.convert_to(cls, _internal=True, **kwargs)
1724
1725        raise TypeError("Cannot convert {} to {}".format(
1726            type(pkt).__name__, cls.__name__))
1727
1728    @classmethod
1729    def convert_packets(cls,
1730                        pkts,  # type: List[Packet]
1731                        **kwargs  # type: Any
1732                        ):
1733        # type: (...) -> Iterator[Iterator[Packet]]
1734        """Converts many packets to this type.
1735
1736        This is implemented as a generator.
1737
1738        See ``Packet.convert_packet``.
1739        """
1740        for pkt in pkts:
1741            yield cls.convert_packet(pkt, **kwargs)
1742
1743
1744class NoPayload(Packet):
1745    def __new__(cls, *args, **kargs):
1746        # type: (Type[Packet], *Any, **Any) -> Packet
1747        singl = cls.__dict__.get("__singl__")
1748        if singl is None:
1749            cls.__singl__ = singl = Packet.__new__(cls)
1750            Packet.__init__(singl)
1751        return singl  # type: ignore
1752
1753    def __init__(self, *args, **kargs):
1754        # type: (*Any, **Any) -> None
1755        pass
1756
1757    def dissection_done(self, pkt):
1758        # type: (Packet) -> None
1759        pass
1760
1761    def add_payload(self, payload):
1762        # type: (Union[Packet, bytes]) -> NoReturn
1763        raise Scapy_Exception("Can't add payload to NoPayload instance")
1764
1765    def remove_payload(self):
1766        # type: () -> None
1767        pass
1768
1769    def add_underlayer(self, underlayer):
1770        # type: (Any) -> None
1771        pass
1772
1773    def remove_underlayer(self, other):
1774        # type: (Packet) -> None
1775        pass
1776
1777    def copy(self):
1778        # type: () -> NoPayload
1779        return self
1780
1781    def clear_cache(self):
1782        # type: () -> None
1783        pass
1784
1785    def __repr__(self):
1786        # type: () -> str
1787        return ""
1788
1789    def __str__(self):
1790        # type: () -> str
1791        return ""
1792
1793    def __bytes__(self):
1794        # type: () -> bytes
1795        return b""
1796
1797    def __nonzero__(self):
1798        # type: () -> bool
1799        return False
1800    __bool__ = __nonzero__
1801
1802    def do_build(self):
1803        # type: () -> bytes
1804        return b""
1805
1806    def build(self):
1807        # type: () -> bytes
1808        return b""
1809
1810    def build_padding(self):
1811        # type: () -> bytes
1812        return b""
1813
1814    def build_done(self, p):
1815        # type: (bytes) -> bytes
1816        return p
1817
1818    def build_ps(self, internal=0):
1819        # type: (int) -> Tuple[bytes, List[Any]]
1820        return b"", []
1821
1822    def getfieldval(self, attr):
1823        # type: (str) -> NoReturn
1824        raise AttributeError(attr)
1825
1826    def getfield_and_val(self, attr):
1827        # type: (str) -> NoReturn
1828        raise AttributeError(attr)
1829
1830    def setfieldval(self, attr, val):
1831        # type: (str, Any) -> NoReturn
1832        raise AttributeError(attr)
1833
1834    def delfieldval(self, attr):
1835        # type: (str) -> NoReturn
1836        raise AttributeError(attr)
1837
1838    def hide_defaults(self):
1839        # type: () -> None
1840        pass
1841
1842    def __iter__(self):
1843        # type: () -> Iterator[Packet]
1844        return iter([])
1845
1846    def __eq__(self, other):
1847        # type: (Any) -> bool
1848        if isinstance(other, NoPayload):
1849            return True
1850        return False
1851
1852    def hashret(self):
1853        # type: () -> bytes
1854        return b""
1855
1856    def answers(self, other):
1857        # type: (NoPayload) -> bool
1858        return isinstance(other, (NoPayload, conf.padding_layer))  # noqa: E501
1859
1860    def haslayer(self, cls, _subclass=None):
1861        # type: (Union[Type[Packet], str], Optional[bool]) -> int
1862        return 0
1863
1864    def getlayer(self,
1865                 cls,  # type: Union[int, Type[Packet], str]
1866                 nb=1,  # type: int
1867                 _track=None,  # type: Optional[List[int]]
1868                 _subclass=None,  # type: Optional[bool]
1869                 **flt  # type: Any
1870                 ):
1871        # type: (...) -> Optional[Packet]
1872        if _track is not None:
1873            _track.append(nb)
1874        return None
1875
1876    def fragment(self, *args, **kargs):
1877        # type: (*Any, **Any) -> List[Packet]
1878        raise Scapy_Exception("cannot fragment this packet")
1879
1880    def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1881        # type: (bool, int, str, str) -> None
1882        pass
1883
1884    def sprintf(self, fmt, relax=1):
1885        # type: (str, int) -> str
1886        if relax:
1887            return "??"
1888        else:
1889            raise Scapy_Exception("Format not found [%s]" % fmt)
1890
1891    def _do_summary(self):
1892        # type: () -> Tuple[int, str, List[Any]]
1893        return 0, "", []
1894
1895    def layers(self):
1896        # type: () -> List[Type[Packet]]
1897        return []
1898
1899    def lastlayer(self, layer=None):
1900        # type: (Optional[Packet]) -> Packet
1901        return layer or self
1902
1903    def command(self):
1904        # type: () -> str
1905        return ""
1906
1907    def route(self):
1908        # type: () -> Tuple[None, None, None]
1909        return (None, None, None)
1910
1911
1912####################
1913#  packet classes  #
1914####################
1915
1916
1917class Raw(Packet):
1918    name = "Raw"
1919    fields_desc = [StrField("load", b"")]
1920
1921    def __init__(self, _pkt=b"", *args, **kwargs):
1922        # type: (bytes, *Any, **Any) -> None
1923        if _pkt and not isinstance(_pkt, bytes):
1924            _pkt = bytes_encode(_pkt)
1925        super(Raw, self).__init__(_pkt, *args, **kwargs)
1926
1927    def answers(self, other):
1928        # type: (Packet) -> int
1929        return 1
1930
1931    def mysummary(self):
1932        # type: () -> str
1933        cs = conf.raw_summary
1934        if cs:
1935            if callable(cs):
1936                return "Raw %s" % cs(self.load)
1937            else:
1938                return "Raw %r" % self.load
1939        return Packet.mysummary(self)
1940
1941    @classmethod
1942    def convert_packet(cls, pkt, **kwargs):
1943        # type: (Packet, **Any) -> Raw
1944        return Raw(raw(pkt))
1945
1946
1947class Padding(Raw):
1948    name = "Padding"
1949
1950    def self_build(self, field_pos_list=None):
1951        # type: (Optional[Any]) -> bytes
1952        return b""
1953
1954    def build_padding(self):
1955        # type: () -> bytes
1956        return (
1957            bytes_encode(self.load) if self.raw_packet_cache is None
1958            else self.raw_packet_cache
1959        ) + self.payload.build_padding()
1960
1961
1962conf.raw_layer = Raw
1963conf.padding_layer = Padding
1964if conf.default_l2 is None:
1965    conf.default_l2 = Raw
1966
1967#################
1968#  Bind layers  #
1969#################
1970
1971
1972def bind_bottom_up(lower,  # type: Type[Packet]
1973                   upper,  # type: Type[Packet]
1974                   __fval=None,  # type: Optional[Any]
1975                   **fval  # type: Any
1976                   ):
1977    # type: (...) -> None
1978    r"""Bind 2 layers for dissection.
1979    The upper layer will be chosen for dissection on top of the lower layer, if
1980    ALL the passed arguments are validated. If multiple calls are made with
1981    the same layers, the last one will be used as default.
1982
1983    ex:
1984        >>> bind_bottom_up(Ether, SNAP, type=0x1234)
1985        >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00')  # noqa: E501
1986        <Ether  dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP  OUI=0x0 code=0x0 |>>  # noqa: E501
1987    """
1988    if __fval is not None:
1989        fval.update(__fval)
1990    lower.payload_guess = lower.payload_guess[:]
1991    lower.payload_guess.append((fval, upper))
1992
1993
1994def bind_top_down(lower,  # type: Type[Packet]
1995                  upper,  # type: Type[Packet]
1996                  __fval=None,  # type: Optional[Any]
1997                  **fval  # type: Any
1998                  ):
1999    # type: (...) -> None
2000    """Bind 2 layers for building.
2001    When the upper layer is added as a payload of the lower layer, all the
2002    arguments will be applied to them.
2003
2004    ex:
2005        >>> bind_top_down(Ether, SNAP, type=0x1234)
2006        >>> Ether()/SNAP()
2007        <Ether  type=0x1234 |<SNAP  |>>
2008    """
2009    if __fval is not None:
2010        fval.update(__fval)
2011    upper._overload_fields = upper._overload_fields.copy()
2012    upper._overload_fields[lower] = fval
2013
2014
2015@conf.commands.register
2016def bind_layers(lower,  # type: Type[Packet]
2017                upper,  # type: Type[Packet]
2018                __fval=None,  # type: Optional[Dict[str, int]]
2019                **fval  # type: Any
2020                ):
2021    # type: (...) -> None
2022    """Bind 2 layers on some specific fields' values.
2023
2024    It makes the packet being built and dissected when the arguments
2025    are present.
2026
2027    This function calls both bind_bottom_up and bind_top_down, with
2028    all passed arguments.
2029
2030    Please have a look at their docs:
2031     - help(bind_bottom_up)
2032     - help(bind_top_down)
2033     """
2034    if __fval is not None:
2035        fval.update(__fval)
2036    bind_top_down(lower, upper, **fval)
2037    bind_bottom_up(lower, upper, **fval)
2038
2039
2040def split_bottom_up(lower,  # type: Type[Packet]
2041                    upper,  # type: Type[Packet]
2042                    __fval=None,  # type: Optional[Any]
2043                    **fval  # type: Any
2044                    ):
2045    # type: (...) -> None
2046    """This call un-links an association that was made using bind_bottom_up.
2047    Have a look at help(bind_bottom_up)
2048    """
2049    if __fval is not None:
2050        fval.update(__fval)
2051
2052    def do_filter(params, cls):
2053        # type: (Dict[str, int], Type[Packet]) -> bool
2054        params_is_invalid = any(
2055            k not in params or params[k] != v for k, v in six.iteritems(fval)
2056        )
2057        return cls != upper or params_is_invalid
2058    lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)]
2059
2060
2061def split_top_down(lower,  # type: Type[Packet]
2062                   upper,  # type: Type[Packet]
2063                   __fval=None,  # type: Optional[Any]
2064                   **fval  # type: Any
2065                   ):
2066    # type: (...) -> None
2067    """This call un-links an association that was made using bind_top_down.
2068    Have a look at help(bind_top_down)
2069    """
2070    if __fval is not None:
2071        fval.update(__fval)
2072    if lower in upper._overload_fields:
2073        ofval = upper._overload_fields[lower]
2074        if any(k not in ofval or ofval[k] != v for k, v in six.iteritems(fval)):  # noqa: E501
2075            return
2076        upper._overload_fields = upper._overload_fields.copy()
2077        del(upper._overload_fields[lower])
2078
2079
2080@conf.commands.register
2081def split_layers(lower,  # type: Type[Packet]
2082                 upper,  # type: Type[Packet]
2083                 __fval=None,  # type: Optional[Any]
2084                 **fval  # type: Any
2085                 ):
2086    # type: (...) -> None
2087    """Split 2 layers previously bound.
2088    This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of  # noqa: E501
2089    bind_layers.
2090
2091    Please have a look at their docs:
2092     - help(split_bottom_up)
2093     - help(split_top_down)
2094    """
2095    if __fval is not None:
2096        fval.update(__fval)
2097    split_bottom_up(lower, upper, **fval)
2098    split_top_down(lower, upper, **fval)
2099
2100
2101@conf.commands.register
2102def explore(layer=None):
2103    # type: (Optional[str]) -> None
2104    """Function used to discover the Scapy layers and protocols.
2105    It helps to see which packets exists in contrib or layer files.
2106
2107    params:
2108     - layer: If specified, the function will explore the layer. If not,
2109              the GUI mode will be activated, to browse the available layers
2110
2111    examples:
2112      >>> explore()  # Launches the GUI
2113      >>> explore("dns")  # Explore scapy.layers.dns
2114      >>> explore("http2")  # Explore scapy.contrib.http2
2115      >>> explore(scapy.layers.bluetooth4LE)
2116
2117    Note: to search a packet by name, use ls("name") rather than explore.
2118    """
2119    if layer is None:  # GUI MODE
2120        if not conf.interactive:
2121            raise Scapy_Exception("explore() GUI-mode cannot be run in "
2122                                  "interactive mode. Please provide a "
2123                                  "'layer' parameter !")
2124        # 0 - Imports
2125        try:
2126            import prompt_toolkit
2127        except ImportError:
2128            raise ImportError("prompt_toolkit is not installed ! "
2129                              "You may install IPython, which contains it, via"
2130                              " `pip install ipython`")
2131        if not _version_checker(prompt_toolkit, (2, 0)):
2132            raise ImportError("prompt_toolkit >= 2.0.0 is required !")
2133        # Only available with prompt_toolkit > 2.0, not released on PyPi yet
2134        from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \
2135            button_dialog
2136        from prompt_toolkit.formatted_text import HTML
2137        # Check for prompt_toolkit >= 3.0.0
2138        call_ptk = lambda x: cast(str, x)  # type: Callable[[Any], str]
2139        if _version_checker(prompt_toolkit, (3, 0)):
2140            call_ptk = lambda x: x.run()  # type: ignore
2141        # 1 - Ask for layer or contrib
2142        btn_diag = button_dialog(
2143            title=six.text_type("Scapy v%s" % conf.version),
2144            text=HTML(
2145                six.text_type(
2146                    '<style bg="white" fg="red">Chose the type of packets'
2147                    ' you want to explore:</style>'
2148                )
2149            ),
2150            buttons=[
2151                (six.text_type("Layers"), "layers"),
2152                (six.text_type("Contribs"), "contribs"),
2153                (six.text_type("Cancel"), "cancel")
2154            ])
2155        action = call_ptk(btn_diag)
2156        # 2 - Retrieve list of Packets
2157        if action == "layers":
2158            # Get all loaded layers
2159            lvalues = conf.layers.layers()
2160            # Restrict to layers-only (not contribs) + packet.py and asn1*.py
2161            values = [x for x in lvalues if ("layers" in x[0] or
2162                                             "packet" in x[0] or
2163                                             "asn1" in x[0])]
2164        elif action == "contribs":
2165            # Get all existing contribs
2166            from scapy.main import list_contrib
2167            cvalues = cast(List[Dict[str, str]], list_contrib(ret=True))
2168            values = [(x['name'], x['description'])
2169                      for x in cvalues]
2170            # Remove very specific modules
2171            values = [x for x in values if "can" not in x[0]]
2172        else:
2173            # Escape/Cancel was pressed
2174            return
2175        # Python 2 compat
2176        if six.PY2:
2177            values = [(six.text_type(x), six.text_type(y))
2178                      for x, y in values]
2179        # Build tree
2180        if action == "contribs":
2181            # A tree is a dictionary. Each layer contains a keyword
2182            # _l which contains the files in the layer, and a _name
2183            # argument which is its name. The other keys are the subfolders,
2184            # which are similar dictionaries
2185            tree = defaultdict(list)  # type: Dict[str, Union[List[Any], Dict[str, Any]]]  # noqa: E501
2186            for name, desc in values:
2187                if "." in name:  # Folder detected
2188                    parts = name.split(".")
2189                    subtree = tree
2190                    for pa in parts[:-1]:
2191                        if pa not in subtree:
2192                            subtree[pa] = {}
2193                        # one layer deeper
2194                        subtree = subtree[pa]  # type: ignore
2195                        subtree["_name"] = pa  # type: ignore
2196                    if "_l" not in subtree:
2197                        subtree["_l"] = []
2198                    subtree["_l"].append((parts[-1], desc))  # type: ignore
2199                else:
2200                    tree["_l"].append((name, desc))  # type: ignore
2201        elif action == "layers":
2202            tree = {"_l": values}
2203        # 3 - Ask for the layer/contrib module to explore
2204        current = tree  # type: Any
2205        previous = []  # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]]  # noqa: E501
2206        while True:
2207            # Generate tests & form
2208            folders = list(current.keys())
2209            _radio_values = [
2210                ("$" + name, six.text_type('[+] ' + name.capitalize()))
2211                for name in folders if not name.startswith("_")
2212            ] + current.get("_l", [])  # type: List[str]
2213            cur_path = ""
2214            if previous:
2215                cur_path = ".".join(
2216                    itertools.chain(
2217                        (x["_name"] for x in previous[1:]),  # type: ignore
2218                        (current["_name"],)
2219                    )
2220                )
2221            extra_text = (
2222                '\n<style bg="white" fg="green">> scapy.%s</style>'
2223            ) % (action + ("." + cur_path if cur_path else ""))
2224            # Show popup
2225            rd_diag = radiolist_dialog(
2226                values=_radio_values,
2227                title=six.text_type(
2228                    "Scapy v%s" % conf.version
2229                ),
2230                text=HTML(
2231                    six.text_type((
2232                        '<style bg="white" fg="red">Please select a file'
2233                        'among the following, to see all layers contained in'
2234                        ' it:</style>'
2235                    ) + extra_text)
2236                ),
2237                cancel_text="Back" if previous else "Cancel"
2238            )
2239            result = call_ptk(rd_diag)
2240            if result is None:
2241                # User pressed "Cancel/Back"
2242                if previous:  # Back
2243                    current = previous.pop()
2244                    continue
2245                else:  # Cancel
2246                    return
2247            if result.startswith("$"):
2248                previous.append(current)
2249                current = current[result[1:]]
2250            else:
2251                # Enter on layer
2252                if previous:  # In subfolder
2253                    result = cur_path + "." + result
2254                break
2255        # 4 - (Contrib only): load contrib
2256        if action == "contribs":
2257            from scapy.main import load_contrib
2258            load_contrib(result)
2259            result = "scapy.contrib." + result
2260    else:  # NON-GUI MODE
2261        # We handle layer as a short layer name, full layer name
2262        # or the module itself
2263        if isinstance(layer, types.ModuleType):
2264            layer = layer.__name__
2265        if isinstance(layer, str):
2266            if layer.startswith("scapy.layers."):
2267                result = layer
2268            else:
2269                if layer.startswith("scapy.contrib."):
2270                    layer = layer.replace("scapy.contrib.", "")
2271                from scapy.main import load_contrib
2272                load_contrib(layer)
2273                result_layer, result_contrib = (("scapy.layers.%s" % layer),
2274                                                ("scapy.contrib.%s" % layer))
2275                if result_layer in conf.layers.ldict:
2276                    result = result_layer
2277                elif result_contrib in conf.layers.ldict:
2278                    result = result_contrib
2279                else:
2280                    raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2281        else:
2282            warning("Wrong usage ! Check out help(explore)")
2283            return
2284
2285    # COMMON PART
2286    # Get the list of all Packets contained in that module
2287    try:
2288        all_layers = conf.layers.ldict[result]
2289    except KeyError:
2290        raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2291    # Print
2292    print(conf.color_theme.layer_name("Packets contained in %s:" % result))
2293    rtlst = []  # type: List[Tuple[Union[str, List[str]], ...]]
2294    rtlst = [(lay.__name__ or "", lay._name or "") for lay in all_layers]
2295    print(pretty_list(rtlst, [("Class", "Name")], borders=True))
2296
2297
2298def _pkt_ls(obj,  # type: Union[Packet, Type[Packet]]
2299            verbose=False,  # type: bool
2300            ):
2301    # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]]  # noqa: E501
2302    """Internal function used to resolve `fields_desc` to display it.
2303
2304    :param obj: a packet object or class
2305    :returns: a list containing tuples [(name, clsname, clsname_extras,
2306        default, long_attrs)]
2307    """
2308    is_pkt = isinstance(obj, Packet)
2309    if not issubtype(obj, Packet) and not is_pkt:
2310        raise ValueError
2311    fields = []
2312    for f in obj.fields_desc:
2313        cur_fld = f
2314        attrs = []  # type: List[str]
2315        long_attrs = []  # type: List[str]
2316        while isinstance(cur_fld, (Emph, ConditionalField)):
2317            if isinstance(cur_fld, ConditionalField):
2318                attrs.append(cur_fld.__class__.__name__[:4])
2319            cur_fld = cur_fld.fld
2320        name = cur_fld.name
2321        default = cur_fld.default
2322        if verbose and isinstance(cur_fld, EnumField) \
2323           and hasattr(cur_fld, "i2s"):
2324            if len(cur_fld.i2s or []) < 50:
2325                long_attrs.extend(
2326                    "%s: %d" % (strval, numval)
2327                    for numval, strval in
2328                    sorted(six.iteritems(cur_fld.i2s))
2329                )
2330        elif isinstance(cur_fld, MultiEnumField):
2331            fld_depend = cur_fld.depends_on(
2332                cast(Packet, obj if is_pkt else obj())
2333            )
2334            attrs.append("Depends on %s" % fld_depend)
2335            if verbose:
2336                cur_i2s = cur_fld.i2s_multi.get(
2337                    cur_fld.depends_on(
2338                        cast(Packet, obj if is_pkt else obj())
2339                    ), {}
2340                )
2341                if len(cur_i2s) < 50:
2342                    long_attrs.extend(
2343                        "%s: %d" % (strval, numval)
2344                        for numval, strval in
2345                        sorted(six.iteritems(cur_i2s))
2346                    )
2347        elif verbose and isinstance(cur_fld, FlagsField):
2348            names = cur_fld.names
2349            long_attrs.append(", ".join(names))
2350        elif isinstance(cur_fld, MultipleTypeField):
2351            default = cur_fld.dflt.default
2352            attrs.append(", ".join(
2353                x[0].__class__.__name__ for x in
2354                itertools.chain(cur_fld.flds, [(cur_fld.dflt,)])
2355            ))
2356
2357        cls = cur_fld.__class__
2358        class_name_extras = "(%s)" % (
2359            ", ".join(attrs)
2360        ) if attrs else ""
2361        if isinstance(cur_fld, BitField):
2362            class_name_extras += " (%d bit%s)" % (
2363                cur_fld.size,
2364                "s" if cur_fld.size > 1 else ""
2365            )
2366        fields.append(
2367            (name,
2368             cls,
2369             class_name_extras,
2370             repr(default),
2371             long_attrs)
2372        )
2373    return fields
2374
2375
2376@conf.commands.register
2377def ls(obj=None,  # type: Optional[Union[str, Packet, Type[Packet]]]
2378       case_sensitive=False,  # type: bool
2379       verbose=False  # type: bool
2380       ):
2381    # type: (...) -> None
2382    """List  available layers, or infos on a given layer class or name.
2383
2384    :param obj: Packet / packet name to use
2385    :param case_sensitive: if obj is a string, is it case sensitive?
2386    :param verbose:
2387    """
2388    is_string = isinstance(obj, str)
2389
2390    if obj is None or is_string:
2391        tip = False
2392        if obj is None:
2393            tip = True
2394            all_layers = sorted(conf.layers, key=lambda x: x.__name__)
2395        else:
2396            pattern = re.compile(
2397                cast(str, obj),
2398                0 if case_sensitive else re.I
2399            )
2400            # We first order by accuracy, then length
2401            if case_sensitive:
2402                sorter = lambda x: (x.__name__.index(obj), len(x.__name__))
2403            else:
2404                obj = obj.lower()
2405                sorter = lambda x: (x.__name__.lower().index(obj),
2406                                    len(x.__name__))
2407            all_layers = sorted((layer for layer in conf.layers
2408                                 if (isinstance(layer.__name__, str) and
2409                                     pattern.search(layer.__name__)) or
2410                                 (isinstance(layer.name, str) and
2411                                     pattern.search(layer.name))),
2412                                key=sorter)
2413        for layer in all_layers:
2414            print("%-10s : %s" % (layer.__name__, layer._name))
2415        if tip and conf.interactive:
2416            print("\nTIP: You may use explore() to navigate through all "
2417                  "layers using a clear GUI")
2418    else:
2419        try:
2420            fields = _pkt_ls(
2421                obj,  # type: ignore
2422                verbose=verbose
2423            )
2424            is_pkt = isinstance(obj, Packet)
2425            # Print
2426            for fname, cls, clsne, dflt, long_attrs in fields:
2427                clsinfo = cls.__name__ + " " + clsne
2428                print("%-10s : %-35s =" % (fname, clsinfo), end=' ')
2429                if is_pkt:
2430                    print("%-15r" % (getattr(obj, fname),), end=' ')
2431                print("(%r)" % (dflt,))
2432                for attr in long_attrs:
2433                    print("%-15s%s" % ("", attr))
2434            # Restart for payload if any
2435            if is_pkt:
2436                obj = cast(Packet, obj)
2437                if isinstance(obj.payload, NoPayload):
2438                    return
2439                print("--")
2440                ls(obj.payload)
2441        except ValueError:
2442            print("Not a packet class or name. Type 'ls()' to list packet classes.")  # noqa: E501
2443
2444
2445@conf.commands.register
2446def rfc(cls, ret=False, legend=True):
2447    # type: (Type[Packet], bool, bool) -> Optional[str]
2448    """
2449    Generate an RFC-like representation of a packet def.
2450
2451    :param cls: the Packet class
2452    :param ret: return the result instead of printing (def. False)
2453    :param legend: show text under the diagram (default True)
2454
2455    Ex::
2456
2457        >>> rfc(Ether)
2458
2459    """
2460    if not issubclass(cls, Packet):
2461        raise TypeError("Packet class expected")
2462    cur_len = 0
2463    cur_line = []
2464    lines = []
2465    # Get the size (width) that a field will take
2466    # when formatted, from its length in bits
2467    clsize = lambda x: 2 * x - 1  # type: Callable[[int], int]
2468    ident = 0  # Fields UUID
2469    # Generate packet groups
2470    for f in cls.fields_desc:
2471        flen = int(f.sz * 8)
2472        cur_len += flen
2473        ident += 1
2474        # Fancy field name
2475        fname = f.name.upper().replace("_", " ")
2476        # The field might exceed the current line or
2477        # take more than one line. Copy it as required
2478        while True:
2479            over = max(0, cur_len - 32)  # Exceed
2480            len1 = clsize(flen - over)  # What fits
2481            cur_line.append((fname[:len1], len1, ident))
2482            if cur_len >= 32:
2483                # Current line is full. start a new line
2484                lines.append(cur_line)
2485                cur_len = flen = over
2486                fname = ""  # do not repeat the field
2487                cur_line = []
2488                if not over:
2489                    # there is no data left
2490                    break
2491            else:
2492                # End of the field
2493                break
2494    # Add the last line if un-finished
2495    if cur_line:
2496        lines.append(cur_line)
2497    # Calculate separations between lines
2498    seps = []
2499    seps.append("+-" * 32 + "+\n")
2500    for i in range(len(lines) - 1):
2501        # Start with a full line
2502        sep = "+-" * 32 + "+\n"
2503        # Get the line above and below the current
2504        # separation
2505        above, below = lines[i], lines[i + 1]
2506        # The last field of above is shared with below
2507        if above[-1][2] == below[0][2]:
2508            # where the field in "above" starts
2509            pos_above = sum(x[1] for x in above[:-1])
2510            # where the field in "below" ends
2511            pos_below = below[0][1]
2512            if pos_above < pos_below:
2513                # they are overlapping.
2514                # Now crop the space between those pos
2515                # and fill it with " "
2516                pos_above = pos_above + pos_above % 2
2517                sep = (
2518                    sep[:1 + pos_above] +
2519                    " " * (pos_below - pos_above) +
2520                    sep[1 + pos_below:]
2521                )
2522        # line is complete
2523        seps.append(sep)
2524    # Graph
2525    result = ""
2526    # Bytes markers
2527    result += " " + (" " * 19).join(
2528        str(x) for x in range(4)
2529    ) + "\n"
2530    # Bits markers
2531    result += " " + " ".join(
2532        str(x % 10) for x in range(32)
2533    ) + "\n"
2534    # Add fields and their separations
2535    for line, sep in zip(lines, seps):
2536        result += sep
2537        for elt, flen, _ in line:
2538            result += "|" + elt.center(flen, " ")
2539        result += "|\n"
2540    result += "+-" * (cur_len or 32) + "+\n"
2541    # Annotate with the figure name
2542    if legend:
2543        result += "\n" + ("Fig. " + cls.__name__).center(66, " ")
2544    # return if asked for, else print
2545    if ret:
2546        return result
2547    print(result)
2548    return None
2549
2550
2551#############
2552#  Fuzzing  #
2553#############
2554
2555@conf.commands.register
2556def fuzz(p,  # type: Packet
2557         _inplace=0,  # type: int
2558         ):
2559    # type: (...) -> Packet
2560    """
2561    Transform a layer into a fuzzy layer by replacing some default values
2562    by random objects.
2563
2564    :param p: the Packet instance to fuzz
2565    :return: the fuzzed packet.
2566    """
2567    if not _inplace:
2568        p = p.copy()
2569    q = p
2570    while not isinstance(q, NoPayload):
2571        new_default_fields = {}
2572        multiple_type_fields = []  # type: List[str]
2573        for f in q.fields_desc:
2574            if isinstance(f, PacketListField):
2575                for r in getattr(q, f.name):
2576                    fuzz(r, _inplace=1)
2577            elif isinstance(f, MultipleTypeField):
2578                # the type of the field will depend on others
2579                multiple_type_fields.append(f.name)
2580            elif f.default is not None:
2581                if not isinstance(f, ConditionalField) or f._evalcond(q):
2582                    rnd = f.randval()
2583                    if rnd is not None:
2584                        new_default_fields[f.name] = rnd
2585        # Process packets with MultipleTypeFields
2586        if multiple_type_fields:
2587            # freeze the other random values
2588            new_default_fields = {
2589                key: (val._fix() if isinstance(val, VolatileValue) else val)
2590                for key, val in six.iteritems(new_default_fields)
2591            }
2592            q.default_fields.update(new_default_fields)
2593            # add the random values of the MultipleTypeFields
2594            for name in multiple_type_fields:
2595                fld = cast(MultipleTypeField, q.get_field(name))
2596                rnd = fld._find_fld_pkt(q).randval()
2597                if rnd is not None:
2598                    new_default_fields[name] = rnd
2599        q.default_fields.update(new_default_fields)
2600        q = q.payload
2601    return p
2602