1# This file is part of Scapy 2# See http://www.secdev.org/projects/scapy for more information 3# Copyright (C) Philippe Biondi <phil@secdev.org> 4# This program is published under a GPLv2 license 5 6""" 7General utility functions. 8""" 9 10from __future__ import absolute_import 11from __future__ import print_function 12 13from decimal import Decimal 14 15import array 16import collections 17import decimal 18import difflib 19import gzip 20import os 21import random 22import re 23import socket 24import struct 25import subprocess 26import sys 27import tempfile 28import threading 29import time 30import warnings 31 32import scapy.modules.six as six 33from scapy.modules.six.moves import range, input, zip_longest 34 35from scapy.config import conf 36from scapy.consts import DARWIN, OPENBSD, WINDOWS 37from scapy.data import MTU, DLT_EN10MB 38from scapy.compat import orb, plain_str, chb, bytes_base64,\ 39 base64_bytes, hex_bytes, lambda_tuple_converter, bytes_encode 40from scapy.error import log_runtime, Scapy_Exception, warning 41from scapy.pton_ntop import inet_pton 42 43# Typing imports 44from scapy.compat import ( 45 cast, 46 Any, 47 AnyStr, 48 Callable, 49 Dict, 50 IO, 51 Iterator, 52 List, 53 Literal, 54 Optional, 55 TYPE_CHECKING, 56 Tuple, 57 Type, 58 Union, 59 overload, 60) 61 62if TYPE_CHECKING: 63 from scapy.packet import Packet 64 from scapy.plist import _PacketIterable, PacketList 65 from scapy.supersocket import SuperSocket 66 _SuperSocket = SuperSocket 67else: 68 _SuperSocket = object 69 70_ByteStream = Union[IO[bytes], gzip.GzipFile] 71 72########### 73# Tools # 74########### 75 76 77def issubtype(x, # type: Any 78 t, # type: Union[type, str] 79 ): 80 # type: (...) -> bool 81 """issubtype(C, B) -> bool 82 83 Return whether C is a class and if it is a subclass of class B. 84 When using a tuple as the second argument issubtype(X, (A, B, ...)), 85 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 86 """ 87 if isinstance(t, str): 88 return t in (z.__name__ for z in x.__bases__) 89 if isinstance(x, type) and issubclass(x, t): 90 return True 91 return False 92 93 94_Decimal = Union[Decimal, int] 95 96 97class EDecimal(Decimal): 98 """Extended Decimal 99 100 This implements arithmetic and comparison with float for 101 backward compatibility 102 """ 103 104 def __add__(self, other, context=None): 105 # type: (_Decimal, Any) -> EDecimal 106 return EDecimal(Decimal.__add__(self, Decimal(other))) 107 108 def __radd__(self, other): 109 # type: (_Decimal) -> EDecimal 110 return EDecimal(Decimal.__add__(self, Decimal(other))) 111 112 def __sub__(self, other): 113 # type: (_Decimal) -> EDecimal 114 return EDecimal(Decimal.__sub__(self, Decimal(other))) 115 116 def __rsub__(self, other): 117 # type: (_Decimal) -> EDecimal 118 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 119 120 def __mul__(self, other): 121 # type: (_Decimal) -> EDecimal 122 return EDecimal(Decimal.__mul__(self, Decimal(other))) 123 124 def __rmul__(self, other): 125 # type: (_Decimal) -> EDecimal 126 return EDecimal(Decimal.__mul__(self, Decimal(other))) 127 128 def __truediv__(self, other): 129 # type: (_Decimal) -> EDecimal 130 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 131 132 def __floordiv__(self, other): 133 # type: (_Decimal) -> EDecimal 134 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 135 136 if sys.version_info >= (3,): 137 def __divmod__(self, other): 138 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 139 r = Decimal.__divmod__(self, Decimal(other)) 140 return EDecimal(r[0]), EDecimal(r[1]) 141 else: 142 def __div__(self, other): 143 # type: (_Decimal) -> EDecimal 144 return EDecimal(Decimal.__div__(self, Decimal(other))) 145 146 def __rdiv__(self, other): 147 # type: (_Decimal) -> EDecimal 148 return EDecimal(Decimal.__rdiv__(self, Decimal(other))) 149 150 def __mod__(self, other): 151 # type: (_Decimal) -> EDecimal 152 return EDecimal(Decimal.__mod__(self, Decimal(other))) 153 154 def __rmod__(self, other): 155 # type: (_Decimal) -> EDecimal 156 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 157 158 def __pow__(self, other, modulo=None): 159 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 160 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 161 162 def __eq__(self, other): 163 # type: (Any) -> bool 164 return super(EDecimal, self).__eq__(other) or float(self) == other 165 166 def normalize(self, precision): # type: ignore 167 # type: (int) -> EDecimal 168 with decimal.localcontext() as ctx: 169 ctx.prec = precision 170 return EDecimal(super(EDecimal, self).normalize(ctx)) 171 172 173@overload 174def get_temp_file(keep, autoext, fd): 175 # type: (bool, str, Literal[True]) -> IO[bytes] 176 pass 177 178 179@overload 180def get_temp_file(keep=False, autoext="", fd=False): # noqa: F811 181 # type: (bool, str, Literal[False]) -> str 182 pass 183 184 185def get_temp_file(keep=False, autoext="", fd=False): # noqa: F811 186 # type: (bool, str, bool) -> Union[IO[bytes], str] 187 """Creates a temporary file. 188 189 :param keep: If False, automatically delete the file when Scapy exits. 190 :param autoext: Suffix to add to the generated file name. 191 :param fd: If True, this returns a file-like object with the temporary 192 file opened. If False (default), this returns a file path. 193 """ 194 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 195 delete=False) 196 if not keep: 197 conf.temp_files.append(f.name) 198 199 if fd: 200 return f 201 else: 202 # Close the file so something else can take it. 203 f.close() 204 return f.name 205 206 207def get_temp_dir(keep=False): 208 # type: (bool) -> str 209 """Creates a temporary file, and returns its name. 210 211 :param keep: If False (default), the directory will be recursively 212 deleted when Scapy exits. 213 :return: A full path to a temporary directory. 214 """ 215 216 dname = tempfile.mkdtemp(prefix="scapy") 217 218 if not keep: 219 conf.temp_files.append(dname) 220 221 return dname 222 223 224def sane(x, color=False): 225 # type: (AnyStr, bool) -> str 226 r = "" 227 for i in x: 228 j = orb(i) 229 if (j < 32) or (j >= 127): 230 if color: 231 r += conf.color_theme.not_printable(".") 232 else: 233 r += "." 234 else: 235 r += chr(j) 236 return r 237 238 239@conf.commands.register 240def restart(): 241 # type: () -> None 242 """Restarts scapy""" 243 if not conf.interactive or not os.path.isfile(sys.argv[0]): 244 raise OSError("Scapy was not started from console") 245 if WINDOWS: 246 try: 247 res_code = subprocess.call([sys.executable] + sys.argv) 248 except KeyboardInterrupt: 249 res_code = 1 250 finally: 251 os._exit(res_code) 252 os.execv(sys.executable, [sys.executable] + sys.argv) 253 254 255def lhex(x): 256 # type: (Any) -> str 257 from scapy.volatile import VolatileValue 258 if isinstance(x, VolatileValue): 259 return repr(x) 260 if type(x) in six.integer_types: 261 return hex(x) 262 elif isinstance(x, tuple): 263 return "(%s)" % ", ".join(map(lhex, x)) 264 elif isinstance(x, list): 265 return "[%s]" % ", ".join(map(lhex, x)) 266 else: 267 return str(x) 268 269 270@conf.commands.register 271def hexdump(p, dump=False): 272 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 273 """Build a tcpdump like hexadecimal view 274 275 :param p: a Packet 276 :param dump: define if the result must be printed or returned in a variable 277 :return: a String only when dump=True 278 """ 279 s = "" 280 x = bytes_encode(p) 281 x_len = len(x) 282 i = 0 283 while i < x_len: 284 s += "%04x " % i 285 for j in range(16): 286 if i + j < x_len: 287 s += "%02X " % orb(x[i + j]) 288 else: 289 s += " " 290 s += " %s\n" % sane(x[i:i + 16], color=True) 291 i += 16 292 # remove trailing \n 293 s = s[:-1] if s.endswith("\n") else s 294 if dump: 295 return s 296 else: 297 print(s) 298 return None 299 300 301@conf.commands.register 302def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 303 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 304 """Build an equivalent view of hexdump() on a single line 305 306 Note that setting both onlyasc and onlyhex to 1 results in a empty output 307 308 :param p: a Packet 309 :param onlyasc: 1 to display only the ascii view 310 :param onlyhex: 1 to display only the hexadecimal view 311 :param dump: print the view if False 312 :return: a String only when dump=True 313 """ 314 s = "" 315 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 316 if dump: 317 return s 318 else: 319 print(s) 320 return None 321 322 323@conf.commands.register 324def chexdump(p, dump=False): 325 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 326 """Build a per byte hexadecimal representation 327 328 Example: 329 >>> chexdump(IP()) 330 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 331 332 :param p: a Packet 333 :param dump: print the view if False 334 :return: a String only if dump=True 335 """ 336 x = bytes_encode(p) 337 s = ", ".join("%#04x" % orb(x) for x in x) 338 if dump: 339 return s 340 else: 341 print(s) 342 return None 343 344 345@conf.commands.register 346def hexstr(p, onlyasc=0, onlyhex=0, color=False): 347 # type: (Union[Packet, AnyStr], int, int, bool) -> str 348 """Build a fancy tcpdump like hex from bytes.""" 349 x = bytes_encode(p) 350 s = [] 351 if not onlyasc: 352 s.append(" ".join("%02X" % orb(b) for b in x)) 353 if not onlyhex: 354 s.append(sane(x, color=color)) 355 return " ".join(s) 356 357 358def repr_hex(s): 359 # type: (bytes) -> str 360 """ Convert provided bitstring to a simple string of hex digits """ 361 return "".join("%02x" % orb(x) for x in s) 362 363 364@conf.commands.register 365def hexdiff(a, b, autojunk=False): 366 # type: (Union[Packet, AnyStr], Union[Packet, AnyStr], bool) -> None 367 """ 368 Show differences between 2 binary strings, Packets... 369 370 For the autojunk parameter, see 371 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 372 373 :param a: 374 :param b: The binary strings, packets... to compare 375 :param autojunk: Setting it to True will likely increase the comparison 376 speed a lot on big byte strings, but will reduce accuracy (will tend 377 to miss insertion and see replacements instead for instance). 378 """ 379 380 # Compare the strings using difflib 381 382 xb = bytes_encode(a) 383 yb = bytes_encode(b) 384 385 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 386 xarr = [xb[i:i + 1] for i in range(len(xb))] 387 yarr = [yb[i:i + 1] for i in range(len(yb))] 388 389 backtrackx = [] 390 backtracky = [] 391 for opcode in sm.get_opcodes(): 392 typ, x0, x1, y0, y1 = opcode 393 if typ == 'delete': 394 backtrackx += xarr[x0:x1] 395 backtracky += [b''] * (x1 - x0) 396 elif typ == 'insert': 397 backtrackx += [b''] * (y1 - y0) 398 backtracky += yarr[y0:y1] 399 elif typ in ['equal', 'replace']: 400 backtrackx += xarr[x0:x1] 401 backtracky += yarr[y0:y1] 402 403 if autojunk: 404 # Some lines may have been considered as junk. Check the sizes 405 lbx = len(backtrackx) 406 lby = len(backtracky) 407 backtrackx += [b''] * (max(lbx, lby) - lbx) 408 backtracky += [b''] * (max(lbx, lby) - lby) 409 410 # Print the diff 411 412 x = y = i = 0 413 colorize = {0: lambda x: x, 414 -1: conf.color_theme.left, 415 1: conf.color_theme.right} 416 417 dox = 1 418 doy = 0 419 btx_len = len(backtrackx) 420 while i < btx_len: 421 linex = backtrackx[i:i + 16] 422 liney = backtracky[i:i + 16] 423 xx = sum(len(k) for k in linex) 424 yy = sum(len(k) for k in liney) 425 if dox and not xx: 426 dox = 0 427 doy = 1 428 if dox and linex == liney: 429 doy = 1 430 431 if dox: 432 xd = y 433 j = 0 434 while not linex[j]: 435 j += 1 436 xd -= 1 437 print(colorize[doy - dox]("%04x" % xd), end=' ') 438 x += xx 439 line = linex 440 else: 441 print(" ", end=' ') 442 if doy: 443 yd = y 444 j = 0 445 while not liney[j]: 446 j += 1 447 yd -= 1 448 print(colorize[doy - dox]("%04x" % yd), end=' ') 449 y += yy 450 line = liney 451 else: 452 print(" ", end=' ') 453 454 print(" ", end=' ') 455 456 cl = "" 457 for j in range(16): 458 if i + j < btx_len: 459 if line[j]: 460 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 461 print(col("%02X" % orb(line[j])), end=' ') 462 if linex[j] == liney[j]: 463 cl += sane(line[j], color=True) 464 else: 465 cl += col(sane(line[j])) 466 else: 467 print(" ", end=' ') 468 cl += " " 469 else: 470 print(" ", end=' ') 471 if j == 7: 472 print("", end=' ') 473 474 print(" ", cl) 475 476 if doy or not yy: 477 doy = 0 478 dox = 1 479 i += 16 480 else: 481 if yy: 482 dox = 0 483 doy = 1 484 else: 485 i += 16 486 487 488if struct.pack("H", 1) == b"\x00\x01": # big endian 489 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 490else: 491 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 492 493 494def checksum(pkt): 495 # type: (bytes) -> int 496 if len(pkt) % 2 == 1: 497 pkt += b"\0" 498 s = sum(array.array("H", pkt)) 499 s = (s >> 16) + (s & 0xffff) 500 s += s >> 16 501 s = ~s 502 return checksum_endian_transform(s) & 0xffff 503 504 505def _fletcher16(charbuf): 506 # type: (bytes) -> Tuple[int, int] 507 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 508 c0 = c1 = 0 509 for char in charbuf: 510 c0 += orb(char) 511 c1 += c0 512 513 c0 %= 255 514 c1 %= 255 515 return (c0, c1) 516 517 518@conf.commands.register 519def fletcher16_checksum(binbuf): 520 # type: (bytes) -> int 521 """Calculates Fletcher-16 checksum of the given buffer. 522 523 Note: 524 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 525 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 526 """ 527 (c0, c1) = _fletcher16(binbuf) 528 return (c1 << 8) | c0 529 530 531@conf.commands.register 532def fletcher16_checkbytes(binbuf, offset): 533 # type: (bytes, int) -> bytes 534 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 535 536 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 537 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 538 the integrity of the buffer on the receiver side. 539 540 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 541 """ 542 543 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 544 if len(binbuf) < offset: 545 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 546 547 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 548 (c0, c1) = _fletcher16(binbuf) 549 550 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 551 552 if (x <= 0): 553 x += 255 554 555 y = 510 - c0 - x 556 557 if (y > 255): 558 y -= 255 559 return chb(x) + chb(y) 560 561 562def mac2str(mac): 563 # type: (str) -> bytes 564 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 565 566 567def valid_mac(mac): 568 # type: (str) -> bool 569 try: 570 return len(mac2str(mac)) == 6 571 except ValueError: 572 pass 573 return False 574 575 576def str2mac(s): 577 # type: (bytes) -> str 578 if isinstance(s, str): 579 return ("%02x:" * 6)[:-1] % tuple(map(ord, s)) 580 return ("%02x:" * 6)[:-1] % tuple(s) 581 582 583def randstring(length): 584 # type: (int) -> bytes 585 """ 586 Returns a random string of length (length >= 0) 587 """ 588 return b"".join(struct.pack('B', random.randint(0, 255)) 589 for _ in range(length)) 590 591 592def zerofree_randstring(length): 593 # type: (int) -> bytes 594 """ 595 Returns a random string of length (length >= 0) without zero in it. 596 """ 597 return b"".join(struct.pack('B', random.randint(1, 255)) 598 for _ in range(length)) 599 600 601def strxor(s1, s2): 602 # type: (bytes, bytes) -> bytes 603 """ 604 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 605 must be of same length. 606 """ 607 return b"".join(map(lambda x, y: chb(orb(x) ^ orb(y)), s1, s2)) 608 609 610def strand(s1, s2): 611 # type: (bytes, bytes) -> bytes 612 """ 613 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 614 must be of same length. 615 """ 616 return b"".join(map(lambda x, y: chb(orb(x) & orb(y)), s1, s2)) 617 618 619# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 620try: 621 socket.inet_aton("255.255.255.255") 622except socket.error: 623 def inet_aton(ip_string): 624 # type: (str) -> bytes 625 if ip_string == "255.255.255.255": 626 return b"\xff" * 4 627 else: 628 return socket.inet_aton(ip_string) 629else: 630 inet_aton = socket.inet_aton 631 632inet_ntoa = socket.inet_ntoa 633 634 635def atol(x): 636 # type: (str) -> int 637 try: 638 ip = inet_aton(x) 639 except socket.error: 640 ip = inet_aton(socket.gethostbyname(x)) 641 return cast(int, struct.unpack("!I", ip)[0]) 642 643 644def valid_ip(addr): 645 # type: (str) -> bool 646 try: 647 addr = plain_str(addr) 648 except UnicodeDecodeError: 649 return False 650 try: 651 atol(addr) 652 except (OSError, ValueError, socket.error): 653 return False 654 return True 655 656 657def valid_net(addr): 658 # type: (str) -> bool 659 try: 660 addr = plain_str(addr) 661 except UnicodeDecodeError: 662 return False 663 if '/' in addr: 664 ip, mask = addr.split('/', 1) 665 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 666 return valid_ip(addr) 667 668 669def valid_ip6(addr): 670 # type: (str) -> bool 671 try: 672 addr = plain_str(addr) 673 except UnicodeDecodeError: 674 return False 675 try: 676 inet_pton(socket.AF_INET6, addr) 677 except socket.error: 678 try: 679 socket.getaddrinfo(addr, None, socket.AF_INET6)[0][4][0] 680 except socket.error: 681 return False 682 return True 683 684 685def valid_net6(addr): 686 # type: (str) -> bool 687 try: 688 addr = plain_str(addr) 689 except UnicodeDecodeError: 690 return False 691 if '/' in addr: 692 ip, mask = addr.split('/', 1) 693 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 694 return valid_ip6(addr) 695 696 697def ltoa(x): 698 # type: (int) -> str 699 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 700 701 702def itom(x): 703 # type: (int) -> int 704 return (0xffffffff00000000 >> x) & 0xffffffff 705 706 707class ContextManagerSubprocess(object): 708 """ 709 Context manager that eases checking for unknown command, without 710 crashing. 711 712 Example: 713 >>> with ContextManagerSubprocess("tcpdump"): 714 >>> subprocess.Popen(["tcpdump", "--version"]) 715 ERROR: Could not execute tcpdump, is it installed? 716 717 """ 718 719 def __init__(self, prog, suppress=True): 720 # type: (str, bool) -> None 721 self.prog = prog 722 self.suppress = suppress 723 724 def __enter__(self): 725 # type: () -> None 726 pass 727 728 def __exit__(self, 729 exc_type, # type: Optional[type] 730 exc_value, # type: Optional[Exception] 731 traceback, # type: Optional[Any] 732 ): 733 # type: (...) -> Optional[bool] 734 if exc_value is None or exc_type is None: 735 return None 736 # Errored 737 if isinstance(exc_value, EnvironmentError): 738 msg = "Could not execute %s, is it installed?" % self.prog 739 else: 740 msg = "%s: execution failed (%s)" % ( 741 self.prog, 742 exc_type.__class__.__name__ 743 ) 744 if not self.suppress: 745 raise exc_type(msg) 746 log_runtime.error(msg, exc_info=True) 747 return True # Suppress the exception 748 749 750class ContextManagerCaptureOutput(object): 751 """ 752 Context manager that intercept the console's output. 753 754 Example: 755 >>> with ContextManagerCaptureOutput() as cmco: 756 ... print("hey") 757 ... assert cmco.get_output() == "hey" 758 """ 759 760 def __init__(self): 761 # type: () -> None 762 self.result_export_object = "" 763 try: 764 import mock # noqa: F401 765 except Exception: 766 raise ImportError("The mock module needs to be installed !") 767 768 def __enter__(self): 769 # type: () -> ContextManagerCaptureOutput 770 import mock 771 772 def write(s, decorator=self): 773 # type: (str, ContextManagerCaptureOutput) -> None 774 decorator.result_export_object += s 775 mock_stdout = mock.Mock() 776 mock_stdout.write = write 777 self.bck_stdout = sys.stdout 778 sys.stdout = mock_stdout 779 return self 780 781 def __exit__(self, *exc): 782 # type: (*Any) -> Literal[False] 783 sys.stdout = self.bck_stdout 784 return False 785 786 def get_output(self, eval_bytes=False): 787 # type: (bool) -> str 788 if self.result_export_object.startswith("b'") and eval_bytes: 789 return plain_str(eval(self.result_export_object)) 790 return self.result_export_object 791 792 793def do_graph( 794 graph, # type: str 795 prog=None, # type: Optional[str] 796 format=None, # type: Optional[str] 797 target=None, # type: Optional[Union[IO[bytes], str]] 798 type=None, # type: Optional[str] 799 string=None, # type: Optional[bool] 800 options=None # type: Optional[List[str]] 801): 802 # type: (...) -> Optional[str] 803 """Processes graph description using an external software. 804 This method is used to convert a graphviz format to an image. 805 806 :param graph: GraphViz graph description 807 :param prog: which graphviz program to use 808 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 809 option 810 :param string: if not None, simply return the graph string 811 :param target: filename or redirect. Defaults pipe to Imagemagick's 812 display program 813 :param options: options to be passed to prog 814 """ 815 816 if format is None: 817 format = "svg" 818 if string: 819 return graph 820 if type is not None: 821 warnings.warn( 822 "type is deprecated, and was renamed format", 823 DeprecationWarning 824 ) 825 format = type 826 if prog is None: 827 prog = conf.prog.dot 828 start_viewer = False 829 if target is None: 830 if WINDOWS: 831 target = get_temp_file(autoext="." + format) 832 start_viewer = True 833 else: 834 with ContextManagerSubprocess(conf.prog.display): 835 target = subprocess.Popen([conf.prog.display], 836 stdin=subprocess.PIPE).stdin 837 if format is not None: 838 format = "-T%s" % format 839 if isinstance(target, str): 840 if target.startswith('|'): 841 target = subprocess.Popen(target[1:].lstrip(), shell=True, 842 stdin=subprocess.PIPE).stdin 843 elif target.startswith('>'): 844 target = open(target[1:].lstrip(), "wb") 845 else: 846 target = open(os.path.abspath(target), "wb") 847 target = cast(IO[bytes], target) 848 proc = subprocess.Popen( 849 "\"%s\" %s %s" % (prog, options or "", format or ""), 850 shell=True, stdin=subprocess.PIPE, stdout=target, 851 stderr=subprocess.PIPE 852 ) 853 _, stderr = proc.communicate(bytes_encode(graph)) 854 if proc.returncode != 0: 855 raise OSError( 856 "GraphViz call failed (is it installed?):\n" + 857 plain_str(stderr) 858 ) 859 try: 860 target.close() 861 except Exception: 862 pass 863 if start_viewer: 864 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 865 waiting_start = time.time() 866 while not os.path.exists(target.name): 867 time.sleep(0.1) 868 if time.time() - waiting_start > 3: 869 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 870 break 871 else: 872 if conf.prog.display == conf.prog._default: 873 os.startfile(target.name) # type: ignore 874 else: 875 with ContextManagerSubprocess(conf.prog.display): 876 subprocess.Popen([conf.prog.display, target.name]) 877 return None 878 879 880_TEX_TR = { 881 "{": "{\\tt\\char123}", 882 "}": "{\\tt\\char125}", 883 "\\": "{\\tt\\char92}", 884 "^": "\\^{}", 885 "$": "\\$", 886 "#": "\\#", 887 "_": "\\_", 888 "&": "\\&", 889 "%": "\\%", 890 "|": "{\\tt\\char124}", 891 "~": "{\\tt\\char126}", 892 "<": "{\\tt\\char60}", 893 ">": "{\\tt\\char62}", 894} 895 896 897def tex_escape(x): 898 # type: (str) -> str 899 s = "" 900 for c in x: 901 s += _TEX_TR.get(c, c) 902 return s 903 904 905def colgen(*lstcol, # type: Any 906 **kargs # type: Any 907 ): 908 # type: (...) -> Iterator[Any] 909 """Returns a generator that mixes provided quantities forever 910 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 911 if len(lstcol) < 2: 912 lstcol *= 2 913 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 914 while True: 915 for i in range(len(lstcol)): 916 for j in range(len(lstcol)): 917 for k in range(len(lstcol)): 918 if i != j or j != k or k != i: 919 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 920 921 922def incremental_label(label="tag%05i", start=0): 923 # type: (str, int) -> Iterator[str] 924 while True: 925 yield label % start 926 start += 1 927 928 929def binrepr(val): 930 # type: (int) -> str 931 return bin(val)[2:] 932 933 934def long_converter(s): 935 # type: (str) -> int 936 return int(s.replace('\n', '').replace(' ', ''), 16) 937 938######################### 939# Enum management # 940######################### 941 942 943class EnumElement: 944 def __init__(self, key, value): 945 # type: (str, int) -> None 946 self._key = key 947 self._value = value 948 949 def __repr__(self): 950 # type: () -> str 951 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 952 953 def __getattr__(self, attr): 954 # type: (str) -> Any 955 return getattr(self._value, attr) 956 957 def __str__(self): 958 # type: () -> str 959 return self._key 960 961 def __bytes__(self): 962 # type: () -> bytes 963 return bytes_encode(self.__str__()) 964 965 def __hash__(self): 966 # type: () -> int 967 return self._value 968 969 def __int__(self): 970 # type: () -> int 971 return int(self._value) 972 973 def __eq__(self, other): 974 # type: (Any) -> bool 975 return self._value == int(other) 976 977 def __neq__(self, other): 978 # type: (Any) -> bool 979 return not self.__eq__(other) 980 981 982class Enum_metaclass(type): 983 element_class = EnumElement 984 985 def __new__(cls, name, bases, dct): 986 # type: (Any, str, Any, Dict[str, Any]) -> Any 987 rdict = {} 988 for k, v in six.iteritems(dct): 989 if isinstance(v, int): 990 v = cls.element_class(k, v) 991 dct[k] = v 992 rdict[v] = k 993 dct["__rdict__"] = rdict 994 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 995 996 def __getitem__(self, attr): 997 # type: (int) -> Any 998 return self.__rdict__[attr] # type: ignore 999 1000 def __contains__(self, val): 1001 # type: (int) -> bool 1002 return val in self.__rdict__ # type: ignore 1003 1004 def get(self, attr, val=None): 1005 # type: (str, Optional[Any]) -> Any 1006 return self.__rdict__.get(attr, val) # type: ignore 1007 1008 def __repr__(self): 1009 # type: () -> str 1010 return "<%s>" % self.__dict__.get("name", self.__name__) 1011 1012 1013################### 1014# Object saving # 1015################### 1016 1017 1018def export_object(obj): 1019 # type: (Any) -> None 1020 import zlib 1021 print(bytes_base64(zlib.compress(six.moves.cPickle.dumps(obj, 2), 9))) 1022 1023 1024def import_object(obj=None): 1025 # type: (Optional[str]) -> Any 1026 import zlib 1027 if obj is None: 1028 obj = sys.stdin.read() 1029 return six.moves.cPickle.loads(zlib.decompress(base64_bytes(obj.strip()))) # noqa: E501 1030 1031 1032def save_object(fname, obj): 1033 # type: (str, Any) -> None 1034 """Pickle a Python object""" 1035 1036 fd = gzip.open(fname, "wb") 1037 six.moves.cPickle.dump(obj, fd) 1038 fd.close() 1039 1040 1041def load_object(fname): 1042 # type: (str) -> Any 1043 """unpickle a Python object""" 1044 return six.moves.cPickle.load(gzip.open(fname, "rb")) 1045 1046 1047@conf.commands.register 1048def corrupt_bytes(data, p=0.01, n=None): 1049 # type: (str, float, Optional[int]) -> bytes 1050 """ 1051 Corrupt a given percentage (at least one byte) or number of bytes 1052 from a string 1053 """ 1054 s = array.array("B", bytes_encode(data)) 1055 s_len = len(s) 1056 if n is None: 1057 n = max(1, int(s_len * p)) 1058 for i in random.sample(range(s_len), n): 1059 s[i] = (s[i] + random.randint(1, 255)) % 256 1060 return s.tostring() if six.PY2 else s.tobytes() 1061 1062 1063@conf.commands.register 1064def corrupt_bits(data, p=0.01, n=None): 1065 # type: (str, float, Optional[int]) -> bytes 1066 """ 1067 Flip a given percentage (at least one bit) or number of bits 1068 from a string 1069 """ 1070 s = array.array("B", bytes_encode(data)) 1071 s_len = len(s) * 8 1072 if n is None: 1073 n = max(1, int(s_len * p)) 1074 for i in random.sample(range(s_len), n): 1075 s[i // 8] ^= 1 << (i % 8) 1076 return s.tostring() if six.PY2 else s.tobytes() 1077 1078 1079############################# 1080# pcap capture file stuff # 1081############################# 1082 1083@conf.commands.register 1084def wrpcap(filename, # type: Union[IO[bytes], str] 1085 pkt, # type: _PacketIterable 1086 *args, # type: Any 1087 **kargs # type: Any 1088 ): 1089 # type: (...) -> None 1090 """Write a list of packets to a pcap file 1091 1092 :param filename: the name of the file to write packets to, or an open, 1093 writable file-like object. The file descriptor will be 1094 closed at the end of the call, so do not use an object you 1095 do not want to close (e.g., running wrpcap(sys.stdout, []) 1096 in interactive mode will crash Scapy). 1097 :param gz: set to 1 to save a gzipped capture 1098 :param linktype: force linktype value 1099 :param endianness: "<" or ">", force endianness 1100 :param sync: do not bufferize writes to the capture file 1101 """ 1102 with PcapWriter(filename, *args, **kargs) as fdesc: 1103 fdesc.write(pkt) 1104 1105 1106@conf.commands.register 1107def rdpcap(filename, count=-1): 1108 # type: (Union[IO[bytes], str], int) -> PacketList 1109 """Read a pcap or pcapng file and return a packet list 1110 1111 :param count: read only <count> packets 1112 """ 1113 # Rant: Our complicated use of metaclasses and especially the 1114 # __call__ function is, of course, not supported by MyPy. 1115 # One day we should simplify this mess and use a much simpler 1116 # layout that will actually be supported and properly dissected. 1117 with PcapReader(filename) as fdesc: # type: ignore 1118 return fdesc.read_all(count=count) 1119 1120 1121# NOTE: Type hinting 1122# Mypy doesn't understand the following metaclass, and thinks each 1123# constructor (PcapReader...) needs 3 arguments each. To avoid this, 1124# we add a fake (=None) to the last 2 arguments then force the value 1125# to not be None in the signature and pack the whole thing in an ignore. 1126# This allows to not have # type: ignore every time we call those 1127# constructors. 1128 1129class PcapReader_metaclass(type): 1130 """Metaclass for (Raw)Pcap(Ng)Readers""" 1131 1132 def __new__(cls, name, bases, dct): 1133 # type: (Any, str, Any, Dict[str, Any]) -> Any 1134 """The `alternative` class attribute is declared in the PcapNg 1135 variant, and set here to the Pcap variant. 1136 1137 """ 1138 newcls = super(PcapReader_metaclass, cls).__new__( 1139 cls, name, bases, dct 1140 ) 1141 if 'alternative' in dct: 1142 dct['alternative'].alternative = newcls 1143 return newcls 1144 1145 def __call__(cls, filename): # type: ignore 1146 # type: (Union[IO[bytes], str]) -> Any 1147 """Creates a cls instance, use the `alternative` if that 1148 fails. 1149 1150 """ 1151 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) 1152 filename, fdesc, magic = cls.open(filename) 1153 if not magic: 1154 raise Scapy_Exception( 1155 "No data could be read!" 1156 ) 1157 try: 1158 i.__init__(filename, fdesc, magic) 1159 return i 1160 except (Scapy_Exception, EOFError): 1161 pass 1162 1163 if "alternative" in cls.__dict__: 1164 cls = cls.__dict__["alternative"] 1165 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) 1166 try: 1167 i.__init__(filename, fdesc, magic) 1168 return i 1169 except (Scapy_Exception, EOFError): 1170 pass 1171 1172 raise Scapy_Exception("Not a supported capture file") 1173 1174 @staticmethod 1175 def open(fname # type: Union[IO[bytes], str] 1176 ): 1177 # type: (...) -> Tuple[str, _ByteStream, bytes] 1178 """Open (if necessary) filename, and read the magic.""" 1179 if isinstance(fname, str): 1180 filename = fname 1181 try: 1182 fdesc = gzip.open(filename, "rb") # type: _ByteStream 1183 magic = fdesc.read(4) 1184 except IOError: 1185 fdesc = open(filename, "rb") 1186 magic = fdesc.read(4) 1187 else: 1188 fdesc = fname 1189 filename = getattr(fdesc, "name", "No name") 1190 magic = fdesc.read(4) 1191 return filename, fdesc, magic 1192 1193 1194@six.add_metaclass(PcapReader_metaclass) 1195class RawPcapReader: 1196 """A stateful pcap reader. Each packet is returned as a string""" 1197 1198 nonblocking_socket = True 1199 PacketMetadata = collections.namedtuple("PacketMetadata", 1200 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 1201 1202 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1203 # type: (str, _ByteStream, bytes) -> None 1204 self.filename = filename 1205 self.f = fdesc 1206 if magic == b"\xa1\xb2\xc3\xd4": # big endian 1207 self.endian = ">" 1208 self.nano = False 1209 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 1210 self.endian = "<" 1211 self.nano = False 1212 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 1213 self.endian = ">" 1214 self.nano = True 1215 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 1216 self.endian = "<" 1217 self.nano = True 1218 else: 1219 raise Scapy_Exception( 1220 "Not a pcap capture file (bad magic: %r)" % magic 1221 ) 1222 hdr = self.f.read(20) 1223 if len(hdr) < 20: 1224 raise Scapy_Exception("Invalid pcap file (too short)") 1225 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 1226 self.endian + "HHIIII", hdr 1227 ) 1228 self.linktype = linktype 1229 self.snaplen = snaplen 1230 1231 def __iter__(self): 1232 # type: () -> RawPcapReader 1233 return self 1234 1235 def next(self): 1236 # type: () -> Packet 1237 """ 1238 implement the iterator protocol on a set of packets in a pcap file 1239 """ 1240 try: 1241 return self.read_packet() 1242 except EOFError: 1243 raise StopIteration 1244 __next__ = next 1245 1246 def _read_packet(self, size=MTU): 1247 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 1248 """return a single packet read from the file as a tuple containing 1249 (pkt_data, pkt_metadata) 1250 1251 raise EOFError when no more packets are available 1252 """ 1253 hdr = self.f.read(16) 1254 if len(hdr) < 16: 1255 raise EOFError 1256 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 1257 return (self.f.read(caplen)[:size], 1258 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 1259 wirelen=wirelen, caplen=caplen)) 1260 1261 def read_packet(self, size=MTU): 1262 # type: (int) -> Packet 1263 return cast( 1264 Packet, 1265 self._read_packet()[0] 1266 ) 1267 1268 def dispatch(self, 1269 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 1270 ): 1271 # type: (...) -> None 1272 """call the specified callback routine for each packet read 1273 1274 This is just a convenience function for the main loop 1275 that allows for easy launching of packet processing in a 1276 thread. 1277 """ 1278 for p in self: 1279 callback(p) 1280 1281 def read_all(self, count=-1): 1282 # type: (int) -> PacketList 1283 res = self._read_all(count) 1284 from scapy import plist 1285 return plist.PacketList(res, name=os.path.basename(self.filename)) 1286 1287 def _read_all(self, count=-1): 1288 # type: (int) -> List[Packet] 1289 """return a list of all packets in the pcap file 1290 """ 1291 res = [] # type: List[Packet] 1292 while count != 0: 1293 count -= 1 1294 try: 1295 p = self.read_packet() # type: Packet 1296 except EOFError: 1297 break 1298 res.append(p) 1299 return res 1300 1301 def recv(self, size=MTU): 1302 # type: (int) -> bytes 1303 """ Emulate a socket 1304 """ 1305 return self._read_packet(size=size)[0] 1306 1307 def fileno(self): 1308 # type: () -> int 1309 return self.f.fileno() 1310 1311 def close(self): 1312 # type: () -> Optional[Any] 1313 return self.f.close() 1314 1315 def __exit__(self, exc_type, exc_value, tracback): 1316 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 1317 self.close() 1318 1319 # emulate SuperSocket 1320 @staticmethod 1321 def select(sockets, # type: List[SuperSocket] 1322 remain=None, # type: Optional[float] 1323 ): 1324 # type: (...) -> List[SuperSocket] 1325 return sockets 1326 1327 1328class PcapReader(RawPcapReader, _SuperSocket): 1329 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1330 # type: (str, IO[bytes], bytes) -> None 1331 RawPcapReader.__init__(self, filename, fdesc, magic) 1332 try: 1333 self.LLcls = conf.l2types.num2layer[ 1334 self.linktype 1335 ] # type: Type[Packet] 1336 except KeyError: 1337 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 1338 if conf.raw_layer is None: 1339 # conf.raw_layer is set on import 1340 import scapy.packet # noqa: F401 1341 self.LLcls = conf.raw_layer 1342 1343 def __enter__(self): 1344 # type: () -> PcapReader 1345 return self 1346 1347 def read_packet(self, size=MTU): 1348 # type: (int) -> Packet 1349 rp = super(PcapReader, self)._read_packet(size=size) 1350 if rp is None: 1351 raise EOFError 1352 s, pkt_info = rp 1353 1354 try: 1355 p = self.LLcls(s) # type: Packet 1356 except KeyboardInterrupt: 1357 raise 1358 except Exception: 1359 if conf.debug_dissector: 1360 from scapy.sendrecv import debug 1361 debug.crashed_on = (self.LLcls, s) 1362 raise 1363 if conf.raw_layer is None: 1364 # conf.raw_layer is set on import 1365 import scapy.packet # noqa: F401 1366 p = conf.raw_layer(s) 1367 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 1368 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 1369 p.wirelen = pkt_info.wirelen 1370 return p 1371 1372 def recv(self, size=MTU): 1373 # type: (int) -> Packet 1374 return self.read_packet(size=size) 1375 1376 1377class RawPcapNgReader(RawPcapReader): 1378 """A stateful pcapng reader. Each packet is returned as 1379 bytes. 1380 1381 """ 1382 1383 alternative = RawPcapReader # type: Type[Any] 1384 1385 PacketMetadata = collections.namedtuple("PacketMetadata", 1386 ["linktype", "tsresol", 1387 "tshigh", "tslow", "wirelen"]) 1388 1389 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1390 # type: (str, IO[bytes], bytes) -> None 1391 self.filename = filename 1392 self.f = fdesc 1393 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 1394 self.interfaces = [] # type: List[Tuple[int, int, int]] 1395 self.default_options = { 1396 "tsresol": 1000000 1397 } 1398 self.blocktypes = { 1399 1: self._read_block_idb, 1400 2: self._read_block_pkt, 1401 3: self._read_block_spb, 1402 6: self._read_block_epb, 1403 } 1404 self.endian = "!" # Will be overwritten by first SHB 1405 1406 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 1407 raise Scapy_Exception( 1408 "Not a pcapng capture file (bad magic: %r)" % magic 1409 ) 1410 1411 try: 1412 self._read_block_shb() 1413 except EOFError: 1414 raise Scapy_Exception( 1415 "The first SHB of the pcapng file is malformed !" 1416 ) 1417 1418 def _read_block(self, size=MTU): 1419 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 1420 try: 1421 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 1422 except struct.error: 1423 raise EOFError 1424 if blocktype == 0x0A0D0D0A: 1425 # This function updates the endianness based on the block content. 1426 self._read_block_shb() 1427 return None 1428 try: 1429 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 1430 except struct.error: 1431 raise EOFError 1432 if blocklen < 12: 1433 warning("Invalid block length !") 1434 raise EOFError 1435 block = self.f.read(blocklen - 12) 1436 self._read_block_tail(blocklen) 1437 return self.blocktypes.get( 1438 blocktype, 1439 lambda block, size: None 1440 )(block, size) 1441 1442 def _read_block_tail(self, blocklen): 1443 # type: (int) -> None 1444 if blocklen % 4: 1445 pad = self.f.read(-blocklen % 4) 1446 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 1447 "Ignored padding %r" % (blocklen, pad)) 1448 try: 1449 if blocklen != struct.unpack(self.endian + 'I', 1450 self.f.read(4))[0]: 1451 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 1452 except struct.error: 1453 raise EOFError 1454 1455 def _read_block_shb(self): 1456 # type: () -> None 1457 _blocklen = self.f.read(4) 1458 endian = self.f.read(4) 1459 if endian == b"\x1a\x2b\x3c\x4d": 1460 self.endian = ">" 1461 elif endian == b"\x4d\x3c\x2b\x1a": 1462 self.endian = "<" 1463 else: 1464 warning("Bad magic in Section Header block (not a pcapng file?)") 1465 raise EOFError 1466 1467 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 1468 if blocklen < 16: 1469 warning("Invalid SHB block length!") 1470 raise EOFError 1471 options = self.f.read(blocklen - 16) 1472 self._read_block_tail(blocklen) 1473 self._read_options(options) 1474 1475 def _read_packet(self, size=MTU): # type: ignore 1476 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1477 """Read blocks until it reaches either EOF or a packet, and 1478 returns None or (packet, (linktype, sec, usec, wirelen)), 1479 where packet is a string. 1480 1481 """ 1482 while True: 1483 res = self._read_block() 1484 if res is not None: 1485 return res 1486 1487 def _read_options(self, options): 1488 # type: (bytes) -> Dict[str, int] 1489 """Section Header Block""" 1490 opts = self.default_options.copy() 1491 while len(options) >= 4: 1492 code, length = struct.unpack(self.endian + "HH", options[:4]) 1493 # PCAP Next Generation (pcapng) Capture File Format 1494 # 4.2. - Interface Description Block 1495 # http://xml2rfc.tools.ietf.org/cgi-bin/xml2rfc.cgi?url=https://raw.githubusercontent.com/pcapng/pcapng/master/draft-tuexen-opsawg-pcapng.xml&modeAsFormat=html/ascii&type=ascii#rfc.section.4.2 1496 if code == 9 and length == 1 and len(options) >= 5: 1497 tsresol = orb(options[4]) 1498 opts["tsresol"] = (2 if tsresol & 128 else 10) ** ( 1499 tsresol & 127 1500 ) 1501 if code == 0: 1502 if length != 0: 1503 warning("PcapNg: invalid option length %d for end-of-option" % length) # noqa: E501 1504 break 1505 if length % 4: 1506 length += (4 - (length % 4)) 1507 options = options[4 + length:] 1508 return opts 1509 1510 def _read_block_idb(self, block, _): 1511 # type: (bytes, int) -> None 1512 """Interface Description Block""" 1513 # 2 bytes LinkType + 2 bytes Reserved 1514 # 4 bytes Snaplen 1515 options = self._read_options(block[8:-4]) 1516 try: 1517 interface = struct.unpack( # type: ignore 1518 self.endian + "HxxI", 1519 block[:8] 1520 ) + (options["tsresol"],) # type: Tuple[int, int, int] 1521 except struct.error: 1522 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 1523 raise EOFError 1524 self.interfaces.append(interface) 1525 1526 def _check_interface_id(self, intid): 1527 # type: (int) -> None 1528 """Check the interface id value and raise EOFError if invalid.""" 1529 tmp_len = len(self.interfaces) 1530 if intid >= tmp_len: 1531 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 1532 raise EOFError 1533 1534 def _read_block_epb(self, block, size): 1535 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1536 """Enhanced Packet Block""" 1537 try: 1538 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 1539 self.endian + "5I", 1540 block[:20], 1541 ) 1542 except struct.error: 1543 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 1544 raise EOFError 1545 1546 self._check_interface_id(intid) 1547 return (block[20:20 + caplen][:size], 1548 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 1549 tsresol=self.interfaces[intid][2], # noqa: E501 1550 tshigh=tshigh, 1551 tslow=tslow, 1552 wirelen=wirelen)) 1553 1554 def _read_block_spb(self, block, size): 1555 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1556 """Simple Packet Block""" 1557 # "it MUST be assumed that all the Simple Packet Blocks have 1558 # been captured on the interface previously specified in the 1559 # first Interface Description Block." 1560 intid = 0 1561 self._check_interface_id(intid) 1562 1563 try: 1564 wirelen, = struct.unpack(self.endian + "I", block[:4]) 1565 except struct.error: 1566 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 1567 raise EOFError 1568 1569 caplen = min(wirelen, self.interfaces[intid][1]) 1570 return (block[4:4 + caplen][:size], 1571 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 1572 tsresol=self.interfaces[intid][2], # noqa: E501 1573 tshigh=None, 1574 tslow=None, 1575 wirelen=wirelen)) 1576 1577 def _read_block_pkt(self, block, size): 1578 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1579 """(Obsolete) Packet Block""" 1580 try: 1581 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 1582 self.endian + "HH4I", 1583 block[:20], 1584 ) 1585 except struct.error: 1586 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 1587 raise EOFError 1588 1589 self._check_interface_id(intid) 1590 return (block[20:20 + caplen][:size], 1591 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 1592 tsresol=self.interfaces[intid][2], # noqa: E501 1593 tshigh=tshigh, 1594 tslow=tslow, 1595 wirelen=wirelen)) 1596 1597 1598class PcapNgReader(RawPcapNgReader, _SuperSocket): 1599 1600 alternative = PcapReader 1601 1602 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1603 # type: (str, IO[bytes], bytes) -> None 1604 RawPcapNgReader.__init__(self, filename, fdesc, magic) 1605 1606 def __enter__(self): 1607 # type: () -> PcapNgReader 1608 return self 1609 1610 def read_packet(self, size=MTU): 1611 # type: (int) -> Packet 1612 rp = super(PcapNgReader, self)._read_packet(size=size) 1613 if rp is None: 1614 raise EOFError 1615 s, (linktype, tsresol, tshigh, tslow, wirelen) = rp 1616 try: 1617 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 1618 p = cls(s) # type: Packet 1619 except KeyboardInterrupt: 1620 raise 1621 except Exception: 1622 if conf.debug_dissector: 1623 raise 1624 if conf.raw_layer is None: 1625 # conf.raw_layer is set on import 1626 import scapy.packet # noqa: F401 1627 p = conf.raw_layer(s) 1628 if tshigh is not None: 1629 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 1630 p.wirelen = wirelen 1631 return p 1632 1633 def recv(self, size=MTU): 1634 # type: (int) -> Packet 1635 return self.read_packet() 1636 1637 1638class RawPcapWriter: 1639 """A stream PCAP writer with more control than wrpcap()""" 1640 1641 def __init__(self, 1642 filename, # type: Union[IO[bytes], str] 1643 linktype=None, # type: Optional[int] 1644 gz=False, # type: bool 1645 endianness="", # type: str 1646 append=False, # type: bool 1647 sync=False, # type: bool 1648 nano=False, # type: bool 1649 snaplen=MTU, # type: int 1650 ): 1651 # type: (...) -> None 1652 """ 1653 :param filename: the name of the file to write packets to, or an open, 1654 writable file-like object. 1655 :param linktype: force linktype to a given value. If None, linktype is 1656 taken from the first writer packet 1657 :param gz: compress the capture on the fly 1658 :param endianness: force an endianness (little:"<", big:">"). 1659 Default is native 1660 :param append: append packets to the capture file instead of 1661 truncating it 1662 :param sync: do not bufferize writes to the capture file 1663 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 1664 1665 """ 1666 1667 self.linktype = linktype 1668 self.snaplen = snaplen 1669 self.header_present = 0 1670 self.append = append 1671 self.gz = gz 1672 self.endian = endianness 1673 self.sync = sync 1674 self.nano = nano 1675 bufsz = 4096 1676 if sync: 1677 bufsz = 0 1678 1679 if isinstance(filename, str): 1680 self.filename = filename 1681 if gz: 1682 self.f = cast(_ByteStream, gzip.open( 1683 filename, append and "ab" or "wb", 9 1684 )) 1685 else: 1686 self.f = open(filename, append and "ab" or "wb", bufsz) 1687 else: 1688 self.f = filename 1689 self.filename = getattr(filename, "name", "No name") 1690 1691 def fileno(self): 1692 # type: () -> int 1693 return self.f.fileno() 1694 1695 def write_header(self, pkt): 1696 # type: (Optional[Union[Packet, bytes]]) -> None 1697 return self._write_header(bytes_encode(pkt)) 1698 1699 def _write_header(self, pkt): 1700 # type: (Optional[Union[Packet, bytes]]) -> None 1701 self.header_present = 1 1702 1703 if self.append: 1704 # Even if prone to race conditions, this seems to be 1705 # safest way to tell whether the header is already present 1706 # because we have to handle compressed streams that 1707 # are not as flexible as basic files 1708 if self.gz: 1709 g = gzip.open(self.filename, "rb") # type: _ByteStream 1710 else: 1711 g = open(self.filename, "rb") 1712 try: 1713 if g.read(16): 1714 return 1715 finally: 1716 g.close() 1717 1718 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 1719 2, 4, 0, 0, self.snaplen, self.linktype)) 1720 self.f.flush() 1721 1722 def write(self, pkt): 1723 # type: (Union[_PacketIterable, bytes]) -> None 1724 """ 1725 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 1726 1727 :param pkt: Packet(s) to write (one record for each Packet), or raw 1728 bytes to write (as one record). 1729 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 1730 """ 1731 if isinstance(pkt, bytes): 1732 if not self.header_present: 1733 self.write_header(pkt) 1734 self.write_packet(pkt) 1735 else: 1736 # Import here to avoid circular dependency 1737 from scapy.supersocket import IterSocket 1738 for p in IterSocket(pkt).iter: 1739 if not self.header_present: 1740 self.write_header(p) 1741 1742 if not isinstance(p, bytes) and \ 1743 self.linktype != conf.l2types.get(type(p), None): 1744 warning("Inconsistent linktypes detected!" 1745 " The resulting PCAP file might contain" 1746 " invalid packets." 1747 ) 1748 1749 self.write_packet(p) 1750 1751 def write_packet(self, 1752 packet, # type: Union[bytes, Packet] 1753 sec=None, # type: Optional[int] 1754 usec=None, # type: Optional[int] 1755 caplen=None, # type: Optional[int] 1756 wirelen=None, # type: Optional[int] 1757 ): 1758 # type: (...) -> None 1759 self._write_packet( 1760 bytes(packet), 1761 sec=sec, usec=usec, 1762 caplen=caplen, wirelen=wirelen 1763 ) 1764 1765 def _write_packet(self, 1766 packet, # type: bytes 1767 sec=None, # type: Optional[int] 1768 usec=None, # type: Optional[int] 1769 caplen=None, # type: Optional[int] 1770 wirelen=None # type: Optional[int] 1771 ): 1772 # type: (...) -> None 1773 """ 1774 Writes a single packet to the pcap file. 1775 1776 :param packet: bytes for a single packet 1777 :type packet: bytes 1778 :param sec: time the packet was captured, in seconds since epoch. If 1779 not supplied, defaults to now. 1780 :type sec: int or long 1781 :param usec: If ``nano=True``, then number of nanoseconds after the 1782 second that the packet was captured. If ``nano=False``, 1783 then the number of microseconds after the second the 1784 packet was captured 1785 :type usec: int or long 1786 :param caplen: The length of the packet in the capture file. If not 1787 specified, uses ``len(packet)``. 1788 :type caplen: int 1789 :param wirelen: The length of the packet on the wire. If not 1790 specified, uses ``caplen``. 1791 :type wirelen: int 1792 :return: None 1793 :rtype: None 1794 """ 1795 if caplen is None: 1796 caplen = len(packet) 1797 if wirelen is None: 1798 wirelen = caplen 1799 if sec is None or usec is None: 1800 t = time.time() 1801 it = int(t) 1802 if sec is None: 1803 sec = it 1804 usec = int(round((t - it) * 1805 (1000000000 if self.nano else 1000000))) 1806 elif usec is None: 1807 usec = 0 1808 1809 self.f.write(struct.pack(self.endian + "IIII", 1810 sec, usec, caplen, wirelen)) 1811 self.f.write(packet) 1812 if self.sync: 1813 self.f.flush() 1814 1815 def flush(self): 1816 # type: () -> Optional[Any] 1817 return self.f.flush() 1818 1819 def close(self): 1820 # type: () -> Optional[Any] 1821 if not self.header_present: 1822 self.write_header(None) 1823 return self.f.close() 1824 1825 def __enter__(self): 1826 # type: () -> RawPcapWriter 1827 return self 1828 1829 def __exit__(self, exc_type, exc_value, tracback): 1830 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 1831 self.flush() 1832 self.close() 1833 1834 1835class PcapWriter(RawPcapWriter): 1836 """A stream PCAP writer with more control than wrpcap()""" 1837 1838 def write_header(self, pkt): 1839 # type: (Optional[Union[Packet, bytes]]) -> None 1840 if self.linktype is None: 1841 try: 1842 if pkt is None or isinstance(pkt, bytes): 1843 # Can't guess LL 1844 raise KeyError 1845 self.linktype = conf.l2types.layer2num[ 1846 pkt.__class__ 1847 ] 1848 except KeyError: 1849 warning("PcapWriter: unknown LL type for %s. Using type 1 (Ethernet)", pkt.__class__.__name__) # noqa: E501 1850 self.linktype = DLT_EN10MB 1851 self._write_header(pkt) 1852 1853 def write_packet(self, 1854 packet, # type: Union[bytes, Packet] 1855 sec=None, # type: Optional[int] 1856 usec=None, # type: Optional[int] 1857 caplen=None, # type: Optional[int] 1858 wirelen=None, # type: Optional[int] 1859 ): 1860 # type: (...) -> None 1861 """ 1862 Writes a single packet to the pcap file. 1863 1864 :param packet: Packet, or bytes for a single packet 1865 :type packet: scapy.packet.Packet or bytes 1866 :param sec: time the packet was captured, in seconds since epoch. If 1867 not supplied, defaults to now. 1868 :type sec: int or long 1869 :param usec: If ``nano=True``, then number of nanoseconds after the 1870 second that the packet was captured. If ``nano=False``, 1871 then the number of microseconds after the second the 1872 packet was captured. If ``sec`` is not specified, 1873 this value is ignored. 1874 :type usec: int or long 1875 :param caplen: The length of the packet in the capture file. If not 1876 specified, uses ``len(raw(packet))``. 1877 :type caplen: int 1878 :param wirelen: The length of the packet on the wire. If not 1879 specified, tries ``packet.wirelen``, otherwise uses 1880 ``caplen``. 1881 :type wirelen: int 1882 :return: None 1883 :rtype: None 1884 """ 1885 if hasattr(packet, "time"): 1886 if sec is None: 1887 sec = int(packet.time) # type: ignore 1888 usec = int(round((packet.time - sec) * # type: ignore 1889 (1000000000 if self.nano else 1000000))) 1890 if usec is None: 1891 usec = 0 1892 1893 rawpkt = bytes_encode(packet) 1894 caplen = len(rawpkt) if caplen is None else caplen 1895 1896 if wirelen is None: 1897 if hasattr(packet, "wirelen"): 1898 wirelen = packet.wirelen # type: ignore 1899 if wirelen is None: 1900 wirelen = caplen 1901 1902 self._write_packet( 1903 rawpkt, 1904 sec=sec, usec=usec, 1905 caplen=caplen, wirelen=wirelen 1906 ) 1907 1908 1909@conf.commands.register 1910def import_hexcap(input_string=None): 1911 # type: (Optional[str]) -> bytes 1912 """Imports a tcpdump like hexadecimal view 1913 1914 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 1915 1916 :param input_string: String containing the hexdump input to parse. If None, 1917 read from standard input. 1918 """ 1919 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 1920 p = "" 1921 try: 1922 if input_string: 1923 input_function = six.StringIO(input_string).readline 1924 else: 1925 input_function = input 1926 while True: 1927 line = input_function().strip() 1928 if not line: 1929 break 1930 try: 1931 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 1932 except Exception: 1933 warning("Parsing error during hexcap") 1934 continue 1935 except EOFError: 1936 pass 1937 1938 p = p.replace(" ", "") 1939 return hex_bytes(p) 1940 1941 1942@conf.commands.register 1943def wireshark(pktlist, wait=False, **kwargs): 1944 # type: (List[Packet], bool, **Any) -> Optional[Any] 1945 """ 1946 Runs Wireshark on a list of packets. 1947 1948 See :func:`tcpdump` for more parameter description. 1949 1950 Note: this defaults to wait=False, to run Wireshark in the background. 1951 """ 1952 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 1953 1954 1955@conf.commands.register 1956def tdecode( 1957 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 1958 args=None, # type: Optional[List[str]] 1959 **kwargs # type: Any 1960): 1961 # type: (...) -> Any 1962 """ 1963 Run tshark on a list of packets. 1964 1965 :param args: If not specified, defaults to ``tshark -V``. 1966 1967 See :func:`tcpdump` for more parameters. 1968 """ 1969 if args is None: 1970 args = ["-V"] 1971 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 1972 1973 1974def _guess_linktype_name(value): 1975 # type: (int) -> str 1976 """Guess the DLT name from its value.""" 1977 from scapy.libs.winpcapy import pcap_datalink_val_to_name 1978 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 1979 1980 1981def _guess_linktype_value(name): 1982 # type: (str) -> int 1983 """Guess the value of a DLT name.""" 1984 from scapy.libs.winpcapy import pcap_datalink_name_to_val 1985 val = cast(int, pcap_datalink_name_to_val(name.encode())) 1986 if val == -1: 1987 warning("Unknown linktype: %s. Using EN10MB", name) 1988 return DLT_EN10MB 1989 return val 1990 1991 1992@conf.commands.register 1993def tcpdump( 1994 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 1995 dump=False, # type: bool 1996 getfd=False, # type: bool 1997 args=None, # type: Optional[List[str]] 1998 flt=None, # type: Optional[str] 1999 prog=None, # type: Optional[Any] 2000 getproc=False, # type: bool 2001 quiet=False, # type: bool 2002 use_tempfile=None, # type: Optional[Any] 2003 read_stdin_opts=None, # type: Optional[Any] 2004 linktype=None, # type: Optional[Any] 2005 wait=True, # type: bool 2006 _suppress=False # type: bool 2007): 2008 # type: (...) -> Any 2009 """Run tcpdump or tshark on a list of packets. 2010 2011 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 2012 temporary file to store the packets. This works around a bug in Apple's 2013 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 2014 2015 Otherwise, the packets are passed in stdin. 2016 2017 This function can be explicitly enabled or disabled with the 2018 ``use_tempfile`` parameter. 2019 2020 When using ``wireshark``, it will be called with ``-ki -`` to start 2021 immediately capturing packets from stdin. 2022 2023 Otherwise, the command will be run with ``-r -`` (which is correct for 2024 ``tcpdump`` and ``tshark``). 2025 2026 This can be overridden with ``read_stdin_opts``. This has no effect when 2027 ``use_tempfile=True``, or otherwise reading packets from a regular file. 2028 2029 :param pktlist: a Packet instance, a PacketList instance or a list of 2030 Packet instances. Can also be a filename (as a string), an open 2031 file-like object that must be a file format readable by 2032 tshark (Pcap, PcapNg, etc.) or None (to sniff) 2033 :param flt: a filter to use with tcpdump 2034 :param dump: when set to True, returns a string instead of displaying it. 2035 :param getfd: when set to True, returns a file-like object to read data 2036 from tcpdump or tshark from. 2037 :param getproc: when set to True, the subprocess.Popen object is returned 2038 :param args: arguments (as a list) to pass to tshark (example for tshark: 2039 args=["-T", "json"]). 2040 :param prog: program to use (defaults to tcpdump, will work with tshark) 2041 :param quiet: when set to True, the process stderr is discarded 2042 :param use_tempfile: When set to True, always use a temporary file to store 2043 packets. 2044 When set to False, pipe packets through stdin. 2045 When set to None (default), only use a temporary file with 2046 ``tcpdump`` on OSX. 2047 :param read_stdin_opts: When set, a list of arguments needed to capture 2048 from stdin. Otherwise, attempts to guess. 2049 :param linktype: A custom DLT value or name, to overwrite the default 2050 values. 2051 :param wait: If True (default), waits for the process to terminate before 2052 returning to Scapy. If False, the process will be detached to the 2053 background. If dump, getproc or getfd is True, these have the same 2054 effect as ``wait=False``. 2055 2056 Examples:: 2057 2058 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 2059 reading from file -, link-type RAW (Raw IP) 2060 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 2061 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 2062 2063 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 2064 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 2065 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 2066 2067 To get a JSON representation of a tshark-parsed PacketList(), one can:: 2068 2069 >>> import json, pprint 2070 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 2071 ... dst="45.33.32.156"), 2072 ... prog=conf.prog.tshark, 2073 ... args=["-T", "json"], 2074 ... getfd=True)) 2075 >>> pprint.pprint(json_data) 2076 [{u'_index': u'packets-2016-12-23', 2077 u'_score': None, 2078 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 2079 u'frame.encap_type': u'7', 2080 [...] 2081 }, 2082 u'ip': {u'ip.addr': u'45.33.32.156', 2083 u'ip.checksum': u'0x0000a20d', 2084 [...] 2085 u'ip.ttl': u'64', 2086 u'ip.version': u'4'}, 2087 u'raw': u'Raw packet data'}}, 2088 u'_type': u'pcap_file'}] 2089 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 2090 u'64' 2091 """ 2092 getfd = getfd or getproc 2093 if prog is None: 2094 if not conf.prog.tcpdump: 2095 raise Scapy_Exception( 2096 "tcpdump is not available" 2097 ) 2098 prog = [conf.prog.tcpdump] 2099 elif isinstance(prog, six.string_types): 2100 prog = [prog] 2101 else: 2102 raise ValueError("prog must be a string") 2103 2104 if linktype is not None: 2105 if isinstance(linktype, int): 2106 # Guess name from value 2107 try: 2108 linktype_name = _guess_linktype_name(linktype) 2109 except StopIteration: 2110 linktype = -1 2111 else: 2112 # Guess value from name 2113 if linktype.startswith("DLT_"): 2114 linktype = linktype[4:] 2115 linktype_name = linktype 2116 try: 2117 linktype = _guess_linktype_value(linktype) 2118 except KeyError: 2119 linktype = -1 2120 if linktype == -1: 2121 raise ValueError( 2122 "Unknown linktype. Try passing its datalink name instead" 2123 ) 2124 prog += ["-y", linktype_name] 2125 2126 # Build Popen arguments 2127 if args is None: 2128 args = [] 2129 else: 2130 # Make a copy of args 2131 args = list(args) 2132 2133 if flt is not None: 2134 # Check the validity of the filter 2135 if linktype is None and isinstance(pktlist, str): 2136 # linktype is unknown but required. Read it from file 2137 with PcapReader(pktlist) as rd: 2138 linktype = rd.linktype 2139 from scapy.arch.common import compile_filter 2140 compile_filter(flt, linktype=linktype) 2141 args.append(flt) 2142 2143 stdout = subprocess.PIPE if dump or getfd else None 2144 stderr = open(os.devnull) if quiet else None 2145 proc = None 2146 2147 if use_tempfile is None: 2148 # Apple's tcpdump cannot read from stdin, see: 2149 # http://apple.stackexchange.com/questions/152682/ 2150 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 2151 2152 if read_stdin_opts is None: 2153 if prog[0] == conf.prog.wireshark: 2154 # Start capturing immediately (-k) from stdin (-i -) 2155 read_stdin_opts = ["-ki", "-"] 2156 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 2157 # Capture in packet-buffered mode (-U) from stdin (-r -) 2158 read_stdin_opts = ["-U", "-r", "-"] 2159 else: 2160 read_stdin_opts = ["-r", "-"] 2161 else: 2162 # Make a copy of read_stdin_opts 2163 read_stdin_opts = list(read_stdin_opts) 2164 2165 if pktlist is None: 2166 # sniff 2167 with ContextManagerSubprocess(prog[0], suppress=_suppress): 2168 proc = subprocess.Popen( 2169 prog + args, 2170 stdout=stdout, 2171 stderr=stderr, 2172 ) 2173 elif isinstance(pktlist, six.string_types): 2174 # file 2175 with ContextManagerSubprocess(prog[0], suppress=_suppress): 2176 proc = subprocess.Popen( 2177 prog + ["-r", pktlist] + args, 2178 stdout=stdout, 2179 stderr=stderr, 2180 ) 2181 elif use_tempfile: 2182 pktlist = cast(Union[IO[bytes], "_PacketIterable"], pktlist) 2183 tmpfile = get_temp_file( # type: ignore 2184 autoext=".pcap", 2185 fd=True 2186 ) # type: IO[bytes] 2187 try: 2188 tmpfile.writelines( 2189 iter(lambda: pktlist.read(1048576), b"") # type: ignore 2190 ) 2191 except AttributeError: 2192 pktlist = cast("_PacketIterable", pktlist) 2193 wrpcap(tmpfile, pktlist, linktype=linktype) 2194 else: 2195 tmpfile.close() 2196 with ContextManagerSubprocess(prog[0], suppress=_suppress): 2197 proc = subprocess.Popen( 2198 prog + ["-r", tmpfile.name] + args, 2199 stdout=stdout, 2200 stderr=stderr, 2201 ) 2202 else: 2203 try: 2204 pktlist.fileno() # type: ignore 2205 # pass the packet stream 2206 with ContextManagerSubprocess(prog[0], suppress=_suppress): 2207 proc = subprocess.Popen( 2208 prog + read_stdin_opts + args, 2209 stdin=pktlist, # type: ignore 2210 stdout=stdout, 2211 stderr=stderr, 2212 ) 2213 except (AttributeError, ValueError): 2214 # write the packet stream to stdin 2215 with ContextManagerSubprocess(prog[0], suppress=_suppress): 2216 proc = subprocess.Popen( 2217 prog + read_stdin_opts + args, 2218 stdin=subprocess.PIPE, 2219 stdout=stdout, 2220 stderr=stderr, 2221 ) 2222 if proc is None: 2223 # An error has occurred 2224 return 2225 try: 2226 proc.stdin.writelines( # type: ignore 2227 iter(lambda: pktlist.read(1048576), b"") # type: ignore 2228 ) 2229 except AttributeError: 2230 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 2231 except UnboundLocalError: 2232 # The error was handled by ContextManagerSubprocess 2233 pass 2234 else: 2235 proc.stdin.close() # type: ignore 2236 if proc is None: 2237 # An error has occurred 2238 return 2239 if dump: 2240 data = b"".join( 2241 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 2242 ) 2243 proc.terminate() 2244 return data 2245 if getproc: 2246 return proc 2247 if getfd: 2248 return proc.stdout 2249 if wait: 2250 proc.wait() 2251 2252 2253@conf.commands.register 2254def hexedit(pktlist): 2255 # type: (_PacketIterable) -> PacketList 2256 """Run hexedit on a list of packets, then return the edited packets.""" 2257 f = get_temp_file() 2258 wrpcap(f, pktlist) 2259 with ContextManagerSubprocess(conf.prog.hexedit): 2260 subprocess.call([conf.prog.hexedit, f]) 2261 rpktlist = rdpcap(f) 2262 os.unlink(f) 2263 return rpktlist 2264 2265 2266def get_terminal_width(): 2267 # type: () -> Optional[int] 2268 """Get terminal width (number of characters) if in a window. 2269 2270 Notice: this will try several methods in order to 2271 support as many terminals and OS as possible. 2272 """ 2273 # Let's first try using the official API 2274 # (Python 3.3+) 2275 sizex = None # type: Optional[int] 2276 if not six.PY2: 2277 import shutil 2278 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 2279 if sizex != 0: 2280 return sizex 2281 # Backups / Python 2.7 2282 if WINDOWS: 2283 from ctypes import windll, create_string_buffer # type: ignore 2284 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 2285 h = windll.kernel32.GetStdHandle(-12) 2286 csbi = create_string_buffer(22) 2287 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 2288 if res: 2289 (bufx, bufy, curx, cury, wattr, 2290 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 2291 sizex = right - left + 1 2292 # sizey = bottom - top + 1 2293 return sizex 2294 return sizex 2295 else: 2296 # We have various methods 2297 # COLUMNS is set on some terminals 2298 try: 2299 sizex = int(os.environ['COLUMNS']) 2300 except Exception: 2301 pass 2302 if sizex: 2303 return sizex 2304 # We can query TIOCGWINSZ 2305 try: 2306 import fcntl 2307 import termios 2308 s = struct.pack('HHHH', 0, 0, 0, 0) 2309 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 2310 sizex = struct.unpack('HHHH', x)[1] 2311 except IOError: 2312 pass 2313 return sizex 2314 2315 2316def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 2317 header, # type: List[Tuple[str, ...]] 2318 sortBy=0, # type: int 2319 borders=False, # type: bool 2320 ): 2321 # type: (...) -> str 2322 """ 2323 Pretty list to fit the terminal, and add header. 2324 2325 :param rtlst: a list of tuples. each tuple contains a value which can 2326 be either a string or a list of string. 2327 :param sortBy: the column id (starting with 0) which whill be used for 2328 ordering 2329 :param borders: whether to put borders on the table or not 2330 """ 2331 if borders: 2332 _space = "|" 2333 else: 2334 _space = " " 2335 cols = len(header[0]) 2336 # Windows has a fat terminal border 2337 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 2338 _croped = False 2339 # Sort correctly 2340 rtlst.sort(key=lambda x: x[sortBy]) 2341 # Resolve multi-values 2342 for i, line in enumerate(rtlst): 2343 ids = [] # type: List[int] 2344 values = [] # type: List[Union[str, List[str]]] 2345 for j, val in enumerate(line): 2346 if isinstance(val, list): 2347 ids.append(j) 2348 values.append(val or " ") 2349 if values: 2350 del rtlst[i] 2351 k = 0 2352 for ex_vals in zip_longest(*values, fillvalue=" "): 2353 if k: 2354 extra_line = [" "] * cols 2355 else: 2356 extra_line = list(line) # type: ignore 2357 for j, h in enumerate(ids): 2358 extra_line[h] = ex_vals[j] 2359 rtlst.insert(i + k, tuple(extra_line)) 2360 k += 1 2361 rtslst = cast(List[Tuple[str, ...]], rtlst) 2362 # Append tag 2363 rtslst = header + rtslst 2364 # Detect column's width 2365 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 2366 # Make text fit in box (if required) 2367 width = get_terminal_width() 2368 if conf.auto_crop_tables and width: 2369 width = width - _spacelen 2370 while sum(colwidth) > width: 2371 _croped = True 2372 # Needs to be cropped 2373 # Get the longest row 2374 i = colwidth.index(max(colwidth)) 2375 # Get all elements of this row 2376 row = [len(x[i]) for x in rtslst] 2377 # Get biggest element of this row: biggest of the array 2378 j = row.index(max(row)) 2379 # Re-build column tuple with the edited element 2380 t = list(rtslst[j]) 2381 t[i] = t[i][:-2] + "_" 2382 rtslst[j] = tuple(t) 2383 # Update max size 2384 row[j] = len(t[i]) 2385 colwidth[i] = max(row) 2386 if _croped: 2387 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 2388 # Generate padding scheme 2389 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 2390 # Append separation line if needed 2391 if borders: 2392 rtslst.insert(1, tuple("-" * x for x in colwidth)) 2393 # Compile 2394 return "\n".join(fmt % x for x in rtslst) 2395 2396 2397def __make_table( 2398 yfmtfunc, # type: Callable[[int], str] 2399 fmtfunc, # type: Callable[[int], str] 2400 endline, # type: str 2401 data, # type: List[Tuple[Packet, Packet]] 2402 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 2403 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 2404 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 2405 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 2406 dump=False # type: bool 2407): 2408 # type: (...) -> Optional[str] 2409 """Core function of the make_table suite, which generates the table""" 2410 vx = {} # type: Dict[str, int] 2411 vy = {} # type: Dict[str, Optional[int]] 2412 vz = {} # type: Dict[Tuple[str, str], str] 2413 vxf = {} # type: Dict[str, str] 2414 2415 # Python 2 backward compatibility 2416 fxyz = lambda_tuple_converter(fxyz) 2417 2418 tmp_len = 0 2419 for e in data: 2420 xx, yy, zz = [str(s) for s in fxyz(*e)] 2421 tmp_len = max(len(yy), tmp_len) 2422 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 2423 vy[yy] = None 2424 vz[(xx, yy)] = zz 2425 2426 vxk = list(vx) 2427 vyk = list(vy) 2428 if sortx: 2429 vxk.sort(key=sortx) 2430 else: 2431 try: 2432 vxk.sort(key=int) 2433 except Exception: 2434 try: 2435 vxk.sort(key=atol) 2436 except Exception: 2437 vxk.sort() 2438 if sorty: 2439 vyk.sort(key=sorty) 2440 else: 2441 try: 2442 vyk.sort(key=int) 2443 except Exception: 2444 try: 2445 vyk.sort(key=atol) 2446 except Exception: 2447 vyk.sort() 2448 2449 s = "" 2450 if seplinefunc: 2451 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 2452 s += sepline + "\n" 2453 2454 fmt = yfmtfunc(tmp_len) 2455 s += fmt % "" 2456 s += ' ' 2457 for x in vxk: 2458 vxf[x] = fmtfunc(vx[x]) 2459 s += vxf[x] % x 2460 s += ' ' 2461 s += endline + "\n" 2462 if seplinefunc: 2463 s += sepline + "\n" 2464 for y in vyk: 2465 s += fmt % y 2466 s += ' ' 2467 for x in vxk: 2468 s += vxf[x] % vz.get((x, y), "-") 2469 s += ' ' 2470 s += endline + "\n" 2471 if seplinefunc: 2472 s += sepline + "\n" 2473 2474 if dump: 2475 return s 2476 else: 2477 print(s, end="") 2478 return None 2479 2480 2481def make_table(*args, **kargs): 2482 # type: (*Any, **Any) -> Optional[Any] 2483 return __make_table( 2484 lambda l: "%%-%is" % l, 2485 lambda l: "%%-%is" % l, 2486 "", 2487 *args, 2488 **kargs 2489 ) 2490 2491 2492def make_lined_table(*args, **kargs): 2493 # type: (*Any, **Any) -> Optional[str] 2494 return __make_table( # type: ignore 2495 lambda l: "%%-%is |" % l, 2496 lambda l: "%%-%is |" % l, 2497 "", 2498 *args, 2499 seplinefunc=lambda a, x: "+".join( 2500 '-' * (y + 2) for y in [a - 1] + x + [-2] 2501 ), 2502 **kargs 2503 ) 2504 2505 2506def make_tex_table(*args, **kargs): 2507 # type: (*Any, **Any) -> Optional[str] 2508 return __make_table( # type: ignore 2509 lambda l: "%s", 2510 lambda l: "& %s", 2511 "\\\\", 2512 *args, 2513 seplinefunc=lambda a, x: "\\hline", 2514 **kargs 2515 ) 2516 2517#################### 2518# WHOIS CLIENT # 2519#################### 2520 2521 2522def whois(ip_address): 2523 # type: (str) -> bytes 2524 """Whois client for Python""" 2525 whois_ip = str(ip_address) 2526 try: 2527 query = socket.gethostbyname(whois_ip) 2528 except Exception: 2529 query = whois_ip 2530 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 2531 s.connect(("whois.ripe.net", 43)) 2532 s.send(query.encode("utf8") + b"\r\n") 2533 answer = b"" 2534 while True: 2535 d = s.recv(4096) 2536 answer += d 2537 if not d: 2538 break 2539 s.close() 2540 ignore_tag = b"remarks:" 2541 # ignore all lines starting with the ignore_tag 2542 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 2543 # remove empty lines at the bottom 2544 for i in range(1, len(lines)): 2545 if not lines[-i].strip(): 2546 del lines[-i] 2547 else: 2548 break 2549 return b"\n".join(lines[3:]) 2550 2551####################### 2552# PERIODIC SENDER # 2553####################### 2554 2555 2556class PeriodicSenderThread(threading.Thread): 2557 def __init__(self, sock, pkt, interval=0.5): 2558 # type: (Any, _PacketIterable, float) -> None 2559 """ Thread to send packets periodically 2560 2561 Args: 2562 sock: socket where packet is sent periodically 2563 pkt: packet or list of packets to send 2564 interval: interval between two packets 2565 """ 2566 if not isinstance(pkt, list): 2567 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 2568 else: 2569 self._pkts = pkt 2570 self._socket = sock 2571 self._stopped = threading.Event() 2572 self._interval = interval 2573 threading.Thread.__init__(self) 2574 2575 def run(self): 2576 # type: () -> None 2577 while not self._stopped.is_set(): 2578 for p in self._pkts: 2579 self._socket.send(p) 2580 time.sleep(self._interval) 2581 if self._stopped.is_set(): 2582 break 2583 2584 def stop(self): 2585 # type: () -> None 2586 self._stopped.set() 2587 2588 2589class SingleConversationSocket(object): 2590 def __init__(self, o): 2591 # type: (Any) -> None 2592 self._inner = o 2593 self._tx_mutex = threading.RLock() 2594 2595 @property 2596 def __dict__(self): # type: ignore 2597 return self._inner.__dict__ 2598 2599 def __getattr__(self, name): 2600 # type: (str) -> Any 2601 return getattr(self._inner, name) 2602 2603 def sr1(self, *args, **kargs): 2604 # type: (*Any, **Any) -> Any 2605 with self._tx_mutex: 2606 return self._inner.sr1(*args, **kargs) 2607 2608 def sr(self, *args, **kargs): 2609 # type: (*Any, **Any) -> Any 2610 with self._tx_mutex: 2611 return self._inner.sr(*args, **kargs) 2612 2613 def send(self, x): 2614 # type: (Packet) -> Any 2615 with self._tx_mutex: 2616 return self._inner.send(x) 2617