1# -*- coding: utf-8 -*- 2# Copyright (C) 2015-2019 Savoir-Faire Linux Inc. 3# Author(s): Adrien Béraud <adrien.beraud@savoirfairelinux.com> 4# Simon Désaulniers <sim.desaulniers@gmail.com> 5 6import sys 7import os 8import threading 9import random 10import string 11import time 12import subprocess 13import re 14import traceback 15import collections 16 17from matplotlib.ticker import FuncFormatter 18import math 19 20import numpy as np 21import matplotlib.pyplot as plt 22import networkx as nx 23from networkx.drawing.nx_agraph import graphviz_layout 24 25 26from opendht import * 27from dht.network import DhtNetwork, DhtNetworkSubProcess 28 29############ 30# Common # 31############ 32 33# matplotlib display format for bits (b, Kb, Mb) 34bit_format = None 35Kbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-1) + 'Kb') 36Mbit_format = FuncFormatter(lambda x, pos: '%1.1f' % (x*1024**-2) + 'Mb') 37 38def random_str_val(size=1024): 39 """Creates a random string value of specified size. 40 41 @param size: Size, in bytes, of the value. 42 @type size: int 43 44 @return: Random string value 45 @rtype : str 46 """ 47 return ''.join(random.choice(string.hexdigits) for _ in range(size)) 48 49 50def random_hash(): 51 """Creates random InfoHash. 52 """ 53 return InfoHash(random_str_val(size=40).encode()) 54 55def timer(f, *args): 56 """ 57 Start a timer which count time taken for execute function f 58 59 @param f : Function to time 60 @type f : function 61 62 @param args : Arguments of the function f 63 @type args : list 64 65 @rtype : timer 66 @return : Time taken by the function f 67 """ 68 start = time.time() 69 f(*args) 70 71 return time.time() - start 72 73def reset_before_test(featureTestMethod): 74 """ 75 This is a decorator for all test methods needing reset(). 76 77 @param featureTestMethod: The method to be decorated. All decorated methods 78 must have 'self' object as first arg. 79 @type featureTestMethod: function 80 """ 81 def call(*args, **kwargs): 82 self = args[0] 83 if isinstance(self, FeatureTest): 84 self._reset() 85 return featureTestMethod(*args, **kwargs) 86 return call 87 88def display_plot(yvals, xvals=None, yformatter=None, display_time=3, **kwargs): 89 """ 90 Displays a plot of data in interactive mode. This method is made to be 91 called successively for plot refreshing. 92 93 @param yvals: Ordinate values (float). 94 @type yvals: list 95 @param xvals: Abscissa values (float). 96 @type xvals: list 97 @param yformatter: The matplotlib FuncFormatter to use for y values. 98 @type yformatter: matplotlib.ticker.FuncFormatter 99 @param displaytime: The time matplotlib can take to refresht the plot. 100 @type displaytime: int 101 """ 102 plt.ion() 103 plt.clf() 104 plt.show() 105 if yformatter: 106 plt.axes().yaxis.set_major_formatter(Kbit_format) 107 if xvals: 108 plt.plot(xvals, yvals, **kwargs) 109 else: 110 plt.plot(yvals, **kwargs) 111 plt.pause(display_time) 112 113def display_traffic_plot(ifname): 114 """Displays the traffic plot for a given interface name. 115 116 @param ifname: Interface name. 117 @type ifname: string 118 """ 119 ydata = [] 120 xdata = [] 121 # warning: infinite loop 122 interval = 2 123 for rate in iftop_traffic_data(ifname, interval=interval): 124 ydata.append(rate) 125 xdata.append((xdata[-1] if len(xdata) > 0 else 0) + interval) 126 display_plot(ydata, xvals=xdata, yformatter=Kbit_format, color='blue') 127 128def iftop_traffic_data(ifname, interval=2, rate_type='send_receive'): 129 """ 130 Generator (yields data) function collecting traffic data from iftop 131 subprocess. 132 133 @param ifname: Interface to listen to. 134 @type ifname: string 135 @param interval: Interval of time between to data collections. Possible 136 values are 2, 10 or 40. 137 @type interval: int 138 @param rates: (default: send_receive) Wether to pick "send", "receive" 139 or "send and receive" rates. Possible values : "send", 140 "receive" and "send_receive". 141 @type rates: string 142 @param _format: Format in which to display data on the y axis. 143 Possible values: Mb, Kb or b. 144 @type _format: string 145 """ 146 # iftop stdout string format 147 SEND_RATE_STR = "Total send rate" 148 RECEIVE_RATE_STR = "Total receive rate" 149 SEND_RECEIVE_RATE_STR = "Total send and receive rate" 150 RATE_STR = { 151 "send" : SEND_RATE_STR, 152 "receive" : RECEIVE_RATE_STR, 153 "send_receive" : SEND_RECEIVE_RATE_STR 154 } 155 TWO_SECONDS_RATE_COL = 0 156 TEN_SECONDS_RATE_COL = 1 157 FOURTY_SECONDS_RATE_COL = 2 158 COLS = { 159 2 : TWO_SECONDS_RATE_COL, 160 10 : TEN_SECONDS_RATE_COL, 161 40 : FOURTY_SECONDS_RATE_COL 162 } 163 FLOAT_REGEX = "[0-9]+[.]*[0-9]*" 164 BIT_REGEX = "[KM]*b" 165 166 iftop = subprocess.Popen(["iftop", "-i", ifname, "-t"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) 167 while True: 168 line = iftop.stdout.readline().decode() 169 if RATE_STR[rate_type] in line: 170 rate, unit = re.findall("("+FLOAT_REGEX+")("+BIT_REGEX+")", line)[COLS[interval]] 171 rate = float(rate) 172 if unit == "Kb": 173 rate *= 1024 174 elif unit == "Mb": 175 rate *= 1024**2 176 yield rate 177 178########### 179# Tests # 180########### 181 182class FeatureTest(object): 183 """ 184 This is a base test. 185 """ 186 187 done = 0 188 lock = None 189 190 def __init__(self, test, workbench): 191 """ 192 @param test: The test string indicating the test to run. This string is 193 determined in the child classes. 194 @type test: string 195 196 @param workbench: A WorkBench object to use inside this test. 197 @type workbench: WorkBench 198 """ 199 self._test = test 200 self._workbench = workbench 201 self._bootstrap = self._workbench.get_bootstrap() 202 203 def _reset(self): 204 """ 205 Resets some static variables. 206 207 This method is most likely going to be called before each tests. 208 """ 209 FeatureTest.done = 0 210 FeatureTest.lock = threading.Condition() 211 212 def run(self): 213 raise NotImplementedError('This method must be implemented.') 214 215################################## 216# PHT # 217################################## 218 219class PhtTest(FeatureTest): 220 """TODO 221 """ 222 223 indexEntries = None 224 prefix = None 225 key = None 226 227 def __init__(self, test, workbench, opts): 228 """ 229 @param test: is one of the following: 230 - 'insert': indexes a considerable amount of data in 231 the PHT structure. 232 TODO 233 @type test: string 234 235 @param opts: Dictionnary containing options for the test. Allowed 236 options are: 237 - 'num_keys': this specifies the number of keys to insert 238 in the PHT during the test. 239 @type opts: dict 240 """ 241 super(PhtTest, self).__init__(test, workbench) 242 self._num_keys = opts['num_keys'] if 'num_keys' in opts else 32 243 self._timer = True if 'timer' in opts else False 244 245 def _reset(self): 246 super(PhtTest, self)._reset() 247 PhtTest.indexEntries = [] 248 249 @staticmethod 250 def lookupCb(vals, prefix): 251 PhtTest.indexEntries = list(vals) 252 PhtTest.prefix = prefix.decode() 253 DhtNetwork.log('Index name: <todo>') 254 DhtNetwork.log('Leaf prefix:', prefix) 255 for v in vals: 256 DhtNetwork.log('[ENTRY]:', v) 257 258 @staticmethod 259 def lookupDoneCb(ok): 260 DhtNetwork.log('[LOOKUP]:', PhtTest.key, "--", "success!" if ok else "Fail...") 261 with FeatureTest.lock: 262 FeatureTest.lock.notify() 263 264 @staticmethod 265 def insertDoneCb(ok): 266 DhtNetwork.log('[INSERT]:', PhtTest.key, "--", "success!" if ok else "Fail...") 267 with FeatureTest.lock: 268 FeatureTest.lock.notify() 269 270 @staticmethod 271 def drawTrie(trie_dict): 272 """ 273 Draws the trie structure of the PHT from dictionnary. 274 275 @param trie_dict: Dictionnary of index entries (prefix -> entry). 276 @type trie_dict: dict 277 """ 278 prefixes = list(trie_dict.keys()) 279 if len(prefixes) == 0: 280 return 281 282 edges = list([]) 283 for prefix in prefixes: 284 for i in range(-1, len(prefix)-1): 285 u = prefix[:i+1] 286 x = ("." if i == -1 else u, u+"0") 287 y = ("." if i == -1 else u, u+"1") 288 if x not in edges: 289 edges.append(x) 290 if y not in edges: 291 edges.append(y) 292 293 # TODO: use a binary tree position layout... 294 # UPDATE : In a better way [change lib] 295 G = nx.Graph(sorted(edges, key=lambda x: len(x[0]))) 296 plt.title("PHT: Tree") 297 pos=graphviz_layout(G,prog='dot') 298 nx.draw(G, pos, with_labels=True, node_color='white') 299 plt.show() 300 301 def run(self): 302 try: 303 if self._test == 'insert': 304 self._insertTest() 305 except Exception as e: 306 print(e) 307 finally: 308 self._bootstrap.resize(1) 309 310 ########### 311 # Tests # 312 ########### 313 314 @reset_before_test 315 def _insertTest(self): 316 """TODO: Docstring for _massIndexTest. 317 """ 318 bootstrap = self._bootstrap 319 bootstrap.resize(2) 320 dht = bootstrap.get(1) 321 322 NUM_DIG = max(math.log(self._num_keys, 2)/4, 5) # at least 5 digit keys. 323 keyspec = collections.OrderedDict([('foo', NUM_DIG)]) 324 pht = Pht(b'foo_index', keyspec, dht) 325 326 DhtNetwork.log('PHT has', 327 pht.MAX_NODE_ENTRY_COUNT, 328 'node'+ ('s' if pht.MAX_NODE_ENTRY_COUNT > 1 else ''), 329 'per leaf bucket.') 330 keys = [{ 331 [_ for _ in keyspec.keys()][0] : 332 ''.join(random.SystemRandom().choice(string.hexdigits) 333 for _ in range(NUM_DIG)).encode() 334 } for n in range(self._num_keys)] 335 all_entries = {} 336 337 # Index all entries. 338 for key in keys: 339 PhtTest.key = key 340 with FeatureTest.lock: 341 time_taken = timer(pht.insert, key, IndexValue(random_hash()), PhtTest.insertDoneCb) 342 if self._timer: 343 DhtNetwork.log('This insert step took : ', time_taken, 'second') 344 FeatureTest.lock.wait() 345 346 time.sleep(1) 347 348 # Recover entries now that the trie is complete. 349 for key in keys: 350 PhtTest.key = key 351 with FeatureTest.lock: 352 time_taken = timer(pht.lookup, key, PhtTest.lookupCb, PhtTest.lookupDoneCb) 353 if self._timer: 354 DhtNetwork.log('This lookup step took : ', time_taken, 'second') 355 FeatureTest.lock.wait() 356 357 all_entries[PhtTest.prefix] = [e.__str__() 358 for e in PhtTest.indexEntries] 359 360 for p in all_entries.keys(): 361 DhtNetwork.log('All entries under prefix', p, ':') 362 DhtNetwork.log(all_entries[p]) 363 PhtTest.drawTrie(all_entries) 364 365################################## 366# DHT # 367################################## 368 369class DhtFeatureTest(FeatureTest): 370 """ 371 This is a base dht test. 372 """ 373 #static variables used by class callbacks 374 successfullTransfer = lambda lv,fv: len(lv) == len(fv) 375 foreignNodes = None 376 foreignValues = None 377 378 def __init__(self, test, workbench): 379 super(DhtFeatureTest, self).__init__(test, workbench) 380 381 def _reset(self): 382 super(DhtFeatureTest, self)._reset() 383 DhtFeatureTest.foreignNodes = [] 384 DhtFeatureTest.foreignValues = [] 385 386 @staticmethod 387 def getcb(value): 388 vstr = value.__str__()[:100] 389 DhtNetwork.Log.log('[GET]: %s' % vstr + ("..." if len(vstr) > 100 else "")) 390 DhtFeatureTest.foreignValues.append(value) 391 return True 392 393 @staticmethod 394 def putDoneCb(ok, nodes): 395 with FeatureTest.lock: 396 if not ok: 397 DhtNetwork.Log.log("[PUT]: failed!") 398 FeatureTest.done -= 1 399 FeatureTest.lock.notify() 400 401 @staticmethod 402 def getDoneCb(ok, nodes): 403 with FeatureTest.lock: 404 if not ok: 405 DhtNetwork.Log.log("[GET]: failed!") 406 else: 407 for node in nodes: 408 if not node.getNode().isExpired(): 409 DhtFeatureTest.foreignNodes.append(node.getId().toString()) 410 FeatureTest.done -= 1 411 FeatureTest.lock.notify() 412 413 def _dhtPut(self, producer, _hash, *values): 414 with FeatureTest.lock: 415 for val in values: 416 vstr = val.__str__()[:100] 417 DhtNetwork.Log.log('[PUT]:', _hash.toString(), '->', vstr + ("..." if len(vstr) > 100 else "")) 418 FeatureTest.done += 1 419 producer.put(_hash, val, DhtFeatureTest.putDoneCb) 420 while FeatureTest.done > 0: 421 FeatureTest.lock.wait() 422 423 def _dhtGet(self, consumer, _hash): 424 DhtFeatureTest.foreignValues = [] 425 DhtFeatureTest.foreignNodes = [] 426 with FeatureTest.lock: 427 FeatureTest.done += 1 428 DhtNetwork.Log.log('[GET]:', _hash.toString()) 429 consumer.get(_hash, DhtFeatureTest.getcb, DhtFeatureTest.getDoneCb) 430 while FeatureTest.done > 0: 431 FeatureTest.lock.wait() 432 433 def _gottaGetThemAllPokeNodes(self, consumer, hashes, nodes=None): 434 for h in hashes: 435 self._dhtGet(consumer, h) 436 if nodes is not None: 437 for n in DhtFeatureTest.foreignNodes: 438 nodes.add(n) 439 440class PersistenceTest(DhtFeatureTest): 441 """ 442 This tests persistence of data on the network. 443 """ 444 445 def __init__(self, test, workbench, opts): 446 """ 447 @param test: is one of the following: 448 - 'mult_time': test persistence of data based on internal 449 OpenDHT storage maintenance timings. 450 - 'delete': test persistence of data upon deletion of 451 nodes. 452 - 'replace': replacing cluster successively. 453 @type test: string 454 455 456 OPTIONS 457 458 - dump_str_log: Enables storage log at test ending. 459 - keep_alive: Keeps the test running indefinately. This may be useful 460 to manually analyse the network traffic during a longer 461 period. 462 - num_producers: Number of producers of data during a DHT test. 463 - num_values: Number of values to initialize the DHT with. 464 """ 465 466 # opts 467 super(PersistenceTest, self).__init__(test, workbench) 468 self._traffic_plot = True if 'traffic_plot' in opts else False 469 self._dump_storage = True if 'dump_str_log' in opts else False 470 self._op_plot = True if 'op_plot' in opts else False 471 self._keep_alive = True if 'keep_alive' in opts else False 472 self._num_producers = opts['num_producers'] if 'num_producers' in opts else None 473 self._num_values = opts['num_values'] if 'num_values' in opts else None 474 475 def _trigger_dp(self, trigger_nodes, _hash, count=1): 476 """ 477 Triggers the data persistence over time. In order to this, `count` nodes 478 are created with an id around the hash of a value. 479 480 @param trigger_nodes: List of created nodes. The nodes created in this 481 function are append to this list. 482 @type trigger_nodes: list 483 @param _hash: Is the id of the value around which creating nodes. 484 @type _hash: InfoHash 485 @param count: The number of nodes to create with id around the id of 486 value. 487 @type count: int 488 """ 489 _hash_str = _hash.toString().decode() 490 _hash_int = int(_hash_str, 16) 491 for i in range(int(-count/2), int(count/2)+1): 492 _hash_str = '{:40x}'.format(_hash_int + i) 493 config = DhtConfig() 494 config.setNodeId(InfoHash(_hash_str.encode())) 495 n = DhtRunner() 496 n.run(config=config) 497 n.bootstrap(self._bootstrap.ip4, 498 str(self._bootstrap.port)) 499 DhtNetwork.log('Node','['+_hash_str+']', 500 'started around', _hash.toString().decode() 501 if n.isRunning() else 502 'failed to start...' 503 ) 504 trigger_nodes.append(n) 505 506 def _result(self, local_values, new_nodes): 507 bootstrap = self._bootstrap 508 if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): 509 DhtNetwork.Log.log('[GET]: Only %s on %s values persisted.' % 510 (len(DhtFeatureTest.foreignValues), len(local_values))) 511 else: 512 DhtNetwork.Log.log('[GET]: All values successfully persisted.') 513 if DhtFeatureTest.foreignValues: 514 if new_nodes: 515 DhtNetwork.Log.log('Values are newly found on:') 516 for node in new_nodes: 517 DhtNetwork.Log.log(node) 518 if self._dump_storage: 519 DhtNetwork.Log.log('Dumping all storage log from '\ 520 'hosting nodes.') 521 for proc in self._workbench.procs: 522 proc.sendClusterRequest(DhtNetworkSubProcess.DUMP_STORAGE_REQ, DhtFeatureTest.foreignNodes) 523 else: 524 DhtNetwork.Log.log("Values didn't reach new hosting nodes after shutdown.") 525 526 def run(self): 527 try: 528 if self._test == 'normal': 529 self._totallyNormalTest() 530 elif self._test == 'delete': 531 self._deleteTest() 532 elif self._test == 'replace': 533 self._replaceClusterTest() 534 elif self._test == 'mult_time': 535 self._multTimeTest() 536 else: 537 raise NameError("This test is not defined '" + self._test + "'") 538 except Exception as e: 539 traceback.print_tb(e.__traceback__) 540 print(type(e).__name__+':', e, file=sys.stderr) 541 finally: 542 if self._traffic_plot or self._op_plot: 543 plot_fname = "traffic-plot" 544 print('plot saved to', plot_fname) 545 plt.savefig(plot_fname) 546 self._bootstrap.resize(1) 547 548 ########### 549 # Tests # 550 ########### 551 552 @reset_before_test 553 def _totallyNormalTest(self): 554 """ 555 Reproduces a network in a realistic state. 556 """ 557 trigger_nodes = [] 558 wb = self._workbench 559 bootstrap = self._bootstrap 560 # Value representing an ICE packet. Each ICE packet is around 1KB. 561 VALUE_SIZE = 1024 562 num_values_per_hash = self._num_values/wb.node_num if self._num_values else 5 563 564 # nodes and values counters 565 total_nr_values = 0 566 nr_nodes = wb.node_num 567 op_cv = threading.Condition() 568 569 # values string in string format. Used for sending cluster request. 570 hashes = [random_hash() for _ in range(wb.node_num)] 571 572 def normalBehavior(do, t): 573 nonlocal total_nr_values, op_cv 574 while True: 575 with op_cv: 576 do() 577 time.sleep(random.uniform(0.0, float(t))) 578 579 def putRequest(): 580 nonlocal hashes, VALUE_SIZE, total_nr_values 581 lock = threading.Condition() 582 def dcb(success): 583 nonlocal total_nr_values, lock 584 if success: 585 total_nr_values += 1 586 DhtNetwork.Log.log("INFO: "+ str(total_nr_values)+" values put on the dht since begining") 587 with lock: 588 lock.notify() 589 with lock: 590 DhtNetwork.Log.warn("Random value put on the DHT...") 591 random.choice(wb.procs).sendClusterPutRequest(random.choice(hashes).toString(), 592 random_str_val(size=VALUE_SIZE).encode(), 593 done_cb=dcb) 594 lock.wait() 595 596 puts = threading.Thread(target=normalBehavior, args=(putRequest, 30.0/wb.node_num)) 597 puts.daemon = True 598 puts.start() 599 600 def newNodeRequest(): 601 nonlocal nr_nodes 602 lock = threading.Condition() 603 def dcb(success): 604 nonlocal nr_nodes, lock 605 nr_nodes += 1 606 DhtNetwork.Log.log("INFO: now "+str(nr_nodes)+" nodes on the dht") 607 with lock: 608 lock.notify() 609 with lock: 610 DhtNetwork.Log.warn("Node joining...") 611 random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.NEW_NODE_REQ, done_cb=dcb) 612 lock.wait() 613 614 connections = threading.Thread(target=normalBehavior, args=(newNodeRequest, 1*50.0/wb.node_num)) 615 connections.daemon = True 616 connections.start() 617 618 def shutdownNodeRequest(): 619 nonlocal nr_nodes 620 lock = threading.Condition() 621 def dcb(success): 622 nonlocal nr_nodes, lock 623 if success: 624 nr_nodes -= 1 625 DhtNetwork.Log.log("INFO: now "+str(nr_nodes)+" nodes on the dht") 626 else: 627 DhtNetwork.Log.err("Oops.. No node to shutodwn.") 628 629 with lock: 630 lock.notify() 631 with lock: 632 DhtNetwork.Log.warn("Node shutting down...") 633 random.choice(wb.procs).sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, done_cb=dcb) 634 lock.wait() 635 636 shutdowns = threading.Thread(target=normalBehavior, args=(shutdownNodeRequest, 1*60.0/wb.node_num)) 637 shutdowns.daemon = True 638 shutdowns.start() 639 640 if self._traffic_plot: 641 display_traffic_plot('br'+wb.ifname) 642 else: 643 # blocks in matplotlib thread 644 while True: 645 plt.pause(3600) 646 647 648 @reset_before_test 649 def _deleteTest(self): 650 """ 651 It uses Dht shutdown call from the API to gracefuly finish the nodes one 652 after the other. 653 """ 654 bootstrap = self._bootstrap 655 656 ops_count = [] 657 658 bootstrap.resize(3) 659 consumer = bootstrap.get(1) 660 producer = bootstrap.get(2) 661 662 myhash = random_hash() 663 local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] 664 665 self._dhtPut(producer, myhash, *local_values) 666 667 #checking if values were transfered 668 self._dhtGet(consumer, myhash) 669 if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): 670 if DhtFeatureTest.foreignValues: 671 DhtNetwork.Log.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', 672 len(local_values), ' values successfully put.') 673 else: 674 DhtNetwork.Log.log('[GET]: 0 values successfully put') 675 676 677 if DhtFeatureTest.foreignValues and DhtFeatureTest.foreignNodes: 678 DhtNetwork.Log.log('Values are found on :') 679 for node in DhtFeatureTest.foreignNodes: 680 DhtNetwork.Log.log(node) 681 682 for _ in range(max(1, int(self._workbench.node_num/32))): 683 DhtNetwork.Log.log('Removing all nodes hosting target values...') 684 cluster_ops_count = 0 685 for proc in self._workbench.procs: 686 DhtNetwork.Log.log('[REMOVE]: sending shutdown request to', proc) 687 lock = threading.Condition() 688 def dcb(success): 689 nonlocal lock 690 if not success: 691 DhtNetwork.Log.err("Failed to shutdown.") 692 with lock: 693 lock.notify() 694 695 with lock: 696 proc.sendClusterRequest( 697 DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, 698 DhtFeatureTest.foreignNodes, 699 done_cb=dcb 700 ) 701 lock.wait() 702 DhtNetwork.Log.log('sending message stats request') 703 def msg_dcb(stats): 704 nonlocal cluster_ops_count, lock 705 if stats: 706 cluster_ops_count += sum(stats[1:]) 707 with lock: 708 lock.notify() 709 with lock: 710 proc.sendGetMessageStats(done_cb=msg_dcb) 711 lock.wait() 712 DhtNetwork.Log.log("5 seconds wait...") 713 time.sleep(5) 714 ops_count.append(cluster_ops_count/self._workbench.node_num) 715 716 # checking if values were transfered to new nodes 717 foreignNodes_before_delete = DhtFeatureTest.foreignNodes 718 DhtNetwork.Log.log('[GET]: trying to fetch persistent values') 719 self._dhtGet(consumer, myhash) 720 new_nodes = set(DhtFeatureTest.foreignNodes) - set(foreignNodes_before_delete) 721 722 self._result(local_values, new_nodes) 723 724 if self._op_plot: 725 display_plot(ops_count, color='blue') 726 else: 727 DhtNetwork.Log.log("[GET]: either couldn't fetch values or nodes hosting values...") 728 729 if traffic_plot_thread: 730 print("Traffic plot running for ever. Ctrl-c for stopping it.") 731 traffic_plot_thread.join() 732 733 @reset_before_test 734 def _replaceClusterTest(self): 735 """ 736 It replaces all clusters one after the other. 737 """ 738 clusters = 8 739 740 bootstrap = self._bootstrap 741 742 bootstrap.resize(3) 743 consumer = bootstrap.get(1) 744 producer = bootstrap.get(2) 745 746 myhash = random_hash() 747 local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] 748 749 self._dhtPut(producer, myhash, *local_values) 750 self._dhtGet(consumer, myhash) 751 initial_nodes = DhtFeatureTest.foreignNodes 752 753 DhtNetwork.Log.log('Replacing', clusters, 'random clusters successively...') 754 for n in range(clusters): 755 i = random.randint(0, len(self._workbench.procs)-1) 756 proc = self._workbench.procs[i] 757 DhtNetwork.Log.log('Replacing', proc) 758 proc.sendClusterRequest(DhtNetworkSubProcess.SHUTDOWN_CLUSTER_REQ) 759 self._workbench.stop_cluster(i) 760 self._workbench.start_cluster(i) 761 762 DhtNetwork.Log.log('[GET]: trying to fetch persistent values') 763 self._dhtGet(consumer, myhash) 764 new_nodes = set(DhtFeatureTest.foreignNodes) - set(initial_nodes) 765 766 self._result(local_values, new_nodes) 767 768 @reset_before_test 769 def _multTimeTest(self): 770 """ 771 Multiple put() calls are made from multiple nodes to multiple hashes 772 after what a set of 8 nodes is created around each hashes in order to 773 enable storage maintenance each nodes. Therefor, this tests will wait 10 774 minutes for the nodes to trigger storage maintenance. 775 """ 776 trigger_nodes = [] 777 bootstrap = self._bootstrap 778 779 N_PRODUCERS = self._num_producers if self._num_values else 16 780 DP_TIMEOUT = 1 781 782 hashes = [] 783 784 # Generating considerable amount of values of size 1KB. 785 VALUE_SIZE = 1024 786 NUM_VALUES = self._num_values if self._num_values else 50 787 values = [Value(random_str_val(size=VALUE_SIZE).encode()) for _ in range(NUM_VALUES)] 788 789 bootstrap.resize(N_PRODUCERS+2) 790 consumer = bootstrap.get(N_PRODUCERS+1) 791 producers = (bootstrap.get(n) for n in range(1,N_PRODUCERS+1)) 792 for p in producers: 793 hashes.append(random_hash()) 794 self._dhtPut(p, hashes[-1], *values) 795 796 once = True 797 while self._keep_alive or once: 798 nodes = set([]) 799 self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes) 800 801 DhtNetwork.Log.log("Values are found on:") 802 for n in nodes: 803 DhtNetwork.Log.log(n) 804 805 DhtNetwork.Log.log("Creating 8 nodes around all of these hashes...") 806 for _hash in hashes: 807 self._trigger_dp(trigger_nodes, _hash, count=8) 808 809 DhtNetwork.Log.log('Waiting', DP_TIMEOUT+1, 'minutes for normal storage maintenance.') 810 time.sleep((DP_TIMEOUT+1)*60) 811 812 DhtNetwork.Log.log('Deleting old nodes from previous search.') 813 for proc in self._workbench.procs: 814 DhtNetwork.Log.log('[REMOVE]: sending delete request to', proc) 815 proc.sendClusterRequest( 816 DhtNetworkSubProcess.REMOVE_NODE_REQ, 817 nodes) 818 819 # new consumer (fresh cache) 820 bootstrap.resize(N_PRODUCERS+1) 821 bootstrap.resize(N_PRODUCERS+2) 822 consumer = bootstrap.get(N_PRODUCERS+1) 823 824 nodes_after_time = set([]) 825 self._gottaGetThemAllPokeNodes(consumer, hashes, nodes=nodes_after_time) 826 self._result(values, nodes_after_time - nodes) 827 828 once = False 829 830 831class PerformanceTest(DhtFeatureTest): 832 """ 833 Tests for general performance of dht operations. 834 """ 835 836 def __init__(self, test, workbench, opts): 837 """ 838 @param test: is one of the following: 839 - 'gets': multiple get operations and statistical results. 840 - 'delete': perform multiple put() operations followed 841 by targeted deletion of nodes hosting the values. Doing 842 so until half of the nodes on the network remain. 843 @type test: string 844 """ 845 super(PerformanceTest, self).__init__(test, workbench) 846 847 def run(self): 848 try: 849 if self._test == 'gets': 850 self._getsTimesTest() 851 elif self._test == 'delete': 852 self._delete() 853 else: 854 raise NameError("This test is not defined '" + self._test + "'") 855 except Exception as e: 856 traceback.print_tb(e.__traceback__) 857 print(type(e).__name__+':', e, file=sys.stderr) 858 finally: 859 self._bootstrap.resize(1) 860 861 862 ########### 863 # Tests # 864 ########### 865 866 @reset_before_test 867 def _getsTimesTest(self): 868 """ 869 Tests for performance of the DHT doing multiple get() operation. 870 """ 871 bootstrap = self._bootstrap 872 873 plt.ion() 874 875 fig, axes = plt.subplots(2, 1) 876 fig.tight_layout() 877 878 lax = axes[0] 879 hax = axes[1] 880 881 lines = None#ax.plot([]) 882 #plt.ylabel('time (s)') 883 hax.set_ylim(0, 2) 884 885 # let the network stabilise 886 plt.pause(20) 887 888 #start = time.time() 889 times = [] 890 891 lock = threading.Condition() 892 done = 0 893 894 def getcb(v): 895 nonlocal bootstrap 896 DhtNetwork.Log.log("found", v) 897 return True 898 899 def donecb(ok, nodes, start): 900 nonlocal bootstrap, lock, done, times 901 t = time.time()-start 902 with lock: 903 if not ok: 904 DhtNetwork.Log.log("failed !") 905 times.append(t) 906 done -= 1 907 lock.notify() 908 909 def update_plot(): 910 nonlocal lines 911 while lines: 912 l = lines.pop() 913 l.remove() 914 del l 915 if len(times) > 1: 916 n, bins, lines = hax.hist(times, 100, normed=1, histtype='stepfilled', color='g') 917 hax.set_ylim(min(n), max(n)) 918 lines.extend(lax.plot(times, color='blue')) 919 plt.draw() 920 921 def run_get(): 922 nonlocal done 923 done += 1 924 start = time.time() 925 bootstrap.front().get(InfoHash.getRandom(), getcb, lambda ok, nodes: donecb(ok, nodes, start)) 926 927 plt.pause(5) 928 929 plt.show() 930 update_plot() 931 932 times = [] 933 for n in range(10): 934 self._workbench.replace_cluster() 935 plt.pause(2) 936 DhtNetwork.Log.log("Getting 50 random hashes succesively.") 937 for i in range(50): 938 with lock: 939 for _ in range(1): 940 run_get() 941 while done > 0: 942 lock.wait() 943 update_plot() 944 plt.pause(.1) 945 update_plot() 946 print("Took", np.sum(times), "mean", np.mean(times), "std", np.std(times), "min", np.min(times), "max", np.max(times)) 947 948 print('GET calls timings benchmark test : DONE. ' \ 949 'Close Matplotlib window for terminating the program.') 950 plt.ioff() 951 plt.show() 952 953 @reset_before_test 954 def _delete(self): 955 """ 956 Tests for performance of get() and put() operations on the network while 957 deleting around the target hash. 958 """ 959 960 bootstrap = self._bootstrap 961 962 bootstrap.resize(3) 963 consumer = bootstrap.get(1) 964 producer = bootstrap.get(2) 965 966 myhash = random_hash() 967 local_values = [Value(b'foo'), Value(b'bar'), Value(b'foobar')] 968 969 for _ in range(max(1, int(self._workbench.node_num/32))): 970 self._dhtGet(consumer, myhash) 971 DhtNetwork.Log.log("Waiting 15 seconds...") 972 time.sleep(15) 973 974 self._dhtPut(producer, myhash, *local_values) 975 976 #checking if values were transfered 977 self._dhtGet(consumer, myhash) 978 DhtNetwork.Log.log('Values are found on :') 979 for node in DhtFeatureTest.foreignNodes: 980 DhtNetwork.Log.log(node) 981 982 if not DhtFeatureTest.successfullTransfer(local_values, DhtFeatureTest.foreignValues): 983 if DhtFeatureTest.foreignValues: 984 DhtNetwork.Log.log('[GET]: Only ', len(DhtFeatureTest.foreignValues) ,' on ', 985 len(local_values), ' values successfully put.') 986 else: 987 DhtNetwork.Log.log('[GET]: 0 values successfully put') 988 989 DhtNetwork.Log.log('Removing all nodes hosting target values...') 990 for proc in self._workbench.procs: 991 DhtNetwork.Log.log('[REMOVE]: sending shutdown request to', proc) 992 proc.sendClusterRequest( 993 DhtNetworkSubProcess.SHUTDOWN_NODE_REQ, 994 DhtFeatureTest.foreignNodes 995 ) 996