1#!/usr/bin/env python
2# pylint: disable=invalid-name,print-statement,superfluous-parens
3"""
4Run several tcpkali processes in source-destination mode and figure out
5if they're behaving according to expectations
6"""
7
8from __future__ import absolute_import
9import os
10import re
11import sys
12import time
13import tempfile
14import datetime
15import subprocess
16
17
18def log(*args):
19    """Print out the arguments in a log-like fashion."""
20    timestamp = datetime.datetime.now()
21    sys.stderr.write("[%s]: %s\n" %
22                     (timestamp, ' '.join([str(x) for x in list(args)])))
23    sys.stderr.flush()
24
25
26class Tcpkali(object):
27    """Wrapper around `tcpkali` process and its output."""
28    def __init__(self, args, **kvargs):
29        self.proc = None
30
31        exe = os.getenv("TCPKALI", "../src/tcpkali")
32
33        self.fout = tempfile.TemporaryFile()
34        self.ferr = tempfile.TemporaryFile()
35
36        full_args = [exe] + args
37        if kvargs.get('capture_io', False):
38            full_args.append("--dump-one")
39        self.proc = subprocess.Popen(full_args,
40                                     stdout=self.fout, stderr=self.ferr)
41        time.sleep(0.1)
42        self.proc.poll()
43        log(' '.join(full_args))
44        if self.proc.returncode is not None:
45            log("Could not start the tcpkali process: %r\n"
46                % self.proc.returncode)
47            raise Exception("Cannot start the tcpkali process")
48        log("Started tcpkali pid %r" % self.proc.pid)
49
50    def results(self):
51        """Return results of tcpkali operation."""
52        self.wait()
53        self.fout.seek(0)
54        self.ferr.seek(0)
55        out = self.fout.readlines()
56        err = self.ferr.readlines()
57        return (out, err)
58
59    def wait(self):
60        """Wait until tcpkali finished."""
61        if self.proc:
62            self.proc.wait()
63            if self.proc.returncode is None:
64                log("Could not stop the tcpkali process\n")
65                raise Exception("Can't stop tcpkali process")
66            pid = self.proc.pid
67            self.proc = None
68            log("Stopped tcpkali pid %r" % pid)
69
70    def __del__(self):
71        if self.proc:
72            self.proc.terminate()
73            self.wait()
74
75
76class Analyze(object):
77    """Tcpkali output analyzer."""
78    # pylint: disable=too-many-instance-attributes,too-many-locals
79    def __init__(self, args):
80        self.out_lengths = {}
81        self.out_num = 0
82        self.in_lengths = {}
83        self.in_num = 0
84
85        # Output and Error lines
86        outLines = args[0]
87        errLines = args[1]
88
89        def _record_occurrence(d, length):
90            if length > 50 and length <= 100:
91                length = 10 * (length // 10)
92            elif length > 100 and length <= 2000:
93                length = 100 * (length // 100)
94            elif length > 2000:
95                length = 2000
96            d[length] = d.get(length, 0) + 1
97
98        outRe = re.compile(r"^Snd\(\d+, (\d+)\): \[(.*)\]$")
99        for _, line in enumerate(errLines):
100            result = outRe.match(line)
101            if result:
102                self.out_num = self.out_num + 1
103                outLen = int(result.group(1))
104                _record_occurrence(self.out_lengths, outLen)
105
106        inRe = re.compile(r"^Rcv\(\d+, (\d+)\): \[(.*)\]$")
107        for _, line in enumerate(errLines):
108            result = inRe.match(line)
109            if result:
110                self.in_num = self.in_num + 1
111                inLen = int(result.group(1))
112                _record_occurrence(self.in_lengths, inLen)
113
114        bwRe = re.compile(r"^Aggregate bandwidth: "
115                          r"([\d.]+)[^\d]+, ([\d.]+)[^\d]+ Mbps")
116        bws = [bwRe.match(line) for line in outLines if bwRe.match(line)][0]
117        self.bw_down_mbps = float(bws.group(1))
118        self.bw_up_mbps = float(bws.group(2))
119
120        totalRe = re.compile(r"^Total data (sent|received):.*"
121                             r"\(([\d.]+) bytes\)")
122        sent = [totalRe.match(line) for line in outLines
123                if totalRe.match(line) and 'sent' in line][0]
124        self.total_sent_bytes = int(sent.group(2))
125        rcvd = [totalRe.match(line) for line in outLines
126                if totalRe.match(line) and 'received' in line][0]
127        self.total_received_bytes = int(rcvd.group(2))
128
129        sockoptRe = re.compile(r"^WARNING: --(snd|rcv)buf option "
130                               r"makes no effect.")
131        self.sockopt_works = True
132        for _, line in enumerate(errLines):
133            result = sockoptRe.match(line)
134            if result:
135                self.sockopt_works = False
136
137        self.debug()
138
139    # Int -> Int(0..100)
140    def input_length_percentile_lte(self, n):
141        """
142        Determine percentile value of all occurrences of input lengths
143        less or equal to n.
144        """
145        return self._length_percentile_lte(self.in_lengths, n)
146
147    # Int -> Int(0..100)
148    def output_length_percentile_lte(self, n):
149        """
150        Determine percentile value of all occurrences of output lengths
151        less or equal to n.
152        """
153        return self._length_percentile_lte(self.out_lengths, n)
154
155    def _length_percentile_lte(self, d, n):
156        # pylint: disable=no-self-use
157        total = sum(d.values())
158        if total > 0:
159            occurs = sum([v for (k, v) in d.items() if k <= n])
160            return 100 * occurs // total
161        else:
162            return 0
163
164    def debug(self):
165        """Print the variables representing analyzed tcpkali output."""
166        for kv in sorted(vars(self).items()):
167            log("  '%s': %s" % kv)
168
169
170def check_segmentation(prefix, lines, contains):
171    """Check that the output consists of the neat repetition of (contents)"""
172    reg = re.compile(r"^" + prefix + r"\(\d+, \d+\): \[(.*)\]$")
173    allOutput = ""
174    for _, line in enumerate(lines):
175        result = reg.match(line)
176        if result:
177            allOutput += result.group(1)
178    assert(allOutput != "")
179    reg = re.compile(r"^((" + contains + ")+)(.*)$")
180    result = reg.match(allOutput)
181    if not result:
182        print("Expected repetition of \"%s\" is not found in \"%s\"..." %
183              (contains, allOutput[0:len(contains)+1]))
184        return False
185    elif len(result.group(3)) == 0:
186        return True
187    else:
188        print("Output is not consistent after byte %d (...\"%s\");"
189              " continuing with \"%s\"..." %
190              (len(result.group(1)), result.group(2),
191               result.group(3)[0:len(contains)+1]))
192        return False
193
194
195def main():
196    """Run multiple tests with tcpkali and see if results are correct"""
197    # pylint: disable=too-many-statements
198    port = 1350
199
200    if os.environ.get('CONTINUOUS_INTEGRATION', 'false') == 'false':
201        print("Correctness of data packetization")
202        port = port + 1
203        t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
204                     "-mFOOBARBAZ"], capture_io=True)
205        (_, errLines) = t.results()
206        assert check_segmentation("Snd", errLines, "FOOBARBAZ")
207
208    print("Slow rate limiting cuts packets at message boundaries")
209    port = port + 1
210    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
211                 "-r20", "-mABC"], capture_io=True)
212    a = Analyze(t.results())
213    assert a.output_length_percentile_lte(len("ABC")) == 100
214
215    print("Rate limiting at 2k does not create single-message writes")
216    port = port + 1
217    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
218                 "-r2k", "-mABC"], capture_io=True)
219    a = Analyze(t.results())
220    assert a.output_length_percentile_lte(len("ABC")) < 2
221    assert sum([a.out_lengths.get(i, 0) for i in range(1, 10) if i % 3]) == 0
222
223    print("Rate limiting cuts packets at message boundaries")
224    port = port + 1
225    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
226                 "-r3k", "-mABC"], capture_io=True)
227    a = Analyze(t.results())
228    assert a.output_length_percentile_lte(4 * len("ABC")) > 90
229    assert sum([a.out_lengths.get(i, 0) for i in range(1, 10) if i % 3]) == 0
230
231    print("Write combining OFF still cuts packets at message boundaries")
232    port = port + 1
233    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
234                 "-r3k", "-mABC", "--write-combine=off"], capture_io=True)
235    a = Analyze(t.results())
236    assert a.output_length_percentile_lte(4 * len("ABC")) > 90
237    assert sum([a.out_lengths.get(i, 0) for i in range(1, 10) if i % 3]) == 0
238
239    print("Rate limiting smoothess with 2kRPS")
240    port = port + 1
241    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
242                 "-r2k", "-mABC"], capture_io=True)
243    a = Analyze(t.results())
244    # Check for not too many long packets outliers (<5%).
245    assert a.output_length_percentile_lte(4 * len("ABC")) > 95
246
247    print("Rate limiting smoothess with 15kRPS")
248    port = port + 1
249    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
250                 "-r15k", "-mABC"], capture_io=True)
251    a = Analyze(t.results())
252    # Check for not too many short packets outliers (<10%).
253    assert a.output_length_percentile_lte(2 * len("ABC")) < 10
254
255    print("Observe write combining at 20kRPS by default")
256    port = port + 1
257    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
258                 "-w1",  # Multi-core affects (removes) TCP level coalescing
259                         # So we disable it here to obtain some for
260                         # proper operation of input_length_percentile_lte().
261                 "-r20k", "-mABC", "--dump-all"], capture_io=True)
262    a = Analyze(t.results())
263    # Check for not too many short packets outliers (<10%).
264    assert a.output_length_percentile_lte(4 * len("ABC")) < 10
265    assert a.input_length_percentile_lte(1 * len("ABC")) < 10
266
267    print("No write combining at 20kRPS with --write-combine=off")
268    port = port + 1
269    t = Tcpkali(["-l" + str(port), "127.1:" + str(port), "-T1",
270                 "-w1",  # Multi-core affects (removes) TCP level coalescing
271                         # So we disable it here to obtain some for
272                         # proper operation of input_length_percentile_lte().
273                 "-r20k", "-mABC", "--dump-all", "--write-combine=off"],
274                capture_io=True)
275    a = Analyze(t.results())
276    # Check for all writes being short (non-coalesced) ones.
277    assert a.output_length_percentile_lte(len("ABC")) == 100
278    # Check that not all reads are de-coalesced. Statistically speaking,
279    # on one core there must be at least some read()-coalescing.
280    assert a.input_length_percentile_lte(1 * len("ABC")) < 50
281
282    # Perform generic bandwidth limiting in different directions,
283    # while varying options
284    for variant in [[], ["--websocket"], ["--write-combine=off"],
285                    ["--websocket", "--write-combine=off"]]:
286
287        print("Can do more than 100 Mbps if short-cirquited"
288              ", opts=" + str(variant))
289        port = port + 1
290        t = Tcpkali(variant + ["-l" + str(port), "127.1:" + str(port),
291                               "-m1", "-T1", "--listen-mode=active"])
292        a = Analyze(t.results())
293        assert a.bw_down_mbps > 100 and a.bw_up_mbps > 100
294
295        print("Can effectively limit upstream bandwidth from sender"
296              ", opts=" + str(variant))
297        port = port + 1
298        receiver = Tcpkali(variant + ["-l" + str(port), "-T3"])
299        sender = Tcpkali(variant + ["127.1:" + str(port), "-m1", "-T3",
300                                    "--channel-bandwidth-upstream=100kbps"])
301        arcv = Analyze(receiver.results())
302        asnd = Analyze(sender.results())
303        assert(arcv.bw_up_mbps < 0.01 and
304               arcv.bw_down_mbps > 0.090 and arcv.bw_down_mbps < 0.110)
305        assert(asnd.bw_down_mbps < 0.01 and
306               asnd.bw_up_mbps > 0.090 and asnd.bw_up_mbps < 0.110)
307
308        # This test is special because downstream rate limit is not immediately
309        # visible on the sender. The feedback loop takes time to stabilize.
310        port = port + 1
311        print("Can effectively limit downstream bandwidth from receiver"
312              ", opts=" + str(variant))
313        receiver = Tcpkali(variant +
314                           ["-l" + str(port), "-T11", "--rcvbuf=5k",
315                            "--channel-bandwidth-downstream=100kbps"])
316        sender = Tcpkali(variant + ["127.1:" + str(port), "-m1", "-T11",
317                                    "--sndbuf=5k"])
318        arcv = Analyze(receiver.results())
319        asnd = Analyze(sender.results())
320        transfer = ((100 * 1024 // 8) * 11)
321        trans_min = 0.85 * transfer
322        trans_max = 1.10 * transfer
323        assert((arcv.total_sent_bytes < 1000 and
324                arcv.total_received_bytes > trans_min and
325                arcv.total_received_bytes < trans_max) or
326               not arcv.sockopt_works)
327        assert((asnd.total_received_bytes < 1000 and
328                asnd.total_sent_bytes > trans_min and
329                asnd.total_sent_bytes < 3 * trans_max) or
330               not asnd.sockopt_works)
331
332    print("FINISHED")
333
334if __name__ == "__main__":
335    main()
336