1# 2# Copyright (c) 2019 by VMware, Inc. ("VMware") 3# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. 4# ("NetDEF") in this file. 5# 6# Permission to use, copy, modify, and/or distribute this software 7# for any purpose with or without fee is hereby granted, provided 8# that the above copyright notice and this permission notice appear 9# in all copies. 10# 11# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES 12# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR 14# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY 15# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, 16# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE 18# OF THIS SOFTWARE. 19# 20 21from collections import OrderedDict 22from datetime import datetime 23from time import sleep 24from copy import deepcopy 25from subprocess import call 26from subprocess import STDOUT as SUB_STDOUT 27from subprocess import PIPE as SUB_PIPE 28from subprocess import Popen 29from functools import wraps 30from re import search as re_search 31from tempfile import mkdtemp 32 33import os 34import sys 35import ConfigParser 36import traceback 37import socket 38import ipaddress 39import platform 40 41 42if sys.version_info[0] > 2: 43 import io 44 import configparser 45else: 46 import StringIO 47 import ConfigParser as configparser 48 49from lib.topolog import logger, logger_config 50from lib.topogen import TopoRouter, get_topogen 51from lib.topotest import interface_set_status, version_cmp 52 53FRRCFG_FILE = "frr_json.conf" 54FRRCFG_BKUP_FILE = "frr_json_initial.conf" 55 56ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"] 57ROUTER_LIST = [] 58 59#### 60CD = os.path.dirname(os.path.realpath(__file__)) 61PYTESTINI_PATH = os.path.join(CD, "../pytest.ini") 62 63# Creating tmp dir with testsuite name to avoid conflict condition when 64# multiple testsuites run together. All temporary files would be created 65# in this dir and this dir would be removed once testsuite run is 66# completed 67LOGDIR = "/tmp/topotests/" 68TMPDIR = None 69 70# NOTE: to save execution logs to log file frrtest_log_dir must be configured 71# in `pytest.ini`. 72config = configparser.ConfigParser() 73config.read(PYTESTINI_PATH) 74 75config_section = "topogen" 76 77if config.has_option("topogen", "verbosity"): 78 loglevel = config.get("topogen", "verbosity") 79 loglevel = loglevel.upper() 80else: 81 loglevel = "INFO" 82 83if config.has_option("topogen", "frrtest_log_dir"): 84 frrtest_log_dir = config.get("topogen", "frrtest_log_dir") 85 time_stamp = datetime.time(datetime.now()) 86 logfile_name = "frr_test_bgp_" 87 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp) 88 print("frrtest_log_file..", frrtest_log_file) 89 90 logger = logger_config.get_logger( 91 name="test_execution_logs", log_level=loglevel, target=frrtest_log_file 92 ) 93 print("Logs will be sent to logfile: {}".format(frrtest_log_file)) 94 95if config.has_option("topogen", "show_router_config"): 96 show_router_config = config.get("topogen", "show_router_config") 97else: 98 show_router_config = False 99 100# env variable for setting what address type to test 101ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES") 102 103 104# Saves sequence id numbers 105SEQ_ID = {"prefix_lists": {}, "route_maps": {}} 106 107 108def get_seq_id(obj_type, router, obj_name): 109 """ 110 Generates and saves sequence number in interval of 10 111 Parameters 112 ---------- 113 * `obj_type`: prefix_lists or route_maps 114 * `router`: router name 115 *` obj_name`: name of the prefix-list or route-map 116 Returns 117 -------- 118 Sequence number generated 119 """ 120 121 router_data = SEQ_ID[obj_type].setdefault(router, {}) 122 obj_data = router_data.setdefault(obj_name, {}) 123 seq_id = obj_data.setdefault("seq_id", 0) 124 125 seq_id = int(seq_id) + 10 126 obj_data["seq_id"] = seq_id 127 128 return seq_id 129 130 131def set_seq_id(obj_type, router, id, obj_name): 132 """ 133 Saves sequence number if not auto-generated and given by user 134 Parameters 135 ---------- 136 * `obj_type`: prefix_lists or route_maps 137 * `router`: router name 138 *` obj_name`: name of the prefix-list or route-map 139 """ 140 router_data = SEQ_ID[obj_type].setdefault(router, {}) 141 obj_data = router_data.setdefault(obj_name, {}) 142 seq_id = obj_data.setdefault("seq_id", 0) 143 144 seq_id = int(seq_id) + int(id) 145 obj_data["seq_id"] = seq_id 146 147 148class InvalidCLIError(Exception): 149 """Raise when the CLI command is wrong""" 150 151 pass 152 153 154def run_frr_cmd(rnode, cmd, isjson=False): 155 """ 156 Execute frr show commands in priviledged mode 157 * `rnode`: router node on which commands needs to executed 158 * `cmd`: Command to be executed on frr 159 * `isjson`: If command is to get json data or not 160 :return str: 161 """ 162 163 if cmd: 164 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson) 165 166 if True: 167 if isjson: 168 logger.debug(ret_data) 169 print_data = rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False) 170 else: 171 print_data = ret_data 172 173 logger.info( 174 "Output for command [ %s] on router %s:\n%s", 175 cmd.rstrip("json"), 176 rnode.name, 177 print_data, 178 ) 179 return ret_data 180 181 else: 182 raise InvalidCLIError("No actual cmd passed") 183 184 185def apply_raw_config(tgen, input_dict): 186 187 """ 188 API to configure raw configuration on device. This can be used for any cli 189 which does has not been implemented in JSON. 190 191 Parameters 192 ---------- 193 * `tgen`: tgen onject 194 * `input_dict`: configuration that needs to be applied 195 196 Usage 197 ----- 198 input_dict = { 199 "r2": { 200 "raw_config": [ 201 "router bgp", 202 "no bgp update-group-split-horizon" 203 ] 204 } 205 } 206 Returns 207 ------- 208 True or errormsg 209 """ 210 211 result = True 212 for router_name in input_dict.keys(): 213 config_cmd = input_dict[router_name]["raw_config"] 214 215 if not isinstance(config_cmd, list): 216 config_cmd = [config_cmd] 217 218 frr_cfg_file = "{}/{}/{}".format(TMPDIR, router_name, FRRCFG_FILE) 219 with open(frr_cfg_file, "w") as cfg: 220 for cmd in config_cmd: 221 cfg.write("{}\n".format(cmd)) 222 223 result = load_config_to_router(tgen, router_name) 224 225 return result 226 227 228def create_common_configuration( 229 tgen, router, data, config_type=None, build=False, load_config=True 230): 231 """ 232 API to create object of class FRRConfig and also create frr_json.conf 233 file. It will create interface and common configurations and save it to 234 frr_json.conf and load to router 235 Parameters 236 ---------- 237 * `tgen`: tgen onject 238 * `data`: Congiguration data saved in a list. 239 * `router` : router id to be configured. 240 * `config_type` : Syntactic information while writing configuration. Should 241 be one of the value as mentioned in the config_map below. 242 * `build` : Only for initial setup phase this is set as True 243 Returns 244 ------- 245 True or False 246 """ 247 TMPDIR = os.path.join(LOGDIR, tgen.modname) 248 249 fname = "{}/{}/{}".format(TMPDIR, router, FRRCFG_FILE) 250 251 config_map = OrderedDict( 252 { 253 "general_config": "! FRR General Config\n", 254 "interface_config": "! Interfaces Config\n", 255 "static_route": "! Static Route Config\n", 256 "prefix_list": "! Prefix List Config\n", 257 "bgp_community_list": "! Community List Config\n", 258 "route_maps": "! Route Maps Config\n", 259 "bgp": "! BGP Config\n", 260 "vrf": "! VRF Config\n", 261 } 262 ) 263 264 if build: 265 mode = "a" 266 elif not load_config: 267 mode = "a" 268 else: 269 mode = "w" 270 271 try: 272 frr_cfg_fd = open(fname, mode) 273 if config_type: 274 frr_cfg_fd.write(config_map[config_type]) 275 for line in data: 276 frr_cfg_fd.write("{} \n".format(str(line))) 277 frr_cfg_fd.write("\n") 278 279 except IOError as err: 280 logger.error( 281 "Unable to open FRR Config File. error(%s): %s" % (err.errno, err.strerror) 282 ) 283 return False 284 finally: 285 frr_cfg_fd.close() 286 287 # If configuration applied from build, it will done at last 288 if not build and load_config: 289 load_config_to_router(tgen, router) 290 291 return True 292 293 294def kill_router_daemons(tgen, router, daemons): 295 """ 296 Router's current config would be saved to /etc/frr/ for each deamon 297 and deamon would be killed forcefully using SIGKILL. 298 * `tgen` : topogen object 299 * `router`: Device under test 300 * `daemons`: list of daemons to be killed 301 """ 302 303 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 304 305 try: 306 router_list = tgen.routers() 307 308 # Saving router config to /etc/frr, which will be loaded to router 309 # when it starts 310 router_list[router].vtysh_cmd("write memory") 311 312 # Kill Daemons 313 result = router_list[router].killDaemons(daemons) 314 if len(result) > 0: 315 assert "Errors found post shutdown - details follow:" == 0, result 316 return result 317 318 except Exception as e: 319 errormsg = traceback.format_exc() 320 logger.error(errormsg) 321 return errormsg 322 323 324def start_router_daemons(tgen, router, daemons): 325 """ 326 Daemons defined by user would be started 327 * `tgen` : topogen object 328 * `router`: Device under test 329 * `daemons`: list of daemons to be killed 330 """ 331 332 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 333 334 try: 335 router_list = tgen.routers() 336 337 # Start daemons 338 result = router_list[router].startDaemons(daemons) 339 return result 340 341 except Exception as e: 342 errormsg = traceback.format_exc() 343 logger.error(errormsg) 344 return errormsg 345 346 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 347 return True 348 349 350def kill_mininet_routers_process(tgen): 351 """ 352 Kill all mininet stale router' processes 353 * `tgen` : topogen object 354 """ 355 356 router_list = tgen.routers() 357 for rname, router in router_list.iteritems(): 358 daemon_list = [ 359 "zebra", 360 "ospfd", 361 "ospf6d", 362 "bgpd", 363 "ripd", 364 "ripngd", 365 "isisd", 366 "pimd", 367 "ldpd", 368 "staticd", 369 ] 370 for daemon in daemon_list: 371 router.run("killall -9 {}".format(daemon)) 372 373 374def check_router_status(tgen): 375 """ 376 Check if all daemons are running for all routers in topology 377 * `tgen` : topogen object 378 """ 379 380 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 381 382 try: 383 router_list = tgen.routers() 384 for router, rnode in router_list.iteritems(): 385 386 result = rnode.check_router_running() 387 if result != "": 388 daemons = [] 389 if "bgpd" in result: 390 daemons.append("bgpd") 391 if "zebra" in result: 392 daemons.append("zebra") 393 394 rnode.startDaemons(daemons) 395 396 except Exception as e: 397 errormsg = traceback.format_exc() 398 logger.error(errormsg) 399 return errormsg 400 401 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 402 return True 403 404def getStrIO(): 405 """ 406 Return a StringIO object appropriate for the current python version. 407 """ 408 if sys.version_info[0] > 2: 409 return io.StringIO() 410 else: 411 return StringIO.StringIO() 412 413def reset_config_on_routers(tgen, routerName=None): 414 """ 415 Resets configuration on routers to the snapshot created using input JSON 416 file. It replaces existing router configuration with FRRCFG_BKUP_FILE 417 418 Parameters 419 ---------- 420 * `tgen` : Topogen object 421 * `routerName` : router config is to be reset 422 """ 423 424 logger.debug("Entering API: reset_config_on_routers") 425 426 router_list = tgen.routers() 427 for rname in ROUTER_LIST: 428 if routerName and routerName != rname: 429 continue 430 431 router = router_list[rname] 432 logger.info("Configuring router %s to initial test configuration", rname) 433 434 cfg = router.run("vtysh -c 'show running'") 435 fname = "{}/{}/frr.sav".format(TMPDIR, rname) 436 dname = "{}/{}/delta.conf".format(TMPDIR, rname) 437 f = open(fname, "w") 438 for line in cfg.split("\n"): 439 line = line.strip() 440 441 if ( 442 line == "Building configuration..." 443 or line == "Current configuration:" 444 or not line 445 ): 446 continue 447 f.write(line) 448 f.write("\n") 449 450 f.close() 451 run_cfg_file = "{}/{}/frr.sav".format(TMPDIR, rname) 452 init_cfg_file = "{}/{}/frr_json_initial.conf".format(TMPDIR, rname) 453 command = "/usr/lib/frr/frr-reload.py --input {} --test {} > {}".format( 454 run_cfg_file, init_cfg_file, dname 455 ) 456 result = call(command, shell=True, stderr=SUB_STDOUT, stdout=SUB_PIPE) 457 458 # Assert if command fail 459 if result > 0: 460 logger.error("Delta file creation failed. Command executed %s", command) 461 with open(run_cfg_file, "r") as fd: 462 logger.info( 463 "Running configuration saved in %s is:\n%s", run_cfg_file, fd.read() 464 ) 465 with open(init_cfg_file, "r") as fd: 466 logger.info( 467 "Test configuration saved in %s is:\n%s", init_cfg_file, fd.read() 468 ) 469 470 err_cmd = ["/usr/bin/vtysh", "-m", "-f", run_cfg_file] 471 result = Popen(err_cmd, stdout=SUB_PIPE, stderr=SUB_PIPE) 472 output = result.communicate() 473 for out_data in output: 474 temp_data = out_data.decode("utf-8").lower() 475 for out_err in ERROR_LIST: 476 if out_err.lower() in temp_data: 477 logger.error( 478 "Found errors while validating data in" " %s", run_cfg_file 479 ) 480 raise InvalidCLIError(out_data) 481 raise InvalidCLIError("Unknown error in %s", output) 482 483 f = open(dname, "r") 484 delta = getStrIO() 485 delta.write("configure terminal\n") 486 t_delta = f.read() 487 488 # Don't disable debugs 489 check_debug = True 490 491 for line in t_delta.split("\n"): 492 line = line.strip() 493 if line == "Lines To Delete" or line == "===============" or not line: 494 continue 495 496 if line == "Lines To Add": 497 check_debug = False 498 continue 499 500 if line == "============" or not line: 501 continue 502 503 # Leave debugs and log output alone 504 if check_debug: 505 if "debug" in line or "log file" in line: 506 continue 507 508 delta.write(line) 509 delta.write("\n") 510 511 f.close() 512 513 delta.write("end\n") 514 515 output = router.vtysh_multicmd(delta.getvalue(), pretty_output=False) 516 517 delta.close() 518 delta = getStrIO() 519 cfg = router.run("vtysh -c 'show running'") 520 for line in cfg.split("\n"): 521 line = line.strip() 522 delta.write(line) 523 delta.write("\n") 524 525 # Router current configuration to log file or console if 526 # "show_router_config" is defined in "pytest.ini" 527 if show_router_config: 528 logger.info("Configuration on router {} after reset:".format(rname)) 529 logger.info(delta.getvalue()) 530 delta.close() 531 532 logger.debug("Exiting API: reset_config_on_routers") 533 return True 534 535 536def load_config_to_router(tgen, routerName, save_bkup=False): 537 """ 538 Loads configuration on router from the file FRRCFG_FILE. 539 540 Parameters 541 ---------- 542 * `tgen` : Topogen object 543 * `routerName` : router for which configuration to be loaded 544 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE 545 """ 546 547 logger.debug("Entering API: load_config_to_router") 548 549 router_list = tgen.routers() 550 for rname in ROUTER_LIST: 551 if routerName and rname != routerName: 552 continue 553 554 router = router_list[rname] 555 try: 556 frr_cfg_file = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_FILE) 557 frr_cfg_bkup = "{}/{}/{}".format(TMPDIR, rname, FRRCFG_BKUP_FILE) 558 with open(frr_cfg_file, "r+") as cfg: 559 data = cfg.read() 560 logger.info( 561 "Applying following configuration on router" 562 " {}:\n{}".format(rname, data) 563 ) 564 if save_bkup: 565 with open(frr_cfg_bkup, "w") as bkup: 566 bkup.write(data) 567 568 output = router.vtysh_multicmd(data, pretty_output=False) 569 for out_err in ERROR_LIST: 570 if out_err.lower() in output.lower(): 571 raise InvalidCLIError("%s" % output) 572 573 cfg.truncate(0) 574 575 except IOError as err: 576 errormsg = ( 577 "Unable to open config File. error(%s):" " %s", 578 (err.errno, err.strerror), 579 ) 580 return errormsg 581 582 # Router current configuration to log file or console if 583 # "show_router_config" is defined in "pytest.ini" 584 if show_router_config: 585 logger.info("New configuration for router {}:".format(rname)) 586 new_config = router.run("vtysh -c 'show running'") 587 logger.info(new_config) 588 589 logger.debug("Exiting API: load_config_to_router") 590 return True 591 592 593def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None): 594 """ 595 API to get the link local ipv6 address of a perticular interface using 596 FRR command 'show interface' 597 598 * `tgen`: tgen onject 599 * `router` : router for which hightest interface should be 600 calculated 601 * `intf` : interface for which linklocal address needs to be taken 602 * `vrf` : VRF name 603 604 Usage 605 ----- 606 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A) 607 608 Returns 609 ------- 610 1) array of interface names to link local ips. 611 """ 612 613 router_list = tgen.routers() 614 for rname, rnode in router_list.iteritems(): 615 if rname != router: 616 continue 617 618 linklocal = [] 619 620 if vrf: 621 cmd = "show interface vrf {}".format(vrf) 622 else: 623 cmd = "show interface" 624 625 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd)) 626 627 # Fix newlines (make them all the same) 628 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines() 629 630 interface = None 631 ll_per_if_count = 0 632 for line in ifaces: 633 # Interface name 634 m = re_search("Interface ([a-zA-Z0-9-]+) is", line) 635 if m: 636 interface = m.group(1).split(" ")[0] 637 ll_per_if_count = 0 638 639 # Interface ip 640 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+[\/0-9]+)", line) 641 if m1: 642 local = m1.group(1) 643 ll_per_if_count += 1 644 if ll_per_if_count > 1: 645 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]] 646 else: 647 linklocal += [[interface, local]] 648 649 if linklocal: 650 if intf: 651 return [_linklocal[1] for _linklocal in linklocal if _linklocal[0] == intf][ 652 0 653 ].split("/")[0] 654 return linklocal 655 else: 656 errormsg = "Link local ip missing on router {}" 657 return errormsg 658 659 660def generate_support_bundle(): 661 """ 662 API to generate support bundle on any verification ste failure. 663 it runs a python utility, /usr/lib/frr/generate_support_bundle.py, 664 which basically runs defined CLIs and dumps the data to specified location 665 """ 666 667 tgen = get_topogen() 668 router_list = tgen.routers() 669 test_name = sys._getframe(2).f_code.co_name 670 TMPDIR = os.path.join(LOGDIR, tgen.modname) 671 672 for rname, rnode in router_list.iteritems(): 673 logger.info("Generating support bundle for {}".format(rname)) 674 rnode.run("mkdir -p /var/log/frr") 675 bundle_log = rnode.run("python2 /usr/lib/frr/generate_support_bundle.py") 676 logger.info(bundle_log) 677 678 dst_bundle = "{}/{}/support_bundles/{}".format(TMPDIR, rname, test_name) 679 src_bundle = "/var/log/frr" 680 rnode.run("rm -rf {}".format(dst_bundle)) 681 rnode.run("mkdir -p {}".format(dst_bundle)) 682 rnode.run("mv -f {}/* {}".format(src_bundle, dst_bundle)) 683 684 return True 685 686 687def start_topology(tgen): 688 """ 689 Starting topology, create tmp files which are loaded to routers 690 to start deamons and then start routers 691 * `tgen` : topogen object 692 """ 693 694 global TMPDIR, ROUTER_LIST 695 # Starting topology 696 tgen.start_topology() 697 698 # Starting deamons 699 700 router_list = tgen.routers() 701 ROUTER_LIST = sorted( 702 router_list.keys(), key=lambda x: int(re_search("\d+", x).group(0)) 703 ) 704 TMPDIR = os.path.join(LOGDIR, tgen.modname) 705 706 router_list = tgen.routers() 707 for rname in ROUTER_LIST: 708 router = router_list[rname] 709 710 # It will help in debugging the failures, will give more details on which 711 # specific kernel version tests are failing 712 linux_ver = router.run("uname -a") 713 logger.info("Logging platform related details: \n %s \n", linux_ver) 714 715 try: 716 os.chdir(TMPDIR) 717 718 # Creating router named dir and empty zebra.conf bgpd.conf files 719 # inside the current directory 720 if os.path.isdir("{}".format(rname)): 721 os.system("rm -rf {}".format(rname)) 722 os.mkdir("{}".format(rname)) 723 os.system("chmod -R go+rw {}".format(rname)) 724 os.chdir("{}/{}".format(TMPDIR, rname)) 725 os.system("touch zebra.conf bgpd.conf") 726 else: 727 os.mkdir("{}".format(rname)) 728 os.system("chmod -R go+rw {}".format(rname)) 729 os.chdir("{}/{}".format(TMPDIR, rname)) 730 os.system("touch zebra.conf bgpd.conf") 731 732 except IOError as err: 733 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror)) 734 735 # Loading empty zebra.conf file to router, to start the zebra deamon 736 router.load_config( 737 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(TMPDIR, rname) 738 ) 739 # Loading empty bgpd.conf file to router, to start the bgp deamon 740 router.load_config(TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(TMPDIR, rname)) 741 742 # Starting routers 743 logger.info("Starting all routers once topology is created") 744 tgen.start_router() 745 746 747def stop_router(tgen, router): 748 """ 749 Router"s current config would be saved to /etc/frr/ for each deamon 750 and router and its deamons would be stopped. 751 752 * `tgen` : topogen object 753 * `router`: Device under test 754 """ 755 756 router_list = tgen.routers() 757 758 # Saving router config to /etc/frr, which will be loaded to router 759 # when it starts 760 router_list[router].vtysh_cmd("write memory") 761 762 # Stop router 763 router_list[router].stop() 764 765 766def start_router(tgen, router): 767 """ 768 Router will started and config would be loaded from /etc/frr/ for each 769 deamon 770 771 * `tgen` : topogen object 772 * `router`: Device under test 773 """ 774 775 logger.debug("Entering lib API: start_router") 776 777 try: 778 router_list = tgen.routers() 779 780 # Router and its deamons would be started and config would 781 # be loaded to router for each deamon from /etc/frr 782 router_list[router].start() 783 784 # Waiting for router to come up 785 sleep(5) 786 787 except Exception as e: 788 errormsg = traceback.format_exc() 789 logger.error(errormsg) 790 return errormsg 791 792 logger.debug("Exiting lib API: start_router()") 793 return True 794 795 796def number_to_row(routerName): 797 """ 798 Returns the number for the router. 799 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23 800 etc 801 """ 802 return int(routerName[1:]) 803 804 805def number_to_column(routerName): 806 """ 807 Returns the number for the router. 808 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1, 809 z23 = column 26 etc 810 """ 811 return ord(routerName[0]) - 97 812 813 814############################################# 815# Common APIs, will be used by all protocols 816############################################# 817 818 819def create_vrf_cfg(tgen, topo, input_dict=None, build=False): 820 """ 821 Create vrf configuration for created topology. VRF 822 configuration is provided in input json file. 823 824 VRF config is done in Linux Kernel: 825 * Create VRF 826 * Attach interface to VRF 827 * Bring up VRF 828 829 Parameters 830 ---------- 831 * `tgen` : Topogen object 832 * `topo` : json file data 833 * `input_dict` : Input dict data, required when configuring 834 from testcase 835 * `build` : Only for initial setup phase this is set as True. 836 837 Usage 838 ----- 839 input_dict={ 840 "r3": { 841 "links": { 842 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"}, 843 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"}, 844 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"}, 845 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"}, 846 }, 847 "vrfs":[ 848 { 849 "name": "RED_A", 850 "id": "1" 851 }, 852 { 853 "name": "RED_B", 854 "id": "2" 855 }, 856 { 857 "name": "BLUE_A", 858 "id": "3", 859 "delete": True 860 }, 861 { 862 "name": "BLUE_B", 863 "id": "4" 864 } 865 ] 866 } 867 } 868 result = create_vrf_cfg(tgen, topo, input_dict) 869 870 Returns 871 ------- 872 True or False 873 """ 874 result = True 875 if not input_dict: 876 input_dict = deepcopy(topo) 877 else: 878 input_dict = deepcopy(input_dict) 879 880 try: 881 for c_router, c_data in input_dict.iteritems(): 882 rnode = tgen.routers()[c_router] 883 if "vrfs" in c_data: 884 for vrf in c_data["vrfs"]: 885 config_data = [] 886 del_action = vrf.setdefault("delete", False) 887 name = vrf.setdefault("name", None) 888 table_id = vrf.setdefault("id", None) 889 vni = vrf.setdefault("vni", None) 890 del_vni = vrf.setdefault("no_vni", None) 891 892 if del_action: 893 # Kernel cmd- Add VRF and table 894 cmd = "ip link del {} type vrf table {}".format( 895 vrf["name"], vrf["id"] 896 ) 897 898 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd) 899 rnode.run(cmd) 900 901 # Kernel cmd - Bring down VRF 902 cmd = "ip link set dev {} down".format(name) 903 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd) 904 rnode.run(cmd) 905 906 else: 907 if name and table_id: 908 # Kernel cmd- Add VRF and table 909 cmd = "ip link add {} type vrf table {}".format( 910 name, table_id 911 ) 912 logger.info( 913 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd 914 ) 915 rnode.run(cmd) 916 917 # Kernel cmd - Bring up VRF 918 cmd = "ip link set dev {} up".format(name) 919 logger.info( 920 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd 921 ) 922 rnode.run(cmd) 923 924 if "links" in c_data: 925 for destRouterLink, data in sorted( 926 c_data["links"].iteritems() 927 ): 928 # Loopback interfaces 929 if "type" in data and data["type"] == "loopback": 930 interface_name = destRouterLink 931 else: 932 interface_name = data["interface"] 933 934 if "vrf" in data: 935 vrf_list = data["vrf"] 936 937 if type(vrf_list) is not list: 938 vrf_list = [vrf_list] 939 940 for _vrf in vrf_list: 941 cmd = "ip link set {} master {}".format( 942 interface_name, _vrf 943 ) 944 945 logger.info( 946 "[DUT: %s]: Running" " kernel cmd [%s]", 947 c_router, 948 cmd, 949 ) 950 rnode.run(cmd) 951 952 if vni: 953 config_data.append("vrf {}".format(vrf["name"])) 954 cmd = "vni {}".format(vni) 955 config_data.append(cmd) 956 957 if del_vni: 958 config_data.append("vrf {}".format(vrf["name"])) 959 cmd = "no vni {}".format(del_vni) 960 config_data.append(cmd) 961 962 result = create_common_configuration( 963 tgen, c_router, config_data, "vrf", build=build 964 ) 965 966 except InvalidCLIError: 967 # Traceback 968 errormsg = traceback.format_exc() 969 logger.error(errormsg) 970 return errormsg 971 972 return result 973 974 975def create_interface_in_kernel( 976 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True 977): 978 """ 979 Cretae interfaces in kernel for ipv4/ipv6 980 Config is done in Linux Kernel: 981 982 Parameters 983 ---------- 984 * `tgen` : Topogen object 985 * `dut` : Device for which interfaces to be added 986 * `name` : interface name 987 * `ip_addr` : ip address for interface 988 * `vrf` : VRF name, to which interface will be associated 989 * `netmask` : netmask value, default is None 990 * `create`: Create interface in kernel, if created then no need 991 to create 992 """ 993 994 rnode = tgen.routers()[dut] 995 996 if create: 997 cmd = "sudo ip link add name {} type dummy".format(name) 998 rnode.run(cmd) 999 1000 addr_type = validate_ip_address(ip_addr) 1001 if addr_type == "ipv4": 1002 cmd = "ifconfig {} {} netmask {}".format(name, ip_addr, netmask) 1003 else: 1004 cmd = "ifconfig {} inet6 add {}/{}".format(name, ip_addr, netmask) 1005 1006 rnode.run(cmd) 1007 1008 if vrf: 1009 cmd = "ip link set {} master {}".format(name, vrf) 1010 rnode.run(cmd) 1011 1012 1013def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False): 1014 """ 1015 Cretae interfaces in kernel for ipv4/ipv6 1016 Config is done in Linux Kernel: 1017 1018 Parameters 1019 ---------- 1020 * `tgen` : Topogen object 1021 * `dut` : Device for which interfaces to be added 1022 * `intf_name` : interface name 1023 * `ifaceaction` : False to shutdown and True to bringup the 1024 ineterface 1025 """ 1026 1027 rnode = tgen.routers()[dut] 1028 1029 cmd = "ip link set dev" 1030 if ifaceaction: 1031 action = "up" 1032 cmd = "{} {} {}".format(cmd, intf_name, action) 1033 else: 1034 action = "down" 1035 cmd = "{} {} {}".format(cmd, intf_name, action) 1036 1037 logger.info("[DUT: %s]: Running command: %s", dut, cmd) 1038 rnode.run(cmd) 1039 1040 1041def validate_ip_address(ip_address): 1042 """ 1043 Validates the type of ip address 1044 Parameters 1045 ---------- 1046 * `ip_address`: IPv4/IPv6 address 1047 Returns 1048 ------- 1049 Type of address as string 1050 """ 1051 1052 if "/" in ip_address: 1053 ip_address = ip_address.split("/")[0] 1054 1055 v4 = True 1056 v6 = True 1057 try: 1058 socket.inet_aton(ip_address) 1059 except socket.error as error: 1060 logger.debug("Not a valid IPv4 address") 1061 v4 = False 1062 else: 1063 return "ipv4" 1064 1065 try: 1066 socket.inet_pton(socket.AF_INET6, ip_address) 1067 except socket.error as error: 1068 logger.debug("Not a valid IPv6 address") 1069 v6 = False 1070 else: 1071 return "ipv6" 1072 1073 if not v4 and not v6: 1074 raise Exception( 1075 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address 1076 ) 1077 1078 1079def check_address_types(addr_type=None): 1080 """ 1081 Checks environment variable set and compares with the current address type 1082 """ 1083 1084 addr_types_env = os.environ.get("ADDRESS_TYPES") 1085 if not addr_types_env: 1086 addr_types_env = "dual" 1087 1088 if addr_types_env == "dual": 1089 addr_types = ["ipv4", "ipv6"] 1090 elif addr_types_env == "ipv4": 1091 addr_types = ["ipv4"] 1092 elif addr_types_env == "ipv6": 1093 addr_types = ["ipv6"] 1094 1095 if addr_type is None: 1096 return addr_types 1097 1098 if addr_type not in addr_types: 1099 logger.debug( 1100 "{} not in supported/configured address types {}".format( 1101 addr_type, addr_types 1102 ) 1103 ) 1104 return False 1105 1106 return True 1107 1108 1109def generate_ips(network, no_of_ips): 1110 """ 1111 Returns list of IPs. 1112 based on start_ip and no_of_ips 1113 * `network` : from here the ip will start generating, 1114 start_ip will be 1115 * `no_of_ips` : these many IPs will be generated 1116 """ 1117 1118 ipaddress_list = [] 1119 if type(network) is not list: 1120 network = [network] 1121 1122 for start_ipaddr in network: 1123 if "/" in start_ipaddr: 1124 start_ip = start_ipaddr.split("/")[0] 1125 mask = int(start_ipaddr.split("/")[1]) 1126 1127 addr_type = validate_ip_address(start_ip) 1128 if addr_type == "ipv4": 1129 start_ip = ipaddress.IPv4Address(unicode(start_ip)) 1130 step = 2 ** (32 - mask) 1131 if addr_type == "ipv6": 1132 start_ip = ipaddress.IPv6Address(unicode(start_ip)) 1133 step = 2 ** (128 - mask) 1134 1135 next_ip = start_ip 1136 count = 0 1137 while count < no_of_ips: 1138 ipaddress_list.append("{}/{}".format(next_ip, mask)) 1139 if addr_type == "ipv6": 1140 next_ip = ipaddress.IPv6Address(int(next_ip) + step) 1141 else: 1142 next_ip += step 1143 count += 1 1144 1145 return ipaddress_list 1146 1147 1148def find_interface_with_greater_ip(topo, router, loopback=True, interface=True): 1149 """ 1150 Returns highest interface ip for ipv4/ipv6. If loopback is there then 1151 it will return highest IP from loopback IPs otherwise from physical 1152 interface IPs. 1153 * `topo` : json file data 1154 * `router` : router for which hightest interface should be calculated 1155 """ 1156 1157 link_data = topo["routers"][router]["links"] 1158 lo_list = [] 1159 interfaces_list = [] 1160 lo_exists = False 1161 for destRouterLink, data in sorted(link_data.iteritems()): 1162 if loopback: 1163 if "type" in data and data["type"] == "loopback": 1164 lo_exists = True 1165 ip_address = topo["routers"][router]["links"][destRouterLink][ 1166 "ipv4" 1167 ].split("/")[0] 1168 lo_list.append(ip_address) 1169 if interface: 1170 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split( 1171 "/" 1172 )[0] 1173 interfaces_list.append(ip_address) 1174 1175 if lo_exists: 1176 return sorted(lo_list)[-1] 1177 1178 return sorted(interfaces_list)[-1] 1179 1180 1181def write_test_header(tc_name): 1182 """ Display message at beginning of test case""" 1183 count = 20 1184 logger.info("*" * (len(tc_name) + count)) 1185 step("START -> Testcase : %s" % tc_name, reset=True) 1186 logger.info("*" * (len(tc_name) + count)) 1187 1188 1189def write_test_footer(tc_name): 1190 """ Display message at end of test case""" 1191 count = 21 1192 logger.info("=" * (len(tc_name) + count)) 1193 logger.info("Testcase : %s -> PASSED", tc_name) 1194 logger.info("=" * (len(tc_name) + count)) 1195 1196 1197def interface_status(tgen, topo, input_dict): 1198 """ 1199 Delete ip route maps from device 1200 * `tgen` : Topogen object 1201 * `topo` : json file data 1202 * `input_dict` : for which router, route map has to be deleted 1203 Usage 1204 ----- 1205 input_dict = { 1206 "r3": { 1207 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'], 1208 "status": "down" 1209 } 1210 } 1211 Returns 1212 ------- 1213 errormsg(str) or True 1214 """ 1215 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1216 1217 try: 1218 global frr_cfg 1219 for router in input_dict.keys(): 1220 1221 interface_list = input_dict[router]["interface_list"] 1222 status = input_dict[router].setdefault("status", "up") 1223 for intf in interface_list: 1224 rnode = tgen.routers()[router] 1225 interface_set_status(rnode, intf, status) 1226 1227 # Load config to router 1228 load_config_to_router(tgen, router) 1229 1230 except Exception as e: 1231 # handle any exception 1232 logger.error("Error %s occured. Arguments %s.", e.message, e.args) 1233 1234 # Traceback 1235 errormsg = traceback.format_exc() 1236 logger.error(errormsg) 1237 return errormsg 1238 1239 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1240 return True 1241 1242 1243def retry(attempts=3, wait=2, return_is_str=True, initial_wait=0, 1244 return_is_dict=False): 1245 """ 1246 Retries function execution, if return is an errormsg or exception 1247 1248 * `attempts`: Number of attempts to make 1249 * `wait`: Number of seconds to wait between each attempt 1250 * `return_is_str`: Return val is an errormsg in case of failure 1251 * `initial_wait`: Sleeps for this much seconds before executing function 1252 1253 """ 1254 1255 def _retry(func): 1256 1257 @wraps(func) 1258 def func_retry(*args, **kwargs): 1259 _wait = kwargs.pop('wait', wait) 1260 _attempts = kwargs.pop('attempts', attempts) 1261 _attempts = int(_attempts) 1262 expected = True 1263 if _attempts < 0: 1264 raise ValueError("attempts must be 0 or greater") 1265 1266 if initial_wait > 0: 1267 logger.info("Waiting for [%s]s as initial delay", initial_wait) 1268 sleep(initial_wait) 1269 1270 _return_is_str = kwargs.pop('return_is_str', return_is_str) 1271 _return_is_dict = kwargs.pop('return_is_str', return_is_dict) 1272 for i in range(1, _attempts + 1): 1273 try: 1274 _expected = kwargs.setdefault('expected', True) 1275 if _expected is False: 1276 expected = _expected 1277 kwargs.pop('expected') 1278 ret = func(*args, **kwargs) 1279 logger.debug("Function returned %s", ret) 1280 if _return_is_str and isinstance(ret, bool) and _expected: 1281 return ret 1282 if (isinstance(ret, str) or isinstance(ret, unicode)) and _expected is False: 1283 return ret 1284 if _return_is_dict and isinstance(ret, dict): 1285 return ret 1286 1287 if _attempts == i and expected: 1288 generate_support_bundle() 1289 return ret 1290 except Exception as err: 1291 if _attempts == i and expected: 1292 generate_support_bundle() 1293 logger.info("Max number of attempts (%r) reached", 1294 _attempts) 1295 raise 1296 else: 1297 logger.info("Function returned %s", err) 1298 if i < _attempts: 1299 logger.info("Retry [#%r] after sleeping for %ss" 1300 % (i, _wait)) 1301 sleep(_wait) 1302 func_retry._original = func 1303 return func_retry 1304 return _retry 1305 1306 1307class Stepper: 1308 """ 1309 Prints step number for the test case step being executed 1310 """ 1311 1312 count = 1 1313 1314 def __call__(self, msg, reset): 1315 if reset: 1316 Stepper.count = 1 1317 logger.info(msg) 1318 else: 1319 logger.info("STEP %s: '%s'", Stepper.count, msg) 1320 Stepper.count += 1 1321 1322 1323def step(msg, reset=False): 1324 """ 1325 Call Stepper to print test steps. Need to reset at the beginning of test. 1326 * ` msg` : Step message body. 1327 * `reset` : Reset step count to 1 when set to True. 1328 """ 1329 _step = Stepper() 1330 _step(msg, reset) 1331 1332 1333############################################# 1334# These APIs, will used by testcase 1335############################################# 1336def create_interfaces_cfg(tgen, topo, build=False): 1337 """ 1338 Create interface configuration for created topology. Basic Interface 1339 configuration is provided in input json file. 1340 1341 Parameters 1342 ---------- 1343 * `tgen` : Topogen object 1344 * `topo` : json file data 1345 * `build` : Only for initial setup phase this is set as True. 1346 1347 Returns 1348 ------- 1349 True or False 1350 """ 1351 result = False 1352 topo = deepcopy(topo) 1353 1354 try: 1355 for c_router, c_data in topo.iteritems(): 1356 interface_data = [] 1357 for destRouterLink, data in sorted(c_data["links"].iteritems()): 1358 # Loopback interfaces 1359 if "type" in data and data["type"] == "loopback": 1360 interface_name = destRouterLink 1361 else: 1362 interface_name = data["interface"] 1363 1364 # Include vrf if present 1365 if "vrf" in data: 1366 interface_data.append( 1367 "interface {} vrf {}".format( 1368 str(interface_name), str(data["vrf"]) 1369 ) 1370 ) 1371 else: 1372 interface_data.append("interface {}".format(str(interface_name))) 1373 1374 if "ipv4" in data: 1375 intf_addr = c_data["links"][destRouterLink]["ipv4"] 1376 1377 if "delete" in data and data["delete"]: 1378 interface_data.append("no ip address {}".format(intf_addr)) 1379 else: 1380 interface_data.append("ip address {}".format(intf_addr)) 1381 if "ipv6" in data: 1382 intf_addr = c_data["links"][destRouterLink]["ipv6"] 1383 1384 if "delete" in data and data["delete"]: 1385 interface_data.append("no ipv6 address {}".format(intf_addr)) 1386 else: 1387 interface_data.append("ipv6 address {}".format(intf_addr)) 1388 1389 if "ipv6-link-local" in data: 1390 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"] 1391 1392 if "delete" in data and data["delete"]: 1393 interface_data.append("no ipv6 address {}".format(intf_addr)) 1394 else: 1395 interface_data.append("ipv6 address {}\n".format(intf_addr)) 1396 1397 result = create_common_configuration( 1398 tgen, c_router, interface_data, "interface_config", build=build 1399 ) 1400 except InvalidCLIError: 1401 # Traceback 1402 errormsg = traceback.format_exc() 1403 logger.error(errormsg) 1404 return errormsg 1405 1406 return result 1407 1408 1409def create_static_routes(tgen, input_dict, build=False): 1410 """ 1411 Create static routes for given router as defined in input_dict 1412 1413 Parameters 1414 ---------- 1415 * `tgen` : Topogen object 1416 * `input_dict` : Input dict data, required when configuring from testcase 1417 * `build` : Only for initial setup phase this is set as True. 1418 1419 Usage 1420 ----- 1421 input_dict should be in the format below: 1422 # static_routes: list of all routes 1423 # network: network address 1424 # no_of_ip: number of next-hop address that will be configured 1425 # admin_distance: admin distance for route/routes. 1426 # next_hop: starting next-hop address 1427 # tag: tag id for static routes 1428 # vrf: VRF name in which static routes needs to be created 1429 # delete: True if config to be removed. Default False. 1430 1431 Example: 1432 "routers": { 1433 "r1": { 1434 "static_routes": [ 1435 { 1436 "network": "100.0.20.1/32", 1437 "no_of_ip": 9, 1438 "admin_distance": 100, 1439 "next_hop": "10.0.0.1", 1440 "tag": 4001, 1441 "vrf": "RED_A" 1442 "delete": true 1443 } 1444 ] 1445 } 1446 } 1447 1448 Returns 1449 ------- 1450 errormsg(str) or True 1451 """ 1452 result = False 1453 logger.debug("Entering lib API: create_static_routes()") 1454 input_dict = deepcopy(input_dict) 1455 1456 try: 1457 for router in input_dict.keys(): 1458 if "static_routes" not in input_dict[router]: 1459 errormsg = "static_routes not present in input_dict" 1460 logger.info(errormsg) 1461 continue 1462 1463 static_routes_list = [] 1464 1465 static_routes = input_dict[router]["static_routes"] 1466 for static_route in static_routes: 1467 del_action = static_route.setdefault("delete", False) 1468 no_of_ip = static_route.setdefault("no_of_ip", 1) 1469 network = static_route.setdefault("network", []) 1470 if type(network) is not list: 1471 network = [network] 1472 1473 admin_distance = static_route.setdefault("admin_distance", None) 1474 tag = static_route.setdefault("tag", None) 1475 vrf = static_route.setdefault("vrf", None) 1476 interface = static_route.setdefault("interface", None) 1477 next_hop = static_route.setdefault("next_hop", None) 1478 nexthop_vrf = static_route.setdefault("nexthop_vrf", None) 1479 1480 ip_list = generate_ips(network, no_of_ip) 1481 for ip in ip_list: 1482 addr_type = validate_ip_address(ip) 1483 1484 if addr_type == "ipv4": 1485 cmd = "ip route {}".format(ip) 1486 else: 1487 cmd = "ipv6 route {}".format(ip) 1488 1489 if interface: 1490 cmd = "{} {}".format(cmd, interface) 1491 1492 if next_hop: 1493 cmd = "{} {}".format(cmd, next_hop) 1494 1495 if nexthop_vrf: 1496 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf) 1497 1498 if vrf: 1499 cmd = "{} vrf {}".format(cmd, vrf) 1500 1501 if tag: 1502 cmd = "{} tag {}".format(cmd, str(tag)) 1503 1504 if admin_distance: 1505 cmd = "{} {}".format(cmd, admin_distance) 1506 1507 if del_action: 1508 cmd = "no {}".format(cmd) 1509 1510 static_routes_list.append(cmd) 1511 1512 result = create_common_configuration( 1513 tgen, router, static_routes_list, "static_route", build=build 1514 ) 1515 1516 except InvalidCLIError: 1517 # Traceback 1518 errormsg = traceback.format_exc() 1519 logger.error(errormsg) 1520 return errormsg 1521 1522 logger.debug("Exiting lib API: create_static_routes()") 1523 return result 1524 1525 1526def create_prefix_lists(tgen, input_dict, build=False): 1527 """ 1528 Create ip prefix lists as per the config provided in input 1529 JSON or input_dict 1530 Parameters 1531 ---------- 1532 * `tgen` : Topogen object 1533 * `input_dict` : Input dict data, required when configuring from testcase 1534 * `build` : Only for initial setup phase this is set as True. 1535 Usage 1536 ----- 1537 # pf_lists_1: name of prefix-list, user defined 1538 # seqid: prefix-list seqid, auto-generated if not given by user 1539 # network: criteria for applying prefix-list 1540 # action: permit/deny 1541 # le: less than or equal number of bits 1542 # ge: greater than or equal number of bits 1543 Example 1544 ------- 1545 input_dict = { 1546 "r1": { 1547 "prefix_lists":{ 1548 "ipv4": { 1549 "pf_list_1": [ 1550 { 1551 "seqid": 10, 1552 "network": "any", 1553 "action": "permit", 1554 "le": "32", 1555 "ge": "30", 1556 "delete": True 1557 } 1558 ] 1559 } 1560 } 1561 } 1562 } 1563 Returns 1564 ------- 1565 errormsg or True 1566 """ 1567 1568 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1569 result = False 1570 try: 1571 for router in input_dict.keys(): 1572 if "prefix_lists" not in input_dict[router]: 1573 errormsg = "prefix_lists not present in input_dict" 1574 logger.debug(errormsg) 1575 continue 1576 1577 config_data = [] 1578 prefix_lists = input_dict[router]["prefix_lists"] 1579 for addr_type, prefix_data in prefix_lists.iteritems(): 1580 if not check_address_types(addr_type): 1581 continue 1582 1583 for prefix_name, prefix_list in prefix_data.iteritems(): 1584 for prefix_dict in prefix_list: 1585 if "action" not in prefix_dict or "network" not in prefix_dict: 1586 errormsg = "'action' or network' missing in" " input_dict" 1587 return errormsg 1588 1589 network_addr = prefix_dict["network"] 1590 action = prefix_dict["action"] 1591 le = prefix_dict.setdefault("le", None) 1592 ge = prefix_dict.setdefault("ge", None) 1593 seqid = prefix_dict.setdefault("seqid", None) 1594 del_action = prefix_dict.setdefault("delete", False) 1595 if seqid is None: 1596 seqid = get_seq_id("prefix_lists", router, prefix_name) 1597 else: 1598 set_seq_id("prefix_lists", router, seqid, prefix_name) 1599 1600 if addr_type == "ipv4": 1601 protocol = "ip" 1602 else: 1603 protocol = "ipv6" 1604 1605 cmd = "{} prefix-list {} seq {} {} {}".format( 1606 protocol, prefix_name, seqid, action, network_addr 1607 ) 1608 if le: 1609 cmd = "{} le {}".format(cmd, le) 1610 if ge: 1611 cmd = "{} ge {}".format(cmd, ge) 1612 1613 if del_action: 1614 cmd = "no {}".format(cmd) 1615 1616 config_data.append(cmd) 1617 result = create_common_configuration( 1618 tgen, router, config_data, "prefix_list", build=build 1619 ) 1620 1621 except InvalidCLIError: 1622 # Traceback 1623 errormsg = traceback.format_exc() 1624 logger.error(errormsg) 1625 return errormsg 1626 1627 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1628 return result 1629 1630 1631def create_route_maps(tgen, input_dict, build=False): 1632 """ 1633 Create route-map on the devices as per the arguments passed 1634 Parameters 1635 ---------- 1636 * `tgen` : Topogen object 1637 * `input_dict` : Input dict data, required when configuring from testcase 1638 * `build` : Only for initial setup phase this is set as True. 1639 Usage 1640 ----- 1641 # route_maps: key, value pair for route-map name and its attribute 1642 # rmap_match_prefix_list_1: user given name for route-map 1643 # action: PERMIT/DENY 1644 # match: key,value pair for match criteria. prefix_list, community-list, 1645 large-community-list or tag. Only one option at a time. 1646 # prefix_list: name of prefix list 1647 # large-community-list: name of large community list 1648 # community-ist: name of community list 1649 # tag: tag id for static routes 1650 # set: key, value pair for modifying route attributes 1651 # localpref: preference value for the network 1652 # med: metric value advertised for AS 1653 # aspath: set AS path value 1654 # weight: weight for the route 1655 # community: standard community value to be attached 1656 # large_community: large community value to be attached 1657 # community_additive: if set to "additive", adds community/large-community 1658 value to the existing values of the network prefix 1659 Example: 1660 -------- 1661 input_dict = { 1662 "r1": { 1663 "route_maps": { 1664 "rmap_match_prefix_list_1": [ 1665 { 1666 "action": "PERMIT", 1667 "match": { 1668 "ipv4": { 1669 "prefix_list": "pf_list_1" 1670 } 1671 "ipv6": { 1672 "prefix_list": "pf_list_1" 1673 } 1674 "large-community-list": { 1675 "id": "community_1", 1676 "exact_match": True 1677 } 1678 "community_list": { 1679 "id": "community_2", 1680 "exact_match": True 1681 } 1682 "tag": "tag_id" 1683 }, 1684 "set": { 1685 "locPrf": 150, 1686 "metric": 30, 1687 "path": { 1688 "num": 20000, 1689 "action": "prepend", 1690 }, 1691 "weight": 500, 1692 "community": { 1693 "num": "1:2 2:3", 1694 "action": additive 1695 } 1696 "large_community": { 1697 "num": "1:2:3 4:5;6", 1698 "action": additive 1699 }, 1700 } 1701 } 1702 ] 1703 } 1704 } 1705 } 1706 Returns 1707 ------- 1708 errormsg(str) or True 1709 """ 1710 1711 result = False 1712 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1713 input_dict = deepcopy(input_dict) 1714 try: 1715 for router in input_dict.keys(): 1716 if "route_maps" not in input_dict[router]: 1717 logger.debug("route_maps not present in input_dict") 1718 continue 1719 rmap_data = [] 1720 for rmap_name, rmap_value in input_dict[router]["route_maps"].iteritems(): 1721 1722 for rmap_dict in rmap_value: 1723 del_action = rmap_dict.setdefault("delete", False) 1724 1725 if del_action: 1726 rmap_data.append("no route-map {}".format(rmap_name)) 1727 continue 1728 1729 if "action" not in rmap_dict: 1730 errormsg = "action not present in input_dict" 1731 logger.error(errormsg) 1732 return False 1733 1734 rmap_action = rmap_dict.setdefault("action", "deny") 1735 1736 seq_id = rmap_dict.setdefault("seq_id", None) 1737 if seq_id is None: 1738 seq_id = get_seq_id("route_maps", router, rmap_name) 1739 else: 1740 set_seq_id("route_maps", router, seq_id, rmap_name) 1741 1742 rmap_data.append( 1743 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id) 1744 ) 1745 1746 if "continue" in rmap_dict: 1747 continue_to = rmap_dict["continue"] 1748 if continue_to: 1749 rmap_data.append("on-match goto {}".format(continue_to)) 1750 else: 1751 logger.error( 1752 "In continue, 'route-map entry " 1753 "sequence number' is not provided" 1754 ) 1755 return False 1756 1757 if "goto" in rmap_dict: 1758 go_to = rmap_dict["goto"] 1759 if go_to: 1760 rmap_data.append("on-match goto {}".format(go_to)) 1761 else: 1762 logger.error( 1763 "In goto, 'Goto Clause number' is not" " provided" 1764 ) 1765 return False 1766 1767 if "call" in rmap_dict: 1768 call_rmap = rmap_dict["call"] 1769 if call_rmap: 1770 rmap_data.append("call {}".format(call_rmap)) 1771 else: 1772 logger.error( 1773 "In call, 'destination Route-Map' is" " not provided" 1774 ) 1775 return False 1776 1777 # Verifying if SET criteria is defined 1778 if "set" in rmap_dict: 1779 set_data = rmap_dict["set"] 1780 ipv4_data = set_data.setdefault("ipv4", {}) 1781 ipv6_data = set_data.setdefault("ipv6", {}) 1782 local_preference = set_data.setdefault("locPrf", None) 1783 metric = set_data.setdefault("metric", None) 1784 as_path = set_data.setdefault("path", {}) 1785 weight = set_data.setdefault("weight", None) 1786 community = set_data.setdefault("community", {}) 1787 large_community = set_data.setdefault("large_community", {}) 1788 large_comm_list = set_data.setdefault("large_comm_list", {}) 1789 set_action = set_data.setdefault("set_action", None) 1790 nexthop = set_data.setdefault("nexthop", None) 1791 origin = set_data.setdefault("origin", None) 1792 ext_comm_list = set_data.setdefault("extcommunity", {}) 1793 1794 # Local Preference 1795 if local_preference: 1796 rmap_data.append( 1797 "set local-preference {}".format(local_preference) 1798 ) 1799 1800 # Metric 1801 if metric: 1802 rmap_data.append("set metric {} \n".format(metric)) 1803 1804 # Origin 1805 if origin: 1806 rmap_data.append("set origin {} \n".format(origin)) 1807 1808 # AS Path Prepend 1809 if as_path: 1810 as_num = as_path.setdefault("as_num", None) 1811 as_action = as_path.setdefault("as_action", None) 1812 if as_action and as_num: 1813 rmap_data.append( 1814 "set as-path {} {}".format(as_action, as_num) 1815 ) 1816 1817 # Community 1818 if community: 1819 num = community.setdefault("num", None) 1820 comm_action = community.setdefault("action", None) 1821 if num: 1822 cmd = "set community {}".format(num) 1823 if comm_action: 1824 cmd = "{} {}".format(cmd, comm_action) 1825 rmap_data.append(cmd) 1826 else: 1827 logger.error("In community, AS Num not" " provided") 1828 return False 1829 1830 if large_community: 1831 num = large_community.setdefault("num", None) 1832 comm_action = large_community.setdefault("action", None) 1833 if num: 1834 cmd = "set large-community {}".format(num) 1835 if comm_action: 1836 cmd = "{} {}".format(cmd, comm_action) 1837 1838 rmap_data.append(cmd) 1839 else: 1840 logger.error( 1841 "In large_community, AS Num not" " provided" 1842 ) 1843 return False 1844 if large_comm_list: 1845 id = large_comm_list.setdefault("id", None) 1846 del_comm = large_comm_list.setdefault("delete", None) 1847 if id: 1848 cmd = "set large-comm-list {}".format(id) 1849 if del_comm: 1850 cmd = "{} delete".format(cmd) 1851 1852 rmap_data.append(cmd) 1853 else: 1854 logger.error("In large_comm_list 'id' not" " provided") 1855 return False 1856 1857 if ext_comm_list: 1858 rt = ext_comm_list.setdefault("rt", None) 1859 del_comm = ext_comm_list.setdefault("delete", None) 1860 if rt: 1861 cmd = "set extcommunity rt {}".format(rt) 1862 if del_comm: 1863 cmd = "{} delete".format(cmd) 1864 1865 rmap_data.append(cmd) 1866 else: 1867 logger.debug("In ext_comm_list 'rt' not" " provided") 1868 return False 1869 1870 # Weight 1871 if weight: 1872 rmap_data.append("set weight {}".format(weight)) 1873 if ipv6_data: 1874 nexthop = ipv6_data.setdefault("nexthop", None) 1875 if nexthop: 1876 rmap_data.append("set ipv6 next-hop {}".format(nexthop)) 1877 1878 # Adding MATCH and SET sequence to RMAP if defined 1879 if "match" in rmap_dict: 1880 match_data = rmap_dict["match"] 1881 ipv4_data = match_data.setdefault("ipv4", {}) 1882 ipv6_data = match_data.setdefault("ipv6", {}) 1883 community = match_data.setdefault("community_list", {}) 1884 large_community = match_data.setdefault("large_community", {}) 1885 large_community_list = match_data.setdefault( 1886 "large_community_list", {} 1887 ) 1888 1889 metric = match_data.setdefault("metric", None) 1890 source_vrf = match_data.setdefault("source-vrf", None) 1891 1892 if ipv4_data: 1893 # fetch prefix list data from rmap 1894 prefix_name = ipv4_data.setdefault("prefix_lists", None) 1895 if prefix_name: 1896 rmap_data.append( 1897 "match ip address" 1898 " prefix-list {}".format(prefix_name) 1899 ) 1900 1901 # fetch tag data from rmap 1902 tag = ipv4_data.setdefault("tag", None) 1903 if tag: 1904 rmap_data.append("match tag {}".format(tag)) 1905 1906 # fetch large community data from rmap 1907 large_community_list = ipv4_data.setdefault( 1908 "large_community_list", {} 1909 ) 1910 large_community = match_data.setdefault( 1911 "large_community", {} 1912 ) 1913 1914 if ipv6_data: 1915 prefix_name = ipv6_data.setdefault("prefix_lists", None) 1916 if prefix_name: 1917 rmap_data.append( 1918 "match ipv6 address" 1919 " prefix-list {}".format(prefix_name) 1920 ) 1921 1922 # fetch tag data from rmap 1923 tag = ipv6_data.setdefault("tag", None) 1924 if tag: 1925 rmap_data.append("match tag {}".format(tag)) 1926 1927 # fetch large community data from rmap 1928 large_community_list = ipv6_data.setdefault( 1929 "large_community_list", {} 1930 ) 1931 large_community = match_data.setdefault( 1932 "large_community", {} 1933 ) 1934 1935 if community: 1936 if "id" not in community: 1937 logger.error( 1938 "'id' is mandatory for " 1939 "community-list in match" 1940 " criteria" 1941 ) 1942 return False 1943 cmd = "match community {}".format(community["id"]) 1944 exact_match = community.setdefault("exact_match", False) 1945 if exact_match: 1946 cmd = "{} exact-match".format(cmd) 1947 1948 rmap_data.append(cmd) 1949 if large_community: 1950 if "id" not in large_community: 1951 logger.error( 1952 "'id' is mandatory for " 1953 "large-community-list in match " 1954 "criteria" 1955 ) 1956 return False 1957 cmd = "match large-community {}".format( 1958 large_community["id"] 1959 ) 1960 exact_match = large_community.setdefault( 1961 "exact_match", False 1962 ) 1963 if exact_match: 1964 cmd = "{} exact-match".format(cmd) 1965 rmap_data.append(cmd) 1966 if large_community_list: 1967 if "id" not in large_community_list: 1968 logger.error( 1969 "'id' is mandatory for " 1970 "large-community-list in match " 1971 "criteria" 1972 ) 1973 return False 1974 cmd = "match large-community {}".format( 1975 large_community_list["id"] 1976 ) 1977 exact_match = large_community_list.setdefault( 1978 "exact_match", False 1979 ) 1980 if exact_match: 1981 cmd = "{} exact-match".format(cmd) 1982 rmap_data.append(cmd) 1983 1984 if source_vrf: 1985 cmd = "match source-vrf {}".format(source_vrf) 1986 rmap_data.append(cmd) 1987 1988 if metric: 1989 cmd = "match metric {}".format(metric) 1990 rmap_data.append(cmd) 1991 1992 result = create_common_configuration( 1993 tgen, router, rmap_data, "route_maps", build=build 1994 ) 1995 1996 except InvalidCLIError: 1997 # Traceback 1998 errormsg = traceback.format_exc() 1999 logger.error(errormsg) 2000 return errormsg 2001 2002 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2003 return result 2004 2005 2006def delete_route_maps(tgen, input_dict): 2007 """ 2008 Delete ip route maps from device 2009 * `tgen` : Topogen object 2010 * `input_dict` : for which router, 2011 route map has to be deleted 2012 Usage 2013 ----- 2014 # Delete route-map rmap_1 and rmap_2 from router r1 2015 input_dict = { 2016 "r1": { 2017 "route_maps": ["rmap_1", "rmap__2"] 2018 } 2019 } 2020 result = delete_route_maps("ipv4", input_dict) 2021 Returns 2022 ------- 2023 errormsg(str) or True 2024 """ 2025 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2026 2027 for router in input_dict.keys(): 2028 route_maps = input_dict[router]["route_maps"][:] 2029 rmap_data = input_dict[router] 2030 rmap_data["route_maps"] = {} 2031 for route_map_name in route_maps: 2032 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]}) 2033 2034 return create_route_maps(tgen, input_dict) 2035 2036 2037def create_bgp_community_lists(tgen, input_dict, build=False): 2038 """ 2039 Create bgp community-list or large-community-list on the devices as per 2040 the arguments passed. Takes list of communities in input. 2041 Parameters 2042 ---------- 2043 * `tgen` : Topogen object 2044 * `input_dict` : Input dict data, required when configuring from testcase 2045 * `build` : Only for initial setup phase this is set as True. 2046 Usage 2047 ----- 2048 input_dict_1 = { 2049 "r3": { 2050 "bgp_community_lists": [ 2051 { 2052 "community_type": "standard", 2053 "action": "permit", 2054 "name": "rmap_lcomm_{}".format(addr_type), 2055 "value": "1:1:1 1:2:3 2:1:1 2:2:2", 2056 "large": True 2057 } 2058 ] 2059 } 2060 } 2061 } 2062 result = create_bgp_community_lists(tgen, input_dict_1) 2063 """ 2064 2065 result = False 2066 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2067 input_dict = deepcopy(input_dict) 2068 try: 2069 for router in input_dict.keys(): 2070 if "bgp_community_lists" not in input_dict[router]: 2071 errormsg = "bgp_community_lists not present in input_dict" 2072 logger.debug(errormsg) 2073 continue 2074 2075 config_data = [] 2076 2077 community_list = input_dict[router]["bgp_community_lists"] 2078 for community_dict in community_list: 2079 del_action = community_dict.setdefault("delete", False) 2080 community_type = community_dict.setdefault("community_type", None) 2081 action = community_dict.setdefault("action", None) 2082 value = community_dict.setdefault("value", "") 2083 large = community_dict.setdefault("large", None) 2084 name = community_dict.setdefault("name", None) 2085 if large: 2086 cmd = "bgp large-community-list" 2087 else: 2088 cmd = "bgp community-list" 2089 2090 if not large and not (community_type and action and value): 2091 errormsg = ( 2092 "community_type, action and value are " 2093 "required in bgp_community_list" 2094 ) 2095 logger.error(errormsg) 2096 return False 2097 2098 try: 2099 community_type = int(community_type) 2100 cmd = "{} {} {} {}".format(cmd, community_type, action, value) 2101 except ValueError: 2102 2103 cmd = "{} {} {} {} {}".format( 2104 cmd, community_type, name, action, value 2105 ) 2106 2107 if del_action: 2108 cmd = "no {}".format(cmd) 2109 2110 config_data.append(cmd) 2111 2112 result = create_common_configuration( 2113 tgen, router, config_data, "bgp_community_list", build=build 2114 ) 2115 2116 except InvalidCLIError: 2117 # Traceback 2118 errormsg = traceback.format_exc() 2119 logger.error(errormsg) 2120 return errormsg 2121 2122 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2123 return result 2124 2125 2126def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False): 2127 """ 2128 Shutdown or bringup router's interface " 2129 * `tgen` : Topogen object 2130 * `dut` : Device under test 2131 * `intf_name` : Interface name to be shut/no shut 2132 * `ifaceaction` : Action, to shut/no shut interface, 2133 by default is False 2134 Usage 2135 ----- 2136 dut = "r3" 2137 intf = "r3-r1-eth0" 2138 # Shut down ineterface 2139 shutdown_bringup_interface(tgen, dut, intf, False) 2140 # Bring up ineterface 2141 shutdown_bringup_interface(tgen, dut, intf, True) 2142 Returns 2143 ------- 2144 errormsg(str) or True 2145 """ 2146 2147 router_list = tgen.routers() 2148 if ifaceaction: 2149 logger.info("Bringing up interface : {}".format(intf_name)) 2150 else: 2151 logger.info("Shutting down interface : {}".format(intf_name)) 2152 2153 interface_set_status(router_list[dut], intf_name, ifaceaction) 2154 2155 2156def addKernelRoute( 2157 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None 2158): 2159 """ 2160 Add route to kernel 2161 2162 Parameters: 2163 ----------- 2164 * `tgen` : Topogen object 2165 * `router`: router for which kernal routes needs to be added 2166 * `intf`: interface name, for which kernal routes needs to be added 2167 * `bindToAddress`: bind to <host>, an interface or multicast 2168 address 2169 2170 returns: 2171 -------- 2172 errormsg or True 2173 """ 2174 2175 logger.debug("Entering lib API: addKernelRoute()") 2176 2177 rnode = tgen.routers()[router] 2178 2179 if type(group_addr_range) is not list: 2180 group_addr_range = [group_addr_range] 2181 2182 for grp_addr in group_addr_range: 2183 2184 addr_type = validate_ip_address(grp_addr) 2185 if addr_type == "ipv4": 2186 if next_hop is not None: 2187 cmd = "ip route add {} via {}".format(grp_addr, next_hop) 2188 else: 2189 cmd = "ip route add {} dev {}".format(grp_addr, intf) 2190 if del_action: 2191 cmd = "ip route del {}".format(grp_addr) 2192 verify_cmd = "ip route" 2193 elif addr_type == "ipv6": 2194 if intf and src: 2195 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src) 2196 else: 2197 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop) 2198 verify_cmd = "ip -6 route" 2199 if del_action: 2200 cmd = "ip -6 route del {}".format(grp_addr) 2201 2202 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd)) 2203 output = rnode.run(cmd) 2204 2205 # Verifying if ip route added to kernal 2206 result = rnode.run(verify_cmd) 2207 logger.debug("{}\n{}".format(verify_cmd, result)) 2208 if "/" in grp_addr: 2209 ip, mask = grp_addr.split("/") 2210 if mask == "32" or mask == "128": 2211 grp_addr = ip 2212 2213 if not re_search(r"{}".format(grp_addr), result) and mask != "0": 2214 errormsg = ( 2215 "[DUT: {}]: Kernal route is not added for group" 2216 " address {} Config output: {}".format(router, grp_addr, output) 2217 ) 2218 2219 return errormsg 2220 2221 logger.debug("Exiting lib API: addKernelRoute()") 2222 return True 2223 2224 2225def configure_vxlan(tgen, input_dict): 2226 """ 2227 Add and configure vxlan 2228 2229 * `tgen`: tgen onject 2230 * `input_dict` : data for vxlan config 2231 2232 Usage: 2233 ------ 2234 input_dict= { 2235 "dcg2":{ 2236 "vxlan":[{ 2237 "vxlan_name": "vxlan75100", 2238 "vxlan_id": "75100", 2239 "dstport": 4789, 2240 "local_addr": "120.0.0.1", 2241 "learning": "no", 2242 "delete": True 2243 }] 2244 } 2245 } 2246 2247 configure_vxlan(tgen, input_dict) 2248 2249 Returns: 2250 ------- 2251 True or errormsg 2252 2253 """ 2254 2255 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2256 2257 router_list = tgen.routers() 2258 for dut in input_dict.keys(): 2259 rnode = tgen.routers()[dut] 2260 2261 if "vxlan" in input_dict[dut]: 2262 for vxlan_dict in input_dict[dut]["vxlan"]: 2263 cmd = "ip link " 2264 2265 del_vxlan = vxlan_dict.setdefault("delete", None) 2266 vxlan_names = vxlan_dict.setdefault("vxlan_name", []) 2267 vxlan_ids = vxlan_dict.setdefault("vxlan_id", []) 2268 dstport = vxlan_dict.setdefault("dstport", None) 2269 local_addr = vxlan_dict.setdefault("local_addr", None) 2270 learning = vxlan_dict.setdefault("learning", None) 2271 2272 config_data = [] 2273 if vxlan_names and vxlan_ids: 2274 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids): 2275 cmd = "ip link" 2276 2277 if del_vxlan: 2278 cmd = "{} del {} type vxlan id {}".format( 2279 cmd, vxlan_name, vxlan_id 2280 ) 2281 else: 2282 cmd = "{} add {} type vxlan id {}".format( 2283 cmd, vxlan_name, vxlan_id 2284 ) 2285 2286 if dstport: 2287 cmd = "{} dstport {}".format(cmd, dstport) 2288 2289 if local_addr: 2290 ip_cmd = "ip addr add {} dev {}".format( 2291 local_addr, vxlan_name 2292 ) 2293 if del_vxlan: 2294 ip_cmd = "ip addr del {} dev {}".format( 2295 local_addr, vxlan_name 2296 ) 2297 2298 config_data.append(ip_cmd) 2299 2300 cmd = "{} local {}".format(cmd, local_addr) 2301 2302 if learning == "no": 2303 cmd = "{} nolearning".format(cmd) 2304 2305 elif learning == "yes": 2306 cmd = "{} learning".format(cmd) 2307 2308 config_data.append(cmd) 2309 2310 try: 2311 for _cmd in config_data: 2312 logger.info("[DUT: %s]: Running command: %s", dut, _cmd) 2313 rnode.run(_cmd) 2314 2315 except InvalidCLIError: 2316 # Traceback 2317 errormsg = traceback.format_exc() 2318 logger.error(errormsg) 2319 return errormsg 2320 2321 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2322 2323 return True 2324 2325 2326def configure_brctl(tgen, topo, input_dict): 2327 """ 2328 Add and configure brctl 2329 2330 * `tgen`: tgen onject 2331 * `input_dict` : data for brctl config 2332 2333 Usage: 2334 ------ 2335 input_dict= { 2336 dut:{ 2337 "brctl": [{ 2338 "brctl_name": "br100", 2339 "addvxlan": "vxlan75100", 2340 "vrf": "RED", 2341 "stp": "off" 2342 }] 2343 } 2344 } 2345 2346 configure_brctl(tgen, topo, input_dict) 2347 2348 Returns: 2349 ------- 2350 True or errormsg 2351 2352 """ 2353 2354 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2355 2356 router_list = tgen.routers() 2357 for dut in input_dict.keys(): 2358 rnode = tgen.routers()[dut] 2359 2360 if "brctl" in input_dict[dut]: 2361 for brctl_dict in input_dict[dut]["brctl"]: 2362 2363 brctl_names = brctl_dict.setdefault("brctl_name", []) 2364 addvxlans = brctl_dict.setdefault("addvxlan", []) 2365 stp_values = brctl_dict.setdefault("stp", []) 2366 vrfs = brctl_dict.setdefault("vrf", []) 2367 2368 ip_cmd = "ip link set" 2369 for brctl_name, vxlan, vrf, stp in zip( 2370 brctl_names, addvxlans, vrfs, stp_values 2371 ): 2372 2373 ip_cmd_list = [] 2374 cmd = "ip link add name {} type bridge stp_state {}".format( 2375 brctl_name, stp 2376 ) 2377 2378 logger.info("[DUT: %s]: Running command: %s", dut, cmd) 2379 rnode.run(cmd) 2380 2381 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name)) 2382 2383 if vxlan: 2384 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name) 2385 2386 logger.info("[DUT: %s]: Running command: %s", dut, cmd) 2387 rnode.run(cmd) 2388 2389 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan)) 2390 2391 if vrf: 2392 ip_cmd_list.append( 2393 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf) 2394 ) 2395 2396 for intf_name, data in topo["routers"][dut]["links"].items(): 2397 if "vrf" not in data: 2398 continue 2399 2400 if data["vrf"] == vrf: 2401 ip_cmd_list.append( 2402 "{} up dev {}".format(ip_cmd, data["interface"]) 2403 ) 2404 2405 try: 2406 for _ip_cmd in ip_cmd_list: 2407 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd) 2408 rnode.run(_ip_cmd) 2409 2410 except InvalidCLIError: 2411 # Traceback 2412 errormsg = traceback.format_exc() 2413 logger.error(errormsg) 2414 return errormsg 2415 2416 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2417 return True 2418 2419 2420def configure_interface_mac(tgen, input_dict): 2421 """ 2422 Add and configure brctl 2423 2424 * `tgen`: tgen onject 2425 * `input_dict` : data for mac config 2426 2427 input_mac= { 2428 "edge1":{ 2429 "br75100": "00:80:48:BA:d1:00, 2430 "br75200": "00:80:48:BA:d1:00 2431 } 2432 } 2433 2434 configure_interface_mac(tgen, input_mac) 2435 2436 Returns: 2437 ------- 2438 True or errormsg 2439 2440 """ 2441 2442 router_list = tgen.routers() 2443 for dut in input_dict.keys(): 2444 rnode = tgen.routers()[dut] 2445 2446 for intf, mac in input_dict[dut].items(): 2447 cmd = "ifconfig {} hw ether {}".format(intf, mac) 2448 logger.info("[DUT: %s]: Running command: %s", dut, cmd) 2449 2450 try: 2451 result = rnode.run(cmd) 2452 if len(result) != 0: 2453 return result 2454 2455 except InvalidCLIError: 2456 # Traceback 2457 errormsg = traceback.format_exc() 2458 logger.error(errormsg) 2459 return errormsg 2460 2461 return True 2462 2463 2464############################################# 2465# Verification APIs 2466############################################# 2467@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) 2468def verify_rib( 2469 tgen, 2470 addr_type, 2471 dut, 2472 input_dict, 2473 next_hop=None, 2474 protocol=None, 2475 tag=None, 2476 metric=None, 2477 fib=None, 2478 count_only=False 2479): 2480 """ 2481 Data will be read from input_dict or input JSON file, API will generate 2482 same prefixes, which were redistributed by either create_static_routes() or 2483 advertise_networks_using_network_command() and do will verify next_hop and 2484 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json" 2485 command o/p. 2486 2487 Parameters 2488 ---------- 2489 * `tgen` : topogen object 2490 * `addr_type` : ip type, ipv4/ipv6 2491 * `dut`: Device Under Test, for which user wants to test the data 2492 * `input_dict` : input dict, has details of static routes 2493 * `next_hop`[optional]: next_hop which needs to be verified, 2494 default: static 2495 * `protocol`[optional]: protocol, default = None 2496 * `count_only`[optional]: count of nexthops only, not specific addresses, 2497 default = False 2498 2499 Usage 2500 ----- 2501 # RIB can be verified for static routes OR network advertised using 2502 network command. Following are input_dicts to create static routes 2503 and advertise networks using network command. Any one of the input_dict 2504 can be passed to verify_rib() to verify routes in DUT"s RIB. 2505 2506 # Creating static routes for r1 2507 input_dict = { 2508 "r1": { 2509 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \ 2510 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}] 2511 }} 2512 # Advertising networks using network command in router r1 2513 input_dict = { 2514 "r1": { 2515 "advertise_networks": [{"start_ip": "20.0.0.0/32", 2516 "no_of_network": 10}, 2517 {"start_ip": "30.0.0.0/32"}] 2518 }} 2519 # Verifying ipv4 routes in router r1 learned via BGP 2520 dut = "r2" 2521 protocol = "bgp" 2522 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol) 2523 2524 Returns 2525 ------- 2526 errormsg(str) or True 2527 """ 2528 2529 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2530 2531 router_list = tgen.routers() 2532 additional_nexthops_in_required_nhs = [] 2533 found_hops = [] 2534 for routerInput in input_dict.keys(): 2535 for router, rnode in router_list.iteritems(): 2536 if router != dut: 2537 continue 2538 2539 logger.info("Checking router %s RIB:", router) 2540 2541 # Verifying RIB routes 2542 if addr_type == "ipv4": 2543 command = "show ip route" 2544 else: 2545 command = "show ipv6 route" 2546 2547 found_routes = [] 2548 missing_routes = [] 2549 2550 if "static_routes" in input_dict[routerInput]: 2551 static_routes = input_dict[routerInput]["static_routes"] 2552 2553 for static_route in static_routes: 2554 if "vrf" in static_route and static_route["vrf"] is not None: 2555 2556 logger.info( 2557 "[DUT: {}]: Verifying routes for VRF:" 2558 " {}".format(router, static_route["vrf"]) 2559 ) 2560 2561 cmd = "{} vrf {}".format(command, static_route["vrf"]) 2562 2563 else: 2564 cmd = "{}".format(command) 2565 2566 if protocol: 2567 cmd = "{} {}".format(cmd, protocol) 2568 2569 cmd = "{} json".format(cmd) 2570 2571 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 2572 2573 # Verifying output dictionary rib_routes_json is not empty 2574 if bool(rib_routes_json) is False: 2575 errormsg = "No route found in rib of router {}..".format(router) 2576 return errormsg 2577 2578 network = static_route["network"] 2579 if "no_of_ip" in static_route: 2580 no_of_ip = static_route["no_of_ip"] 2581 else: 2582 no_of_ip = 1 2583 2584 if "tag" in static_route: 2585 _tag = static_route["tag"] 2586 else: 2587 _tag = None 2588 2589 # Generating IPs for verification 2590 ip_list = generate_ips(network, no_of_ip) 2591 st_found = False 2592 nh_found = False 2593 2594 for st_rt in ip_list: 2595 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 2596 2597 _addr_type = validate_ip_address(st_rt) 2598 if _addr_type != addr_type: 2599 continue 2600 2601 if st_rt in rib_routes_json: 2602 st_found = True 2603 found_routes.append(st_rt) 2604 2605 if fib and next_hop: 2606 if type(next_hop) is not list: 2607 next_hop = [next_hop] 2608 2609 for mnh in range(0, len(rib_routes_json[st_rt])): 2610 if ( 2611 "fib" 2612 in rib_routes_json[st_rt][mnh]["nexthops"][0] 2613 ): 2614 found_hops.append( 2615 [ 2616 rib_r["ip"] 2617 for rib_r in rib_routes_json[st_rt][ 2618 mnh 2619 ]["nexthops"] 2620 ] 2621 ) 2622 2623 if found_hops[0]: 2624 missing_list_of_nexthops = set( 2625 found_hops[0] 2626 ).difference(next_hop) 2627 additional_nexthops_in_required_nhs = set( 2628 next_hop 2629 ).difference(found_hops[0]) 2630 2631 if additional_nexthops_in_required_nhs: 2632 logger.info( 2633 "Nexthop " 2634 "%s is not active for route %s in " 2635 "RIB of router %s\n", 2636 additional_nexthops_in_required_nhs, 2637 st_rt, 2638 dut, 2639 ) 2640 errormsg = ( 2641 "Nexthop {} is not active" 2642 " for route {} in RIB of router" 2643 " {}\n".format( 2644 additional_nexthops_in_required_nhs, 2645 st_rt, 2646 dut, 2647 ) 2648 ) 2649 return errormsg 2650 else: 2651 nh_found = True 2652 2653 elif next_hop and fib is None: 2654 if type(next_hop) is not list: 2655 next_hop = [next_hop] 2656 found_hops = [ 2657 rib_r["ip"] 2658 for rib_r in rib_routes_json[st_rt][0]["nexthops"] 2659 ] 2660 2661 # Check only the count of nexthops 2662 if count_only: 2663 if len(next_hop) == len(found_hops): 2664 nh_found = True 2665 else: 2666 errormsg = ( 2667 "Nexthops are missing for " 2668 "route {} in RIB of router {}: " 2669 "expected {}, found {}\n".format( 2670 st_rt, dut, len(next_hop), 2671 len(found_hops) 2672 ) 2673 ) 2674 return errormsg 2675 2676 # Check the actual nexthops 2677 elif found_hops: 2678 missing_list_of_nexthops = set( 2679 found_hops 2680 ).difference(next_hop) 2681 additional_nexthops_in_required_nhs = set( 2682 next_hop 2683 ).difference(found_hops) 2684 2685 if additional_nexthops_in_required_nhs: 2686 logger.info( 2687 "Missing nexthop %s for route" 2688 " %s in RIB of router %s\n", 2689 additional_nexthops_in_required_nhs, 2690 st_rt, 2691 dut, 2692 ) 2693 errormsg = ( 2694 "Nexthop {} is Missing for " 2695 "route {} in RIB of router {}\n".format( 2696 additional_nexthops_in_required_nhs, 2697 st_rt, 2698 dut, 2699 ) 2700 ) 2701 return errormsg 2702 else: 2703 nh_found = True 2704 2705 if tag: 2706 if "tag" not in rib_routes_json[st_rt][0]: 2707 errormsg = ( 2708 "[DUT: {}]: tag is not" 2709 " present for" 2710 " route {} in RIB \n".format(dut, st_rt) 2711 ) 2712 return errormsg 2713 2714 if _tag != rib_routes_json[st_rt][0]["tag"]: 2715 errormsg = ( 2716 "[DUT: {}]: tag value {}" 2717 " is not matched for" 2718 " route {} in RIB \n".format(dut, _tag, st_rt,) 2719 ) 2720 return errormsg 2721 2722 if metric is not None: 2723 if "metric" not in rib_routes_json[st_rt][0]: 2724 errormsg = ( 2725 "[DUT: {}]: metric is" 2726 " not present for" 2727 " route {} in RIB \n".format(dut, st_rt) 2728 ) 2729 return errormsg 2730 2731 if metric != rib_routes_json[st_rt][0]["metric"]: 2732 errormsg = ( 2733 "[DUT: {}]: metric value " 2734 "{} is not matched for " 2735 "route {} in RIB \n".format(dut, metric, st_rt,) 2736 ) 2737 return errormsg 2738 2739 else: 2740 missing_routes.append(st_rt) 2741 2742 if nh_found: 2743 logger.info( 2744 "[DUT: {}]: Found next_hop {} for all bgp" 2745 " routes in RIB".format(router, next_hop) 2746 ) 2747 2748 if len(missing_routes) > 0: 2749 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format( 2750 dut, missing_routes 2751 ) 2752 return errormsg 2753 2754 if found_routes: 2755 logger.info( 2756 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n", 2757 dut, 2758 found_routes, 2759 ) 2760 2761 continue 2762 2763 if "bgp" in input_dict[routerInput]: 2764 if ( 2765 "advertise_networks" 2766 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][ 2767 "unicast" 2768 ] 2769 ): 2770 continue 2771 2772 found_routes = [] 2773 missing_routes = [] 2774 advertise_network = input_dict[routerInput]["bgp"]["address_family"][ 2775 addr_type 2776 ]["unicast"]["advertise_networks"] 2777 2778 # Continue if there are no network advertise 2779 if len(advertise_network) == 0: 2780 continue 2781 2782 for advertise_network_dict in advertise_network: 2783 if "vrf" in advertise_network_dict: 2784 cmd = "{} vrf {} json".format(command, static_route["vrf"]) 2785 else: 2786 cmd = "{} json".format(command) 2787 2788 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 2789 2790 # Verifying output dictionary rib_routes_json is not empty 2791 if bool(rib_routes_json) is False: 2792 errormsg = "No route found in rib of router {}..".format(router) 2793 return errormsg 2794 2795 start_ip = advertise_network_dict["network"] 2796 if "no_of_network" in advertise_network_dict: 2797 no_of_network = advertise_network_dict["no_of_network"] 2798 else: 2799 no_of_network = 1 2800 2801 # Generating IPs for verification 2802 ip_list = generate_ips(start_ip, no_of_network) 2803 st_found = False 2804 nh_found = False 2805 2806 for st_rt in ip_list: 2807 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 2808 2809 _addr_type = validate_ip_address(st_rt) 2810 if _addr_type != addr_type: 2811 continue 2812 2813 if st_rt in rib_routes_json: 2814 st_found = True 2815 found_routes.append(st_rt) 2816 2817 if next_hop: 2818 if type(next_hop) is not list: 2819 next_hop = [next_hop] 2820 2821 count = 0 2822 for nh in next_hop: 2823 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]: 2824 if nh_dict["ip"] != nh: 2825 continue 2826 else: 2827 count += 1 2828 2829 if count == len(next_hop): 2830 nh_found = True 2831 else: 2832 errormsg = ( 2833 "Nexthop {} is Missing" 2834 " for route {} in " 2835 "RIB of router {}\n".format(next_hop, st_rt, dut) 2836 ) 2837 return errormsg 2838 else: 2839 missing_routes.append(st_rt) 2840 2841 if nh_found: 2842 logger.info( 2843 "Found next_hop {} for all routes in RIB" 2844 " of router {}\n".format(next_hop, dut) 2845 ) 2846 2847 if len(missing_routes) > 0: 2848 errormsg = ( 2849 "Missing {} route in RIB of router {}, " 2850 "routes: {} \n".format(addr_type, dut, missing_routes) 2851 ) 2852 return errormsg 2853 2854 if found_routes: 2855 logger.info( 2856 "Verified {} routes in router {} RIB, found" 2857 " routes are: {}\n".format(addr_type, dut, found_routes) 2858 ) 2859 2860 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2861 return True 2862 2863 2864@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) 2865def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): 2866 """ 2867 Data will be read from input_dict or input JSON file, API will generate 2868 same prefixes, which were redistributed by either create_static_routes() or 2869 advertise_networks_using_network_command() and will verify next_hop and 2870 each prefix/routes is present in "show ip/ipv6 fib json" 2871 command o/p. 2872 2873 Parameters 2874 ---------- 2875 * `tgen` : topogen object 2876 * `addr_type` : ip type, ipv4/ipv6 2877 * `dut`: Device Under Test, for which user wants to test the data 2878 * `input_dict` : input dict, has details of static routes 2879 * `next_hop`[optional]: next_hop which needs to be verified, 2880 default: static 2881 2882 Usage 2883 ----- 2884 input_routes_r1 = { 2885 "r1": { 2886 "static_routes": [{ 2887 "network": ["1.1.1.1/32], 2888 "next_hop": "Null0", 2889 "vrf": "RED" 2890 }] 2891 } 2892 } 2893 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1) 2894 2895 Returns 2896 ------- 2897 errormsg(str) or True 2898 """ 2899 2900 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2901 2902 router_list = tgen.routers() 2903 for routerInput in input_dict.keys(): 2904 for router, rnode in router_list.iteritems(): 2905 if router != dut: 2906 continue 2907 2908 logger.info("Checking router %s FIB routes:", router) 2909 2910 # Verifying RIB routes 2911 if addr_type == "ipv4": 2912 command = "show ip fib" 2913 else: 2914 command = "show ipv6 fib" 2915 2916 found_routes = [] 2917 missing_routes = [] 2918 2919 if "static_routes" in input_dict[routerInput]: 2920 static_routes = input_dict[routerInput]["static_routes"] 2921 2922 for static_route in static_routes: 2923 if "vrf" in static_route and static_route["vrf"] is not None: 2924 2925 logger.info( 2926 "[DUT: {}]: Verifying routes for VRF:" 2927 " {}".format(router, static_route["vrf"]) 2928 ) 2929 2930 cmd = "{} vrf {}".format(command, static_route["vrf"]) 2931 2932 else: 2933 cmd = "{}".format(command) 2934 2935 cmd = "{} json".format(cmd) 2936 2937 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 2938 2939 # Verifying output dictionary rib_routes_json is not empty 2940 if bool(rib_routes_json) is False: 2941 errormsg = "[DUT: {}]: No route found in fib".format(router) 2942 return errormsg 2943 2944 network = static_route["network"] 2945 if "no_of_ip" in static_route: 2946 no_of_ip = static_route["no_of_ip"] 2947 else: 2948 no_of_ip = 1 2949 2950 # Generating IPs for verification 2951 ip_list = generate_ips(network, no_of_ip) 2952 st_found = False 2953 nh_found = False 2954 2955 for st_rt in ip_list: 2956 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 2957 #st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) 2958 2959 _addr_type = validate_ip_address(st_rt) 2960 if _addr_type != addr_type: 2961 continue 2962 2963 if st_rt in rib_routes_json: 2964 st_found = True 2965 found_routes.append(st_rt) 2966 2967 if next_hop: 2968 if type(next_hop) is not list: 2969 next_hop = [next_hop] 2970 2971 count = 0 2972 for nh in next_hop: 2973 for nh_dict in rib_routes_json[st_rt][0][ 2974 "nexthops" 2975 ]: 2976 if nh_dict["ip"] != nh: 2977 continue 2978 else: 2979 count += 1 2980 2981 if count == len(next_hop): 2982 nh_found = True 2983 else: 2984 missing_routes.append(st_rt) 2985 errormsg = ( 2986 "Nexthop {} is Missing" 2987 " for route {} in " 2988 "RIB of router {}\n".format( 2989 next_hop, st_rt, dut 2990 ) 2991 ) 2992 return errormsg 2993 2994 else: 2995 missing_routes.append(st_rt) 2996 2997 if len(missing_routes) > 0: 2998 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format( 2999 dut, missing_routes 3000 ) 3001 return errormsg 3002 3003 if nh_found: 3004 logger.info( 3005 "Found next_hop {} for all routes in RIB" 3006 " of router {}\n".format(next_hop, dut) 3007 ) 3008 3009 if found_routes: 3010 logger.info( 3011 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n", 3012 dut, 3013 found_routes, 3014 ) 3015 3016 continue 3017 3018 if "bgp" in input_dict[routerInput]: 3019 if ( 3020 "advertise_networks" 3021 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][ 3022 "unicast" 3023 ] 3024 ): 3025 continue 3026 3027 found_routes = [] 3028 missing_routes = [] 3029 advertise_network = input_dict[routerInput]["bgp"]["address_family"][ 3030 addr_type 3031 ]["unicast"]["advertise_networks"] 3032 3033 # Continue if there are no network advertise 3034 if len(advertise_network) == 0: 3035 continue 3036 3037 for advertise_network_dict in advertise_network: 3038 if "vrf" in advertise_network_dict: 3039 cmd = "{} vrf {} json".format(command, static_route["vrf"]) 3040 else: 3041 cmd = "{} json".format(command) 3042 3043 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 3044 3045 # Verifying output dictionary rib_routes_json is not empty 3046 if bool(rib_routes_json) is False: 3047 errormsg = "No route found in rib of router {}..".format(router) 3048 return errormsg 3049 3050 start_ip = advertise_network_dict["network"] 3051 if "no_of_network" in advertise_network_dict: 3052 no_of_network = advertise_network_dict["no_of_network"] 3053 else: 3054 no_of_network = 1 3055 3056 # Generating IPs for verification 3057 ip_list = generate_ips(start_ip, no_of_network) 3058 st_found = False 3059 nh_found = False 3060 3061 for st_rt in ip_list: 3062 #st_rt = str(ipaddr.IPNetwork(unicode(st_rt))) 3063 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 3064 3065 _addr_type = validate_ip_address(st_rt) 3066 if _addr_type != addr_type: 3067 continue 3068 3069 if st_rt in rib_routes_json: 3070 st_found = True 3071 found_routes.append(st_rt) 3072 3073 if next_hop: 3074 if type(next_hop) is not list: 3075 next_hop = [next_hop] 3076 3077 count = 0 3078 for nh in next_hop: 3079 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]: 3080 if nh_dict["ip"] != nh: 3081 continue 3082 else: 3083 count += 1 3084 3085 if count == len(next_hop): 3086 nh_found = True 3087 else: 3088 missing_routes.append(st_rt) 3089 errormsg = ( 3090 "Nexthop {} is Missing" 3091 " for route {} in " 3092 "RIB of router {}\n".format(next_hop, st_rt, dut) 3093 ) 3094 return errormsg 3095 else: 3096 missing_routes.append(st_rt) 3097 3098 if len(missing_routes) > 0: 3099 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format( 3100 dut, missing_routes 3101 ) 3102 return errormsg 3103 3104 if nh_found: 3105 logger.info( 3106 "Found next_hop {} for all routes in RIB" 3107 " of router {}\n".format(next_hop, dut) 3108 ) 3109 3110 if found_routes: 3111 logger.info( 3112 "[DUT: {}]: Verified routes FIB" 3113 ", found routes are: {}\n".format(dut, found_routes) 3114 ) 3115 3116 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3117 return True 3118 3119 3120@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) 3121def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None): 3122 """ 3123 Data will be read from input_dict or input JSON file, API will generate 3124 same prefixes, which were redistributed by either create_static_routes() or 3125 advertise_networks_using_network_command() and will verify next_hop and 3126 each prefix/routes is present in "show ip/ipv6 fib json" 3127 command o/p. 3128 3129 Parameters 3130 ---------- 3131 * `tgen` : topogen object 3132 * `addr_type` : ip type, ipv4/ipv6 3133 * `dut`: Device Under Test, for which user wants to test the data 3134 * `input_dict` : input dict, has details of static routes 3135 * `next_hop`[optional]: next_hop which needs to be verified, 3136 default: static 3137 3138 Usage 3139 ----- 3140 input_routes_r1 = { 3141 "r1": { 3142 "static_routes": [{ 3143 "network": ["1.1.1.1/32], 3144 "next_hop": "Null0", 3145 "vrf": "RED" 3146 }] 3147 } 3148 } 3149 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1) 3150 3151 Returns 3152 ------- 3153 errormsg(str) or True 3154 """ 3155 3156 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3157 3158 router_list = tgen.routers() 3159 for routerInput in input_dict.keys(): 3160 for router, rnode in router_list.iteritems(): 3161 if router != dut: 3162 continue 3163 3164 logger.info("Checking router %s FIB routes:", router) 3165 3166 # Verifying RIB routes 3167 if addr_type == "ipv4": 3168 command = "show ip fib" 3169 else: 3170 command = "show ipv6 fib" 3171 3172 found_routes = [] 3173 missing_routes = [] 3174 3175 if "static_routes" in input_dict[routerInput]: 3176 static_routes = input_dict[routerInput]["static_routes"] 3177 3178 for static_route in static_routes: 3179 if "vrf" in static_route and static_route["vrf"] is not None: 3180 3181 logger.info( 3182 "[DUT: {}]: Verifying routes for VRF:" 3183 " {}".format(router, static_route["vrf"]) 3184 ) 3185 3186 cmd = "{} vrf {}".format(command, static_route["vrf"]) 3187 3188 else: 3189 cmd = "{}".format(command) 3190 3191 cmd = "{} json".format(cmd) 3192 3193 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 3194 3195 # Verifying output dictionary rib_routes_json is not empty 3196 if bool(rib_routes_json) is False: 3197 errormsg = "[DUT: {}]: No route found in fib".format(router) 3198 return errormsg 3199 3200 network = static_route["network"] 3201 if "no_of_ip" in static_route: 3202 no_of_ip = static_route["no_of_ip"] 3203 else: 3204 no_of_ip = 1 3205 3206 # Generating IPs for verification 3207 ip_list = generate_ips(network, no_of_ip) 3208 st_found = False 3209 nh_found = False 3210 3211 for st_rt in ip_list: 3212 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 3213 3214 _addr_type = validate_ip_address(st_rt) 3215 if _addr_type != addr_type: 3216 continue 3217 3218 if st_rt in rib_routes_json: 3219 st_found = True 3220 found_routes.append(st_rt) 3221 3222 if next_hop: 3223 if type(next_hop) is not list: 3224 next_hop = [next_hop] 3225 3226 count = 0 3227 for nh in next_hop: 3228 for nh_dict in rib_routes_json[st_rt][0][ 3229 "nexthops" 3230 ]: 3231 if nh_dict["ip"] != nh: 3232 continue 3233 else: 3234 count += 1 3235 3236 if count == len(next_hop): 3237 nh_found = True 3238 else: 3239 missing_routes.append(st_rt) 3240 errormsg = ( 3241 "Nexthop {} is Missing" 3242 " for route {} in " 3243 "RIB of router {}\n".format( 3244 next_hop, st_rt, dut 3245 ) 3246 ) 3247 return errormsg 3248 3249 else: 3250 missing_routes.append(st_rt) 3251 3252 if len(missing_routes) > 0: 3253 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format( 3254 dut, missing_routes 3255 ) 3256 return errormsg 3257 3258 if nh_found: 3259 logger.info( 3260 "Found next_hop {} for all routes in RIB" 3261 " of router {}\n".format(next_hop, dut) 3262 ) 3263 3264 if found_routes: 3265 logger.info( 3266 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n", 3267 dut, 3268 found_routes, 3269 ) 3270 3271 continue 3272 3273 if "bgp" in input_dict[routerInput]: 3274 if ( 3275 "advertise_networks" 3276 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][ 3277 "unicast" 3278 ] 3279 ): 3280 continue 3281 3282 found_routes = [] 3283 missing_routes = [] 3284 advertise_network = input_dict[routerInput]["bgp"]["address_family"][ 3285 addr_type 3286 ]["unicast"]["advertise_networks"] 3287 3288 # Continue if there are no network advertise 3289 if len(advertise_network) == 0: 3290 continue 3291 3292 for advertise_network_dict in advertise_network: 3293 if "vrf" in advertise_network_dict: 3294 cmd = "{} vrf {} json".format(command, static_route["vrf"]) 3295 else: 3296 cmd = "{} json".format(command) 3297 3298 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 3299 3300 # Verifying output dictionary rib_routes_json is not empty 3301 if bool(rib_routes_json) is False: 3302 errormsg = "No route found in rib of router {}..".format(router) 3303 return errormsg 3304 3305 start_ip = advertise_network_dict["network"] 3306 if "no_of_network" in advertise_network_dict: 3307 no_of_network = advertise_network_dict["no_of_network"] 3308 else: 3309 no_of_network = 1 3310 3311 # Generating IPs for verification 3312 ip_list = generate_ips(start_ip, no_of_network) 3313 st_found = False 3314 nh_found = False 3315 3316 for st_rt in ip_list: 3317 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 3318 3319 _addr_type = validate_ip_address(st_rt) 3320 if _addr_type != addr_type: 3321 continue 3322 3323 if st_rt in rib_routes_json: 3324 st_found = True 3325 found_routes.append(st_rt) 3326 3327 if next_hop: 3328 if type(next_hop) is not list: 3329 next_hop = [next_hop] 3330 3331 count = 0 3332 for nh in next_hop: 3333 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]: 3334 if nh_dict["ip"] != nh: 3335 continue 3336 else: 3337 count += 1 3338 3339 if count == len(next_hop): 3340 nh_found = True 3341 else: 3342 missing_routes.append(st_rt) 3343 errormsg = ( 3344 "Nexthop {} is Missing" 3345 " for route {} in " 3346 "RIB of router {}\n".format(next_hop, st_rt, dut) 3347 ) 3348 return errormsg 3349 else: 3350 missing_routes.append(st_rt) 3351 3352 if len(missing_routes) > 0: 3353 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format( 3354 dut, missing_routes 3355 ) 3356 return errormsg 3357 3358 if nh_found: 3359 logger.info( 3360 "Found next_hop {} for all routes in RIB" 3361 " of router {}\n".format(next_hop, dut) 3362 ) 3363 3364 if found_routes: 3365 logger.info( 3366 "[DUT: {}]: Verified routes FIB" 3367 ", found routes are: {}\n".format(dut, found_routes) 3368 ) 3369 3370 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3371 return True 3372 3373 3374def verify_admin_distance_for_static_routes(tgen, input_dict): 3375 """ 3376 API to verify admin distance for static routes as defined in input_dict/ 3377 input JSON by running show ip/ipv6 route json command. 3378 Parameter 3379 --------- 3380 * `tgen` : topogen object 3381 * `input_dict`: having details like - for which router and static routes 3382 admin dsitance needs to be verified 3383 Usage 3384 ----- 3385 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop 3386 10.0.0.2 in router r1 3387 input_dict = { 3388 "r1": { 3389 "static_routes": [{ 3390 "network": "10.0.20.1/32", 3391 "admin_distance": 10, 3392 "next_hop": "10.0.0.2" 3393 }] 3394 } 3395 } 3396 result = verify_admin_distance_for_static_routes(tgen, input_dict) 3397 Returns 3398 ------- 3399 errormsg(str) or True 3400 """ 3401 3402 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3403 3404 for router in input_dict.keys(): 3405 if router not in tgen.routers(): 3406 continue 3407 3408 rnode = tgen.routers()[router] 3409 3410 for static_route in input_dict[router]["static_routes"]: 3411 addr_type = validate_ip_address(static_route["network"]) 3412 # Command to execute 3413 if addr_type == "ipv4": 3414 command = "show ip route json" 3415 else: 3416 command = "show ipv6 route json" 3417 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True) 3418 3419 logger.info( 3420 "Verifying admin distance for static route %s" " under dut %s:", 3421 static_route, 3422 router, 3423 ) 3424 network = static_route["network"] 3425 next_hop = static_route["next_hop"] 3426 admin_distance = static_route["admin_distance"] 3427 route_data = show_ip_route_json[network][0] 3428 if network in show_ip_route_json: 3429 if route_data["nexthops"][0]["ip"] == next_hop: 3430 if route_data["distance"] != admin_distance: 3431 errormsg = ( 3432 "Verification failed: admin distance" 3433 " for static route {} under dut {}," 3434 " found:{} but expected:{}".format( 3435 static_route, 3436 router, 3437 route_data["distance"], 3438 admin_distance, 3439 ) 3440 ) 3441 return errormsg 3442 else: 3443 logger.info( 3444 "Verification successful: admin" 3445 " distance for static route %s under" 3446 " dut %s, found:%s", 3447 static_route, 3448 router, 3449 route_data["distance"], 3450 ) 3451 3452 else: 3453 errormsg = ( 3454 "Static route {} not found in " 3455 "show_ip_route_json for dut {}".format(network, router) 3456 ) 3457 return errormsg 3458 3459 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3460 return True 3461 3462 3463def verify_prefix_lists(tgen, input_dict): 3464 """ 3465 Running "show ip prefix-list" command and verifying given prefix-list 3466 is present in router. 3467 Parameters 3468 ---------- 3469 * `tgen` : topogen object 3470 * `input_dict`: data to verify prefix lists 3471 Usage 3472 ----- 3473 # To verify pf_list_1 is present in router r1 3474 input_dict = { 3475 "r1": { 3476 "prefix_lists": ["pf_list_1"] 3477 }} 3478 result = verify_prefix_lists("ipv4", input_dict, tgen) 3479 Returns 3480 ------- 3481 errormsg(str) or True 3482 """ 3483 3484 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3485 3486 for router in input_dict.keys(): 3487 if router not in tgen.routers(): 3488 continue 3489 3490 rnode = tgen.routers()[router] 3491 3492 # Show ip prefix list 3493 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list") 3494 3495 # Verify Prefix list is deleted 3496 prefix_lists_addr = input_dict[router]["prefix_lists"] 3497 for addr_type in prefix_lists_addr: 3498 if not check_address_types(addr_type): 3499 continue 3500 3501 for prefix_list in prefix_lists_addr[addr_type].keys(): 3502 if prefix_list in show_prefix_list: 3503 errormsg = ( 3504 "Prefix list {} is/are present in the router" 3505 " {}".format(prefix_list, router) 3506 ) 3507 return errormsg 3508 3509 logger.info( 3510 "Prefix list %s is/are not present in the router" " from router %s", 3511 prefix_list, 3512 router, 3513 ) 3514 3515 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3516 return True 3517 3518 3519@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) 3520def verify_route_maps(tgen, input_dict): 3521 """ 3522 Running "show route-map" command and verifying given route-map 3523 is present in router. 3524 Parameters 3525 ---------- 3526 * `tgen` : topogen object 3527 * `input_dict`: data to verify prefix lists 3528 Usage 3529 ----- 3530 # To verify rmap_1 and rmap_2 are present in router r1 3531 input_dict = { 3532 "r1": { 3533 "route_maps": ["rmap_1", "rmap_2"] 3534 } 3535 } 3536 result = verify_route_maps(tgen, input_dict) 3537 Returns 3538 ------- 3539 errormsg(str) or True 3540 """ 3541 3542 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3543 3544 for router in input_dict.keys(): 3545 if router not in tgen.routers(): 3546 continue 3547 3548 rnode = tgen.routers()[router] 3549 # Show ip route-map 3550 show_route_maps = rnode.vtysh_cmd("show route-map") 3551 3552 # Verify route-map is deleted 3553 route_maps = input_dict[router]["route_maps"] 3554 for route_map in route_maps: 3555 if route_map in show_route_maps: 3556 errormsg = "Route map {} is not deleted from router" " {}".format( 3557 route_map, router 3558 ) 3559 return errormsg 3560 3561 logger.info( 3562 "Route map %s is/are deleted successfully from" " router %s", 3563 route_maps, 3564 router, 3565 ) 3566 3567 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3568 return True 3569 3570 3571@retry(attempts=3, wait=4, return_is_str=True) 3572def verify_bgp_community(tgen, addr_type, router, network, input_dict=None): 3573 """ 3574 API to veiryf BGP large community is attached in route for any given 3575 DUT by running "show bgp ipv4/6 {route address} json" command. 3576 Parameters 3577 ---------- 3578 * `tgen`: topogen object 3579 * `addr_type` : ip type, ipv4/ipv6 3580 * `dut`: Device Under Test 3581 * `network`: network for which set criteria needs to be verified 3582 * `input_dict`: having details like - for which router, community and 3583 values needs to be verified 3584 Usage 3585 ----- 3586 networks = ["200.50.2.0/32"] 3587 input_dict = { 3588 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5" 3589 } 3590 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None) 3591 Returns 3592 ------- 3593 errormsg(str) or True 3594 """ 3595 3596 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3597 if router not in tgen.routers(): 3598 return False 3599 3600 rnode = tgen.routers()[router] 3601 3602 logger.debug( 3603 "Verifying BGP community attributes on dut %s: for %s " "network %s", 3604 router, 3605 addr_type, 3606 network, 3607 ) 3608 3609 for net in network: 3610 cmd = "show bgp {} {} json".format(addr_type, net) 3611 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True) 3612 logger.info(show_bgp_json) 3613 if "paths" not in show_bgp_json: 3614 return "Prefix {} not found in BGP table of router: {}".format(net, router) 3615 3616 as_paths = show_bgp_json["paths"] 3617 found = False 3618 for i in range(len(as_paths)): 3619 if ( 3620 "largeCommunity" in show_bgp_json["paths"][i] 3621 or "community" in show_bgp_json["paths"][i] 3622 ): 3623 found = True 3624 logger.info( 3625 "Large Community attribute is found for route:" " %s in router: %s", 3626 net, 3627 router, 3628 ) 3629 if input_dict is not None: 3630 for criteria, comm_val in input_dict.items(): 3631 show_val = show_bgp_json["paths"][i][criteria]["string"] 3632 if comm_val == show_val: 3633 logger.info( 3634 "Verifying BGP %s for prefix: %s" 3635 " in router: %s, found expected" 3636 " value: %s", 3637 criteria, 3638 net, 3639 router, 3640 comm_val, 3641 ) 3642 else: 3643 errormsg = ( 3644 "Failed: Verifying BGP attribute" 3645 " {} for route: {} in router: {}" 3646 ", expected value: {} but found" 3647 ": {}".format(criteria, net, router, comm_val, show_val) 3648 ) 3649 return errormsg 3650 3651 if not found: 3652 errormsg = ( 3653 "Large Community attribute is not found for route: " 3654 "{} in router: {} ".format(net, router) 3655 ) 3656 return errormsg 3657 3658 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3659 return True 3660 3661 3662def verify_create_community_list(tgen, input_dict): 3663 """ 3664 API is to verify if large community list is created for any given DUT in 3665 input_dict by running "sh bgp large-community-list {"comm_name"} detail" 3666 command. 3667 Parameters 3668 ---------- 3669 * `tgen`: topogen object 3670 * `input_dict`: having details like - for which router, large community 3671 needs to be verified 3672 Usage 3673 ----- 3674 input_dict = { 3675 "r1": { 3676 "large-community-list": { 3677 "standard": { 3678 "Test1": [{"action": "PERMIT", "attribute":\ 3679 ""}] 3680 }}}} 3681 result = verify_create_community_list(tgen, input_dict) 3682 Returns 3683 ------- 3684 errormsg(str) or True 3685 """ 3686 3687 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3688 3689 for router in input_dict.keys(): 3690 if router not in tgen.routers(): 3691 continue 3692 3693 rnode = tgen.routers()[router] 3694 3695 logger.info("Verifying large-community is created for dut %s:", router) 3696 3697 for comm_data in input_dict[router]["bgp_community_lists"]: 3698 comm_name = comm_data["name"] 3699 comm_type = comm_data["community_type"] 3700 show_bgp_community = run_frr_cmd( 3701 rnode, "show bgp large-community-list {} detail".format(comm_name) 3702 ) 3703 3704 # Verify community list and type 3705 if comm_name in show_bgp_community and comm_type in show_bgp_community: 3706 logger.info( 3707 "BGP %s large-community-list %s is" " created", comm_type, comm_name 3708 ) 3709 else: 3710 errormsg = "BGP {} large-community-list {} is not" " created".format( 3711 comm_type, comm_name 3712 ) 3713 return errormsg 3714 3715 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3716 return True 3717 3718 3719def verify_cli_json(tgen, input_dict): 3720 """ 3721 API to verify if JSON is available for clis 3722 command. 3723 3724 Parameters 3725 ---------- 3726 * `tgen`: topogen object 3727 * `input_dict`: CLIs for which JSON needs to be verified 3728 Usage 3729 ----- 3730 input_dict = { 3731 "edge1":{ 3732 "cli": ["show evpn vni detail", show evpn rmac vni all] 3733 } 3734 } 3735 3736 result = verify_cli_json(tgen, input_dict) 3737 3738 Returns 3739 ------- 3740 errormsg(str) or True 3741 """ 3742 3743 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3744 for dut in input_dict.keys(): 3745 rnode = tgen.routers()[dut] 3746 3747 for cli in input_dict[dut]["cli"]: 3748 logger.info( 3749 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli 3750 ) 3751 3752 test_cli = "{} json".format(cli) 3753 ret_json = rnode.vtysh_cmd(test_cli, isjson=True) 3754 if not bool(ret_json): 3755 errormsg = "CLI: %s, JSON format is not available" % (cli) 3756 return errormsg 3757 elif "unknown" in ret_json or "Unknown" in ret_json: 3758 errormsg = "CLI: %s, JSON format is not available" % (cli) 3759 return errormsg 3760 else: 3761 logger.info( 3762 "CLI : %s JSON format is available: " "\n %s", cli, ret_json 3763 ) 3764 3765 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3766 3767 return True 3768 3769 3770@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) 3771def verify_evpn_vni(tgen, input_dict): 3772 """ 3773 API to verify evpn vni details using "show evpn vni detail json" 3774 command. 3775 3776 Parameters 3777 ---------- 3778 * `tgen`: topogen object 3779 * `input_dict`: having details like - for which router, evpn details 3780 needs to be verified 3781 Usage 3782 ----- 3783 input_dict = { 3784 "edge1":{ 3785 "vni": [ 3786 { 3787 "75100":{ 3788 "vrf": "RED", 3789 "vxlanIntf": "vxlan75100", 3790 "localVtepIp": "120.1.1.1", 3791 "sviIntf": "br100" 3792 } 3793 } 3794 ] 3795 } 3796 } 3797 3798 result = verify_evpn_vni(tgen, input_dict) 3799 3800 Returns 3801 ------- 3802 errormsg(str) or True 3803 """ 3804 3805 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3806 for dut in input_dict.keys(): 3807 rnode = tgen.routers()[dut] 3808 3809 logger.info("[DUT: %s]: Verifying evpn vni details :", dut) 3810 3811 cmd = "show evpn vni detail json" 3812 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True) 3813 if not bool(evpn_all_vni_json): 3814 errormsg = "No output for '{}' cli".format(cmd) 3815 return errormsg 3816 3817 if "vni" in input_dict[dut]: 3818 for vni_dict in input_dict[dut]["vni"]: 3819 found = False 3820 vni = vni_dict["name"] 3821 for evpn_vni_json in evpn_all_vni_json: 3822 if "vni" in evpn_vni_json: 3823 if evpn_vni_json["vni"] != int(vni): 3824 continue 3825 3826 for attribute in vni_dict.keys(): 3827 if vni_dict[attribute] != evpn_vni_json[attribute]: 3828 errormsg = ( 3829 "[DUT: %s] Verifying " 3830 "%s for VNI: %s [FAILED]||" 3831 ", EXPECTED : %s " 3832 " FOUND : %s" 3833 % ( 3834 dut, 3835 attribute, 3836 vni, 3837 vni_dict[attribute], 3838 evpn_vni_json[attribute], 3839 ) 3840 ) 3841 return errormsg 3842 3843 else: 3844 found = True 3845 logger.info( 3846 "[DUT: %s] Verifying" 3847 " %s for VNI: %s , " 3848 "Found Expected : %s ", 3849 dut, 3850 attribute, 3851 vni, 3852 evpn_vni_json[attribute], 3853 ) 3854 3855 if evpn_vni_json["state"] != "Up": 3856 errormsg = ( 3857 "[DUT: %s] Failed: Verifying" 3858 " State for VNI: %s is not Up" % (dut, vni) 3859 ) 3860 return errormsg 3861 3862 else: 3863 errormsg = ( 3864 "[DUT: %s] Failed:" 3865 " VNI: %s is not present in JSON" % (dut, vni) 3866 ) 3867 return errormsg 3868 3869 if found: 3870 logger.info( 3871 "[DUT %s]: Verifying VNI : %s " 3872 "details and state is Up [PASSED]!!", 3873 dut, 3874 vni, 3875 ) 3876 return True 3877 3878 else: 3879 errormsg = ( 3880 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut) 3881 ) 3882 return errormsg 3883 3884 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3885 return False 3886 3887 3888@retry(attempts=2, wait=4, return_is_str=True, initial_wait=2) 3889def verify_vrf_vni(tgen, input_dict): 3890 """ 3891 API to verify vrf vni details using "show vrf vni json" 3892 command. 3893 3894 Parameters 3895 ---------- 3896 * `tgen`: topogen object 3897 * `input_dict`: having details like - for which router, evpn details 3898 needs to be verified 3899 Usage 3900 ----- 3901 input_dict = { 3902 "edge1":{ 3903 "vrfs": [ 3904 { 3905 "RED":{ 3906 "vni": 75000, 3907 "vxlanIntf": "vxlan75100", 3908 "sviIntf": "br100", 3909 "routerMac": "00:80:48:ba:d1:00", 3910 "state": "Up" 3911 } 3912 } 3913 ] 3914 } 3915 } 3916 3917 result = verify_vrf_vni(tgen, input_dict) 3918 3919 Returns 3920 ------- 3921 errormsg(str) or True 3922 """ 3923 3924 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3925 for dut in input_dict.keys(): 3926 rnode = tgen.routers()[dut] 3927 3928 logger.info("[DUT: %s]: Verifying vrf vni details :", dut) 3929 3930 cmd = "show vrf vni json" 3931 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True) 3932 if not bool(vrf_all_vni_json): 3933 errormsg = "No output for '{}' cli".format(cmd) 3934 return errormsg 3935 3936 if "vrfs" in input_dict[dut]: 3937 for vrfs in input_dict[dut]["vrfs"]: 3938 for vrf, vrf_dict in vrfs.items(): 3939 found = False 3940 for vrf_vni_json in vrf_all_vni_json["vrfs"]: 3941 if "vrf" in vrf_vni_json: 3942 if vrf_vni_json["vrf"] != vrf: 3943 continue 3944 3945 for attribute in vrf_dict.keys(): 3946 if vrf_dict[attribute] == vrf_vni_json[attribute]: 3947 found = True 3948 logger.info( 3949 "[DUT %s]: VRF: %s, " 3950 "verifying %s " 3951 ", Found Expected: %s " 3952 "[PASSED]!!", 3953 dut, 3954 vrf, 3955 attribute, 3956 vrf_vni_json[attribute], 3957 ) 3958 else: 3959 errormsg = ( 3960 "[DUT: %s] VRF: %s, " 3961 "verifying %s [FAILED!!] " 3962 ", EXPECTED : %s " 3963 ", FOUND : %s" 3964 % ( 3965 dut, 3966 vrf, 3967 attribute, 3968 vrf_dict[attribute], 3969 vrf_vni_json[attribute], 3970 ) 3971 ) 3972 return errormsg 3973 3974 else: 3975 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % ( 3976 dut, 3977 vrf, 3978 ) 3979 return errormsg 3980 3981 if found: 3982 logger.info( 3983 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!", 3984 dut, 3985 vrf, 3986 ) 3987 return True 3988 3989 else: 3990 errormsg = ( 3991 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut) 3992 ) 3993 return errormsg 3994 3995 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3996 return False 3997 3998 3999def required_linux_kernel_version(required_version): 4000 """ 4001 This API is used to check linux version compatibility of the test suite. 4002 If version mentioned in required_version is higher than the linux kernel 4003 of the system, test suite will be skipped. This API returns true or errormsg. 4004 4005 Parameters 4006 ---------- 4007 * `required_version` : Kernel version required for the suites to run. 4008 4009 Usage 4010 ----- 4011 result = linux_kernel_version_lowerthan('4.15') 4012 4013 Returns 4014 ------- 4015 errormsg(str) or True 4016 """ 4017 system_kernel = platform.release() 4018 if version_cmp(system_kernel, required_version) < 0: 4019 error_msg = ('These tests will not run on kernel "{}", ' 4020 'they require kernel >= {})'.format(system_kernel, 4021 required_version )) 4022 return error_msg 4023 return True 4024