1# This file is part of Scapy
2# See http://www.secdev.org/projects/scapy for more information
3# Copyright (C) Philippe Biondi <phil@secdev.org>
4# This program is published under a GPLv2 license
5
6"""
7General utility functions.
8"""
9
10from __future__ import absolute_import
11from __future__ import print_function
12
13from decimal import Decimal
14
15import array
16import collections
17import decimal
18import difflib
19import gzip
20import os
21import random
22import re
23import socket
24import struct
25import subprocess
26import sys
27import tempfile
28import threading
29import time
30import warnings
31
32import scapy.modules.six as six
33from scapy.modules.six.moves import range, input, zip_longest
34
35from scapy.config import conf
36from scapy.consts import DARWIN, OPENBSD, WINDOWS
37from scapy.data import MTU, DLT_EN10MB
38from scapy.compat import orb, plain_str, chb, bytes_base64,\
39    base64_bytes, hex_bytes, lambda_tuple_converter, bytes_encode
40from scapy.error import log_runtime, Scapy_Exception, warning
41from scapy.pton_ntop import inet_pton
42
43# Typing imports
44from scapy.compat import (
45    cast,
46    Any,
47    AnyStr,
48    Callable,
49    Dict,
50    IO,
51    Iterator,
52    List,
53    Literal,
54    Optional,
55    TYPE_CHECKING,
56    Tuple,
57    Type,
58    Union,
59    overload,
60)
61
62if TYPE_CHECKING:
63    from scapy.packet import Packet
64    from scapy.plist import _PacketIterable, PacketList
65    from scapy.supersocket import SuperSocket
66    _SuperSocket = SuperSocket
67else:
68    _SuperSocket = object
69
70_ByteStream = Union[IO[bytes], gzip.GzipFile]
71
72###########
73#  Tools  #
74###########
75
76
77def issubtype(x,  # type: Any
78              t,  # type: Union[type, str]
79              ):
80    # type: (...) -> bool
81    """issubtype(C, B) -> bool
82
83    Return whether C is a class and if it is a subclass of class B.
84    When using a tuple as the second argument issubtype(X, (A, B, ...)),
85    is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
86    """
87    if isinstance(t, str):
88        return t in (z.__name__ for z in x.__bases__)
89    if isinstance(x, type) and issubclass(x, t):
90        return True
91    return False
92
93
94_Decimal = Union[Decimal, int]
95
96
97class EDecimal(Decimal):
98    """Extended Decimal
99
100    This implements arithmetic and comparison with float for
101    backward compatibility
102    """
103
104    def __add__(self, other, context=None):
105        # type: (_Decimal, Any) -> EDecimal
106        return EDecimal(Decimal.__add__(self, Decimal(other)))
107
108    def __radd__(self, other):
109        # type: (_Decimal) -> EDecimal
110        return EDecimal(Decimal.__add__(self, Decimal(other)))
111
112    def __sub__(self, other):
113        # type: (_Decimal) -> EDecimal
114        return EDecimal(Decimal.__sub__(self, Decimal(other)))
115
116    def __rsub__(self, other):
117        # type: (_Decimal) -> EDecimal
118        return EDecimal(Decimal.__rsub__(self, Decimal(other)))
119
120    def __mul__(self, other):
121        # type: (_Decimal) -> EDecimal
122        return EDecimal(Decimal.__mul__(self, Decimal(other)))
123
124    def __rmul__(self, other):
125        # type: (_Decimal) -> EDecimal
126        return EDecimal(Decimal.__mul__(self, Decimal(other)))
127
128    def __truediv__(self, other):
129        # type: (_Decimal) -> EDecimal
130        return EDecimal(Decimal.__truediv__(self, Decimal(other)))
131
132    def __floordiv__(self, other):
133        # type: (_Decimal) -> EDecimal
134        return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
135
136    if sys.version_info >= (3,):
137        def __divmod__(self, other):
138            # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
139            r = Decimal.__divmod__(self, Decimal(other))
140            return EDecimal(r[0]), EDecimal(r[1])
141    else:
142        def __div__(self, other):
143            # type: (_Decimal) -> EDecimal
144            return EDecimal(Decimal.__div__(self, Decimal(other)))
145
146        def __rdiv__(self, other):
147            # type: (_Decimal) -> EDecimal
148            return EDecimal(Decimal.__rdiv__(self, Decimal(other)))
149
150    def __mod__(self, other):
151        # type: (_Decimal) -> EDecimal
152        return EDecimal(Decimal.__mod__(self, Decimal(other)))
153
154    def __rmod__(self, other):
155        # type: (_Decimal) -> EDecimal
156        return EDecimal(Decimal.__rmod__(self, Decimal(other)))
157
158    def __pow__(self, other, modulo=None):
159        # type: (_Decimal, Optional[_Decimal]) -> EDecimal
160        return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
161
162    def __eq__(self, other):
163        # type: (Any) -> bool
164        return super(EDecimal, self).__eq__(other) or float(self) == other
165
166    def normalize(self, precision):  # type: ignore
167        # type: (int) -> EDecimal
168        with decimal.localcontext() as ctx:
169            ctx.prec = precision
170            return EDecimal(super(EDecimal, self).normalize(ctx))
171
172
173@overload
174def get_temp_file(keep, autoext, fd):
175    # type: (bool, str, Literal[True]) -> IO[bytes]
176    pass
177
178
179@overload
180def get_temp_file(keep=False, autoext="", fd=False):  # noqa: F811
181    # type: (bool, str, Literal[False]) -> str
182    pass
183
184
185def get_temp_file(keep=False, autoext="", fd=False):  # noqa: F811
186    # type: (bool, str, bool) -> Union[IO[bytes], str]
187    """Creates a temporary file.
188
189    :param keep: If False, automatically delete the file when Scapy exits.
190    :param autoext: Suffix to add to the generated file name.
191    :param fd: If True, this returns a file-like object with the temporary
192               file opened. If False (default), this returns a file path.
193    """
194    f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
195                                    delete=False)
196    if not keep:
197        conf.temp_files.append(f.name)
198
199    if fd:
200        return f
201    else:
202        # Close the file so something else can take it.
203        f.close()
204        return f.name
205
206
207def get_temp_dir(keep=False):
208    # type: (bool) -> str
209    """Creates a temporary file, and returns its name.
210
211    :param keep: If False (default), the directory will be recursively
212                 deleted when Scapy exits.
213    :return: A full path to a temporary directory.
214    """
215
216    dname = tempfile.mkdtemp(prefix="scapy")
217
218    if not keep:
219        conf.temp_files.append(dname)
220
221    return dname
222
223
224def sane(x, color=False):
225    # type: (AnyStr, bool) -> str
226    r = ""
227    for i in x:
228        j = orb(i)
229        if (j < 32) or (j >= 127):
230            if color:
231                r += conf.color_theme.not_printable(".")
232            else:
233                r += "."
234        else:
235            r += chr(j)
236    return r
237
238
239@conf.commands.register
240def restart():
241    # type: () -> None
242    """Restarts scapy"""
243    if not conf.interactive or not os.path.isfile(sys.argv[0]):
244        raise OSError("Scapy was not started from console")
245    if WINDOWS:
246        try:
247            res_code = subprocess.call([sys.executable] + sys.argv)
248        except KeyboardInterrupt:
249            res_code = 1
250        finally:
251            os._exit(res_code)
252    os.execv(sys.executable, [sys.executable] + sys.argv)
253
254
255def lhex(x):
256    # type: (Any) -> str
257    from scapy.volatile import VolatileValue
258    if isinstance(x, VolatileValue):
259        return repr(x)
260    if type(x) in six.integer_types:
261        return hex(x)
262    elif isinstance(x, tuple):
263        return "(%s)" % ", ".join(map(lhex, x))
264    elif isinstance(x, list):
265        return "[%s]" % ", ".join(map(lhex, x))
266    else:
267        return str(x)
268
269
270@conf.commands.register
271def hexdump(p, dump=False):
272    # type: (Union[Packet, AnyStr], bool) -> Optional[str]
273    """Build a tcpdump like hexadecimal view
274
275    :param p: a Packet
276    :param dump: define if the result must be printed or returned in a variable
277    :return: a String only when dump=True
278    """
279    s = ""
280    x = bytes_encode(p)
281    x_len = len(x)
282    i = 0
283    while i < x_len:
284        s += "%04x  " % i
285        for j in range(16):
286            if i + j < x_len:
287                s += "%02X " % orb(x[i + j])
288            else:
289                s += "   "
290        s += " %s\n" % sane(x[i:i + 16], color=True)
291        i += 16
292    # remove trailing \n
293    s = s[:-1] if s.endswith("\n") else s
294    if dump:
295        return s
296    else:
297        print(s)
298        return None
299
300
301@conf.commands.register
302def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
303    # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
304    """Build an equivalent view of hexdump() on a single line
305
306    Note that setting both onlyasc and onlyhex to 1 results in a empty output
307
308    :param p: a Packet
309    :param onlyasc: 1 to display only the ascii view
310    :param onlyhex: 1 to display only the hexadecimal view
311    :param dump: print the view if False
312    :return: a String only when dump=True
313    """
314    s = ""
315    s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
316    if dump:
317        return s
318    else:
319        print(s)
320        return None
321
322
323@conf.commands.register
324def chexdump(p, dump=False):
325    # type: (Union[Packet, AnyStr], bool) -> Optional[str]
326    """Build a per byte hexadecimal representation
327
328    Example:
329        >>> chexdump(IP())
330        0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01  # noqa: E501
331
332    :param p: a Packet
333    :param dump: print the view if False
334    :return: a String only if dump=True
335    """
336    x = bytes_encode(p)
337    s = ", ".join("%#04x" % orb(x) for x in x)
338    if dump:
339        return s
340    else:
341        print(s)
342        return None
343
344
345@conf.commands.register
346def hexstr(p, onlyasc=0, onlyhex=0, color=False):
347    # type: (Union[Packet, AnyStr], int, int, bool) -> str
348    """Build a fancy tcpdump like hex from bytes."""
349    x = bytes_encode(p)
350    s = []
351    if not onlyasc:
352        s.append(" ".join("%02X" % orb(b) for b in x))
353    if not onlyhex:
354        s.append(sane(x, color=color))
355    return "  ".join(s)
356
357
358def repr_hex(s):
359    # type: (bytes) -> str
360    """ Convert provided bitstring to a simple string of hex digits """
361    return "".join("%02x" % orb(x) for x in s)
362
363
364@conf.commands.register
365def hexdiff(a, b, autojunk=False):
366    # type: (Union[Packet, AnyStr], Union[Packet, AnyStr], bool) -> None
367    """
368    Show differences between 2 binary strings, Packets...
369
370    For the autojunk parameter, see
371    https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
372
373    :param a:
374    :param b: The binary strings, packets... to compare
375    :param autojunk: Setting it to True will likely increase the comparison
376        speed a lot on big byte strings, but will reduce accuracy (will tend
377        to miss insertion and see replacements instead for instance).
378    """
379
380    # Compare the strings using difflib
381
382    xb = bytes_encode(a)
383    yb = bytes_encode(b)
384
385    sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
386    xarr = [xb[i:i + 1] for i in range(len(xb))]
387    yarr = [yb[i:i + 1] for i in range(len(yb))]
388
389    backtrackx = []
390    backtracky = []
391    for opcode in sm.get_opcodes():
392        typ, x0, x1, y0, y1 = opcode
393        if typ == 'delete':
394            backtrackx += xarr[x0:x1]
395            backtracky += [b''] * (x1 - x0)
396        elif typ == 'insert':
397            backtrackx += [b''] * (y1 - y0)
398            backtracky += yarr[y0:y1]
399        elif typ in ['equal', 'replace']:
400            backtrackx += xarr[x0:x1]
401            backtracky += yarr[y0:y1]
402
403    if autojunk:
404        # Some lines may have been considered as junk. Check the sizes
405        lbx = len(backtrackx)
406        lby = len(backtracky)
407        backtrackx += [b''] * (max(lbx, lby) - lbx)
408        backtracky += [b''] * (max(lbx, lby) - lby)
409
410    # Print the diff
411
412    x = y = i = 0
413    colorize = {0: lambda x: x,
414                -1: conf.color_theme.left,
415                1: conf.color_theme.right}
416
417    dox = 1
418    doy = 0
419    btx_len = len(backtrackx)
420    while i < btx_len:
421        linex = backtrackx[i:i + 16]
422        liney = backtracky[i:i + 16]
423        xx = sum(len(k) for k in linex)
424        yy = sum(len(k) for k in liney)
425        if dox and not xx:
426            dox = 0
427            doy = 1
428        if dox and linex == liney:
429            doy = 1
430
431        if dox:
432            xd = y
433            j = 0
434            while not linex[j]:
435                j += 1
436                xd -= 1
437            print(colorize[doy - dox]("%04x" % xd), end=' ')
438            x += xx
439            line = linex
440        else:
441            print("    ", end=' ')
442        if doy:
443            yd = y
444            j = 0
445            while not liney[j]:
446                j += 1
447                yd -= 1
448            print(colorize[doy - dox]("%04x" % yd), end=' ')
449            y += yy
450            line = liney
451        else:
452            print("    ", end=' ')
453
454        print(" ", end=' ')
455
456        cl = ""
457        for j in range(16):
458            if i + j < btx_len:
459                if line[j]:
460                    col = colorize[(linex[j] != liney[j]) * (doy - dox)]
461                    print(col("%02X" % orb(line[j])), end=' ')
462                    if linex[j] == liney[j]:
463                        cl += sane(line[j], color=True)
464                    else:
465                        cl += col(sane(line[j]))
466                else:
467                    print("  ", end=' ')
468                    cl += " "
469            else:
470                print("  ", end=' ')
471            if j == 7:
472                print("", end=' ')
473
474        print(" ", cl)
475
476        if doy or not yy:
477            doy = 0
478            dox = 1
479            i += 16
480        else:
481            if yy:
482                dox = 0
483                doy = 1
484            else:
485                i += 16
486
487
488if struct.pack("H", 1) == b"\x00\x01":  # big endian
489    checksum_endian_transform = lambda chk: chk  # type: Callable[[int], int]
490else:
491    checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
492
493
494def checksum(pkt):
495    # type: (bytes) -> int
496    if len(pkt) % 2 == 1:
497        pkt += b"\0"
498    s = sum(array.array("H", pkt))
499    s = (s >> 16) + (s & 0xffff)
500    s += s >> 16
501    s = ~s
502    return checksum_endian_transform(s) & 0xffff
503
504
505def _fletcher16(charbuf):
506    # type: (bytes) -> Tuple[int, int]
507    # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>  # noqa: E501
508    c0 = c1 = 0
509    for char in charbuf:
510        c0 += orb(char)
511        c1 += c0
512
513    c0 %= 255
514    c1 %= 255
515    return (c0, c1)
516
517
518@conf.commands.register
519def fletcher16_checksum(binbuf):
520    # type: (bytes) -> int
521    """Calculates Fletcher-16 checksum of the given buffer.
522
523       Note:
524       If the buffer contains the two checkbytes derived from the Fletcher-16 checksum  # noqa: E501
525       the result of this function has to be 0. Otherwise the buffer has been corrupted.  # noqa: E501
526    """
527    (c0, c1) = _fletcher16(binbuf)
528    return (c1 << 8) | c0
529
530
531@conf.commands.register
532def fletcher16_checkbytes(binbuf, offset):
533    # type: (bytes, int) -> bytes
534    """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
535
536       Including the bytes into the buffer (at the position marked by offset) the  # noqa: E501
537       global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify  # noqa: E501
538       the integrity of the buffer on the receiver side.
539
540       For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B.  # noqa: E501
541    """
542
543    # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>  # noqa: E501
544    if len(binbuf) < offset:
545        raise Exception("Packet too short for checkbytes %d" % len(binbuf))
546
547    binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
548    (c0, c1) = _fletcher16(binbuf)
549
550    x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
551
552    if (x <= 0):
553        x += 255
554
555    y = 510 - c0 - x
556
557    if (y > 255):
558        y -= 255
559    return chb(x) + chb(y)
560
561
562def mac2str(mac):
563    # type: (str) -> bytes
564    return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
565
566
567def valid_mac(mac):
568    # type: (str) -> bool
569    try:
570        return len(mac2str(mac)) == 6
571    except ValueError:
572        pass
573    return False
574
575
576def str2mac(s):
577    # type: (bytes) -> str
578    if isinstance(s, str):
579        return ("%02x:" * 6)[:-1] % tuple(map(ord, s))
580    return ("%02x:" * 6)[:-1] % tuple(s)
581
582
583def randstring(length):
584    # type: (int) -> bytes
585    """
586    Returns a random string of length (length >= 0)
587    """
588    return b"".join(struct.pack('B', random.randint(0, 255))
589                    for _ in range(length))
590
591
592def zerofree_randstring(length):
593    # type: (int) -> bytes
594    """
595    Returns a random string of length (length >= 0) without zero in it.
596    """
597    return b"".join(struct.pack('B', random.randint(1, 255))
598                    for _ in range(length))
599
600
601def strxor(s1, s2):
602    # type: (bytes, bytes) -> bytes
603    """
604    Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
605    must be of same length.
606    """
607    return b"".join(map(lambda x, y: chb(orb(x) ^ orb(y)), s1, s2))
608
609
610def strand(s1, s2):
611    # type: (bytes, bytes) -> bytes
612    """
613    Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
614    must be of same length.
615    """
616    return b"".join(map(lambda x, y: chb(orb(x) & orb(y)), s1, s2))
617
618
619# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470  # noqa: E501
620try:
621    socket.inet_aton("255.255.255.255")
622except socket.error:
623    def inet_aton(ip_string):
624        # type: (str) -> bytes
625        if ip_string == "255.255.255.255":
626            return b"\xff" * 4
627        else:
628            return socket.inet_aton(ip_string)
629else:
630    inet_aton = socket.inet_aton
631
632inet_ntoa = socket.inet_ntoa
633
634
635def atol(x):
636    # type: (str) -> int
637    try:
638        ip = inet_aton(x)
639    except socket.error:
640        ip = inet_aton(socket.gethostbyname(x))
641    return cast(int, struct.unpack("!I", ip)[0])
642
643
644def valid_ip(addr):
645    # type: (str) -> bool
646    try:
647        addr = plain_str(addr)
648    except UnicodeDecodeError:
649        return False
650    try:
651        atol(addr)
652    except (OSError, ValueError, socket.error):
653        return False
654    return True
655
656
657def valid_net(addr):
658    # type: (str) -> bool
659    try:
660        addr = plain_str(addr)
661    except UnicodeDecodeError:
662        return False
663    if '/' in addr:
664        ip, mask = addr.split('/', 1)
665        return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
666    return valid_ip(addr)
667
668
669def valid_ip6(addr):
670    # type: (str) -> bool
671    try:
672        addr = plain_str(addr)
673    except UnicodeDecodeError:
674        return False
675    try:
676        inet_pton(socket.AF_INET6, addr)
677    except socket.error:
678        try:
679            socket.getaddrinfo(addr, None, socket.AF_INET6)[0][4][0]
680        except socket.error:
681            return False
682    return True
683
684
685def valid_net6(addr):
686    # type: (str) -> bool
687    try:
688        addr = plain_str(addr)
689    except UnicodeDecodeError:
690        return False
691    if '/' in addr:
692        ip, mask = addr.split('/', 1)
693        return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
694    return valid_ip6(addr)
695
696
697def ltoa(x):
698    # type: (int) -> str
699    return inet_ntoa(struct.pack("!I", x & 0xffffffff))
700
701
702def itom(x):
703    # type: (int) -> int
704    return (0xffffffff00000000 >> x) & 0xffffffff
705
706
707class ContextManagerSubprocess(object):
708    """
709    Context manager that eases checking for unknown command, without
710    crashing.
711
712    Example:
713    >>> with ContextManagerSubprocess("tcpdump"):
714    >>>     subprocess.Popen(["tcpdump", "--version"])
715    ERROR: Could not execute tcpdump, is it installed?
716
717    """
718
719    def __init__(self, prog, suppress=True):
720        # type: (str, bool) -> None
721        self.prog = prog
722        self.suppress = suppress
723
724    def __enter__(self):
725        # type: () -> None
726        pass
727
728    def __exit__(self,
729                 exc_type,  # type: Optional[type]
730                 exc_value,  # type: Optional[Exception]
731                 traceback,  # type: Optional[Any]
732                 ):
733        # type: (...) -> Optional[bool]
734        if exc_value is None or exc_type is None:
735            return None
736        # Errored
737        if isinstance(exc_value, EnvironmentError):
738            msg = "Could not execute %s, is it installed?" % self.prog
739        else:
740            msg = "%s: execution failed (%s)" % (
741                self.prog,
742                exc_type.__class__.__name__
743            )
744        if not self.suppress:
745            raise exc_type(msg)
746        log_runtime.error(msg, exc_info=True)
747        return True  # Suppress the exception
748
749
750class ContextManagerCaptureOutput(object):
751    """
752    Context manager that intercept the console's output.
753
754    Example:
755    >>> with ContextManagerCaptureOutput() as cmco:
756    ...     print("hey")
757    ...     assert cmco.get_output() == "hey"
758    """
759
760    def __init__(self):
761        # type: () -> None
762        self.result_export_object = ""
763        try:
764            import mock  # noqa: F401
765        except Exception:
766            raise ImportError("The mock module needs to be installed !")
767
768    def __enter__(self):
769        # type: () -> ContextManagerCaptureOutput
770        import mock
771
772        def write(s, decorator=self):
773            # type: (str, ContextManagerCaptureOutput) -> None
774            decorator.result_export_object += s
775        mock_stdout = mock.Mock()
776        mock_stdout.write = write
777        self.bck_stdout = sys.stdout
778        sys.stdout = mock_stdout
779        return self
780
781    def __exit__(self, *exc):
782        # type: (*Any) -> Literal[False]
783        sys.stdout = self.bck_stdout
784        return False
785
786    def get_output(self, eval_bytes=False):
787        # type: (bool) -> str
788        if self.result_export_object.startswith("b'") and eval_bytes:
789            return plain_str(eval(self.result_export_object))
790        return self.result_export_object
791
792
793def do_graph(
794    graph,  # type: str
795    prog=None,  # type: Optional[str]
796    format=None,  # type: Optional[str]
797    target=None,  # type: Optional[Union[IO[bytes], str]]
798    type=None,  # type: Optional[str]
799    string=None,  # type: Optional[bool]
800    options=None  # type: Optional[List[str]]
801):
802    # type: (...) -> Optional[str]
803    """Processes graph description using an external software.
804    This method is used to convert a graphviz format to an image.
805
806    :param graph: GraphViz graph description
807    :param prog: which graphviz program to use
808    :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
809        option
810    :param string: if not None, simply return the graph string
811    :param target: filename or redirect. Defaults pipe to Imagemagick's
812        display program
813    :param options: options to be passed to prog
814    """
815
816    if format is None:
817        format = "svg"
818    if string:
819        return graph
820    if type is not None:
821        warnings.warn(
822            "type is deprecated, and was renamed format",
823            DeprecationWarning
824        )
825        format = type
826    if prog is None:
827        prog = conf.prog.dot
828    start_viewer = False
829    if target is None:
830        if WINDOWS:
831            target = get_temp_file(autoext="." + format)
832            start_viewer = True
833        else:
834            with ContextManagerSubprocess(conf.prog.display):
835                target = subprocess.Popen([conf.prog.display],
836                                          stdin=subprocess.PIPE).stdin
837    if format is not None:
838        format = "-T%s" % format
839    if isinstance(target, str):
840        if target.startswith('|'):
841            target = subprocess.Popen(target[1:].lstrip(), shell=True,
842                                      stdin=subprocess.PIPE).stdin
843        elif target.startswith('>'):
844            target = open(target[1:].lstrip(), "wb")
845        else:
846            target = open(os.path.abspath(target), "wb")
847    target = cast(IO[bytes], target)
848    proc = subprocess.Popen(
849        "\"%s\" %s %s" % (prog, options or "", format or ""),
850        shell=True, stdin=subprocess.PIPE, stdout=target,
851        stderr=subprocess.PIPE
852    )
853    _, stderr = proc.communicate(bytes_encode(graph))
854    if proc.returncode != 0:
855        raise OSError(
856            "GraphViz call failed (is it installed?):\n" +
857            plain_str(stderr)
858        )
859    try:
860        target.close()
861    except Exception:
862        pass
863    if start_viewer:
864        # Workaround for file not found error: We wait until tempfile is written.  # noqa: E501
865        waiting_start = time.time()
866        while not os.path.exists(target.name):
867            time.sleep(0.1)
868            if time.time() - waiting_start > 3:
869                warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile)  # noqa: E501
870                break
871        else:
872            if conf.prog.display == conf.prog._default:
873                os.startfile(target.name)  # type: ignore
874            else:
875                with ContextManagerSubprocess(conf.prog.display):
876                    subprocess.Popen([conf.prog.display, target.name])
877    return None
878
879
880_TEX_TR = {
881    "{": "{\\tt\\char123}",
882    "}": "{\\tt\\char125}",
883    "\\": "{\\tt\\char92}",
884    "^": "\\^{}",
885    "$": "\\$",
886    "#": "\\#",
887    "_": "\\_",
888    "&": "\\&",
889    "%": "\\%",
890    "|": "{\\tt\\char124}",
891    "~": "{\\tt\\char126}",
892    "<": "{\\tt\\char60}",
893    ">": "{\\tt\\char62}",
894}
895
896
897def tex_escape(x):
898    # type: (str) -> str
899    s = ""
900    for c in x:
901        s += _TEX_TR.get(c, c)
902    return s
903
904
905def colgen(*lstcol,  # type: Any
906           **kargs  # type: Any
907           ):
908    # type: (...) -> Iterator[Any]
909    """Returns a generator that mixes provided quantities forever
910    trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default"""  # noqa: E501
911    if len(lstcol) < 2:
912        lstcol *= 2
913    trans = kargs.get("trans", lambda x, y, z: (x, y, z))
914    while True:
915        for i in range(len(lstcol)):
916            for j in range(len(lstcol)):
917                for k in range(len(lstcol)):
918                    if i != j or j != k or k != i:
919                        yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)])  # noqa: E501
920
921
922def incremental_label(label="tag%05i", start=0):
923    # type: (str, int) -> Iterator[str]
924    while True:
925        yield label % start
926        start += 1
927
928
929def binrepr(val):
930    # type: (int) -> str
931    return bin(val)[2:]
932
933
934def long_converter(s):
935    # type: (str) -> int
936    return int(s.replace('\n', '').replace(' ', ''), 16)
937
938#########################
939#    Enum management    #
940#########################
941
942
943class EnumElement:
944    def __init__(self, key, value):
945        # type: (str, int) -> None
946        self._key = key
947        self._value = value
948
949    def __repr__(self):
950        # type: () -> str
951        return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value)  # noqa: E501
952
953    def __getattr__(self, attr):
954        # type: (str) -> Any
955        return getattr(self._value, attr)
956
957    def __str__(self):
958        # type: () -> str
959        return self._key
960
961    def __bytes__(self):
962        # type: () -> bytes
963        return bytes_encode(self.__str__())
964
965    def __hash__(self):
966        # type: () -> int
967        return self._value
968
969    def __int__(self):
970        # type: () -> int
971        return int(self._value)
972
973    def __eq__(self, other):
974        # type: (Any) -> bool
975        return self._value == int(other)
976
977    def __neq__(self, other):
978        # type: (Any) -> bool
979        return not self.__eq__(other)
980
981
982class Enum_metaclass(type):
983    element_class = EnumElement
984
985    def __new__(cls, name, bases, dct):
986        # type: (Any, str, Any, Dict[str, Any]) -> Any
987        rdict = {}
988        for k, v in six.iteritems(dct):
989            if isinstance(v, int):
990                v = cls.element_class(k, v)
991                dct[k] = v
992                rdict[v] = k
993        dct["__rdict__"] = rdict
994        return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
995
996    def __getitem__(self, attr):
997        # type: (int) -> Any
998        return self.__rdict__[attr]  # type: ignore
999
1000    def __contains__(self, val):
1001        # type: (int) -> bool
1002        return val in self.__rdict__  # type: ignore
1003
1004    def get(self, attr, val=None):
1005        # type: (str, Optional[Any]) -> Any
1006        return self.__rdict__.get(attr, val)  # type: ignore
1007
1008    def __repr__(self):
1009        # type: () -> str
1010        return "<%s>" % self.__dict__.get("name", self.__name__)
1011
1012
1013###################
1014#  Object saving  #
1015###################
1016
1017
1018def export_object(obj):
1019    # type: (Any) -> None
1020    import zlib
1021    print(bytes_base64(zlib.compress(six.moves.cPickle.dumps(obj, 2), 9)))
1022
1023
1024def import_object(obj=None):
1025    # type: (Optional[str]) -> Any
1026    import zlib
1027    if obj is None:
1028        obj = sys.stdin.read()
1029    return six.moves.cPickle.loads(zlib.decompress(base64_bytes(obj.strip())))  # noqa: E501
1030
1031
1032def save_object(fname, obj):
1033    # type: (str, Any) -> None
1034    """Pickle a Python object"""
1035
1036    fd = gzip.open(fname, "wb")
1037    six.moves.cPickle.dump(obj, fd)
1038    fd.close()
1039
1040
1041def load_object(fname):
1042    # type: (str) -> Any
1043    """unpickle a Python object"""
1044    return six.moves.cPickle.load(gzip.open(fname, "rb"))
1045
1046
1047@conf.commands.register
1048def corrupt_bytes(data, p=0.01, n=None):
1049    # type: (str, float, Optional[int]) -> bytes
1050    """
1051    Corrupt a given percentage (at least one byte) or number of bytes
1052    from a string
1053    """
1054    s = array.array("B", bytes_encode(data))
1055    s_len = len(s)
1056    if n is None:
1057        n = max(1, int(s_len * p))
1058    for i in random.sample(range(s_len), n):
1059        s[i] = (s[i] + random.randint(1, 255)) % 256
1060    return s.tostring() if six.PY2 else s.tobytes()
1061
1062
1063@conf.commands.register
1064def corrupt_bits(data, p=0.01, n=None):
1065    # type: (str, float, Optional[int]) -> bytes
1066    """
1067    Flip a given percentage (at least one bit) or number of bits
1068    from a string
1069    """
1070    s = array.array("B", bytes_encode(data))
1071    s_len = len(s) * 8
1072    if n is None:
1073        n = max(1, int(s_len * p))
1074    for i in random.sample(range(s_len), n):
1075        s[i // 8] ^= 1 << (i % 8)
1076    return s.tostring() if six.PY2 else s.tobytes()
1077
1078
1079#############################
1080#  pcap capture file stuff  #
1081#############################
1082
1083@conf.commands.register
1084def wrpcap(filename,  # type: Union[IO[bytes], str]
1085           pkt,  # type: _PacketIterable
1086           *args,  # type: Any
1087           **kargs  # type: Any
1088           ):
1089    # type: (...) -> None
1090    """Write a list of packets to a pcap file
1091
1092    :param filename: the name of the file to write packets to, or an open,
1093        writable file-like object. The file descriptor will be
1094        closed at the end of the call, so do not use an object you
1095        do not want to close (e.g., running wrpcap(sys.stdout, [])
1096        in interactive mode will crash Scapy).
1097    :param gz: set to 1 to save a gzipped capture
1098    :param linktype: force linktype value
1099    :param endianness: "<" or ">", force endianness
1100    :param sync: do not bufferize writes to the capture file
1101    """
1102    with PcapWriter(filename, *args, **kargs) as fdesc:
1103        fdesc.write(pkt)
1104
1105
1106@conf.commands.register
1107def rdpcap(filename, count=-1):
1108    # type: (Union[IO[bytes], str], int) -> PacketList
1109    """Read a pcap or pcapng file and return a packet list
1110
1111    :param count: read only <count> packets
1112    """
1113    # Rant: Our complicated use of metaclasses and especially the
1114    # __call__ function is, of course, not supported by MyPy.
1115    # One day we should simplify this mess and use a much simpler
1116    # layout that will actually be supported and properly dissected.
1117    with PcapReader(filename) as fdesc:  # type: ignore
1118        return fdesc.read_all(count=count)
1119
1120
1121# NOTE: Type hinting
1122# Mypy doesn't understand the following metaclass, and thinks each
1123# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1124# we add a fake (=None) to the last 2 arguments then force the value
1125# to not be None in the signature and pack the whole thing in an ignore.
1126# This allows to not have # type: ignore every time we call those
1127# constructors.
1128
1129class PcapReader_metaclass(type):
1130    """Metaclass for (Raw)Pcap(Ng)Readers"""
1131
1132    def __new__(cls, name, bases, dct):
1133        # type: (Any, str, Any, Dict[str, Any]) -> Any
1134        """The `alternative` class attribute is declared in the PcapNg
1135        variant, and set here to the Pcap variant.
1136
1137        """
1138        newcls = super(PcapReader_metaclass, cls).__new__(
1139            cls, name, bases, dct
1140        )
1141        if 'alternative' in dct:
1142            dct['alternative'].alternative = newcls
1143        return newcls
1144
1145    def __call__(cls, filename):  # type: ignore
1146        # type: (Union[IO[bytes], str]) -> Any
1147        """Creates a cls instance, use the `alternative` if that
1148        fails.
1149
1150        """
1151        i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
1152        filename, fdesc, magic = cls.open(filename)
1153        if not magic:
1154            raise Scapy_Exception(
1155                "No data could be read!"
1156            )
1157        try:
1158            i.__init__(filename, fdesc, magic)
1159            return i
1160        except (Scapy_Exception, EOFError):
1161            pass
1162
1163        if "alternative" in cls.__dict__:
1164            cls = cls.__dict__["alternative"]
1165            i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)
1166            try:
1167                i.__init__(filename, fdesc, magic)
1168                return i
1169            except (Scapy_Exception, EOFError):
1170                pass
1171
1172        raise Scapy_Exception("Not a supported capture file")
1173
1174    @staticmethod
1175    def open(fname  # type: Union[IO[bytes], str]
1176             ):
1177        # type: (...) -> Tuple[str, _ByteStream, bytes]
1178        """Open (if necessary) filename, and read the magic."""
1179        if isinstance(fname, str):
1180            filename = fname
1181            try:
1182                fdesc = gzip.open(filename, "rb")  # type: _ByteStream
1183                magic = fdesc.read(4)
1184            except IOError:
1185                fdesc = open(filename, "rb")
1186                magic = fdesc.read(4)
1187        else:
1188            fdesc = fname
1189            filename = getattr(fdesc, "name", "No name")
1190            magic = fdesc.read(4)
1191        return filename, fdesc, magic
1192
1193
1194@six.add_metaclass(PcapReader_metaclass)
1195class RawPcapReader:
1196    """A stateful pcap reader. Each packet is returned as a string"""
1197
1198    nonblocking_socket = True
1199    PacketMetadata = collections.namedtuple("PacketMetadata",
1200                                            ["sec", "usec", "wirelen", "caplen"])  # noqa: E501
1201
1202    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1203        # type: (str, _ByteStream, bytes) -> None
1204        self.filename = filename
1205        self.f = fdesc
1206        if magic == b"\xa1\xb2\xc3\xd4":  # big endian
1207            self.endian = ">"
1208            self.nano = False
1209        elif magic == b"\xd4\xc3\xb2\xa1":  # little endian
1210            self.endian = "<"
1211            self.nano = False
1212        elif magic == b"\xa1\xb2\x3c\x4d":  # big endian, nanosecond-precision
1213            self.endian = ">"
1214            self.nano = True
1215        elif magic == b"\x4d\x3c\xb2\xa1":  # little endian, nanosecond-precision  # noqa: E501
1216            self.endian = "<"
1217            self.nano = True
1218        else:
1219            raise Scapy_Exception(
1220                "Not a pcap capture file (bad magic: %r)" % magic
1221            )
1222        hdr = self.f.read(20)
1223        if len(hdr) < 20:
1224            raise Scapy_Exception("Invalid pcap file (too short)")
1225        vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1226            self.endian + "HHIIII", hdr
1227        )
1228        self.linktype = linktype
1229        self.snaplen = snaplen
1230
1231    def __iter__(self):
1232        # type: () -> RawPcapReader
1233        return self
1234
1235    def next(self):
1236        # type: () -> Packet
1237        """
1238        implement the iterator protocol on a set of packets in a pcap file
1239        """
1240        try:
1241            return self.read_packet()
1242        except EOFError:
1243            raise StopIteration
1244    __next__ = next
1245
1246    def _read_packet(self, size=MTU):
1247        # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1248        """return a single packet read from the file as a tuple containing
1249        (pkt_data, pkt_metadata)
1250
1251        raise EOFError when no more packets are available
1252        """
1253        hdr = self.f.read(16)
1254        if len(hdr) < 16:
1255            raise EOFError
1256        sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1257        return (self.f.read(caplen)[:size],
1258                RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1259                                             wirelen=wirelen, caplen=caplen))
1260
1261    def read_packet(self, size=MTU):
1262        # type: (int) -> Packet
1263        return cast(
1264            Packet,
1265            self._read_packet()[0]
1266        )
1267
1268    def dispatch(self,
1269                 callback  # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any]  # noqa: E501
1270                 ):
1271        # type: (...) -> None
1272        """call the specified callback routine for each packet read
1273
1274        This is just a convenience function for the main loop
1275        that allows for easy launching of packet processing in a
1276        thread.
1277        """
1278        for p in self:
1279            callback(p)
1280
1281    def read_all(self, count=-1):
1282        # type: (int) -> PacketList
1283        res = self._read_all(count)
1284        from scapy import plist
1285        return plist.PacketList(res, name=os.path.basename(self.filename))
1286
1287    def _read_all(self, count=-1):
1288        # type: (int) -> List[Packet]
1289        """return a list of all packets in the pcap file
1290        """
1291        res = []  # type: List[Packet]
1292        while count != 0:
1293            count -= 1
1294            try:
1295                p = self.read_packet()  # type: Packet
1296            except EOFError:
1297                break
1298            res.append(p)
1299        return res
1300
1301    def recv(self, size=MTU):
1302        # type: (int) -> bytes
1303        """ Emulate a socket
1304        """
1305        return self._read_packet(size=size)[0]
1306
1307    def fileno(self):
1308        # type: () -> int
1309        return self.f.fileno()
1310
1311    def close(self):
1312        # type: () -> Optional[Any]
1313        return self.f.close()
1314
1315    def __exit__(self, exc_type, exc_value, tracback):
1316        # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1317        self.close()
1318
1319    # emulate SuperSocket
1320    @staticmethod
1321    def select(sockets,  # type: List[SuperSocket]
1322               remain=None,  # type: Optional[float]
1323               ):
1324        # type: (...) -> List[SuperSocket]
1325        return sockets
1326
1327
1328class PcapReader(RawPcapReader, _SuperSocket):
1329    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1330        # type: (str, IO[bytes], bytes) -> None
1331        RawPcapReader.__init__(self, filename, fdesc, magic)
1332        try:
1333            self.LLcls = conf.l2types.num2layer[
1334                self.linktype
1335            ]  # type: Type[Packet]
1336        except KeyError:
1337            warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype))  # noqa: E501
1338            if conf.raw_layer is None:
1339                # conf.raw_layer is set on import
1340                import scapy.packet  # noqa: F401
1341            self.LLcls = conf.raw_layer
1342
1343    def __enter__(self):
1344        # type: () -> PcapReader
1345        return self
1346
1347    def read_packet(self, size=MTU):
1348        # type: (int) -> Packet
1349        rp = super(PcapReader, self)._read_packet(size=size)
1350        if rp is None:
1351            raise EOFError
1352        s, pkt_info = rp
1353
1354        try:
1355            p = self.LLcls(s)  # type: Packet
1356        except KeyboardInterrupt:
1357            raise
1358        except Exception:
1359            if conf.debug_dissector:
1360                from scapy.sendrecv import debug
1361                debug.crashed_on = (self.LLcls, s)
1362                raise
1363            if conf.raw_layer is None:
1364                # conf.raw_layer is set on import
1365                import scapy.packet  # noqa: F401
1366            p = conf.raw_layer(s)
1367        power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1368        p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1369        p.wirelen = pkt_info.wirelen
1370        return p
1371
1372    def recv(self, size=MTU):
1373        # type: (int) -> Packet
1374        return self.read_packet(size=size)
1375
1376
1377class RawPcapNgReader(RawPcapReader):
1378    """A stateful pcapng reader. Each packet is returned as
1379    bytes.
1380
1381    """
1382
1383    alternative = RawPcapReader  # type: Type[Any]
1384
1385    PacketMetadata = collections.namedtuple("PacketMetadata",
1386                                            ["linktype", "tsresol",
1387                                             "tshigh", "tslow", "wirelen"])
1388
1389    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1390        # type: (str, IO[bytes], bytes) -> None
1391        self.filename = filename
1392        self.f = fdesc
1393        # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1394        self.interfaces = []  # type: List[Tuple[int, int, int]]
1395        self.default_options = {
1396            "tsresol": 1000000
1397        }
1398        self.blocktypes = {
1399            1: self._read_block_idb,
1400            2: self._read_block_pkt,
1401            3: self._read_block_spb,
1402            6: self._read_block_epb,
1403        }
1404        self.endian = "!"  # Will be overwritten by first SHB
1405
1406        if magic != b"\x0a\x0d\x0d\x0a":  # PcapNg:
1407            raise Scapy_Exception(
1408                "Not a pcapng capture file (bad magic: %r)" % magic
1409            )
1410
1411        try:
1412            self._read_block_shb()
1413        except EOFError:
1414            raise Scapy_Exception(
1415                "The first SHB of the pcapng file is malformed !"
1416            )
1417
1418    def _read_block(self, size=MTU):
1419        # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]  # noqa: E501
1420        try:
1421            blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1422        except struct.error:
1423            raise EOFError
1424        if blocktype == 0x0A0D0D0A:
1425            # This function updates the endianness based on the block content.
1426            self._read_block_shb()
1427            return None
1428        try:
1429            blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1430        except struct.error:
1431            raise EOFError
1432        if blocklen < 12:
1433            warning("Invalid block length !")
1434            raise EOFError
1435        block = self.f.read(blocklen - 12)
1436        self._read_block_tail(blocklen)
1437        return self.blocktypes.get(
1438            blocktype,
1439            lambda block, size: None
1440        )(block, size)
1441
1442    def _read_block_tail(self, blocklen):
1443        # type: (int) -> None
1444        if blocklen % 4:
1445            pad = self.f.read(-blocklen % 4)
1446            warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1447                    "Ignored padding %r" % (blocklen, pad))
1448        try:
1449            if blocklen != struct.unpack(self.endian + 'I',
1450                                         self.f.read(4))[0]:
1451                raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1452        except struct.error:
1453            raise EOFError
1454
1455    def _read_block_shb(self):
1456        # type: () -> None
1457        _blocklen = self.f.read(4)
1458        endian = self.f.read(4)
1459        if endian == b"\x1a\x2b\x3c\x4d":
1460            self.endian = ">"
1461        elif endian == b"\x4d\x3c\x2b\x1a":
1462            self.endian = "<"
1463        else:
1464            warning("Bad magic in Section Header block (not a pcapng file?)")
1465            raise EOFError
1466
1467        blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1468        if blocklen < 16:
1469            warning("Invalid SHB block length!")
1470            raise EOFError
1471        options = self.f.read(blocklen - 16)
1472        self._read_block_tail(blocklen)
1473        self._read_options(options)
1474
1475    def _read_packet(self, size=MTU):  # type: ignore
1476        # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1477        """Read blocks until it reaches either EOF or a packet, and
1478        returns None or (packet, (linktype, sec, usec, wirelen)),
1479        where packet is a string.
1480
1481        """
1482        while True:
1483            res = self._read_block()
1484            if res is not None:
1485                return res
1486
1487    def _read_options(self, options):
1488        # type: (bytes) -> Dict[str, int]
1489        """Section Header Block"""
1490        opts = self.default_options.copy()
1491        while len(options) >= 4:
1492            code, length = struct.unpack(self.endian + "HH", options[:4])
1493            # PCAP Next Generation (pcapng) Capture File Format
1494            # 4.2. - Interface Description Block
1495            # http://xml2rfc.tools.ietf.org/cgi-bin/xml2rfc.cgi?url=https://raw.githubusercontent.com/pcapng/pcapng/master/draft-tuexen-opsawg-pcapng.xml&modeAsFormat=html/ascii&type=ascii#rfc.section.4.2
1496            if code == 9 and length == 1 and len(options) >= 5:
1497                tsresol = orb(options[4])
1498                opts["tsresol"] = (2 if tsresol & 128 else 10) ** (
1499                    tsresol & 127
1500                )
1501            if code == 0:
1502                if length != 0:
1503                    warning("PcapNg: invalid option length %d for end-of-option" % length)  # noqa: E501
1504                break
1505            if length % 4:
1506                length += (4 - (length % 4))
1507            options = options[4 + length:]
1508        return opts
1509
1510    def _read_block_idb(self, block, _):
1511        # type: (bytes, int) -> None
1512        """Interface Description Block"""
1513        # 2 bytes LinkType + 2 bytes Reserved
1514        # 4 bytes Snaplen
1515        options = self._read_options(block[8:-4])
1516        try:
1517            interface = struct.unpack(  # type: ignore
1518                self.endian + "HxxI",
1519                block[:8]
1520            ) + (options["tsresol"],)  # type: Tuple[int, int, int]
1521        except struct.error:
1522            warning("PcapNg: IDB is too small %d/8 !" % len(block))
1523            raise EOFError
1524        self.interfaces.append(interface)
1525
1526    def _check_interface_id(self, intid):
1527        # type: (int) -> None
1528        """Check the interface id value and raise EOFError if invalid."""
1529        tmp_len = len(self.interfaces)
1530        if intid >= tmp_len:
1531            warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1532            raise EOFError
1533
1534    def _read_block_epb(self, block, size):
1535        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1536        """Enhanced Packet Block"""
1537        try:
1538            intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1539                self.endian + "5I",
1540                block[:20],
1541            )
1542        except struct.error:
1543            warning("PcapNg: EPB is too small %d/20 !" % len(block))
1544            raise EOFError
1545
1546        self._check_interface_id(intid)
1547        return (block[20:20 + caplen][:size],
1548                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
1549                                               tsresol=self.interfaces[intid][2],  # noqa: E501
1550                                               tshigh=tshigh,
1551                                               tslow=tslow,
1552                                               wirelen=wirelen))
1553
1554    def _read_block_spb(self, block, size):
1555        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1556        """Simple Packet Block"""
1557        # "it MUST be assumed that all the Simple Packet Blocks have
1558        # been captured on the interface previously specified in the
1559        # first Interface Description Block."
1560        intid = 0
1561        self._check_interface_id(intid)
1562
1563        try:
1564            wirelen, = struct.unpack(self.endian + "I", block[:4])
1565        except struct.error:
1566            warning("PcapNg: SPB is too small %d/4 !" % len(block))
1567            raise EOFError
1568
1569        caplen = min(wirelen, self.interfaces[intid][1])
1570        return (block[4:4 + caplen][:size],
1571                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
1572                                               tsresol=self.interfaces[intid][2],  # noqa: E501
1573                                               tshigh=None,
1574                                               tslow=None,
1575                                               wirelen=wirelen))
1576
1577    def _read_block_pkt(self, block, size):
1578        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1579        """(Obsolete) Packet Block"""
1580        try:
1581            intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1582                self.endian + "HH4I",
1583                block[:20],
1584            )
1585        except struct.error:
1586            warning("PcapNg: PKT is too small %d/20 !" % len(block))
1587            raise EOFError
1588
1589        self._check_interface_id(intid)
1590        return (block[20:20 + caplen][:size],
1591                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
1592                                               tsresol=self.interfaces[intid][2],  # noqa: E501
1593                                               tshigh=tshigh,
1594                                               tslow=tslow,
1595                                               wirelen=wirelen))
1596
1597
1598class PcapNgReader(RawPcapNgReader, _SuperSocket):
1599
1600    alternative = PcapReader
1601
1602    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1603        # type: (str, IO[bytes], bytes) -> None
1604        RawPcapNgReader.__init__(self, filename, fdesc, magic)
1605
1606    def __enter__(self):
1607        # type: () -> PcapNgReader
1608        return self
1609
1610    def read_packet(self, size=MTU):
1611        # type: (int) -> Packet
1612        rp = super(PcapNgReader, self)._read_packet(size=size)
1613        if rp is None:
1614            raise EOFError
1615        s, (linktype, tsresol, tshigh, tslow, wirelen) = rp
1616        try:
1617            cls = conf.l2types.num2layer[linktype]  # type: Type[Packet]
1618            p = cls(s)  # type: Packet
1619        except KeyboardInterrupt:
1620            raise
1621        except Exception:
1622            if conf.debug_dissector:
1623                raise
1624            if conf.raw_layer is None:
1625                # conf.raw_layer is set on import
1626                import scapy.packet  # noqa: F401
1627            p = conf.raw_layer(s)
1628        if tshigh is not None:
1629            p.time = EDecimal((tshigh << 32) + tslow) / tsresol
1630        p.wirelen = wirelen
1631        return p
1632
1633    def recv(self, size=MTU):
1634        # type: (int) -> Packet
1635        return self.read_packet()
1636
1637
1638class RawPcapWriter:
1639    """A stream PCAP writer with more control than wrpcap()"""
1640
1641    def __init__(self,
1642                 filename,  # type: Union[IO[bytes], str]
1643                 linktype=None,  # type: Optional[int]
1644                 gz=False,  # type: bool
1645                 endianness="",  # type: str
1646                 append=False,  # type: bool
1647                 sync=False,  # type: bool
1648                 nano=False,  # type: bool
1649                 snaplen=MTU,  # type: int
1650                 ):
1651        # type: (...) -> None
1652        """
1653        :param filename: the name of the file to write packets to, or an open,
1654            writable file-like object.
1655        :param linktype: force linktype to a given value. If None, linktype is
1656            taken from the first writer packet
1657        :param gz: compress the capture on the fly
1658        :param endianness: force an endianness (little:"<", big:">").
1659            Default is native
1660        :param append: append packets to the capture file instead of
1661            truncating it
1662        :param sync: do not bufferize writes to the capture file
1663        :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
1664
1665        """
1666
1667        self.linktype = linktype
1668        self.snaplen = snaplen
1669        self.header_present = 0
1670        self.append = append
1671        self.gz = gz
1672        self.endian = endianness
1673        self.sync = sync
1674        self.nano = nano
1675        bufsz = 4096
1676        if sync:
1677            bufsz = 0
1678
1679        if isinstance(filename, str):
1680            self.filename = filename
1681            if gz:
1682                self.f = cast(_ByteStream, gzip.open(
1683                    filename, append and "ab" or "wb", 9
1684                ))
1685            else:
1686                self.f = open(filename, append and "ab" or "wb", bufsz)
1687        else:
1688            self.f = filename
1689            self.filename = getattr(filename, "name", "No name")
1690
1691    def fileno(self):
1692        # type: () -> int
1693        return self.f.fileno()
1694
1695    def write_header(self, pkt):
1696        # type: (Optional[Union[Packet, bytes]]) -> None
1697        return self._write_header(bytes_encode(pkt))
1698
1699    def _write_header(self, pkt):
1700        # type: (Optional[Union[Packet, bytes]]) -> None
1701        self.header_present = 1
1702
1703        if self.append:
1704            # Even if prone to race conditions, this seems to be
1705            # safest way to tell whether the header is already present
1706            # because we have to handle compressed streams that
1707            # are not as flexible as basic files
1708            if self.gz:
1709                g = gzip.open(self.filename, "rb")  # type: _ByteStream
1710            else:
1711                g = open(self.filename, "rb")
1712            try:
1713                if g.read(16):
1714                    return
1715            finally:
1716                g.close()
1717
1718        self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4,  # noqa: E501
1719                                 2, 4, 0, 0, self.snaplen, self.linktype))
1720        self.f.flush()
1721
1722    def write(self, pkt):
1723        # type: (Union[_PacketIterable, bytes]) -> None
1724        """
1725        Writes a Packet, a SndRcvList object, or bytes to a pcap file.
1726
1727        :param pkt: Packet(s) to write (one record for each Packet), or raw
1728                    bytes to write (as one record).
1729        :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
1730        """
1731        if isinstance(pkt, bytes):
1732            if not self.header_present:
1733                self.write_header(pkt)
1734            self.write_packet(pkt)
1735        else:
1736            # Import here to avoid circular dependency
1737            from scapy.supersocket import IterSocket
1738            for p in IterSocket(pkt).iter:
1739                if not self.header_present:
1740                    self.write_header(p)
1741
1742                if not isinstance(p, bytes) and \
1743                        self.linktype != conf.l2types.get(type(p), None):
1744                    warning("Inconsistent linktypes detected!"
1745                            " The resulting PCAP file might contain"
1746                            " invalid packets."
1747                            )
1748
1749                self.write_packet(p)
1750
1751    def write_packet(self,
1752                     packet,  # type: Union[bytes, Packet]
1753                     sec=None,  # type: Optional[int]
1754                     usec=None,  # type: Optional[int]
1755                     caplen=None,  # type: Optional[int]
1756                     wirelen=None,  # type: Optional[int]
1757                     ):
1758        # type: (...) -> None
1759        self._write_packet(
1760            bytes(packet),
1761            sec=sec, usec=usec,
1762            caplen=caplen, wirelen=wirelen
1763        )
1764
1765    def _write_packet(self,
1766                      packet,  # type: bytes
1767                      sec=None,  # type: Optional[int]
1768                      usec=None,  # type: Optional[int]
1769                      caplen=None,  # type: Optional[int]
1770                      wirelen=None  # type: Optional[int]
1771                      ):
1772        # type: (...) -> None
1773        """
1774        Writes a single packet to the pcap file.
1775
1776        :param packet: bytes for a single packet
1777        :type packet: bytes
1778        :param sec: time the packet was captured, in seconds since epoch. If
1779                    not supplied, defaults to now.
1780        :type sec: int or long
1781        :param usec: If ``nano=True``, then number of nanoseconds after the
1782                     second that the packet was captured. If ``nano=False``,
1783                     then the number of microseconds after the second the
1784                     packet was captured
1785        :type usec: int or long
1786        :param caplen: The length of the packet in the capture file. If not
1787                       specified, uses ``len(packet)``.
1788        :type caplen: int
1789        :param wirelen: The length of the packet on the wire. If not
1790                        specified, uses ``caplen``.
1791        :type wirelen: int
1792        :return: None
1793        :rtype: None
1794        """
1795        if caplen is None:
1796            caplen = len(packet)
1797        if wirelen is None:
1798            wirelen = caplen
1799        if sec is None or usec is None:
1800            t = time.time()
1801            it = int(t)
1802            if sec is None:
1803                sec = it
1804                usec = int(round((t - it) *
1805                                 (1000000000 if self.nano else 1000000)))
1806            elif usec is None:
1807                usec = 0
1808
1809        self.f.write(struct.pack(self.endian + "IIII",
1810                                 sec, usec, caplen, wirelen))
1811        self.f.write(packet)
1812        if self.sync:
1813            self.f.flush()
1814
1815    def flush(self):
1816        # type: () -> Optional[Any]
1817        return self.f.flush()
1818
1819    def close(self):
1820        # type: () -> Optional[Any]
1821        if not self.header_present:
1822            self.write_header(None)
1823        return self.f.close()
1824
1825    def __enter__(self):
1826        # type: () -> RawPcapWriter
1827        return self
1828
1829    def __exit__(self, exc_type, exc_value, tracback):
1830        # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1831        self.flush()
1832        self.close()
1833
1834
1835class PcapWriter(RawPcapWriter):
1836    """A stream PCAP writer with more control than wrpcap()"""
1837
1838    def write_header(self, pkt):
1839        # type: (Optional[Union[Packet, bytes]]) -> None
1840        if self.linktype is None:
1841            try:
1842                if pkt is None or isinstance(pkt, bytes):
1843                    # Can't guess LL
1844                    raise KeyError
1845                self.linktype = conf.l2types.layer2num[
1846                    pkt.__class__
1847                ]
1848            except KeyError:
1849                warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)", pkt.__class__.__name__)  # noqa: E501
1850                self.linktype = DLT_EN10MB
1851        self._write_header(pkt)
1852
1853    def write_packet(self,
1854                     packet,  # type: Union[bytes, Packet]
1855                     sec=None,  # type: Optional[int]
1856                     usec=None,  # type: Optional[int]
1857                     caplen=None,  # type: Optional[int]
1858                     wirelen=None,  # type: Optional[int]
1859                     ):
1860        # type: (...) -> None
1861        """
1862        Writes a single packet to the pcap file.
1863
1864        :param packet: Packet, or bytes for a single packet
1865        :type packet: scapy.packet.Packet or bytes
1866        :param sec: time the packet was captured, in seconds since epoch. If
1867                    not supplied, defaults to now.
1868        :type sec: int or long
1869        :param usec: If ``nano=True``, then number of nanoseconds after the
1870                     second that the packet was captured. If ``nano=False``,
1871                     then the number of microseconds after the second the
1872                     packet was captured. If ``sec`` is not specified,
1873                     this value is ignored.
1874        :type usec: int or long
1875        :param caplen: The length of the packet in the capture file. If not
1876                       specified, uses ``len(raw(packet))``.
1877        :type caplen: int
1878        :param wirelen: The length of the packet on the wire. If not
1879                        specified, tries ``packet.wirelen``, otherwise uses
1880                        ``caplen``.
1881        :type wirelen: int
1882        :return: None
1883        :rtype: None
1884        """
1885        if hasattr(packet, "time"):
1886            if sec is None:
1887                sec = int(packet.time)  # type: ignore
1888                usec = int(round((packet.time - sec) *  # type: ignore
1889                                 (1000000000 if self.nano else 1000000)))
1890        if usec is None:
1891            usec = 0
1892
1893        rawpkt = bytes_encode(packet)
1894        caplen = len(rawpkt) if caplen is None else caplen
1895
1896        if wirelen is None:
1897            if hasattr(packet, "wirelen"):
1898                wirelen = packet.wirelen  # type: ignore
1899        if wirelen is None:
1900            wirelen = caplen
1901
1902        self._write_packet(
1903            rawpkt,
1904            sec=sec, usec=usec,
1905            caplen=caplen, wirelen=wirelen
1906        )
1907
1908
1909@conf.commands.register
1910def import_hexcap(input_string=None):
1911    # type: (Optional[str]) -> bytes
1912    """Imports a tcpdump like hexadecimal view
1913
1914    e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
1915
1916    :param input_string: String containing the hexdump input to parse. If None,
1917        read from standard input.
1918    """
1919    re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})")  # noqa: E501
1920    p = ""
1921    try:
1922        if input_string:
1923            input_function = six.StringIO(input_string).readline
1924        else:
1925            input_function = input
1926        while True:
1927            line = input_function().strip()
1928            if not line:
1929                break
1930            try:
1931                p += re_extract_hexcap.match(line).groups()[2]  # type: ignore
1932            except Exception:
1933                warning("Parsing error during hexcap")
1934                continue
1935    except EOFError:
1936        pass
1937
1938    p = p.replace(" ", "")
1939    return hex_bytes(p)
1940
1941
1942@conf.commands.register
1943def wireshark(pktlist, wait=False, **kwargs):
1944    # type: (List[Packet], bool, **Any) -> Optional[Any]
1945    """
1946    Runs Wireshark on a list of packets.
1947
1948    See :func:`tcpdump` for more parameter description.
1949
1950    Note: this defaults to wait=False, to run Wireshark in the background.
1951    """
1952    return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
1953
1954
1955@conf.commands.register
1956def tdecode(
1957    pktlist,  # type: Union[IO[bytes], None, str, _PacketIterable]
1958    args=None,  # type: Optional[List[str]]
1959    **kwargs  # type: Any
1960):
1961    # type: (...) -> Any
1962    """
1963    Run tshark on a list of packets.
1964
1965    :param args: If not specified, defaults to ``tshark -V``.
1966
1967    See :func:`tcpdump` for more parameters.
1968    """
1969    if args is None:
1970        args = ["-V"]
1971    return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
1972
1973
1974def _guess_linktype_name(value):
1975    # type: (int) -> str
1976    """Guess the DLT name from its value."""
1977    from scapy.libs.winpcapy import pcap_datalink_val_to_name
1978    return cast(bytes, pcap_datalink_val_to_name(value)).decode()
1979
1980
1981def _guess_linktype_value(name):
1982    # type: (str) -> int
1983    """Guess the value of a DLT name."""
1984    from scapy.libs.winpcapy import pcap_datalink_name_to_val
1985    val = cast(int, pcap_datalink_name_to_val(name.encode()))
1986    if val == -1:
1987        warning("Unknown linktype: %s. Using EN10MB", name)
1988        return DLT_EN10MB
1989    return val
1990
1991
1992@conf.commands.register
1993def tcpdump(
1994    pktlist=None,  # type: Union[IO[bytes], None, str, _PacketIterable]
1995    dump=False,  # type: bool
1996    getfd=False,  # type: bool
1997    args=None,  # type: Optional[List[str]]
1998    flt=None,  # type: Optional[str]
1999    prog=None,  # type: Optional[Any]
2000    getproc=False,  # type: bool
2001    quiet=False,  # type: bool
2002    use_tempfile=None,  # type: Optional[Any]
2003    read_stdin_opts=None,  # type: Optional[Any]
2004    linktype=None,  # type: Optional[Any]
2005    wait=True,  # type: bool
2006    _suppress=False  # type: bool
2007):
2008    # type: (...) -> Any
2009    """Run tcpdump or tshark on a list of packets.
2010
2011    When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
2012    temporary file to store the packets. This works around a bug in Apple's
2013    version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
2014
2015    Otherwise, the packets are passed in stdin.
2016
2017    This function can be explicitly enabled or disabled with the
2018    ``use_tempfile`` parameter.
2019
2020    When using ``wireshark``, it will be called with ``-ki -`` to start
2021    immediately capturing packets from stdin.
2022
2023    Otherwise, the command will be run with ``-r -`` (which is correct for
2024    ``tcpdump`` and ``tshark``).
2025
2026    This can be overridden with ``read_stdin_opts``. This has no effect when
2027    ``use_tempfile=True``, or otherwise reading packets from a regular file.
2028
2029    :param pktlist: a Packet instance, a PacketList instance or a list of
2030        Packet instances. Can also be a filename (as a string), an open
2031        file-like object that must be a file format readable by
2032        tshark (Pcap, PcapNg, etc.) or None (to sniff)
2033    :param flt: a filter to use with tcpdump
2034    :param dump:    when set to True, returns a string instead of displaying it.
2035    :param getfd:   when set to True, returns a file-like object to read data
2036        from tcpdump or tshark from.
2037    :param getproc: when set to True, the subprocess.Popen object is returned
2038    :param args:    arguments (as a list) to pass to tshark (example for tshark:
2039        args=["-T", "json"]).
2040    :param prog:    program to use (defaults to tcpdump, will work with tshark)
2041    :param quiet:   when set to True, the process stderr is discarded
2042    :param use_tempfile: When set to True, always use a temporary file to store
2043        packets.
2044        When set to False, pipe packets through stdin.
2045        When set to None (default), only use a temporary file with
2046        ``tcpdump`` on OSX.
2047    :param read_stdin_opts: When set, a list of arguments needed to capture
2048        from stdin. Otherwise, attempts to guess.
2049    :param linktype: A custom DLT value or name, to overwrite the default
2050        values.
2051    :param wait: If True (default), waits for the process to terminate before
2052        returning to Scapy. If False, the process will be detached to the
2053        background. If dump, getproc or getfd is True, these have the same
2054        effect as ``wait=False``.
2055
2056    Examples::
2057
2058        >>> tcpdump([IP()/TCP(), IP()/UDP()])
2059        reading from file -, link-type RAW (Raw IP)
2060        16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0  # noqa: E501
2061        16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
2062
2063        >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
2064          1   0.000000    127.0.0.1 -> 127.0.0.1    TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0  # noqa: E501
2065          2   0.000459    127.0.0.1 -> 127.0.0.1    UDP 28 53->53 Len=0
2066
2067    To get a JSON representation of a tshark-parsed PacketList(), one can::
2068
2069        >>> import json, pprint
2070        >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
2071        ...                                  dst="45.33.32.156"),
2072        ...                               prog=conf.prog.tshark,
2073        ...                               args=["-T", "json"],
2074        ...                               getfd=True))
2075        >>> pprint.pprint(json_data)
2076        [{u'_index': u'packets-2016-12-23',
2077          u'_score': None,
2078          u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
2079                                              u'frame.encap_type': u'7',
2080        [...]
2081                                              },
2082                                   u'ip': {u'ip.addr': u'45.33.32.156',
2083                                           u'ip.checksum': u'0x0000a20d',
2084        [...]
2085                                           u'ip.ttl': u'64',
2086                                           u'ip.version': u'4'},
2087                                   u'raw': u'Raw packet data'}},
2088          u'_type': u'pcap_file'}]
2089        >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
2090        u'64'
2091    """
2092    getfd = getfd or getproc
2093    if prog is None:
2094        if not conf.prog.tcpdump:
2095            raise Scapy_Exception(
2096                "tcpdump is not available"
2097            )
2098        prog = [conf.prog.tcpdump]
2099    elif isinstance(prog, six.string_types):
2100        prog = [prog]
2101    else:
2102        raise ValueError("prog must be a string")
2103
2104    if linktype is not None:
2105        if isinstance(linktype, int):
2106            # Guess name from value
2107            try:
2108                linktype_name = _guess_linktype_name(linktype)
2109            except StopIteration:
2110                linktype = -1
2111        else:
2112            # Guess value from name
2113            if linktype.startswith("DLT_"):
2114                linktype = linktype[4:]
2115            linktype_name = linktype
2116            try:
2117                linktype = _guess_linktype_value(linktype)
2118            except KeyError:
2119                linktype = -1
2120        if linktype == -1:
2121            raise ValueError(
2122                "Unknown linktype. Try passing its datalink name instead"
2123            )
2124        prog += ["-y", linktype_name]
2125
2126    # Build Popen arguments
2127    if args is None:
2128        args = []
2129    else:
2130        # Make a copy of args
2131        args = list(args)
2132
2133    if flt is not None:
2134        # Check the validity of the filter
2135        if linktype is None and isinstance(pktlist, str):
2136            # linktype is unknown but required. Read it from file
2137            with PcapReader(pktlist) as rd:
2138                linktype = rd.linktype
2139        from scapy.arch.common import compile_filter
2140        compile_filter(flt, linktype=linktype)
2141        args.append(flt)
2142
2143    stdout = subprocess.PIPE if dump or getfd else None
2144    stderr = open(os.devnull) if quiet else None
2145    proc = None
2146
2147    if use_tempfile is None:
2148        # Apple's tcpdump cannot read from stdin, see:
2149        # http://apple.stackexchange.com/questions/152682/
2150        use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
2151
2152    if read_stdin_opts is None:
2153        if prog[0] == conf.prog.wireshark:
2154            # Start capturing immediately (-k) from stdin (-i -)
2155            read_stdin_opts = ["-ki", "-"]
2156        elif prog[0] == conf.prog.tcpdump and not OPENBSD:
2157            # Capture in packet-buffered mode (-U) from stdin (-r -)
2158            read_stdin_opts = ["-U", "-r", "-"]
2159        else:
2160            read_stdin_opts = ["-r", "-"]
2161    else:
2162        # Make a copy of read_stdin_opts
2163        read_stdin_opts = list(read_stdin_opts)
2164
2165    if pktlist is None:
2166        # sniff
2167        with ContextManagerSubprocess(prog[0], suppress=_suppress):
2168            proc = subprocess.Popen(
2169                prog + args,
2170                stdout=stdout,
2171                stderr=stderr,
2172            )
2173    elif isinstance(pktlist, six.string_types):
2174        # file
2175        with ContextManagerSubprocess(prog[0], suppress=_suppress):
2176            proc = subprocess.Popen(
2177                prog + ["-r", pktlist] + args,
2178                stdout=stdout,
2179                stderr=stderr,
2180            )
2181    elif use_tempfile:
2182        pktlist = cast(Union[IO[bytes], "_PacketIterable"], pktlist)
2183        tmpfile = get_temp_file(  # type: ignore
2184            autoext=".pcap",
2185            fd=True
2186        )  # type: IO[bytes]
2187        try:
2188            tmpfile.writelines(
2189                iter(lambda: pktlist.read(1048576), b"")  # type: ignore
2190            )
2191        except AttributeError:
2192            pktlist = cast("_PacketIterable", pktlist)
2193            wrpcap(tmpfile, pktlist, linktype=linktype)
2194        else:
2195            tmpfile.close()
2196        with ContextManagerSubprocess(prog[0], suppress=_suppress):
2197            proc = subprocess.Popen(
2198                prog + ["-r", tmpfile.name] + args,
2199                stdout=stdout,
2200                stderr=stderr,
2201            )
2202    else:
2203        try:
2204            pktlist.fileno()  # type: ignore
2205            # pass the packet stream
2206            with ContextManagerSubprocess(prog[0], suppress=_suppress):
2207                proc = subprocess.Popen(
2208                    prog + read_stdin_opts + args,
2209                    stdin=pktlist,  # type: ignore
2210                    stdout=stdout,
2211                    stderr=stderr,
2212                )
2213        except (AttributeError, ValueError):
2214            # write the packet stream to stdin
2215            with ContextManagerSubprocess(prog[0], suppress=_suppress):
2216                proc = subprocess.Popen(
2217                    prog + read_stdin_opts + args,
2218                    stdin=subprocess.PIPE,
2219                    stdout=stdout,
2220                    stderr=stderr,
2221                )
2222            if proc is None:
2223                # An error has occurred
2224                return
2225            try:
2226                proc.stdin.writelines(  # type: ignore
2227                    iter(lambda: pktlist.read(1048576), b"")  # type: ignore
2228                )
2229            except AttributeError:
2230                wrpcap(proc.stdin, pktlist, linktype=linktype)  # type: ignore
2231            except UnboundLocalError:
2232                # The error was handled by ContextManagerSubprocess
2233                pass
2234            else:
2235                proc.stdin.close()  # type: ignore
2236    if proc is None:
2237        # An error has occurred
2238        return
2239    if dump:
2240        data = b"".join(
2241            iter(lambda: proc.stdout.read(1048576), b"")  # type: ignore
2242        )
2243        proc.terminate()
2244        return data
2245    if getproc:
2246        return proc
2247    if getfd:
2248        return proc.stdout
2249    if wait:
2250        proc.wait()
2251
2252
2253@conf.commands.register
2254def hexedit(pktlist):
2255    # type: (_PacketIterable) -> PacketList
2256    """Run hexedit on a list of packets, then return the edited packets."""
2257    f = get_temp_file()
2258    wrpcap(f, pktlist)
2259    with ContextManagerSubprocess(conf.prog.hexedit):
2260        subprocess.call([conf.prog.hexedit, f])
2261    rpktlist = rdpcap(f)
2262    os.unlink(f)
2263    return rpktlist
2264
2265
2266def get_terminal_width():
2267    # type: () -> Optional[int]
2268    """Get terminal width (number of characters) if in a window.
2269
2270    Notice: this will try several methods in order to
2271    support as many terminals and OS as possible.
2272    """
2273    # Let's first try using the official API
2274    # (Python 3.3+)
2275    sizex = None  # type: Optional[int]
2276    if not six.PY2:
2277        import shutil
2278        sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
2279        if sizex != 0:
2280            return sizex
2281    # Backups / Python 2.7
2282    if WINDOWS:
2283        from ctypes import windll, create_string_buffer  # type: ignore
2284        # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
2285        h = windll.kernel32.GetStdHandle(-12)
2286        csbi = create_string_buffer(22)
2287        res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
2288        if res:
2289            (bufx, bufy, curx, cury, wattr,
2290             left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)  # noqa: E501
2291            sizex = right - left + 1
2292            # sizey = bottom - top + 1
2293            return sizex
2294        return sizex
2295    else:
2296        # We have various methods
2297        # COLUMNS is set on some terminals
2298        try:
2299            sizex = int(os.environ['COLUMNS'])
2300        except Exception:
2301            pass
2302        if sizex:
2303            return sizex
2304        # We can query TIOCGWINSZ
2305        try:
2306            import fcntl
2307            import termios
2308            s = struct.pack('HHHH', 0, 0, 0, 0)
2309            x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
2310            sizex = struct.unpack('HHHH', x)[1]
2311        except IOError:
2312            pass
2313        return sizex
2314
2315
2316def pretty_list(rtlst,  # type: List[Tuple[Union[str, List[str]], ...]]
2317                header,  # type: List[Tuple[str, ...]]
2318                sortBy=0,  # type: int
2319                borders=False,  # type: bool
2320                ):
2321    # type: (...) -> str
2322    """
2323    Pretty list to fit the terminal, and add header.
2324
2325    :param rtlst: a list of tuples. each tuple contains a value which can
2326        be either a string or a list of string.
2327    :param sortBy: the column id (starting with 0) which whill be used for
2328        ordering
2329    :param borders: whether to put borders on the table or not
2330    """
2331    if borders:
2332        _space = "|"
2333    else:
2334        _space = "  "
2335    cols = len(header[0])
2336    # Windows has a fat terminal border
2337    _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
2338    _croped = False
2339    # Sort correctly
2340    rtlst.sort(key=lambda x: x[sortBy])
2341    # Resolve multi-values
2342    for i, line in enumerate(rtlst):
2343        ids = []  # type: List[int]
2344        values = []  # type: List[Union[str, List[str]]]
2345        for j, val in enumerate(line):
2346            if isinstance(val, list):
2347                ids.append(j)
2348                values.append(val or " ")
2349        if values:
2350            del rtlst[i]
2351            k = 0
2352            for ex_vals in zip_longest(*values, fillvalue=" "):
2353                if k:
2354                    extra_line = [" "] * cols
2355                else:
2356                    extra_line = list(line)  # type: ignore
2357                for j, h in enumerate(ids):
2358                    extra_line[h] = ex_vals[j]
2359                rtlst.insert(i + k, tuple(extra_line))
2360                k += 1
2361    rtslst = cast(List[Tuple[str, ...]], rtlst)
2362    # Append tag
2363    rtslst = header + rtslst
2364    # Detect column's width
2365    colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
2366    # Make text fit in box (if required)
2367    width = get_terminal_width()
2368    if conf.auto_crop_tables and width:
2369        width = width - _spacelen
2370        while sum(colwidth) > width:
2371            _croped = True
2372            # Needs to be cropped
2373            # Get the longest row
2374            i = colwidth.index(max(colwidth))
2375            # Get all elements of this row
2376            row = [len(x[i]) for x in rtslst]
2377            # Get biggest element of this row: biggest of the array
2378            j = row.index(max(row))
2379            # Re-build column tuple with the edited element
2380            t = list(rtslst[j])
2381            t[i] = t[i][:-2] + "_"
2382            rtslst[j] = tuple(t)
2383            # Update max size
2384            row[j] = len(t[i])
2385            colwidth[i] = max(row)
2386    if _croped:
2387        log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)")  # noqa: E501
2388    # Generate padding scheme
2389    fmt = _space.join(["%%-%ds" % x for x in colwidth])
2390    # Append separation line if needed
2391    if borders:
2392        rtslst.insert(1, tuple("-" * x for x in colwidth))
2393    # Compile
2394    return "\n".join(fmt % x for x in rtslst)
2395
2396
2397def __make_table(
2398    yfmtfunc,  # type: Callable[[int], str]
2399    fmtfunc,  # type: Callable[[int], str]
2400    endline,  # type: str
2401    data,  # type: List[Tuple[Packet, Packet]]
2402    fxyz,  # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
2403    sortx=None,  # type: Optional[Callable[[str], Tuple[Any, ...]]]
2404    sorty=None,  # type: Optional[Callable[[str], Tuple[Any, ...]]]
2405    seplinefunc=None,  # type: Optional[Callable[[int, List[int]], str]]
2406    dump=False  # type: bool
2407):
2408    # type: (...) -> Optional[str]
2409    """Core function of the make_table suite, which generates the table"""
2410    vx = {}  # type: Dict[str, int]
2411    vy = {}  # type: Dict[str, Optional[int]]
2412    vz = {}  # type: Dict[Tuple[str, str], str]
2413    vxf = {}  # type: Dict[str, str]
2414
2415    # Python 2 backward compatibility
2416    fxyz = lambda_tuple_converter(fxyz)
2417
2418    tmp_len = 0
2419    for e in data:
2420        xx, yy, zz = [str(s) for s in fxyz(*e)]
2421        tmp_len = max(len(yy), tmp_len)
2422        vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
2423        vy[yy] = None
2424        vz[(xx, yy)] = zz
2425
2426    vxk = list(vx)
2427    vyk = list(vy)
2428    if sortx:
2429        vxk.sort(key=sortx)
2430    else:
2431        try:
2432            vxk.sort(key=int)
2433        except Exception:
2434            try:
2435                vxk.sort(key=atol)
2436            except Exception:
2437                vxk.sort()
2438    if sorty:
2439        vyk.sort(key=sorty)
2440    else:
2441        try:
2442            vyk.sort(key=int)
2443        except Exception:
2444            try:
2445                vyk.sort(key=atol)
2446            except Exception:
2447                vyk.sort()
2448
2449    s = ""
2450    if seplinefunc:
2451        sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
2452        s += sepline + "\n"
2453
2454    fmt = yfmtfunc(tmp_len)
2455    s += fmt % ""
2456    s += ' '
2457    for x in vxk:
2458        vxf[x] = fmtfunc(vx[x])
2459        s += vxf[x] % x
2460        s += ' '
2461    s += endline + "\n"
2462    if seplinefunc:
2463        s += sepline + "\n"
2464    for y in vyk:
2465        s += fmt % y
2466        s += ' '
2467        for x in vxk:
2468            s += vxf[x] % vz.get((x, y), "-")
2469            s += ' '
2470        s += endline + "\n"
2471    if seplinefunc:
2472        s += sepline + "\n"
2473
2474    if dump:
2475        return s
2476    else:
2477        print(s, end="")
2478        return None
2479
2480
2481def make_table(*args, **kargs):
2482    # type: (*Any, **Any) -> Optional[Any]
2483    return __make_table(
2484        lambda l: "%%-%is" % l,
2485        lambda l: "%%-%is" % l,
2486        "",
2487        *args,
2488        **kargs
2489    )
2490
2491
2492def make_lined_table(*args, **kargs):
2493    # type: (*Any, **Any) -> Optional[str]
2494    return __make_table(  # type: ignore
2495        lambda l: "%%-%is |" % l,
2496        lambda l: "%%-%is |" % l,
2497        "",
2498        *args,
2499        seplinefunc=lambda a, x: "+".join(
2500            '-' * (y + 2) for y in [a - 1] + x + [-2]
2501        ),
2502        **kargs
2503    )
2504
2505
2506def make_tex_table(*args, **kargs):
2507    # type: (*Any, **Any) -> Optional[str]
2508    return __make_table(  # type: ignore
2509        lambda l: "%s",
2510        lambda l: "& %s",
2511        "\\\\",
2512        *args,
2513        seplinefunc=lambda a, x: "\\hline",
2514        **kargs
2515    )
2516
2517####################
2518#   WHOIS CLIENT   #
2519####################
2520
2521
2522def whois(ip_address):
2523    # type: (str) -> bytes
2524    """Whois client for Python"""
2525    whois_ip = str(ip_address)
2526    try:
2527        query = socket.gethostbyname(whois_ip)
2528    except Exception:
2529        query = whois_ip
2530    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2531    s.connect(("whois.ripe.net", 43))
2532    s.send(query.encode("utf8") + b"\r\n")
2533    answer = b""
2534    while True:
2535        d = s.recv(4096)
2536        answer += d
2537        if not d:
2538            break
2539    s.close()
2540    ignore_tag = b"remarks:"
2541    # ignore all lines starting with the ignore_tag
2542    lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))]  # noqa: E501
2543    # remove empty lines at the bottom
2544    for i in range(1, len(lines)):
2545        if not lines[-i].strip():
2546            del lines[-i]
2547        else:
2548            break
2549    return b"\n".join(lines[3:])
2550
2551#######################
2552#   PERIODIC SENDER   #
2553#######################
2554
2555
2556class PeriodicSenderThread(threading.Thread):
2557    def __init__(self, sock, pkt, interval=0.5):
2558        # type: (Any, _PacketIterable, float) -> None
2559        """ Thread to send packets periodically
2560
2561        Args:
2562            sock: socket where packet is sent periodically
2563            pkt: packet or list of packets to send
2564            interval: interval between two packets
2565        """
2566        if not isinstance(pkt, list):
2567            self._pkts = [cast("Packet", pkt)]  # type: _PacketIterable
2568        else:
2569            self._pkts = pkt
2570        self._socket = sock
2571        self._stopped = threading.Event()
2572        self._interval = interval
2573        threading.Thread.__init__(self)
2574
2575    def run(self):
2576        # type: () -> None
2577        while not self._stopped.is_set():
2578            for p in self._pkts:
2579                self._socket.send(p)
2580                time.sleep(self._interval)
2581                if self._stopped.is_set():
2582                    break
2583
2584    def stop(self):
2585        # type: () -> None
2586        self._stopped.set()
2587
2588
2589class SingleConversationSocket(object):
2590    def __init__(self, o):
2591        # type: (Any) -> None
2592        self._inner = o
2593        self._tx_mutex = threading.RLock()
2594
2595    @property
2596    def __dict__(self):  # type: ignore
2597        return self._inner.__dict__
2598
2599    def __getattr__(self, name):
2600        # type: (str) -> Any
2601        return getattr(self._inner, name)
2602
2603    def sr1(self, *args, **kargs):
2604        # type: (*Any, **Any) -> Any
2605        with self._tx_mutex:
2606            return self._inner.sr1(*args, **kargs)
2607
2608    def sr(self, *args, **kargs):
2609        # type: (*Any, **Any) -> Any
2610        with self._tx_mutex:
2611            return self._inner.sr(*args, **kargs)
2612
2613    def send(self, x):
2614        # type: (Packet) -> Any
2615        with self._tx_mutex:
2616            return self._inner.send(x)
2617