1import logging 2import os 3import random 4import shutil 5import subprocess 6import sys 7import tempfile 8import time 9from ipaddress import ip_address 10 11import dpkt 12import pytest 13 14import deckard 15from contrib.namespaces import LinuxNamespace 16from networking import InterfaceManager 17 18 19def set_coverage_env(path, qmin): 20 """Sets up enviroment variables so code coverage utility can work.""" 21 if os.environ.get("COVERAGE"): 22 exports = subprocess.check_output([os.environ["COVERAGE_ENV_SCRIPT"], 23 os.environ["DAEMONSRCDIR"], 24 os.environ["COVERAGE_STATSDIR"], 25 path + "-qmin-" + str(qmin)]).decode() 26 for export in exports.split(): 27 key, value = export.split("=", 1) 28 value = value.strip('"') 29 os.environ[key] = value 30 31 32def check_platform(): 33 if sys.platform == 'windows': 34 pytest.exit('Not supported at all on Windows') 35 36 37# Suppress extensive Augeas logging 38logging.getLogger("augeas").setLevel(logging.ERROR) 39 40 41check_platform() 42 43 44class DeckardUnderLoadError(Exception): 45 pass 46 47 48class TCPDump: 49 """This context manager captures a PCAP file and than checks it for obvious errors.""" 50 51 DUMPCAP_CMD = ["dumpcap", "-i", "any", "-q", "-P", "-w"] 52 53 def __init__(self, config): 54 self.config = config 55 self.config["tmpdir"] = self.get_tmpdir() 56 self.tcpdump = None 57 self.config["pcap"] = os.path.join(self.config["tmpdir"], "deckard.pcap") 58 59 def __enter__(self): 60 cmd = self.DUMPCAP_CMD.copy() 61 cmd.append(self.config["pcap"]) 62 self.tcpdump = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE) 63 64 def __exit__(self, _, exc_value, __): 65 # Wait for the PCAP to be finalized 66 while not os.path.exists(self.config["pcap"]): 67 time.sleep(1) 68 69 self.tcpdump.terminate() 70 self.tcpdump.wait() 71 72 self.check_for_unknown_server() 73 74 if exc_value is None: 75 if self.config.get('noclean') or "DECKARD_NOCLEAN" in os.environ: 76 # Do not clear files if the server crashed (for analysis) 77 logging.getLogger('deckard.hint').info( 78 'test working directory %s', self.config["tmpdir"]) 79 else: 80 shutil.rmtree(self.config["tmpdir"]) 81 else: 82 if isinstance(exc_value, ValueError): 83 self.check_for_icmp() 84 raise 85 86 @staticmethod 87 def get_tmpdir(): 88 if "DECKARD_DIR" in os.environ: 89 tmpdir = os.environ["DECKARD_DIR"] 90 if os.path.lexists(tmpdir): 91 raise ValueError('DECKARD_DIR "%s" must not exist' % tmpdir) 92 else: 93 tmpdir = tempfile.mkdtemp(suffix='', prefix='tmpdeckard') 94 95 return tmpdir 96 97 def check_for_icmp(self): 98 """ Checks Deckards's PCAP for ICMP packets """ 99 # Deckard's responses to resolvers might be delayed due to load which 100 # leads the resolver to close the port and to the test failing in the 101 # end. We partially detect these by checking the PCAP for ICMP packets. 102 udp_seen = False 103 with open(self.config["pcap"], "rb") as f: 104 pcap = dpkt.pcap.Reader(f) 105 for _, packet in pcap: 106 ip = dpkt.sll.SLL(packet).data 107 108 if isinstance(ip.data, dpkt.udp.UDP): 109 udp_seen = True 110 111 if udp_seen: 112 if isinstance(ip.data, (dpkt.icmp.ICMP, dpkt.icmp6.ICMP6)): 113 raise DeckardUnderLoadError("Deckard is under load. " 114 "Other errors might be false negatives. " 115 "Consider retrying the job later.") 116 117 def check_for_unknown_server(self): 118 unknown_addresses = set() 119 with open(self.config["pcap"], "rb") as f: 120 pcap = dpkt.pcap.Reader(f) 121 for _, packet in pcap: 122 ip = dpkt.sll.SLL(packet).data 123 try: 124 if ip.p != dpkt.ip.IP_PROTO_TCP or ip.p != dpkt.ip.IP_PROTO_UDP: 125 continue 126 except AttributeError: 127 continue 128 dest = str(ip_address(ip.dst)) 129 if dest not in self.config["if_manager"].added_addresses: 130 unknown_addresses.add(dest) 131 132 if unknown_addresses: 133 raise RuntimeError("Binary under test queried an IP address not present" 134 " in scenario %s" % unknown_addresses) 135 136 137def run_test(path, qmin, config, max_retries, retries=0): 138 set_coverage_env(path, qmin) 139 140 try: 141 with LinuxNamespace("net"): 142 config["if_manager"] = InterfaceManager() 143 with TCPDump(config): 144 deckard.process_file(path, qmin, config) 145 except deckard.DeckardUnderLoadError as e: 146 if retries < max_retries: 147 logging.error("Deckard under load. Retrying…") 148 # Exponential backoff 149 time.sleep((2 ** retries) + random.random()) 150 run_test(path, qmin, config, max_retries, retries + 1) 151 else: 152 raise e 153 154 155def test_passes_qmin_on(scenario, max_retries): 156 if scenario.qmin is True or scenario.qmin is None: 157 run_test(scenario.path, True, scenario.config, max_retries) 158 else: 159 pytest.skip("Query minimization is off in test config") 160 161 162def test_passes_qmin_off(scenario, max_retries): 163 if scenario.qmin is False or scenario.qmin is None: 164 run_test(scenario.path, False, scenario.config, max_retries) 165 else: 166 pytest.skip("Query minimization is on in test config") 167