1#!/usr/bin/env python 2# pylint: disable=invalid-name,print-statement,superfluous-parens 3""" 4Run several tcpkali processes in source-destination mode and figure out 5if they're behaving according to expectations 6""" 7 8from __future__ import absolute_import 9import os 10import re 11import sys 12import time 13import tempfile 14import datetime 15import subprocess 16 17 18def log(*args): 19 """Print out the arguments in a log-like fashion.""" 20 timestamp = datetime.datetime.now() 21 sys.stderr.write("[%s]: %s\n" % 22 (timestamp, ' '.join([str(x) for x in list(args)]))) 23 sys.stderr.flush() 24 25 26class Tcpkali(object): 27 """Wrapper around `tcpkali` process and its output.""" 28 def __init__(self, args, **kvargs): 29 self.proc = None 30 31 exe = os.getenv("TCPKALI", "../src/tcpkali") 32 33 self.fout = tempfile.TemporaryFile() 34 self.ferr = tempfile.TemporaryFile() 35 36 full_args = [exe] + args 37 if kvargs.get('capture_io', False): 38 full_args.append("--dump-one") 39 self.proc = subprocess.Popen(full_args, 40 stdout=self.fout, stderr=self.ferr) 41 time.sleep(0.1) 42 self.proc.poll() 43 log(' '.join(full_args)) 44 if self.proc.returncode is not None: 45 log("Could not start the tcpkali process: %r\n" 46 % self.proc.returncode) 47 raise Exception("Cannot start the tcpkali process") 48 log("Started tcpkali pid %r" % self.proc.pid) 49 50 def results(self): 51 """Return results of tcpkali operation.""" 52 self.wait() 53 self.fout.seek(0) 54 self.ferr.seek(0) 55 out = self.fout.readlines() 56 err = self.ferr.readlines() 57 return (out, err) 58 59 def wait(self): 60 """Wait until tcpkali finished.""" 61 if self.proc: 62 self.proc.wait() 63 if self.proc.returncode is None: 64 log("Could not stop the tcpkali process\n") 65 raise Exception("Can't stop tcpkali process") 66 pid = self.proc.pid 67 self.proc = None 68 log("Stopped tcpkali pid %r" % pid) 69 70 def __del__(self): 71 if self.proc: 72 self.proc.terminate() 73 self.wait() 74 75 76class Analyze(object): 77 """Tcpkali output analyzer.""" 78 # pylint: disable=too-many-instance-attributes,too-many-locals 79 def __init__(self, args): 80 self.out_lengths = {} 81 self.out_num = 0 82 self.in_lengths = {} 83 self.in_num = 0 84 85 # Output and Error lines 86 outLines = args[0] 87 errLines = args[1] 88 89 def _record_occurrence(d, length): 90 if length > 50 and length <= 100: 91 length = 10 * (length // 10) 92 elif length > 100 and length <= 2000: 93 length = 100 * (length // 100) 94 elif length > 2000: 95 length = 2000 96 d[length] = d.get(length, 0) + 1 97 98 outRe = re.compile(r"^Snd\(\d+, (\d+)\): \[(.*)\]$") 99 for _, line in enumerate(errLines): 100 result = outRe.match(line) 101 if result: 102 self.out_num = self.out_num + 1 103 outLen = int(result.group(1)) 104 _record_occurrence(self.out_lengths, outLen) 105 106 inRe = re.compile(r"^Rcv\(\d+, (\d+)\): \[(.*)\]$") 107 for _, line in enumerate(errLines): 108 result = inRe.match(line) 109 if result: 110 self.in_num = self.in_num + 1 111 inLen = int(result.group(1)) 112 _record_occurrence(self.in_lengths, inLen) 113 114 bwRe = re.compile(r"^Aggregate bandwidth: " 115 r"([\d.]+)[^\d]+, ([\d.]+)[^\d]+ Mbps") 116 bws = [bwRe.match(line) for line in outLines if bwRe.match(line)][0] 117 self.bw_down_mbps = float(bws.group(1)) 118 self.bw_up_mbps = float(bws.group(2)) 119 120 totalRe = re.compile(r"^Total data (sent|received):.*" 121 r"\(([\d.]+) bytes\)") 122 sent = [totalRe.match(line) for line in outLines 123 if totalRe.match(line) and 'sent' in line][0] 124 self.total_sent_bytes = int(sent.group(2)) 125 rcvd = [totalRe.match(line) for line in outLines 126 if totalRe.match(line) and 'received' in line][0] 127 self.total_received_bytes = int(rcvd.group(2)) 128 129 sockoptRe = re.compile(r"^WARNING: --(snd|rcv)buf option " 130 r"makes no effect.") 131 self.sockopt_works = True 132 for _, line in enumerate(errLines): 133 result = sockoptRe.match(line) 134 if result: 135 self.sockopt_works = False 136 137 self.debug() 138 139 # Int -> Int(0..100) 140 def input_length_percentile_lte(self, n): 141 """ 142 Determine percentile value of all occurrences of input lengths 143 less or equal to n. 144 """ 145 return self._length_percentile_lte(self.in_lengths, n) 146 147 # Int -> Int(0..100) 148 def output_length_percentile_lte(self, n): 149 """ 150 Determine percentile value of all occurrences of output lengths 151 less or equal to n. 152 """ 153 return self._length_percentile_lte(self.out_lengths, n) 154 155 def _length_percentile_lte(self, d, n): 156 # pylint: disable=no-self-use 157 total = sum(d.values()) 158 if total > 0: 159 occurs = sum([v for (k, v) in d.items() if k <= n]) 160 return 100 * occurs // total 161 else: 162 return 0 163 164 def debug(self): 165 """Print the variables representing analyzed tcpkali output.""" 166 for kv in sorted(vars(self).items()): 167 log(" '%s': %s" % kv) 168 169 170def check_segmentation(prefix, lines, contains): 171 """Check that the output consists of the neat repetition of (contents)""" 172 reg = re.compile(r"^" + prefix + r"\(\d+, \d+\): \[(.*)\]$") 173 allOutput = "" 174 for _, line in enumerate(lines): 175 result = reg.match(line) 176 if result: 177 allOutput += result.group(1) 178 assert(allOutput != "") 179 reg = re.compile(r"^((" + contains + ")+)(.*)$") 180 result = reg.match(allOutput) 181 if not result: 182 print("Expected repetition of \"%s\" is not found in \"%s\"..." % 183 (contains, allOutput[0:len(contains)+1])) 184 return False 185 elif len(result.group(3)) == 0: 186 return True 187 else: 188 print("Output is not consistent after byte %d (...\"%s\");" 189 " continuing with \"%s\"..." % 190 (len(result.group(1)), result.group(2), 191 result.group(3)[0:len(contains)+1])) 192 return False 193 194 195def main(): 196 """Run multiple tests with tcpkali and see if results are correct""" 197 # pylint: disable=too-many-statements 198 port = 1350 199 200 if os.environ.get('CONTINUOUS_INTEGRATION', 'false') == 'false': 201 print("Correctness of data packetization") 202 port = port + 1 203 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 204 "-mFOOBARBAZ"], capture_io=True) 205 (_, errLines) = t.results() 206 assert check_segmentation("Snd", errLines, "FOOBARBAZ") 207 208 print("Slow rate limiting cuts packets at message boundaries") 209 port = port + 1 210 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 211 "-r20", "-mABC"], capture_io=True) 212 a = Analyze(t.results()) 213 assert a.output_length_percentile_lte(len("ABC")) == 100 214 215 print("Rate limiting at 2k does not create single-message writes") 216 port = port + 1 217 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 218 "-r2k", "-mABC"], capture_io=True) 219 a = Analyze(t.results()) 220 assert a.output_length_percentile_lte(len("ABC")) < 2 221 assert sum([a.out_lengths.get(i, 0) for i in range(1, 10) if i % 3]) == 0 222 223 print("Rate limiting cuts packets at message boundaries") 224 port = port + 1 225 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 226 "-r3k", "-mABC"], capture_io=True) 227 a = Analyze(t.results()) 228 assert a.output_length_percentile_lte(4 * len("ABC")) > 90 229 assert sum([a.out_lengths.get(i, 0) for i in range(1, 10) if i % 3]) == 0 230 231 print("Write combining OFF still cuts packets at message boundaries") 232 port = port + 1 233 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 234 "-r3k", "-mABC", "--write-combine=off"], capture_io=True) 235 a = Analyze(t.results()) 236 assert a.output_length_percentile_lte(4 * len("ABC")) > 90 237 assert sum([a.out_lengths.get(i, 0) for i in range(1, 10) if i % 3]) == 0 238 239 print("Rate limiting smoothess with 2kRPS") 240 port = port + 1 241 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 242 "-r2k", "-mABC"], capture_io=True) 243 a = Analyze(t.results()) 244 # Check for not too many long packets outliers (<5%). 245 assert a.output_length_percentile_lte(4 * len("ABC")) > 95 246 247 print("Rate limiting smoothess with 15kRPS") 248 port = port + 1 249 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 250 "-r15k", "-mABC"], capture_io=True) 251 a = Analyze(t.results()) 252 # Check for not too many short packets outliers (<10%). 253 assert a.output_length_percentile_lte(2 * len("ABC")) < 10 254 255 print("Observe write combining at 20kRPS by default") 256 port = port + 1 257 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 258 "-w1", # Multi-core affects (removes) TCP level coalescing 259 # So we disable it here to obtain some for 260 # proper operation of input_length_percentile_lte(). 261 "-r20k", "-mABC", "--dump-all"], capture_io=True) 262 a = Analyze(t.results()) 263 # Check for not too many short packets outliers (<10%). 264 assert a.output_length_percentile_lte(4 * len("ABC")) < 10 265 assert a.input_length_percentile_lte(1 * len("ABC")) < 10 266 267 print("No write combining at 20kRPS with --write-combine=off") 268 port = port + 1 269 t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1", 270 "-w1", # Multi-core affects (removes) TCP level coalescing 271 # So we disable it here to obtain some for 272 # proper operation of input_length_percentile_lte(). 273 "-r20k", "-mABC", "--dump-all", "--write-combine=off"], 274 capture_io=True) 275 a = Analyze(t.results()) 276 # Check for all writes being short (non-coalesced) ones. 277 assert a.output_length_percentile_lte(len("ABC")) == 100 278 # Check that not all reads are de-coalesced. Statistically speaking, 279 # on one core there must be at least some read()-coalescing. 280 assert a.input_length_percentile_lte(1 * len("ABC")) < 50 281 282 # Perform generic bandwidth limiting in different directions, 283 # while varying options 284 for variant in [[], ["--websocket"], ["--write-combine=off"], 285 ["--websocket", "--write-combine=off"]]: 286 287 print("Can do more than 100 Mbps if short-cirquited" 288 ", opts=" + str(variant)) 289 port = port + 1 290 t = Tcpkali(variant + ["-l" + str(port), "127.1:" + str(port), 291 "-m1", "-T1", "--listen-mode=active"]) 292 a = Analyze(t.results()) 293 assert a.bw_down_mbps > 100 and a.bw_up_mbps > 100 294 295 print("Can effectively limit upstream bandwidth from sender" 296 ", opts=" + str(variant)) 297 port = port + 1 298 receiver = Tcpkali(variant + ["-l" + str(port), "-T3"]) 299 sender = Tcpkali(variant + ["127.1:" + str(port), "-m1", "-T3", 300 "--channel-bandwidth-upstream=100kbps"]) 301 arcv = Analyze(receiver.results()) 302 asnd = Analyze(sender.results()) 303 assert(arcv.bw_up_mbps < 0.01 and 304 arcv.bw_down_mbps > 0.090 and arcv.bw_down_mbps < 0.110) 305 assert(asnd.bw_down_mbps < 0.01 and 306 asnd.bw_up_mbps > 0.090 and asnd.bw_up_mbps < 0.110) 307 308 # This test is special because downstream rate limit is not immediately 309 # visible on the sender. The feedback loop takes time to stabilize. 310 port = port + 1 311 print("Can effectively limit downstream bandwidth from receiver" 312 ", opts=" + str(variant)) 313 receiver = Tcpkali(variant + 314 ["-l" + str(port), "-T11", "--rcvbuf=5k", 315 "--channel-bandwidth-downstream=100kbps"]) 316 sender = Tcpkali(variant + ["127.1:" + str(port), "-m1", "-T11", 317 "--sndbuf=5k"]) 318 arcv = Analyze(receiver.results()) 319 asnd = Analyze(sender.results()) 320 transfer = ((100 * 1024 // 8) * 11) 321 trans_min = 0.85 * transfer 322 trans_max = 1.10 * transfer 323 assert((arcv.total_sent_bytes < 1000 and 324 arcv.total_received_bytes > trans_min and 325 arcv.total_received_bytes < trans_max) or 326 not arcv.sockopt_works) 327 assert((asnd.total_received_bytes < 1000 and 328 asnd.total_sent_bytes > trans_min and 329 asnd.total_sent_bytes < 3 * trans_max) or 330 not asnd.sockopt_works) 331 332 print("FINISHED") 333 334if __name__ == "__main__": 335 main() 336