1# This file should be kept compatible with both Python 2.6 and Python >= 3.0.
2
3from __future__ import division
4from __future__ import print_function
5
6"""
7ccbench, a Python concurrency benchmark.
8"""
9
10import time
11import os
12import sys
13import itertools
14import threading
15import subprocess
16import socket
17from optparse import OptionParser, SUPPRESS_HELP
18import platform
19
20# Compatibility
21try:
22    xrange
23except NameError:
24    xrange = range
25
26try:
27    map = itertools.imap
28except AttributeError:
29    pass
30
31
32THROUGHPUT_DURATION = 2.0
33
34LATENCY_PING_INTERVAL = 0.1
35LATENCY_DURATION = 2.0
36
37BANDWIDTH_PACKET_SIZE = 1024
38BANDWIDTH_DURATION = 2.0
39
40
41def task_pidigits():
42    """Pi calculation (Python)"""
43    _map = map
44    _count = itertools.count
45    _islice = itertools.islice
46
47    def calc_ndigits(n):
48        # From http://shootout.alioth.debian.org/
49        def gen_x():
50            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))
51
52        def compose(a, b):
53            aq, ar, as_, at = a
54            bq, br, bs, bt = b
55            return (aq * bq,
56                    aq * br + ar * bt,
57                    as_ * bq + at * bs,
58                    as_ * br + at * bt)
59
60        def extract(z, j):
61            q, r, s, t = z
62            return (q*j + r) // (s*j + t)
63
64        def pi_digits():
65            z = (1, 0, 0, 1)
66            x = gen_x()
67            while 1:
68                y = extract(z, 3)
69                while y != extract(z, 4):
70                    z = compose(z, next(x))
71                    y = extract(z, 3)
72                z = compose((10, -10*y, 0, 1), z)
73                yield y
74
75        return list(_islice(pi_digits(), n))
76
77    return calc_ndigits, (50, )
78
79def task_regex():
80    """regular expression (C)"""
81    # XXX this task gives horrendous latency results.
82    import re
83    # Taken from the `inspect` module
84    pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)
85    with open(__file__, "r") as f:
86        arg = f.read(2000)
87
88    def findall(s):
89        t = time.time()
90        try:
91            return pat.findall(s)
92        finally:
93            print(time.time() - t)
94    return pat.findall, (arg, )
95
96def task_sort():
97    """list sorting (C)"""
98    def list_sort(l):
99        l = l[::-1]
100        l.sort()
101
102    return list_sort, (list(range(1000)), )
103
104def task_compress_zlib():
105    """zlib compression (C)"""
106    import zlib
107    with open(__file__, "rb") as f:
108        arg = f.read(5000) * 3
109
110    def compress(s):
111        zlib.decompress(zlib.compress(s, 5))
112    return compress, (arg, )
113
114def task_compress_bz2():
115    """bz2 compression (C)"""
116    import bz2
117    with open(__file__, "rb") as f:
118        arg = f.read(3000) * 2
119
120    def compress(s):
121        bz2.compress(s)
122    return compress, (arg, )
123
124def task_hashing():
125    """SHA1 hashing (C)"""
126    import hashlib
127    with open(__file__, "rb") as f:
128        arg = f.read(5000) * 30
129
130    def compute(s):
131        hashlib.sha1(s).digest()
132    return compute, (arg, )
133
134
135throughput_tasks = [task_pidigits, task_regex]
136for mod in 'bz2', 'hashlib':
137    try:
138        globals()[mod] = __import__(mod)
139    except ImportError:
140        globals()[mod] = None
141
142# For whatever reasons, zlib gives irregular results, so we prefer bz2 or
143# hashlib if available.
144# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)
145if bz2 is not None:
146    throughput_tasks.append(task_compress_bz2)
147elif hashlib is not None:
148    throughput_tasks.append(task_hashing)
149else:
150    throughput_tasks.append(task_compress_zlib)
151
152latency_tasks = throughput_tasks
153bandwidth_tasks = [task_pidigits]
154
155
156class TimedLoop:
157    def __init__(self, func, args):
158        self.func = func
159        self.args = args
160
161    def __call__(self, start_time, min_duration, end_event, do_yield=False):
162        step = 20
163        niters = 0
164        duration = 0.0
165        _time = time.time
166        _sleep = time.sleep
167        _func = self.func
168        _args = self.args
169        t1 = start_time
170        while True:
171            for i in range(step):
172                _func(*_args)
173            t2 = _time()
174            # If another thread terminated, the current measurement is invalid
175            # => return the previous one.
176            if end_event:
177                return niters, duration
178            niters += step
179            duration = t2 - start_time
180            if duration >= min_duration:
181                end_event.append(None)
182                return niters, duration
183            if t2 - t1 < 0.01:
184                # Minimize interference of measurement on overall runtime
185                step = step * 3 // 2
186            elif do_yield:
187                # OS scheduling of Python threads is sometimes so bad that we
188                # have to force thread switching ourselves, otherwise we get
189                # completely useless results.
190                _sleep(0.0001)
191            t1 = t2
192
193
194def run_throughput_test(func, args, nthreads):
195    assert nthreads >= 1
196
197    # Warm up
198    func(*args)
199
200    results = []
201    loop = TimedLoop(func, args)
202    end_event = []
203
204    if nthreads == 1:
205        # Pure single-threaded performance, without any switching or
206        # synchronization overhead.
207        start_time = time.time()
208        results.append(loop(start_time, THROUGHPUT_DURATION,
209                            end_event, do_yield=False))
210        return results
211
212    started = False
213    ready_cond = threading.Condition()
214    start_cond = threading.Condition()
215    ready = []
216
217    def run():
218        with ready_cond:
219            ready.append(None)
220            ready_cond.notify()
221        with start_cond:
222            while not started:
223                start_cond.wait()
224        results.append(loop(start_time, THROUGHPUT_DURATION,
225                            end_event, do_yield=True))
226
227    threads = []
228    for i in range(nthreads):
229        threads.append(threading.Thread(target=run))
230    for t in threads:
231        t.setDaemon(True)
232        t.start()
233    # We don't want measurements to include thread startup overhead,
234    # so we arrange for timing to start after all threads are ready.
235    with ready_cond:
236        while len(ready) < nthreads:
237            ready_cond.wait()
238    with start_cond:
239        start_time = time.time()
240        started = True
241        start_cond.notify(nthreads)
242    for t in threads:
243        t.join()
244
245    return results
246
247def run_throughput_tests(max_threads):
248    for task in throughput_tasks:
249        print(task.__doc__)
250        print()
251        func, args = task()
252        nthreads = 1
253        baseline_speed = None
254        while nthreads <= max_threads:
255            results = run_throughput_test(func, args, nthreads)
256            # Taking the max duration rather than average gives pessimistic
257            # results rather than optimistic.
258            speed = sum(r[0] for r in results) / max(r[1] for r in results)
259            print("threads=%d: %d" % (nthreads, speed), end="")
260            if baseline_speed is None:
261                print(" iterations/s.")
262                baseline_speed = speed
263            else:
264                print(" ( %d %%)" % (speed / baseline_speed * 100))
265            nthreads += 1
266        print()
267
268
269LAT_END = "END"
270
271def _sendto(sock, s, addr):
272    sock.sendto(s.encode('ascii'), addr)
273
274def _recv(sock, n):
275    return sock.recv(n).decode('ascii')
276
277def latency_client(addr, nb_pings, interval):
278    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
279    try:
280        _time = time.time
281        _sleep = time.sleep
282        def _ping():
283            _sendto(sock, "%r\n" % _time(), addr)
284        # The first ping signals the parent process that we are ready.
285        _ping()
286        # We give the parent a bit of time to notice.
287        _sleep(1.0)
288        for i in range(nb_pings):
289            _sleep(interval)
290            _ping()
291        _sendto(sock, LAT_END + "\n", addr)
292    finally:
293        sock.close()
294
295def run_latency_client(**kwargs):
296    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
297    cmd_line.extend(['--latclient', repr(kwargs)])
298    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
299                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
300
301def run_latency_test(func, args, nthreads):
302    # Create a listening socket to receive the pings. We use UDP which should
303    # be painlessly cross-platform.
304    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
305    sock.bind(("127.0.0.1", 0))
306    addr = sock.getsockname()
307
308    interval = LATENCY_PING_INTERVAL
309    duration = LATENCY_DURATION
310    nb_pings = int(duration / interval)
311
312    results = []
313    threads = []
314    end_event = []
315    start_cond = threading.Condition()
316    started = False
317    if nthreads > 0:
318        # Warm up
319        func(*args)
320
321        results = []
322        loop = TimedLoop(func, args)
323        ready = []
324        ready_cond = threading.Condition()
325
326        def run():
327            with ready_cond:
328                ready.append(None)
329                ready_cond.notify()
330            with start_cond:
331                while not started:
332                    start_cond.wait()
333            loop(start_time, duration * 1.5, end_event, do_yield=False)
334
335        for i in range(nthreads):
336            threads.append(threading.Thread(target=run))
337        for t in threads:
338            t.setDaemon(True)
339            t.start()
340        # Wait for threads to be ready
341        with ready_cond:
342            while len(ready) < nthreads:
343                ready_cond.wait()
344
345    # Run the client and wait for the first ping(s) to arrive before
346    # unblocking the background threads.
347    chunks = []
348    process = run_latency_client(addr=sock.getsockname(),
349                                 nb_pings=nb_pings, interval=interval)
350    s = _recv(sock, 4096)
351    _time = time.time
352
353    with start_cond:
354        start_time = _time()
355        started = True
356        start_cond.notify(nthreads)
357
358    while LAT_END not in s:
359        s = _recv(sock, 4096)
360        t = _time()
361        chunks.append((t, s))
362
363    # Tell the background threads to stop.
364    end_event.append(None)
365    for t in threads:
366        t.join()
367    process.wait()
368    sock.close()
369
370    for recv_time, chunk in chunks:
371        # NOTE: it is assumed that a line sent by a client wasn't received
372        # in two chunks because the lines are very small.
373        for line in chunk.splitlines():
374            line = line.strip()
375            if line and line != LAT_END:
376                send_time = eval(line)
377                assert isinstance(send_time, float)
378                results.append((send_time, recv_time))
379
380    return results
381
382def run_latency_tests(max_threads):
383    for task in latency_tasks:
384        print("Background CPU task:", task.__doc__)
385        print()
386        func, args = task()
387        nthreads = 0
388        while nthreads <= max_threads:
389            results = run_latency_test(func, args, nthreads)
390            n = len(results)
391            # We print out milliseconds
392            lats = [1000 * (t2 - t1) for (t1, t2) in results]
393            #print(list(map(int, lats)))
394            avg = sum(lats) / n
395            dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5
396            print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")
397            print()
398            #print("    [... from %d samples]" % n)
399            nthreads += 1
400        print()
401
402
403BW_END = "END"
404
405def bandwidth_client(addr, packet_size, duration):
406    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
407    sock.bind(("127.0.0.1", 0))
408    local_addr = sock.getsockname()
409    _time = time.time
410    _sleep = time.sleep
411    def _send_chunk(msg):
412        _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)
413    # We give the parent some time to be ready.
414    _sleep(1.0)
415    try:
416        start_time = _time()
417        end_time = start_time + duration * 2.0
418        i = 0
419        while _time() < end_time:
420            _send_chunk(str(i))
421            s = _recv(sock, packet_size)
422            assert len(s) == packet_size
423            i += 1
424        _send_chunk(BW_END)
425    finally:
426        sock.close()
427
428def run_bandwidth_client(**kwargs):
429    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]
430    cmd_line.extend(['--bwclient', repr(kwargs)])
431    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,
432                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
433
434def run_bandwidth_test(func, args, nthreads):
435    # Create a listening socket to receive the packets. We use UDP which should
436    # be painlessly cross-platform.
437    with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
438        sock.bind(("127.0.0.1", 0))
439        addr = sock.getsockname()
440
441        duration = BANDWIDTH_DURATION
442        packet_size = BANDWIDTH_PACKET_SIZE
443
444        results = []
445        threads = []
446        end_event = []
447        start_cond = threading.Condition()
448        started = False
449        if nthreads > 0:
450            # Warm up
451            func(*args)
452
453            results = []
454            loop = TimedLoop(func, args)
455            ready = []
456            ready_cond = threading.Condition()
457
458            def run():
459                with ready_cond:
460                    ready.append(None)
461                    ready_cond.notify()
462                with start_cond:
463                    while not started:
464                        start_cond.wait()
465                loop(start_time, duration * 1.5, end_event, do_yield=False)
466
467            for i in range(nthreads):
468                threads.append(threading.Thread(target=run))
469            for t in threads:
470                t.setDaemon(True)
471                t.start()
472            # Wait for threads to be ready
473            with ready_cond:
474                while len(ready) < nthreads:
475                    ready_cond.wait()
476
477        # Run the client and wait for the first packet to arrive before
478        # unblocking the background threads.
479        process = run_bandwidth_client(addr=addr,
480                                       packet_size=packet_size,
481                                       duration=duration)
482        _time = time.time
483        # This will also wait for the parent to be ready
484        s = _recv(sock, packet_size)
485        remote_addr = eval(s.partition('#')[0])
486
487        with start_cond:
488            start_time = _time()
489            started = True
490            start_cond.notify(nthreads)
491
492        n = 0
493        first_time = None
494        while not end_event and BW_END not in s:
495            _sendto(sock, s, remote_addr)
496            s = _recv(sock, packet_size)
497            if first_time is None:
498                first_time = _time()
499            n += 1
500        end_time = _time()
501
502    end_event.append(None)
503    for t in threads:
504        t.join()
505    process.kill()
506
507    return (n - 1) / (end_time - first_time)
508
509def run_bandwidth_tests(max_threads):
510    for task in bandwidth_tasks:
511        print("Background CPU task:", task.__doc__)
512        print()
513        func, args = task()
514        nthreads = 0
515        baseline_speed = None
516        while nthreads <= max_threads:
517            results = run_bandwidth_test(func, args, nthreads)
518            speed = results
519            #speed = len(results) * 1.0 / results[-1][0]
520            print("CPU threads=%d: %.1f" % (nthreads, speed), end="")
521            if baseline_speed is None:
522                print(" packets/s.")
523                baseline_speed = speed
524            else:
525                print(" ( %d %%)" % (speed / baseline_speed * 100))
526            nthreads += 1
527        print()
528
529
530def main():
531    usage = "usage: %prog [-h|--help] [options]"
532    parser = OptionParser(usage=usage)
533    parser.add_option("-t", "--throughput",
534                      action="store_true", dest="throughput", default=False,
535                      help="run throughput tests")
536    parser.add_option("-l", "--latency",
537                      action="store_true", dest="latency", default=False,
538                      help="run latency tests")
539    parser.add_option("-b", "--bandwidth",
540                      action="store_true", dest="bandwidth", default=False,
541                      help="run I/O bandwidth tests")
542    parser.add_option("-i", "--interval",
543                      action="store", type="int", dest="check_interval", default=None,
544                      help="sys.setcheckinterval() value")
545    parser.add_option("-I", "--switch-interval",
546                      action="store", type="float", dest="switch_interval", default=None,
547                      help="sys.setswitchinterval() value")
548    parser.add_option("-n", "--num-threads",
549                      action="store", type="int", dest="nthreads", default=4,
550                      help="max number of threads in tests")
551
552    # Hidden option to run the pinging and bandwidth clients
553    parser.add_option("", "--latclient",
554                      action="store", dest="latclient", default=None,
555                      help=SUPPRESS_HELP)
556    parser.add_option("", "--bwclient",
557                      action="store", dest="bwclient", default=None,
558                      help=SUPPRESS_HELP)
559
560    options, args = parser.parse_args()
561    if args:
562        parser.error("unexpected arguments")
563
564    if options.latclient:
565        kwargs = eval(options.latclient)
566        latency_client(**kwargs)
567        return
568
569    if options.bwclient:
570        kwargs = eval(options.bwclient)
571        bandwidth_client(**kwargs)
572        return
573
574    if not options.throughput and not options.latency and not options.bandwidth:
575        options.throughput = options.latency = options.bandwidth = True
576    if options.check_interval:
577        sys.setcheckinterval(options.check_interval)
578    if options.switch_interval:
579        sys.setswitchinterval(options.switch_interval)
580
581    print("== %s %s (%s) ==" % (
582        platform.python_implementation(),
583        platform.python_version(),
584        platform.python_build()[0],
585    ))
586    # Processor identification often has repeated spaces
587    cpu = ' '.join(platform.processor().split())
588    print("== %s %s on '%s' ==" % (
589        platform.machine(),
590        platform.system(),
591        cpu,
592    ))
593    print()
594
595    if options.throughput:
596        print("--- Throughput ---")
597        print()
598        run_throughput_tests(options.nthreads)
599
600    if options.latency:
601        print("--- Latency ---")
602        print()
603        run_latency_tests(options.nthreads)
604
605    if options.bandwidth:
606        print("--- I/O bandwidth ---")
607        print()
608        run_bandwidth_tests(options.nthreads)
609
610if __name__ == "__main__":
611    main()
612