1# -*- coding: utf-8 -*-
2# Copyright (C) 2015-2019 Savoir-Faire Linux Inc.
3# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com>
4#            Simon Désaulniers <sim.desaulniers@gmail.com>
5
6import sys
7import os
8import threading
9import random
10import string
11import time
12import subprocess
13import re
14import traceback
15import collections
16
17from matplotlib.ticker import FuncFormatter
18import math
19
20import numpy as np
21import matplotlib.pyplot as plt
22import networkx as nx
23from networkx.drawing.nx_agraph import graphviz_layout
24
25
26from opendht import *
27from dht.network import DhtNetwork, DhtNetworkSubProcess
28
29############
30#  Common  #
31############
32
33# matplotlib display format for bits (b, Kb, Mb)
34bit_format = None
35Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb')
36Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb')
37
38def random_str_val(size=1024):
39    """Creates a random string value of specified size.
40
41    @param size:  Size, in bytes, of the value.
42    @type  size:  int
43
44    @return:  Random string value
45    @rtype :  str
46    """
47    return ''.join(random.choice(string.hexdigits) for _ in range(size))
48
49
50def random_hash():
51    """Creates random InfoHash.
52    """
53    return InfoHash(random_str_val(size=40).encode())
54
55def timer(f, *args):
56    """
57    Start a timer which count time taken for execute function f
58
59    @param f : Function to time
60    @type  f : function
61
62    @param args : Arguments of the function f
63    @type  args : list
64
65    @rtype : timer
66    @return : Time taken by the function f
67    """
68    start = time.time()
69    f(*args)
70
71    return time.time() - start
72
73def reset_before_test(featureTestMethod):
74    """
75    This is a decorator for all test methods needing reset().
76
77    @param featureTestMethod: The method to be decorated. All decorated methods
78                              must have 'self' object as first arg.
79    @type  featureTestMethod: function
80    """
81    def call(*args, **kwargs):
82        self = args[0]
83        if isinstance(self, FeatureTest):
84            self._reset()
85        return featureTestMethod(*args, **kwargs)
86    return call
87
88def display_plot(yvals, xvals=None, yformatter=None, display_time=3, **kwargs):
89    """
90    Displays a plot of data in interactive mode. This method is made to be
91    called successively for plot refreshing.
92
93    @param yvals:  Ordinate values (float).
94    @type  yvals:  list
95    @param xvals:  Abscissa values (float).
96    @type  xvals:  list
97    @param yformatter:  The matplotlib FuncFormatter to use for y values.
98    @type  yformatter:  matplotlib.ticker.FuncFormatter
99    @param displaytime:  The time matplotlib can take to refresht the plot.
100    @type  displaytime:  int
101    """
102    plt.ion()
103    plt.clf()
104    plt.show()
105    if yformatter:
106        plt.axes().yaxis.set_major_formatter(Kbit_format)
107    if xvals:
108        plt.plot(xvals, yvals, **kwargs)
109    else:
110        plt.plot(yvals, **kwargs)
111    plt.pause(display_time)
112
113def display_traffic_plot(ifname):
114    """Displays the traffic plot for a given interface name.
115
116    @param ifname:  Interface name.
117    @type  ifname:  string
118    """
119    ydata = []
120    xdata = []
121    # warning: infinite loop
122    interval = 2
123    for rate in iftop_traffic_data(ifname, interval=interval):
124        ydata.append(rate)
125        xdata.append((xdata[-1] if len(xdata) > 0 else 0) + interval)
126        display_plot(ydata, xvals=xdata, yformatter=Kbit_format, color='blue')
127
128def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'):
129    """
130    Generator (yields data) function collecting traffic data from iftop
131    subprocess.
132
133    @param ifname: Interface to listen to.
134    @type  ifname: string
135    @param interval: Interval of time between to data collections. Possible
136                     values are 2, 10 or 40.
137    @type  interval: int
138    @param rates: (default: send_receive) Wether to pick "send", "receive"
139                  or "send and receive" rates. Possible values : "send",
140                  "receive" and "send_receive".
141    @type  rates: string
142    @param _format: Format in which to display data on the y axis.
143                    Possible values: Mb, Kb or b.
144    @type  _format: string
145    """
146    # iftop stdout string format
147    SEND_RATE_STR               = "Total send rate"
148    RECEIVE_RATE_STR            = "Total receive rate"
149    SEND_RECEIVE_RATE_STR       = "Total send and receive rate"
150    RATE_STR = {
151            "send"         : SEND_RATE_STR,
152            "receive"      : RECEIVE_RATE_STR,
153            "send_receive" : SEND_RECEIVE_RATE_STR
154    }
155    TWO_SECONDS_RATE_COL    = 0
156    TEN_SECONDS_RATE_COL    = 1
157    FOURTY_SECONDS_RATE_COL = 2
158    COLS = {
159            2  : TWO_SECONDS_RATE_COL,
160            10 : TEN_SECONDS_RATE_COL,
161            40 : FOURTY_SECONDS_RATE_COL
162    }
163    FLOAT_REGEX = "[0-9]+[.]*[0-9]*"
164    BIT_REGEX = "[KM]*b"
165
166    iftop = subprocess.Popen(["iftop", "-i", ifname, "-t"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
167    while True:
168        line = iftop.stdout.readline().decode()
169        if RATE_STR[rate_type] in line:
170            rate, unit = re.findall("("+FLOAT_REGEX+")("+BIT_REGEX+")", line)[COLS[interval]]
171            rate = float(rate)
172            if unit == "Kb":
173                rate *= 1024
174            elif unit == "Mb":
175                rate *= 1024**2
176            yield rate
177
178###########
179#  Tests  #
180###########
181
182class FeatureTest(object):
183    """
184    This is a base test.
185    """
186
187    done = 0
188    lock = None
189
190    def __init__(self, test, workbench):
191        """
192        @param test: The test string indicating the test to run. This string is
193                     determined in the child classes.
194        @type  test: string
195
196        @param workbench: A WorkBench object to use inside this test.
197        @type  workbench: WorkBench
198        """
199        self._test = test
200        self._workbench = workbench
201        self._bootstrap = self._workbench.get_bootstrap()
202
203    def _reset(self):
204        """
205        Resets some static variables.
206
207        This method is most likely going to be called before each tests.
208        """
209        FeatureTest.done = 0
210        FeatureTest.lock = threading.Condition()
211
212    def run(self):
213        raise NotImplementedError('This method must be implemented.')
214
215##################################
216#               PHT              #
217##################################
218
219class PhtTest(FeatureTest):
220    """TODO
221    """
222
223    indexEntries = None
224    prefix       = None
225    key          = None
226
227    def __init__(self, test, workbench, opts):
228        """
229        @param test: is one of the following:
230                     - 'insert': indexes a considerable amount of data in
231                       the PHT structure.
232                       TODO
233        @type  test: string
234
235        @param opts: Dictionnary containing options for the test. Allowed
236                     options are:
237                     - 'num_keys': this specifies the number of keys to insert
238                                   in the PHT during the test.
239        @type  opts: dict
240        """
241        super(PhtTest, self).__init__(test, workbench)
242        self._num_keys = opts['num_keys'] if 'num_keys' in opts else 32
243        self._timer = True if 'timer' in opts else False
244
245    def _reset(self):
246        super(PhtTest, self)._reset()
247        PhtTest.indexEntries = []
248
249    @staticmethod
250    def lookupCb(vals, prefix):
251        PhtTest.indexEntries = list(vals)
252        PhtTest.prefix = prefix.decode()
253        DhtNetwork.log('Index name: <todo>')
254        DhtNetwork.log('Leaf prefix:', prefix)
255        for v in vals:
256            DhtNetwork.log('[ENTRY]:', v)
257
258    @staticmethod
259    def lookupDoneCb(ok):
260        DhtNetwork.log('[LOOKUP]:', PhtTest.key, "--", "success!" if ok else "Fail...")
261        with FeatureTest.lock:
262            FeatureTest.lock.notify()
263
264    @staticmethod
265    def insertDoneCb(ok):
266        DhtNetwork.log('[INSERT]:', PhtTest.key, "--", "success!" if ok else "Fail...")
267        with FeatureTest.lock:
268            FeatureTest.lock.notify()
269
270    @staticmethod
271    def drawTrie(trie_dict):
272        """
273        Draws the trie structure of the PHT from dictionnary.
274
275        @param trie_dict: Dictionnary of index entries (prefix -> entry).
276        @type  trie_dict: dict
277        """
278        prefixes = list(trie_dict.keys())
279        if len(prefixes) == 0:
280            return
281
282        edges = list([])
283        for prefix in prefixes:
284            for i in range(-1, len(prefix)-1):
285                u = prefix[:i+1]
286                x = ("." if i == -1 else u, u+"0")
287                y = ("." if i == -1 else u, u+"1")
288                if x not in edges:
289                    edges.append(x)
290                if y not in edges:
291                    edges.append(y)
292
293        # TODO: use a binary tree position layout...
294        #   UPDATE : In a better way [change lib]
295        G = nx.Graph(sorted(edges, key=lambda x: len(x[0])))
296        plt.title("PHT: Tree")
297        pos=graphviz_layout(G,prog='dot')
298        nx.draw(G, pos, with_labels=True, node_color='white')
299        plt.show()
300
301    def run(self):
302        try:
303            if self._test == 'insert':
304                self._insertTest()
305        except Exception as e:
306            print(e)
307        finally:
308            self._bootstrap.resize(1)
309
310    ###########
311    #  Tests  #
312    ###########
313
314    @reset_before_test
315    def _insertTest(self):
316        """TODO: Docstring for _massIndexTest.
317        """
318        bootstrap = self._bootstrap
319        bootstrap.resize(2)
320        dht = bootstrap.get(1)
321
322        NUM_DIG  = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys.
323        keyspec = collections.OrderedDict([('foo', NUM_DIG)])
324        pht = Pht(b'foo_index', keyspec, dht)
325
326        DhtNetwork.log('PHT has',
327                       pht.MAX_NODE_ENTRY_COUNT,
328                       'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''),
329                       'per leaf bucket.')
330        keys = [{
331            [_ for _ in keyspec.keys()][0] :
332            ''.join(random.SystemRandom().choice(string.hexdigits)
333                for _ in range(NUM_DIG)).encode()
334            } for n in range(self._num_keys)]
335        all_entries = {}
336
337        # Index all entries.
338        for key in keys:
339            PhtTest.key = key
340            with FeatureTest.lock:
341                time_taken = timer(pht.insert, key, IndexValue(random_hash()), PhtTest.insertDoneCb)
342                if self._timer:
343                    DhtNetwork.log('This insert step took : ', time_taken, 'second')
344                FeatureTest.lock.wait()
345
346        time.sleep(1)
347
348        # Recover entries now that the trie is complete.
349        for key in keys:
350            PhtTest.key = key
351            with FeatureTest.lock:
352                time_taken = timer(pht.lookup, key, PhtTest.lookupCb, PhtTest.lookupDoneCb)
353                if self._timer:
354                    DhtNetwork.log('This lookup step took : ', time_taken, 'second')
355                FeatureTest.lock.wait()
356
357            all_entries[PhtTest.prefix] = [e.__str__()
358                                           for e in PhtTest.indexEntries]
359
360        for p in all_entries.keys():
361            DhtNetwork.log('All entries under prefix', p, ':')
362            DhtNetwork.log(all_entries[p])
363        PhtTest.drawTrie(all_entries)
364
365##################################
366#               DHT              #
367##################################
368
369class DhtFeatureTest(FeatureTest):
370    """
371    This is a base dht test.
372    """
373    #static variables used by class callbacks
374    successfullTransfer = lambda lv,fv: len(lv) == len(fv)
375    foreignNodes = None
376    foreignValues = None
377
378    def __init__(self, test, workbench):
379        super(DhtFeatureTest, self).__init__(test, workbench)
380
381    def _reset(self):
382        super(DhtFeatureTest, self)._reset()
383        DhtFeatureTest.foreignNodes = []
384        DhtFeatureTest.foreignValues = []
385
386    @staticmethod
387    def getcb(value):
388        vstr = value.__str__()[:100]
389        DhtNetwork.Log.log('[GET]: %s' % vstr + ("..." if len(vstr) > 100 else ""))
390        DhtFeatureTest.foreignValues.append(value)
391        return True
392
393    @staticmethod
394    def putDoneCb(ok, nodes):
395        with FeatureTest.lock:
396            if not ok:
397                DhtNetwork.Log.log("[PUT]: failed!")
398            FeatureTest.done -= 1
399            FeatureTest.lock.notify()
400
401    @staticmethod
402    def getDoneCb(ok, nodes):
403        with FeatureTest.lock:
404            if not ok:
405                DhtNetwork.Log.log("[GET]: failed!")
406            else:
407                for node in nodes:
408                    if not node.getNode().isExpired():
409                        DhtFeatureTest.foreignNodes.append(node.getId().toString())
410            FeatureTest.done -= 1
411            FeatureTest.lock.notify()
412
413    def _dhtPut(self, producer, _hash, *values):
414        with FeatureTest.lock:
415            for val in values:
416                vstr = val.__str__()[:100]
417                DhtNetwork.Log.log('[PUT]:', _hash.toString(), '->', vstr + ("..." if len(vstr) > 100 else ""))
418                FeatureTest.done += 1
419                producer.put(_hash, val, DhtFeatureTest.putDoneCb)
420            while FeatureTest.done > 0:
421                FeatureTest.lock.wait()
422
423    def _dhtGet(self, consumer, _hash):
424        DhtFeatureTest.foreignValues = []
425        DhtFeatureTest.foreignNodes = []
426        with FeatureTest.lock:
427            FeatureTest.done += 1
428            DhtNetwork.Log.log('[GET]:', _hash.toString())
429            consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb)
430            while FeatureTest.done > 0:
431                FeatureTest.lock.wait()
432
433    def _gottaGetThemAllPokeNodes(self, consumer, hashes, nodes=None):
434        for h in hashes:
435            self._dhtGet(consumer, h)
436            if nodes is not None:
437                for n in DhtFeatureTest.foreignNodes:
438                    nodes.add(n)
439
440class PersistenceTest(DhtFeatureTest):
441    """
442    This tests persistence of data on the network.
443    """
444
445    def __init__(self, test, workbench, opts):
446        """
447        @param test: is one of the following:
448                     - 'mult_time': test persistence of data based on internal
449                       OpenDHT storage maintenance timings.
450                     - 'delete': test persistence of data upon deletion of
451                       nodes.
452                     - 'replace': replacing cluster successively.
453        @type  test: string
454
455
456        OPTIONS
457
458        - dump_str_log:  Enables storage log at test ending.
459        - keep_alive:    Keeps the test running indefinately. This may be useful
460                         to manually analyse the network traffic during a longer
461                         period.
462        - num_producers: Number of producers of data during a DHT test.
463        - num_values:    Number of values to initialize the DHT with.
464        """
465
466        # opts
467        super(PersistenceTest, self).__init__(test, workbench)
468        self._traffic_plot  = True if 'traffic_plot' in opts else False
469        self._dump_storage  = True if 'dump_str_log' in opts else False
470        self._op_plot       = True if 'op_plot' in opts else False
471        self._keep_alive    = True if 'keep_alive' in opts else False
472        self._num_producers = opts['num_producers'] if 'num_producers' in opts else None
473        self._num_values    = opts['num_values'] if 'num_values' in opts else None
474
475    def _trigger_dp(self, trigger_nodes, _hash, count=1):
476        """
477        Triggers the data persistence over time. In order to this, `count` nodes
478        are created with an id around the hash of a value.
479
480        @param trigger_nodes: List of created nodes. The nodes created in this
481                              function are append to this list.
482        @type  trigger_nodes: list
483        @param _hash: Is the id of the value around which creating nodes.
484        @type  _hash: InfoHash
485        @param count: The number of nodes to create with id around the id of
486                      value.
487        @type  count: int
488        """
489        _hash_str = _hash.toString().decode()
490        _hash_int = int(_hash_str, 16)
491        for i in range(int(-count/2), int(count/2)+1):
492            _hash_str = '{:40x}'.format(_hash_int + i)
493            config = DhtConfig()
494            config.setNodeId(InfoHash(_hash_str.encode()))
495            n = DhtRunner()
496            n.run(config=config)
497            n.bootstrap(self._bootstrap.ip4,
498                        str(self._bootstrap.port))
499            DhtNetwork.log('Node','['+_hash_str+']',
500                           'started around', _hash.toString().decode()
501                           if n.isRunning() else
502                           'failed to start...'
503            )
504            trigger_nodes.append(n)
505
506    def _result(self, local_values, new_nodes):
507        bootstrap = self._bootstrap
508        if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
509            DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' %
510                    (len(DhtFeatureTest.foreignValues), len(local_values)))
511        else:
512            DhtNetwork.Log.log('[GET]: All values successfully persisted.')
513        if DhtFeatureTest.foreignValues:
514            if new_nodes:
515                DhtNetwork.Log.log('Values are newly found on:')
516                for node in new_nodes:
517                    DhtNetwork.Log.log(node)
518                if self._dump_storage:
519                    DhtNetwork.Log.log('Dumping all storage log from '\
520                                  'hosting nodes.')
521                    for proc in self._workbench.procs:
522                        proc.sendClusterRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes)
523            else:
524                DhtNetwork.Log.log("Values didn't reach new hosting nodes after shutdown.")
525
526    def run(self):
527        try:
528            if self._test == 'normal':
529                self._totallyNormalTest()
530            elif self._test == 'delete':
531                self._deleteTest()
532            elif self._test == 'replace':
533                self._replaceClusterTest()
534            elif self._test == 'mult_time':
535                self._multTimeTest()
536            else:
537                raise NameError("This test is not defined '" + self._test + "'")
538        except Exception as e:
539            traceback.print_tb(e.__traceback__)
540            print(type(e).__name__+':', e, file=sys.stderr)
541        finally:
542            if self._traffic_plot or self._op_plot:
543                plot_fname = "traffic-plot"
544                print('plot saved to', plot_fname)
545                plt.savefig(plot_fname)
546            self._bootstrap.resize(1)
547
548    ###########
549    #  Tests  #
550    ###########
551
552    @reset_before_test
553    def _totallyNormalTest(self):
554        """
555        Reproduces a network in a realistic state.
556        """
557        trigger_nodes = []
558        wb = self._workbench
559        bootstrap = self._bootstrap
560        # Value representing an ICE packet. Each ICE packet is around 1KB.
561        VALUE_SIZE = 1024
562        num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5
563
564        # nodes and values counters
565        total_nr_values = 0
566        nr_nodes = wb.node_num
567        op_cv = threading.Condition()
568
569        # values string in string format. Used for sending cluster request.
570        hashes = [random_hash() for _ in range(wb.node_num)]
571
572        def normalBehavior(do, t):
573            nonlocal total_nr_values, op_cv
574            while True:
575                with op_cv:
576                    do()
577                time.sleep(random.uniform(0.0, float(t)))
578
579        def putRequest():
580            nonlocal hashes, VALUE_SIZE, total_nr_values
581            lock = threading.Condition()
582            def dcb(success):
583                nonlocal total_nr_values, lock
584                if success:
585                    total_nr_values += 1
586                    DhtNetwork.Log.log("INFO: "+ str(total_nr_values)+" values put on the dht since begining")
587                with lock:
588                    lock.notify()
589            with lock:
590                DhtNetwork.Log.warn("Random value put on the DHT...")
591                random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(),
592                                                              random_str_val(size=VALUE_SIZE).encode(),
593                                                              done_cb=dcb)
594                lock.wait()
595
596        puts = threading.Thread(target=normalBehavior, args=(putRequest, 30.0/wb.node_num))
597        puts.daemon = True
598        puts.start()
599
600        def newNodeRequest():
601            nonlocal nr_nodes
602            lock = threading.Condition()
603            def dcb(success):
604                nonlocal nr_nodes, lock
605                nr_nodes += 1
606                DhtNetwork.Log.log("INFO: now "+str(nr_nodes)+" nodes on the dht")
607                with lock:
608                    lock.notify()
609            with lock:
610                DhtNetwork.Log.warn("Node joining...")
611                random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ, done_cb=dcb)
612                lock.wait()
613
614        connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*50.0/wb.node_num))
615        connections.daemon = True
616        connections.start()
617
618        def shutdownNodeRequest():
619            nonlocal nr_nodes
620            lock = threading.Condition()
621            def dcb(success):
622                nonlocal nr_nodes, lock
623                if success:
624                    nr_nodes -= 1
625                    DhtNetwork.Log.log("INFO: now "+str(nr_nodes)+" nodes on the dht")
626                else:
627                    DhtNetwork.Log.err("Oops.. No node to shutodwn.")
628
629                with lock:
630                    lock.notify()
631            with lock:
632                DhtNetwork.Log.warn("Node shutting down...")
633                random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, done_cb=dcb)
634                lock.wait()
635
636        shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60.0/wb.node_num))
637        shutdowns.daemon = True
638        shutdowns.start()
639
640        if self._traffic_plot:
641            display_traffic_plot('br'+wb.ifname)
642        else:
643            # blocks in matplotlib thread
644            while True:
645                plt.pause(3600)
646
647
648    @reset_before_test
649    def _deleteTest(self):
650        """
651        It uses Dht shutdown call from the API to gracefuly finish the nodes one
652        after the other.
653        """
654        bootstrap = self._bootstrap
655
656        ops_count = []
657
658        bootstrap.resize(3)
659        consumer = bootstrap.get(1)
660        producer = bootstrap.get(2)
661
662        myhash = random_hash()
663        local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')]
664
665        self._dhtPut(producer, myhash, *local_values)
666
667        #checking if values were transfered
668        self._dhtGet(consumer, myhash)
669        if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
670            if DhtFeatureTest.foreignValues:
671                DhtNetwork.Log.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ',
672                        len(local_values), ' values successfully put.')
673            else:
674                DhtNetwork.Log.log('[GET]: 0 values successfully put')
675
676
677        if DhtFeatureTest.foreignValues and DhtFeatureTest.foreignNodes:
678            DhtNetwork.Log.log('Values are found on :')
679            for node in DhtFeatureTest.foreignNodes:
680                DhtNetwork.Log.log(node)
681
682            for _ in range(max(1, int(self._workbench.node_num/32))):
683                DhtNetwork.Log.log('Removing all nodes hosting target values...')
684                cluster_ops_count = 0
685                for proc in self._workbench.procs:
686                    DhtNetwork.Log.log('[REMOVE]: sending shutdown request to', proc)
687                    lock = threading.Condition()
688                    def dcb(success):
689                        nonlocal lock
690                        if not success:
691                            DhtNetwork.Log.err("Failed to shutdown.")
692                        with lock:
693                            lock.notify()
694
695                    with lock:
696                        proc.sendClusterRequest(
697                            DhtNetworkSubProcess.SHUTDOWN_NODE_REQ,
698                            DhtFeatureTest.foreignNodes,
699                            done_cb=dcb
700                        )
701                        lock.wait()
702                    DhtNetwork.Log.log('sending message stats request')
703                    def msg_dcb(stats):
704                        nonlocal cluster_ops_count, lock
705                        if stats:
706                            cluster_ops_count += sum(stats[1:])
707                        with lock:
708                            lock.notify()
709                    with lock:
710                        proc.sendGetMessageStats(done_cb=msg_dcb)
711                        lock.wait()
712                    DhtNetwork.Log.log("5 seconds wait...")
713                    time.sleep(5)
714                ops_count.append(cluster_ops_count/self._workbench.node_num)
715
716                # checking if values were transfered to new nodes
717                foreignNodes_before_delete = DhtFeatureTest.foreignNodes
718                DhtNetwork.Log.log('[GET]: trying to fetch persistent values')
719                self._dhtGet(consumer, myhash)
720                new_nodes = set(DhtFeatureTest.foreignNodes) - set(foreignNodes_before_delete)
721
722                self._result(local_values, new_nodes)
723
724            if self._op_plot:
725                display_plot(ops_count, color='blue')
726        else:
727            DhtNetwork.Log.log("[GET]: either couldn't fetch values or nodes hosting values...")
728
729        if traffic_plot_thread:
730            print("Traffic plot running for ever. Ctrl-c for stopping it.")
731            traffic_plot_thread.join()
732
733    @reset_before_test
734    def _replaceClusterTest(self):
735        """
736        It replaces all clusters one after the other.
737        """
738        clusters = 8
739
740        bootstrap = self._bootstrap
741
742        bootstrap.resize(3)
743        consumer = bootstrap.get(1)
744        producer = bootstrap.get(2)
745
746        myhash = random_hash()
747        local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')]
748
749        self._dhtPut(producer, myhash, *local_values)
750        self._dhtGet(consumer, myhash)
751        initial_nodes = DhtFeatureTest.foreignNodes
752
753        DhtNetwork.Log.log('Replacing', clusters, 'random clusters successively...')
754        for n in range(clusters):
755            i = random.randint(0, len(self._workbench.procs)-1)
756            proc = self._workbench.procs[i]
757            DhtNetwork.Log.log('Replacing', proc)
758            proc.sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ)
759            self._workbench.stop_cluster(i)
760            self._workbench.start_cluster(i)
761
762        DhtNetwork.Log.log('[GET]: trying to fetch persistent values')
763        self._dhtGet(consumer, myhash)
764        new_nodes = set(DhtFeatureTest.foreignNodes) - set(initial_nodes)
765
766        self._result(local_values, new_nodes)
767
768    @reset_before_test
769    def _multTimeTest(self):
770        """
771        Multiple put() calls are made from multiple nodes to multiple hashes
772        after what a set of 8 nodes is created around each hashes in order to
773        enable storage maintenance each nodes. Therefor, this tests will wait 10
774        minutes for the nodes to trigger storage maintenance.
775        """
776        trigger_nodes = []
777        bootstrap = self._bootstrap
778
779        N_PRODUCERS = self._num_producers if self._num_values else 16
780        DP_TIMEOUT = 1
781
782        hashes = []
783
784    # Generating considerable amount of values of size 1KB.
785        VALUE_SIZE = 1024
786        NUM_VALUES = self._num_values if self._num_values else 50
787        values = [Value(random_str_val(size=VALUE_SIZE).encode()) for _ in range(NUM_VALUES)]
788
789        bootstrap.resize(N_PRODUCERS+2)
790        consumer = bootstrap.get(N_PRODUCERS+1)
791        producers = (bootstrap.get(n) for n in range(1,N_PRODUCERS+1))
792        for p in producers:
793            hashes.append(random_hash())
794            self._dhtPut(p, hashes[-1], *values)
795
796        once = True
797        while self._keep_alive or once:
798            nodes = set([])
799            self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes)
800
801            DhtNetwork.Log.log("Values are found on:")
802            for n in nodes:
803                DhtNetwork.Log.log(n)
804
805            DhtNetwork.Log.log("Creating 8 nodes around all of these hashes...")
806            for _hash in hashes:
807                self._trigger_dp(trigger_nodes, _hash, count=8)
808
809            DhtNetwork.Log.log('Waiting', DP_TIMEOUT+1, 'minutes for normal storage maintenance.')
810            time.sleep((DP_TIMEOUT+1)*60)
811
812            DhtNetwork.Log.log('Deleting old nodes from previous search.')
813            for proc in self._workbench.procs:
814                DhtNetwork.Log.log('[REMOVE]: sending delete request to', proc)
815                proc.sendClusterRequest(
816                    DhtNetworkSubProcess.REMOVE_NODE_REQ,
817                    nodes)
818
819            # new consumer (fresh cache)
820            bootstrap.resize(N_PRODUCERS+1)
821            bootstrap.resize(N_PRODUCERS+2)
822            consumer = bootstrap.get(N_PRODUCERS+1)
823
824            nodes_after_time = set([])
825            self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes_after_time)
826            self._result(values, nodes_after_time - nodes)
827
828            once = False
829
830
831class PerformanceTest(DhtFeatureTest):
832    """
833    Tests for general performance of dht operations.
834    """
835
836    def __init__(self, test, workbench, opts):
837        """
838        @param test: is one of the following:
839                     - 'gets': multiple get operations and statistical results.
840                     - 'delete': perform multiple put() operations followed
841                       by targeted deletion of nodes hosting the values. Doing
842                       so until half of the nodes on the network remain.
843        @type  test: string
844        """
845        super(PerformanceTest, self).__init__(test, workbench)
846
847    def run(self):
848        try:
849            if self._test == 'gets':
850                self._getsTimesTest()
851            elif self._test == 'delete':
852                self._delete()
853            else:
854                raise NameError("This test is not defined '" + self._test + "'")
855        except Exception as e:
856            traceback.print_tb(e.__traceback__)
857            print(type(e).__name__+':', e, file=sys.stderr)
858        finally:
859            self._bootstrap.resize(1)
860
861
862    ###########
863    #  Tests  #
864    ###########
865
866    @reset_before_test
867    def _getsTimesTest(self):
868        """
869        Tests for performance of the DHT doing multiple get() operation.
870        """
871        bootstrap = self._bootstrap
872
873        plt.ion()
874
875        fig, axes = plt.subplots(2, 1)
876        fig.tight_layout()
877
878        lax = axes[0]
879        hax = axes[1]
880
881        lines = None#ax.plot([])
882        #plt.ylabel('time (s)')
883        hax.set_ylim(0, 2)
884
885        # let the network stabilise
886        plt.pause(20)
887
888        #start = time.time()
889        times = []
890
891        lock = threading.Condition()
892        done = 0
893
894        def getcb(v):
895            nonlocal bootstrap
896            DhtNetwork.Log.log("found", v)
897            return True
898
899        def donecb(ok, nodes, start):
900            nonlocal bootstrap, lock, done, times
901            t = time.time()-start
902            with lock:
903                if not ok:
904                    DhtNetwork.Log.log("failed !")
905                times.append(t)
906                done -= 1
907                lock.notify()
908
909        def update_plot():
910            nonlocal lines
911            while lines:
912                l = lines.pop()
913                l.remove()
914                del l
915            if len(times) > 1:
916                n, bins, lines = hax.hist(times, 100, normed=1, histtype='stepfilled', color='g')
917                hax.set_ylim(min(n), max(n))
918                lines.extend(lax.plot(times, color='blue'))
919            plt.draw()
920
921        def run_get():
922            nonlocal done
923            done += 1
924            start = time.time()
925            bootstrap.front().get(InfoHash.getRandom(), getcb, lambda ok, nodes: donecb(ok, nodes, start))
926
927        plt.pause(5)
928
929        plt.show()
930        update_plot()
931
932        times = []
933        for n in range(10):
934            self._workbench.replace_cluster()
935            plt.pause(2)
936            DhtNetwork.Log.log("Getting 50 random hashes succesively.")
937            for i in range(50):
938                with lock:
939                    for _ in range(1):
940                        run_get()
941                    while done > 0:
942                        lock.wait()
943                        update_plot()
944                        plt.pause(.1)
945                update_plot()
946            print("Took", np.sum(times), "mean", np.mean(times), "std", np.std(times), "min", np.min(times), "max", np.max(times))
947
948        print('GET calls timings benchmark test : DONE. '  \
949                'Close Matplotlib window for terminating the program.')
950        plt.ioff()
951        plt.show()
952
953    @reset_before_test
954    def _delete(self):
955        """
956        Tests for performance of get() and put() operations on the network while
957        deleting around the target hash.
958        """
959
960        bootstrap = self._bootstrap
961
962        bootstrap.resize(3)
963        consumer = bootstrap.get(1)
964        producer = bootstrap.get(2)
965
966        myhash = random_hash()
967        local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')]
968
969        for _ in range(max(1, int(self._workbench.node_num/32))):
970            self._dhtGet(consumer, myhash)
971            DhtNetwork.Log.log("Waiting 15 seconds...")
972            time.sleep(15)
973
974            self._dhtPut(producer, myhash, *local_values)
975
976            #checking if values were transfered
977            self._dhtGet(consumer, myhash)
978            DhtNetwork.Log.log('Values are found on :')
979            for node in DhtFeatureTest.foreignNodes:
980                DhtNetwork.Log.log(node)
981
982            if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues):
983                if DhtFeatureTest.foreignValues:
984                    DhtNetwork.Log.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ',
985                            len(local_values), ' values successfully put.')
986                else:
987                    DhtNetwork.Log.log('[GET]: 0 values successfully put')
988
989            DhtNetwork.Log.log('Removing all nodes hosting target values...')
990            for proc in self._workbench.procs:
991                DhtNetwork.Log.log('[REMOVE]: sending shutdown request to', proc)
992                proc.sendClusterRequest(
993                        DhtNetworkSubProcess.SHUTDOWN_NODE_REQ,
994                        DhtFeatureTest.foreignNodes
995                )
996