1# This file is part of Scapy 2# See http://www.secdev.org/projects/scapy for more information 3# Copyright (C) Philippe Biondi <phil@secdev.org> 4# This program is published under a GPLv2 license 5 6""" 7Implementation of the configuration object. 8""" 9 10from __future__ import absolute_import 11from __future__ import print_function 12 13import atexit 14import copy 15import functools 16import os 17import re 18import socket 19import sys 20import time 21import warnings 22 23import scapy 24from scapy import VERSION 25from scapy.base_classes import BasePacket 26from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS 27from scapy.error import log_scapy, warning, ScapyInvalidPlatformException 28from scapy.modules import six 29from scapy.themes import NoTheme, apply_ipython_style 30 31from scapy.compat import ( 32 Any, 33 Callable, 34 DecoratorCallable, 35 Dict, 36 Iterator, 37 List, 38 NoReturn, 39 Optional, 40 Set, 41 Type, 42 Tuple, 43 Union, 44 TYPE_CHECKING, 45) 46from types import ModuleType 47 48if TYPE_CHECKING: 49 # Do not import at runtime 50 from scapy.packet import Packet 51 52############ 53# Config # 54############ 55 56 57class ConfClass(object): 58 def configure(self, cnf): 59 # type: (ConfClass) -> None 60 self.__dict__ = cnf.__dict__.copy() 61 62 def __repr__(self): 63 # type: () -> str 64 return str(self) 65 66 def __str__(self): 67 # type: () -> str 68 s = "" 69 dkeys = self.__class__.__dict__.copy() 70 dkeys.update(self.__dict__) 71 keys = sorted(dkeys) 72 for i in keys: 73 if i[0] != "_": 74 r = repr(getattr(self, i)) 75 r = " ".join(r.split()) 76 wlen = 76 - max(len(i), 10) 77 if len(r) > wlen: 78 r = r[:wlen - 3] + "..." 79 s += "%-10s = %s\n" % (i, r) 80 return s[:-1] 81 82 83class Interceptor(object): 84 def __init__(self, 85 name, # type: str 86 default, # type: Any 87 hook, # type: Callable[..., Any] 88 args=None, # type: Optional[List[Any]] 89 kargs=None # type: Optional[Dict[str, Any]] 90 ): 91 # type: (...) -> None 92 self.name = name 93 self.intname = "_intercepted_%s" % name 94 self.default = default 95 self.hook = hook 96 self.args = args if args is not None else [] 97 self.kargs = kargs if kargs is not None else {} 98 99 def __get__(self, obj, typ=None): 100 # type: (Conf, Optional[type]) -> Any 101 if not hasattr(obj, self.intname): 102 setattr(obj, self.intname, self.default) 103 return getattr(obj, self.intname) 104 105 @staticmethod 106 def set_from_hook(obj, name, val): 107 # type: (Conf, str, bool) -> None 108 int_name = "_intercepted_%s" % name 109 setattr(obj, int_name, val) 110 111 def __set__(self, obj, val): 112 # type: (Conf, Any) -> None 113 old = getattr(obj, self.intname, self.default) 114 val = self.hook(self.name, val, old, *self.args, **self.kargs) 115 setattr(obj, self.intname, val) 116 117 118def _readonly(name): 119 # type: (str) -> NoReturn 120 default = Conf.__dict__[name].default 121 Interceptor.set_from_hook(conf, name, default) 122 raise ValueError("Read-only value !") 123 124 125ReadOnlyAttribute = functools.partial( 126 Interceptor, 127 hook=(lambda name, *args, **kwargs: _readonly(name)) 128) 129ReadOnlyAttribute.__doc__ = "Read-only class attribute" 130 131 132class ProgPath(ConfClass): 133 _default = "<System default>" 134 universal_open = "open" if DARWIN else "xdg-open" 135 pdfreader = universal_open 136 psreader = universal_open 137 svgreader = universal_open 138 dot = "dot" 139 display = "display" 140 tcpdump = "tcpdump" 141 tcpreplay = "tcpreplay" 142 hexedit = "hexer" 143 tshark = "tshark" 144 wireshark = "wireshark" 145 ifconfig = "ifconfig" 146 147 148class ConfigFieldList: 149 def __init__(self): 150 # type: () -> None 151 self.fields = set() # type: Set[Any] 152 self.layers = set() # type: Set[Any] 153 154 @staticmethod 155 def _is_field(f): 156 # type: (Any) -> bool 157 return hasattr(f, "owners") 158 159 def _recalc_layer_list(self): 160 # type: () -> None 161 self.layers = {owner for f in self.fields for owner in f.owners} 162 163 def add(self, *flds): 164 # type: (*Any) -> None 165 self.fields |= {f for f in flds if self._is_field(f)} 166 self._recalc_layer_list() 167 168 def remove(self, *flds): 169 # type: (*Any) -> None 170 self.fields -= set(flds) 171 self._recalc_layer_list() 172 173 def __contains__(self, elt): 174 # type: (Any) -> bool 175 if isinstance(elt, BasePacket): 176 return elt in self.layers 177 return elt in self.fields 178 179 def __repr__(self): 180 # type: () -> str 181 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501 182 183 184class Emphasize(ConfigFieldList): 185 pass 186 187 188class Resolve(ConfigFieldList): 189 pass 190 191 192class Num2Layer: 193 def __init__(self): 194 # type: () -> None 195 self.num2layer = {} # type: Dict[int, Type[Packet]] 196 self.layer2num = {} # type: Dict[Type[Packet], int] 197 198 def register(self, num, layer): 199 # type: (int, Type[Packet]) -> None 200 self.register_num2layer(num, layer) 201 self.register_layer2num(num, layer) 202 203 def register_num2layer(self, num, layer): 204 # type: (int, Type[Packet]) -> None 205 self.num2layer[num] = layer 206 207 def register_layer2num(self, num, layer): 208 # type: (int, Type[Packet]) -> None 209 self.layer2num[layer] = num 210 211 def __getitem__(self, item): 212 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]] 213 if isinstance(item, int): 214 return self.num2layer[item] 215 else: 216 return self.layer2num[item] 217 218 def __contains__(self, item): 219 # type: (Union[int, Type[Packet]]) -> bool 220 if isinstance(item, int): 221 return item in self.num2layer 222 else: 223 return item in self.layer2num 224 225 def get(self, 226 item, # type: Union[int, Type[Packet]] 227 default=None, # type: Optional[Type[Packet]] 228 ): 229 # type: (...) -> Optional[Union[int, Type[Packet]]] 230 return self[item] if item in self else default 231 232 def __repr__(self): 233 # type: () -> str 234 lst = [] 235 for num, layer in six.iteritems(self.num2layer): 236 if layer in self.layer2num and self.layer2num[layer] == num: 237 dir = "<->" 238 else: 239 dir = " ->" 240 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__, 241 layer._name))) 242 for layer, num in six.iteritems(self.layer2num): 243 if num not in self.num2layer or self.num2layer[num] != layer: 244 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__, 245 layer._name))) 246 lst.sort() 247 return "\n".join(y for x, y in lst) 248 249 250class LayersList(List[Type['scapy.packet.Packet']]): 251 def __init__(self): 252 # type: () -> None 253 list.__init__(self) 254 self.ldict = {} # type: Dict[str, List[Type[Packet]]] 255 self.filtered = False 256 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501 257 258 def __repr__(self): 259 # type: () -> str 260 return "\n".join("%-20s: %s" % (layer.__name__, layer.name) 261 for layer in self) 262 263 def register(self, layer): 264 # type: (Type[Packet]) -> None 265 self.append(layer) 266 if layer.__module__ not in self.ldict: 267 self.ldict[layer.__module__] = [] 268 self.ldict[layer.__module__].append(layer) 269 270 def layers(self): 271 # type: () -> List[Tuple[str, str]] 272 result = [] 273 # This import may feel useless, but it is required for the eval below 274 import scapy # noqa: F401 275 for lay in self.ldict: 276 doc = eval(lay).__doc__ 277 result.append((lay, doc.strip().split("\n")[0] if doc else lay)) 278 return result 279 280 def filter(self, items): 281 # type: (List[Type[Packet]]) -> None 282 """Disable dissection of unused layers to speed up dissection""" 283 if self.filtered: 284 raise ValueError("Already filtered. Please disable it first") 285 for lay in six.itervalues(self.ldict): 286 for cls in lay: 287 if cls not in self._backup_dict: 288 self._backup_dict[cls] = cls.payload_guess[:] 289 cls.payload_guess = [ 290 y for y in cls.payload_guess if y[1] in items 291 ] 292 self.filtered = True 293 294 def unfilter(self): 295 # type: () -> None 296 """Re-enable dissection for all layers""" 297 if not self.filtered: 298 raise ValueError("Not filtered. Please filter first") 299 for lay in six.itervalues(self.ldict): 300 for cls in lay: 301 cls.payload_guess = self._backup_dict[cls] 302 self._backup_dict.clear() 303 self.filtered = False 304 305 306class CommandsList(List[Callable[..., Any]]): 307 def __repr__(self): 308 # type: () -> str 309 s = [] 310 for li in sorted(self, key=lambda x: x.__name__): 311 doc = li.__doc__.split("\n")[0] if li.__doc__ else "--" 312 s.append("%-20s: %s" % (li.__name__, doc)) 313 return "\n".join(s) 314 315 def register(self, cmd): 316 # type: (DecoratorCallable) -> DecoratorCallable 317 self.append(cmd) 318 return cmd # return cmd so that method can be used as a decorator 319 320 321def lsc(): 322 # type: () -> None 323 """Displays Scapy's default commands""" 324 print(repr(conf.commands)) 325 326 327class CacheInstance(Dict[str, Any], object): 328 __slots__ = ["timeout", "name", "_timetable", "__dict__"] 329 330 def __init__(self, name="noname", timeout=None): 331 # type: (str, Optional[int]) -> None 332 self.timeout = timeout 333 self.name = name 334 self._timetable = {} # type: Dict[str, float] 335 336 def flush(self): 337 # type: () -> None 338 CacheInstance.__init__( 339 self, 340 name=self.name, 341 timeout=self.timeout 342 ) 343 344 def __getitem__(self, item): 345 # type: (str) -> Any 346 if item in self.__slots__: 347 return object.__getattribute__(self, item) 348 val = super(CacheInstance, self).__getitem__(item) 349 if self.timeout is not None: 350 t = self._timetable[item] 351 if time.time() - t > self.timeout: 352 raise KeyError(item) 353 return val 354 355 def get(self, item, default=None): 356 # type: (str, Optional[Any]) -> Any 357 # overloading this method is needed to force the dict to go through 358 # the timetable check 359 try: 360 return self[item] 361 except KeyError: 362 return default 363 364 def __setitem__(self, item, v): 365 # type: (str, str) -> None 366 if item in self.__slots__: 367 return object.__setattr__(self, item, v) 368 self._timetable[item] = time.time() 369 super(CacheInstance, self).__setitem__(item, v) 370 371 def update(self, # type: ignore 372 other, # type: Any 373 **kwargs # type: Any 374 ): 375 # type: (...) -> None 376 for key, value in six.iteritems(other): 377 # We only update an element from `other` either if it does 378 # not exist in `self` or if the entry in `self` is older. 379 if key not in self or self._timetable[key] < other._timetable[key]: 380 dict.__setitem__(self, key, value) 381 self._timetable[key] = other._timetable[key] 382 383 def iteritems(self): 384 # type: () -> Iterator[Tuple[str, Any]] 385 if self.timeout is None: 386 return six.iteritems(self.__dict__) # type: ignore 387 t0 = time.time() 388 return ((k, v) for (k, v) in six.iteritems(self.__dict__) if t0 - self._timetable[k] < self.timeout) # noqa: E501 389 390 def iterkeys(self): 391 # type: () -> Iterator[str] 392 if self.timeout is None: 393 return six.iterkeys(self.__dict__) # type: ignore 394 t0 = time.time() 395 return (k for k in six.iterkeys(self.__dict__) if t0 - self._timetable[k] < self.timeout) # noqa: E501 396 397 def __iter__(self): 398 # type: () -> Iterator[str] 399 return self.iterkeys() 400 401 def itervalues(self): 402 # type: () -> Iterator[Tuple[str, Any]] 403 if self.timeout is None: 404 return six.itervalues(self.__dict__) # type: ignore 405 t0 = time.time() 406 return (v for (k, v) in six.iteritems(self.__dict__) if t0 - self._timetable[k] < self.timeout) # noqa: E501 407 408 def items(self): 409 # type: () -> Any 410 if self.timeout is None: 411 return super(CacheInstance, self).items() 412 t0 = time.time() 413 return [(k, v) for (k, v) in six.iteritems(self.__dict__) if t0 - self._timetable[k] < self.timeout] # noqa: E501 414 415 def keys(self): 416 # type: () -> Any 417 if self.timeout is None: 418 return super(CacheInstance, self).keys() 419 t0 = time.time() 420 return [k for k in six.iterkeys(self.__dict__) if t0 - self._timetable[k] < self.timeout] # noqa: E501 421 422 def values(self): 423 # type: () -> Any 424 if self.timeout is None: 425 return list(six.itervalues(self)) 426 t0 = time.time() 427 return [v for (k, v) in six.iteritems(self.__dict__) if t0 - self._timetable[k] < self.timeout] # noqa: E501 428 429 def __len__(self): 430 # type: () -> int 431 if self.timeout is None: 432 return super(CacheInstance, self).__len__() 433 return len(self.keys()) 434 435 def summary(self): 436 # type: () -> str 437 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501 438 439 def __repr__(self): 440 # type: () -> str 441 s = [] 442 if self: 443 mk = max(len(k) for k in six.iterkeys(self.__dict__)) 444 fmt = "%%-%is %%s" % (mk + 1) 445 for item in six.iteritems(self.__dict__): 446 s.append(fmt % item) 447 return "\n".join(s) 448 449 def copy(self): 450 # type: () -> CacheInstance 451 return copy.copy(self) 452 453 454class NetCache: 455 def __init__(self): 456 # type: () -> None 457 self._caches_list = [] # type: List[CacheInstance] 458 459 def add_cache(self, cache): 460 # type: (CacheInstance) -> None 461 self._caches_list.append(cache) 462 setattr(self, cache.name, cache) 463 464 def new_cache(self, name, timeout=None): 465 # type: (str, Optional[int]) -> CacheInstance 466 c = CacheInstance(name=name, timeout=timeout) 467 self.add_cache(c) 468 return c 469 470 def __delattr__(self, attr): 471 # type: (str) -> NoReturn 472 raise AttributeError("Cannot delete attributes") 473 474 def update(self, other): 475 # type: (NetCache) -> None 476 for co in other._caches_list: 477 if hasattr(self, co.name): 478 getattr(self, co.name).update(co) 479 else: 480 self.add_cache(co.copy()) 481 482 def flush(self): 483 # type: () -> None 484 for c in self._caches_list: 485 c.flush() 486 487 def __repr__(self): 488 # type: () -> str 489 return "\n".join(c.summary() for c in self._caches_list) 490 491 492def _version_checker(module, minver): 493 # type: (ModuleType, Tuple[int, ...]) -> bool 494 """Checks that module has a higher version that minver. 495 496 params: 497 - module: a module to test 498 - minver: a tuple of versions 499 """ 500 # We could use LooseVersion, but distutils imports imp which is deprecated 501 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?' 502 version_tags_r = re.match( 503 version_regexp, 504 getattr(module, "__version__", "") 505 ) 506 if not version_tags_r: 507 return False 508 version_tags_i = version_tags_r.group(1).split(".") 509 version_tags = tuple(int(x) for x in version_tags_i) 510 return bool(version_tags >= minver) 511 512 513def isCryptographyValid(): 514 # type: () -> bool 515 """ 516 Check if the cryptography module >= 2.0.0 is present. This is the minimum 517 version for most usages in Scapy. 518 """ 519 try: 520 import cryptography 521 except ImportError: 522 return False 523 return _version_checker(cryptography, (2, 0, 0)) 524 525 526def isCryptographyAdvanced(): 527 # type: () -> bool 528 """ 529 Check if the cryptography module is present, and if it supports X25519, 530 ChaCha20Poly1305 and such. 531 532 Notes: 533 - cryptography >= 2.0 is required 534 - OpenSSL >= 1.1.0 is required 535 """ 536 try: 537 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501 538 X25519PrivateKey.generate() 539 except Exception: 540 return False 541 else: 542 return True 543 544 545def isPyPy(): 546 # type: () -> bool 547 """Returns either scapy is running under PyPy or not""" 548 try: 549 import __pypy__ # noqa: F401 550 return True 551 except ImportError: 552 return False 553 554 555def _prompt_changer(attr, val, old): 556 # type: (str, Any, Any) -> Any 557 """Change the current prompt theme""" 558 Interceptor.set_from_hook(conf, attr, val) 559 try: 560 sys.ps1 = conf.color_theme.prompt(conf.prompt) 561 except Exception: 562 pass 563 try: 564 apply_ipython_style( 565 get_ipython() # type: ignore 566 ) 567 except NameError: 568 pass 569 return getattr(conf, attr, old) 570 571 572def _set_conf_sockets(): 573 # type: () -> None 574 """Populate the conf.L2Socket and conf.L3Socket 575 according to the various use_* parameters 576 """ 577 if conf.use_bpf and not BSD: 578 Interceptor.set_from_hook(conf, "use_bpf", False) 579 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !") 580 if not conf.use_pcap and SOLARIS: 581 Interceptor.set_from_hook(conf, "use_pcap", True) 582 raise ScapyInvalidPlatformException( 583 "Scapy only supports libpcap on Solaris !" 584 ) 585 # we are already in an Interceptor hook, use Interceptor.set_from_hook 586 if conf.use_pcap: 587 try: 588 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \ 589 L3pcapSocket 590 except (OSError, ImportError): 591 warning("No libpcap provider available ! pcap won't be used") 592 Interceptor.set_from_hook(conf, "use_pcap", False) 593 else: 594 conf.L3socket = L3pcapSocket 595 conf.L3socket6 = functools.partial( # type: ignore 596 L3pcapSocket, filter="ip6") 597 conf.L2socket = L2pcapSocket 598 conf.L2listen = L2pcapListenSocket 599 conf.ifaces.reload() 600 return 601 if conf.use_bpf: 602 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \ 603 L2bpfSocket, L3bpfSocket 604 conf.L3socket = L3bpfSocket 605 conf.L3socket6 = functools.partial( # type: ignore 606 L3bpfSocket, filter="ip6") 607 conf.L2socket = L2bpfSocket 608 conf.L2listen = L2bpfListenSocket 609 conf.ifaces.reload() 610 return 611 if LINUX: 612 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket 613 conf.L3socket = L3PacketSocket 614 conf.L3socket6 = functools.partial( # type: ignore 615 L3PacketSocket, filter="ip6") 616 conf.L2socket = L2Socket 617 conf.L2listen = L2ListenSocket 618 conf.ifaces.reload() 619 return 620 if WINDOWS: 621 from scapy.arch.windows import _NotAvailableSocket 622 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6 623 conf.L3socket = L3WinSocket 624 conf.L3socket6 = L3WinSocket6 625 conf.L2socket = _NotAvailableSocket 626 conf.L2listen = _NotAvailableSocket 627 conf.ifaces.reload() 628 # No need to update globals on Windows 629 return 630 from scapy.supersocket import L3RawSocket 631 from scapy.layers.inet6 import L3RawSocket6 632 conf.L3socket = L3RawSocket 633 conf.L3socket6 = L3RawSocket6 634 635 636def _socket_changer(attr, val, old): 637 # type: (str, bool, bool) -> Any 638 if not isinstance(val, bool): 639 raise TypeError("This argument should be a boolean") 640 Interceptor.set_from_hook(conf, attr, val) 641 dependencies = { # Things that will be turned off 642 "use_pcap": ["use_bpf"], 643 "use_bpf": ["use_pcap"], 644 } 645 restore = {k: getattr(conf, k) for k in dependencies} 646 del restore[attr] # This is handled directly by _set_conf_sockets 647 if val: # Only if True 648 for param in dependencies[attr]: 649 Interceptor.set_from_hook(conf, param, False) 650 try: 651 _set_conf_sockets() 652 except (ScapyInvalidPlatformException, ImportError) as e: 653 for key, value in restore.items(): 654 Interceptor.set_from_hook(conf, key, value) 655 if isinstance(e, ScapyInvalidPlatformException): 656 raise 657 return getattr(conf, attr) 658 659 660def _loglevel_changer(attr, val, old): 661 # type: (str, int, int) -> int 662 """Handle a change of conf.logLevel""" 663 log_scapy.setLevel(val) 664 return val 665 666 667def _iface_changer(attr, val, old): 668 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface' 669 """Resolves the interface in conf.iface""" 670 if isinstance(val, str): 671 from scapy.interfaces import resolve_iface 672 iface = resolve_iface(val) 673 if old and iface.dummy: 674 warning( 675 "This interface is not specified in any provider ! " 676 "See conf.ifaces output" 677 ) 678 return iface 679 return val # type: ignore 680 681 682class Conf(ConfClass): 683 """ 684 This object contains the configuration of Scapy. 685 """ 686 version = ReadOnlyAttribute("version", VERSION) 687 session = "" #: filename where the session will be saved 688 interactive = False 689 #: can be "ipython", "python" or "auto". Default: Auto 690 interactive_shell = "" 691 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...) 692 stealth = "not implemented" 693 #: selects the default output interface for srp() and sendp(). 694 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # type: ignore # noqa: E501 695 layers = LayersList() 696 commands = CommandsList() # type: CommandsList 697 ASN1_default_codec = None #: Codec used by default for ASN1 objects 698 AS_resolver = None #: choose the AS resolver class to use 699 dot15d4_protocol = None # Used in dot15d4.py 700 logLevel = Interceptor("logLevel", log_scapy.level, _loglevel_changer) 701 #: if 0, doesn't check that IPID matches between IP sent and 702 #: ICMP IP citation received 703 #: if 1, checks that they either are equal or byte swapped 704 #: equals (bug in some IP stacks) 705 #: if 2, strictly checks that they are equals 706 checkIPID = False 707 #: if 1, checks IP src in IP and ICMP IP citation match 708 #: (bug in some NAT stacks) 709 checkIPsrc = True 710 checkIPaddr = True 711 #: if True, checks that IP-in-IP layers match. If False, do 712 #: not check IP layers that encapsulates another IP layer 713 checkIPinIP = True 714 #: if 1, also check that TCP seq and ack match the 715 #: ones in ICMP citation 716 check_TCPerror_seqack = False 717 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose) 718 prompt = Interceptor("prompt", ">>> ", _prompt_changer) 719 #: default mode for listening socket (to get answers if you 720 #: spoof on a lan) 721 promisc = True 722 sniff_promisc = 1 #: default mode for sniff() 723 raw_layer = None # type: Type[Packet] 724 raw_summary = False 725 padding_layer = None # type: Type[Packet] 726 default_l2 = None # type: Type[Packet] 727 l2types = Num2Layer() 728 l3types = Num2Layer() 729 L3socket = None # type: Type[scapy.supersocket.SuperSocket] 730 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket] 731 L2socket = None # type: Type[scapy.supersocket.SuperSocket] 732 L2listen = None # type: Type[scapy.supersocket.SuperSocket] 733 BTsocket = None # type: Type[scapy.supersocket.SuperSocket] 734 USBsocket = None # type: Type[scapy.supersocket.SuperSocket] 735 min_pkt_size = 60 736 #: holds MIB direct access dictionary 737 mib = None # type: 'scapy.asn1.mib.MIBDict' 738 bufsize = 2**16 739 #: history file 740 histfile = os.getenv('SCAPY_HISTFILE', 741 os.path.join(os.path.expanduser("~"), 742 ".scapy_history")) 743 #: includes padding in disassembled packets 744 padding = 1 745 #: BPF filter for packets to ignore 746 except_filter = "" 747 #: bpf filter added to every sniffing socket to exclude traffic 748 #: from analysis 749 filter = "" 750 #: when 1, store received packet that are not matched into `debug.recv` 751 debug_match = False 752 #: When 1, print some TLS session secrets when they are computed. 753 debug_tls = False 754 wepkey = "" 755 #: holds the Scapy interface list and manager 756 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict' 757 #: holds the cache of interfaces loaded from Libpcap 758 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], int]] 759 neighbor = None # type: 'scapy.layers.l2.Neighbor' 760 # `neighbor` will be filed by scapy.layers.l2 761 #: holds the Scapy IPv4 routing table and provides methods to 762 #: manipulate it 763 route = None # type: 'scapy.route.Route' 764 # `route` will be filed by route.py 765 #: holds the Scapy IPv6 routing table and provides methods to 766 #: manipulate it 767 route6 = None # type: 'scapy.route6.Route6' 768 manufdb = None # type: 'scapy.data.ManufDA' 769 # 'route6' will be filed by route6.py 770 teredoPrefix = "" # type: str 771 teredoServerPort = None # type: int 772 auto_fragment = True 773 #: raise exception when a packet dissector raises an exception 774 debug_dissector = False 775 color_theme = Interceptor("color_theme", NoTheme(), _prompt_changer) 776 #: how much time between warnings from the same place 777 warning_threshold = 5 778 prog = ProgPath() 779 #: holds list of fields for which resolution should be done 780 resolve = Resolve() 781 #: holds list of enum fields for which conversion to string 782 #: should NOT be done 783 noenum = Resolve() 784 emph = Emphasize() 785 #: read only attribute to show if PyPy is in use 786 use_pypy = ReadOnlyAttribute("use_pypy", isPyPy()) 787 #: use libpcap integration or not. Changing this value will update 788 #: the conf.L[2/3] sockets 789 use_pcap = Interceptor( 790 "use_pcap", 791 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"), 792 _socket_changer 793 ) 794 use_bpf = Interceptor("use_bpf", False, _socket_changer) 795 use_npcap = False 796 ipv6_enabled = socket.has_ipv6 797 #: path or list of paths where extensions are to be looked for 798 extensions_paths = "." 799 stats_classic_protocols = [] # type: List[Type[Packet]] 800 stats_dot11_protocols = [] # type: List[Type[Packet]] 801 temp_files = [] # type: List[str] 802 netcache = NetCache() 803 geoip_city = None 804 # can, tls, http and a few others are not loaded by default 805 load_layers = [ 806 'bluetooth', 807 'bluetooth4LE', 808 'dhcp', 809 'dhcp6', 810 'dns', 811 'dot11', 812 'dot15d4', 813 'eap', 814 'gprs', 815 'hsrp', 816 'inet', 817 'inet6', 818 'ipsec', 819 'ir', 820 'isakmp', 821 'l2', 822 'l2tp', 823 'llmnr', 824 'lltd', 825 'mgcp', 826 'mobileip', 827 'netbios', 828 'netflow', 829 'ntp', 830 'ppi', 831 'ppp', 832 'pptp', 833 'radius', 834 'rip', 835 'rtp', 836 'sctp', 837 'sixlowpan', 838 'skinny', 839 'smb', 840 'smb2', 841 'snmp', 842 'tftp', 843 'vrrp', 844 'vxlan', 845 'x509', 846 'zigbee' 847 ] 848 #: a dict which can be used by contrib layers to store local 849 #: configuration 850 contribs = dict() # type: Dict[str, Any] 851 crypto_valid = isCryptographyValid() 852 crypto_valid_advanced = isCryptographyAdvanced() 853 fancy_prompt = True 854 auto_crop_tables = True 855 #: how often to check for new packets. 856 #: Defaults to 0.05s. 857 recv_poll_rate = 0.05 858 #: When True, raise exception if no dst MAC found otherwise broadcast. 859 #: Default is False. 860 raise_no_dst_mac = False 861 loopback_name = "lo" if LINUX else "lo0" 862 863 def __getattribute__(self, attr): 864 # type: (str) -> Any 865 # Those are loaded on runtime to avoid import loops 866 if attr == "manufdb": 867 from scapy.data import MANUFDB 868 return MANUFDB 869 if attr == "ethertypes": 870 from scapy.data import ETHER_TYPES 871 return ETHER_TYPES 872 if attr == "protocols": 873 from scapy.data import IP_PROTOS 874 return IP_PROTOS 875 if attr == "services_udp": 876 from scapy.data import UDP_SERVICES 877 return UDP_SERVICES 878 if attr == "services_tcp": 879 from scapy.data import TCP_SERVICES 880 return TCP_SERVICES 881 if attr == "iface6": 882 warnings.warn( 883 "conf.iface6 is deprecated in favor of conf.iface", 884 DeprecationWarning 885 ) 886 attr = "iface" 887 return object.__getattribute__(self, attr) 888 889 890if not Conf.ipv6_enabled: 891 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501 892 for m in ["inet6", "dhcp6"]: 893 if m in Conf.load_layers: 894 Conf.load_layers.remove(m) 895 896conf = Conf() # type: Conf 897 898 899def crypto_validator(func): 900 # type: (DecoratorCallable) -> DecoratorCallable 901 """ 902 This a decorator to be used for any method relying on the cryptography library. # noqa: E501 903 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'. 904 """ 905 def func_in(*args, **kwargs): 906 # type: (*Any, **Any) -> Any 907 if not conf.crypto_valid: 908 raise ImportError("Cannot execute crypto-related method! " 909 "Please install python-cryptography v1.7 or later.") # noqa: E501 910 return func(*args, **kwargs) 911 return func_in # type: ignore 912 913 914def scapy_delete_temp_files(): 915 # type: () -> None 916 for f in conf.temp_files: 917 try: 918 os.unlink(f) 919 except Exception: 920 pass 921 del conf.temp_files[:] 922 923 924atexit.register(scapy_delete_temp_files) 925