1# 2# Copyright (c) 2019 by VMware, Inc. ("VMware") 3# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. 4# ("NetDEF") in this file. 5# 6# Permission to use, copy, modify, and/or distribute this software 7# for any purpose with or without fee is hereby granted, provided 8# that the above copyright notice and this permission notice appear 9# in all copies. 10# 11# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES 12# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 13# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR 14# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY 15# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, 16# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS 17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE 18# OF THIS SOFTWARE. 19# 20 21from copy import deepcopy 22from time import sleep 23import traceback 24import ipaddress 25import os 26import sys 27from lib import topotest 28from lib.topolog import logger 29 30from lib.topogen import TopoRouter, get_topogen 31 32# Import common_config to use commomnly used APIs 33from lib.common_config import ( 34 create_common_configuration, 35 InvalidCLIError, 36 load_config_to_router, 37 check_address_types, 38 generate_ips, 39 validate_ip_address, 40 find_interface_with_greater_ip, 41 run_frr_cmd, 42 FRRCFG_FILE, 43 retry, 44) 45 46LOGDIR = "/tmp/topotests/" 47TMPDIR = None 48 49 50def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True): 51 """ 52 API to configure bgp on router 53 54 Parameters 55 ---------- 56 * `tgen` : Topogen object 57 * `topo` : json file data 58 * `input_dict` : Input dict data, required when configuring from testcase 59 * `build` : Only for initial setup phase this is set as True. 60 61 Usage 62 ----- 63 input_dict = { 64 "r1": { 65 "bgp": { 66 "local_as": "200", 67 "router_id": "22.22.22.22", 68 "graceful-restart": { 69 "graceful-restart": True, 70 "preserve-fw-state": True, 71 "timer": { 72 "restart-time": 30, 73 "rib-stale-time": 30, 74 "select-defer-time": 30, 75 } 76 }, 77 "address_family": { 78 "ipv4": { 79 "unicast": { 80 "redistribute": [{ 81 "redist_type": "static", 82 "attribute": { 83 "metric" : 123 84 } 85 }, 86 {"redist_type": "connected"} 87 ], 88 "advertise_networks": [ 89 { 90 "network": "20.0.0.0/32", 91 "no_of_network": 10 92 }, 93 { 94 "network": "30.0.0.0/32", 95 "no_of_network": 10 96 } 97 ], 98 "neighbor": { 99 "r3": { 100 "keepalivetimer": 60, 101 "holddowntimer": 180, 102 "dest_link": { 103 "r4": { 104 "allowas-in": { 105 "number_occurences": 2 106 }, 107 "prefix_lists": [ 108 { 109 "name": "pf_list_1", 110 "direction": "in" 111 } 112 ], 113 "route_maps": [{ 114 "name": "RMAP_MED_R3", 115 "direction": "in" 116 }], 117 "next_hop_self": True 118 }, 119 "r1": {"graceful-restart-helper": True} 120 } 121 } 122 } 123 } 124 } 125 } 126 } 127 } 128 } 129 130 131 Returns 132 ------- 133 True or False 134 """ 135 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 136 result = False 137 138 # Flag is used when testing ipv6 over ipv4 or vice-versa 139 afi_test = False 140 141 if not input_dict: 142 input_dict = deepcopy(topo) 143 else: 144 topo = topo["routers"] 145 input_dict = deepcopy(input_dict) 146 147 for router in input_dict.keys(): 148 if "bgp" not in input_dict[router]: 149 logger.debug("Router %s: 'bgp' not present in input_dict", router) 150 continue 151 152 bgp_data_list = input_dict[router]["bgp"] 153 154 if type(bgp_data_list) is not list: 155 bgp_data_list = [bgp_data_list] 156 157 for bgp_data in bgp_data_list: 158 data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build) 159 if data_all_bgp: 160 bgp_addr_data = bgp_data.setdefault("address_family", {}) 161 162 if not bgp_addr_data: 163 logger.debug( 164 "Router %s: 'address_family' not present in " 165 "input_dict for BGP", 166 router, 167 ) 168 else: 169 170 ipv4_data = bgp_addr_data.setdefault("ipv4", {}) 171 ipv6_data = bgp_addr_data.setdefault("ipv6", {}) 172 l2vpn_data = bgp_addr_data.setdefault("l2vpn", {}) 173 174 neigh_unicast = ( 175 True 176 if ipv4_data.setdefault("unicast", {}) 177 or ipv6_data.setdefault("unicast", {}) 178 else False 179 ) 180 181 l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False 182 183 if neigh_unicast: 184 data_all_bgp = __create_bgp_unicast_neighbor( 185 tgen, 186 topo, 187 bgp_data, 188 router, 189 afi_test, 190 config_data=data_all_bgp, 191 ) 192 193 if l2vpn_evpn: 194 data_all_bgp = __create_l2vpn_evpn_address_family( 195 tgen, topo, bgp_data, router, config_data=data_all_bgp 196 ) 197 198 try: 199 result = create_common_configuration( 200 tgen, router, data_all_bgp, "bgp", build, load_config 201 ) 202 except InvalidCLIError: 203 # Traceback 204 errormsg = traceback.format_exc() 205 logger.error(errormsg) 206 return errormsg 207 208 logger.debug("Exiting lib API: create_router_bgp()") 209 return result 210 211 212def __create_bgp_global(tgen, input_dict, router, build=False): 213 """ 214 Helper API to create bgp global configuration. 215 216 Parameters 217 ---------- 218 * `tgen` : Topogen object 219 * `input_dict` : Input dict data, required when configuring from testcase 220 * `router` : router id to be configured. 221 * `build` : Only for initial setup phase this is set as True. 222 223 Returns 224 ------- 225 True or False 226 """ 227 228 result = False 229 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 230 231 bgp_data = input_dict 232 del_bgp_action = bgp_data.setdefault("delete", False) 233 234 config_data = [] 235 236 if "local_as" not in bgp_data and build: 237 logger.debug( 238 "Router %s: 'local_as' not present in input_dict" "for BGP", router 239 ) 240 return False 241 242 local_as = bgp_data.setdefault("local_as", "") 243 cmd = "router bgp {}".format(local_as) 244 vrf_id = bgp_data.setdefault("vrf", None) 245 if vrf_id: 246 cmd = "{} vrf {}".format(cmd, vrf_id) 247 248 if del_bgp_action: 249 cmd = "no {}".format(cmd) 250 config_data.append(cmd) 251 252 return config_data 253 254 config_data.append(cmd) 255 config_data.append("no bgp ebgp-requires-policy") 256 257 router_id = bgp_data.setdefault("router_id", None) 258 del_router_id = bgp_data.setdefault("del_router_id", False) 259 if del_router_id: 260 config_data.append("no bgp router-id") 261 if router_id: 262 config_data.append("bgp router-id {}".format(router_id)) 263 264 config_data.append("no bgp network import-check") 265 266 bst_path = bgp_data.setdefault("bestpath", None) 267 if bst_path: 268 if "aspath" in bst_path: 269 if "delete" in bst_path: 270 config_data.append( 271 "no bgp bestpath as-path {}".format(bst_path["aspath"]) 272 ) 273 else: 274 config_data.append("bgp bestpath as-path {}".format(bst_path["aspath"])) 275 276 if "graceful-restart" in bgp_data: 277 graceful_config = bgp_data["graceful-restart"] 278 279 graceful_restart = graceful_config.setdefault("graceful-restart", None) 280 281 graceful_restart_disable = graceful_config.setdefault( 282 "graceful-restart-disable", None 283 ) 284 285 preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None) 286 287 disable_eor = graceful_config.setdefault("disable-eor", None) 288 289 if graceful_restart == False: 290 cmd = "no bgp graceful-restart" 291 if graceful_restart: 292 cmd = "bgp graceful-restart" 293 294 if graceful_restart is not None: 295 config_data.append(cmd) 296 297 if graceful_restart_disable == False: 298 cmd = "no bgp graceful-restart-disable" 299 if graceful_restart_disable: 300 cmd = "bgp graceful-restart-disable" 301 302 if graceful_restart_disable is not None: 303 config_data.append(cmd) 304 305 if preserve_fw_state == False: 306 cmd = "no bgp graceful-restart preserve-fw-state" 307 if preserve_fw_state: 308 cmd = "bgp graceful-restart preserve-fw-state" 309 310 if preserve_fw_state is not None: 311 config_data.append(cmd) 312 313 if disable_eor == False: 314 cmd = "no bgp graceful-restart disable-eor" 315 if disable_eor: 316 cmd = "bgp graceful-restart disable-eor" 317 318 if disable_eor is not None: 319 config_data.append(cmd) 320 321 if "timer" in bgp_data["graceful-restart"]: 322 timer = bgp_data["graceful-restart"]["timer"] 323 324 if "delete" in timer: 325 del_action = timer["delete"] 326 else: 327 del_action = False 328 329 for rs_timer, value in timer.items(): 330 rs_timer_value = timer.setdefault(rs_timer, None) 331 332 if rs_timer_value and rs_timer != "delete": 333 cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value) 334 335 if del_action: 336 cmd = "no {}".format(cmd) 337 338 config_data.append(cmd) 339 340 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 341 return config_data 342 343 344def __create_bgp_unicast_neighbor( 345 tgen, topo, input_dict, router, afi_test, config_data=None 346): 347 """ 348 Helper API to create configuration for address-family unicast 349 350 Parameters 351 ---------- 352 * `tgen` : Topogen object 353 * `topo` : json file data 354 * `input_dict` : Input dict data, required when configuring from testcase 355 * `router` : router id to be configured. 356 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa 357 * `build` : Only for initial setup phase this is set as True. 358 """ 359 360 result = False 361 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 362 363 add_neigh = True 364 bgp_data = input_dict 365 if "router bgp" in config_data: 366 add_neigh = False 367 368 bgp_data = input_dict["address_family"] 369 370 for addr_type, addr_dict in bgp_data.iteritems(): 371 if not addr_dict: 372 continue 373 374 if not check_address_types(addr_type) and not afi_test: 375 continue 376 377 addr_data = addr_dict["unicast"] 378 if addr_data: 379 config_data.append("address-family {} unicast".format(addr_type)) 380 advertise_network = addr_data.setdefault("advertise_networks", []) 381 for advertise_network_dict in advertise_network: 382 network = advertise_network_dict["network"] 383 if type(network) is not list: 384 network = [network] 385 386 if "no_of_network" in advertise_network_dict: 387 no_of_network = advertise_network_dict["no_of_network"] 388 else: 389 no_of_network = 1 390 391 del_action = advertise_network_dict.setdefault("delete", False) 392 393 # Generating IPs for verification 394 prefix = str(ipaddress.ip_network(unicode(network[0])).prefixlen) 395 network_list = generate_ips(network, no_of_network) 396 for ip in network_list: 397 ip = str(ipaddress.ip_network(unicode(ip)).network_address) 398 399 cmd = "network {}/{}".format(ip, prefix) 400 if del_action: 401 cmd = "no {}".format(cmd) 402 403 config_data.append(cmd) 404 405 max_paths = addr_data.setdefault("maximum_paths", {}) 406 if max_paths: 407 ibgp = max_paths.setdefault("ibgp", None) 408 ebgp = max_paths.setdefault("ebgp", None) 409 if ibgp: 410 config_data.append("maximum-paths ibgp {}".format(ibgp)) 411 if ebgp: 412 config_data.append("maximum-paths {}".format(ebgp)) 413 414 aggregate_addresses = addr_data.setdefault("aggregate_address", []) 415 for aggregate_address in aggregate_addresses: 416 network = aggregate_address.setdefault("network", None) 417 if not network: 418 logger.debug( 419 "Router %s: 'network' not present in " "input_dict for BGP", router 420 ) 421 else: 422 cmd = "aggregate-address {}".format(network) 423 424 as_set = aggregate_address.setdefault("as_set", False) 425 summary = aggregate_address.setdefault("summary", False) 426 del_action = aggregate_address.setdefault("delete", False) 427 if as_set: 428 cmd = "{} as-set".format(cmd) 429 if summary: 430 cmd = "{} summary".format(cmd) 431 432 if del_action: 433 cmd = "no {}".format(cmd) 434 435 config_data.append(cmd) 436 437 redistribute_data = addr_data.setdefault("redistribute", {}) 438 if redistribute_data: 439 for redistribute in redistribute_data: 440 if "redist_type" not in redistribute: 441 logger.debug( 442 "Router %s: 'redist_type' not present in " "input_dict", router 443 ) 444 else: 445 cmd = "redistribute {}".format(redistribute["redist_type"]) 446 redist_attr = redistribute.setdefault("attribute", None) 447 if redist_attr: 448 if isinstance(redist_attr, dict): 449 for key, value in redist_attr.items(): 450 cmd = "{} {} {}".format(cmd, key, value) 451 else: 452 cmd = "{} {}".format(cmd, redist_attr) 453 454 del_action = redistribute.setdefault("delete", False) 455 if del_action: 456 cmd = "no {}".format(cmd) 457 config_data.append(cmd) 458 459 import_vrf_data = addr_data.setdefault("import", {}) 460 if import_vrf_data: 461 cmd = "import vrf {}".format(import_vrf_data["vrf"]) 462 463 del_action = import_vrf_data.setdefault("delete", False) 464 if del_action: 465 cmd = "no {}".format(cmd) 466 config_data.append(cmd) 467 468 if "neighbor" in addr_data: 469 neigh_data = __create_bgp_neighbor( 470 topo, input_dict, router, addr_type, add_neigh 471 ) 472 config_data.extend(neigh_data) 473 474 for addr_type, addr_dict in bgp_data.iteritems(): 475 if not addr_dict or not check_address_types(addr_type): 476 continue 477 478 addr_data = addr_dict["unicast"] 479 if "neighbor" in addr_data: 480 neigh_addr_data = __create_bgp_unicast_address_family( 481 topo, input_dict, router, addr_type, add_neigh 482 ) 483 484 config_data.extend(neigh_addr_data) 485 486 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 487 return config_data 488 489 490def __create_l2vpn_evpn_address_family( 491 tgen, topo, input_dict, router, config_data=None 492): 493 """ 494 Helper API to create configuration for l2vpn evpn address-family 495 496 Parameters 497 ---------- 498 * `tgen` : Topogen object 499 * `topo` : json file data 500 * `input_dict` : Input dict data, required when configuring 501 from testcase 502 * `router` : router id to be configured. 503 * `build` : Only for initial setup phase this is set as True. 504 """ 505 506 result = False 507 508 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 509 510 bgp_data = input_dict["address_family"] 511 512 for family_type, family_dict in bgp_data.iteritems(): 513 if family_type != "l2vpn": 514 continue 515 516 family_data = family_dict["evpn"] 517 if family_data: 518 config_data.append("address-family l2vpn evpn") 519 520 advertise_data = family_data.setdefault("advertise", {}) 521 neighbor_data = family_data.setdefault("neighbor", {}) 522 advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None) 523 rd_data = family_data.setdefault("rd", None) 524 no_rd_data = family_data.setdefault("no rd", False) 525 route_target_data = family_data.setdefault("route-target", {}) 526 527 if advertise_data: 528 for address_type, unicast_type in advertise_data.items(): 529 530 if isinstance(unicast_type, dict): 531 for key, value in unicast_type.items(): 532 cmd = "advertise {} {}".format(address_type, key) 533 534 if value: 535 route_map = value.setdefault("route-map", {}) 536 advertise_del_action = value.setdefault("delete", None) 537 538 if route_map: 539 cmd = "{} route-map {}".format(cmd, route_map) 540 541 if advertise_del_action: 542 cmd = "no {}".format(cmd) 543 544 config_data.append(cmd) 545 546 if neighbor_data: 547 for neighbor, neighbor_data in neighbor_data.items(): 548 ipv4_neighbor = neighbor_data.setdefault("ipv4", {}) 549 ipv6_neighbor = neighbor_data.setdefault("ipv6", {}) 550 551 if ipv4_neighbor: 552 for neighbor_name, action in ipv4_neighbor.items(): 553 neighbor_ip = topo[neighbor]["links"][neighbor_name][ 554 "ipv4" 555 ].split("/")[0] 556 557 if isinstance(action, dict): 558 next_hop_self = action.setdefault("next_hop_self", None) 559 route_maps = action.setdefault("route_maps", {}) 560 561 if next_hop_self is not None: 562 if next_hop_self is True: 563 config_data.append( 564 "neighbor {} " 565 "next-hop-self".format(neighbor_ip) 566 ) 567 elif next_hop_self is False: 568 config_data.append( 569 "no neighbor {} " 570 "next-hop-self".format(neighbor_ip) 571 ) 572 573 if route_maps: 574 for route_map in route_maps: 575 name = route_map.setdefault("name", {}) 576 direction = route_map.setdefault("direction", "in") 577 del_action = route_map.setdefault("delete", False) 578 579 if not name: 580 logger.info( 581 "Router %s: 'name' " 582 "not present in " 583 "input_dict for BGP " 584 "neighbor route name", 585 router, 586 ) 587 else: 588 cmd = "neighbor {} route-map {} " "{}".format( 589 neighbor_ip, name, direction 590 ) 591 592 if del_action: 593 cmd = "no {}".format(cmd) 594 595 config_data.append(cmd) 596 597 else: 598 if action == "activate": 599 cmd = "neighbor {} activate".format(neighbor_ip) 600 elif action == "deactivate": 601 cmd = "no neighbor {} activate".format(neighbor_ip) 602 603 config_data.append(cmd) 604 605 if ipv6_neighbor: 606 for neighbor_name, action in ipv4_neighbor.items(): 607 neighbor_ip = topo[neighbor]["links"][neighbor_name][ 608 "ipv6" 609 ].split("/")[0] 610 if action == "activate": 611 cmd = "neighbor {} activate".format(neighbor_ip) 612 elif action == "deactivate": 613 cmd = "no neighbor {} activate".format(neighbor_ip) 614 615 config_data.append(cmd) 616 617 if advertise_all_vni_data == True: 618 cmd = "advertise-all-vni" 619 config_data.append(cmd) 620 elif advertise_all_vni_data == False: 621 cmd = "no advertise-all-vni" 622 config_data.append(cmd) 623 624 if rd_data: 625 cmd = "rd {}".format(rd_data) 626 config_data.append(cmd) 627 628 if no_rd_data: 629 cmd = "no rd {}".format(no_rd_data) 630 config_data.append(cmd) 631 632 if route_target_data: 633 for rt_type, rt_dict in route_target_data.items(): 634 for _rt_dict in rt_dict: 635 rt_value = _rt_dict.setdefault("value", None) 636 del_rt = _rt_dict.setdefault("delete", None) 637 638 if rt_value: 639 cmd = "route-target {} {}".format(rt_type, rt_value) 640 if del_rt: 641 cmd = "no {}".format(cmd) 642 643 config_data.append(cmd) 644 645 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 646 647 return config_data 648 649 650def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True): 651 """ 652 Helper API to create neighbor specific configuration 653 654 Parameters 655 ---------- 656 * `tgen` : Topogen object 657 * `topo` : json file data 658 * `input_dict` : Input dict data, required when configuring from testcase 659 * `router` : router id to be configured 660 """ 661 662 config_data = [] 663 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 664 665 bgp_data = input_dict["address_family"] 666 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"] 667 668 for name, peer_dict in neigh_data.iteritems(): 669 for dest_link, peer in peer_dict["dest_link"].iteritems(): 670 nh_details = topo[name] 671 672 if "vrfs" in topo[router] or type(nh_details["bgp"]) is list: 673 remote_as = nh_details["bgp"][0]["local_as"] 674 else: 675 remote_as = nh_details["bgp"]["local_as"] 676 677 update_source = None 678 679 if dest_link in nh_details["links"].keys(): 680 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0] 681 # Loopback interface 682 if "source_link" in peer and peer["source_link"] == "lo": 683 update_source = topo[router]["links"]["lo"][addr_type].split("/")[0] 684 685 neigh_cxt = "neighbor {}".format(ip_addr) 686 687 if add_neigh: 688 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as)) 689 if addr_type == "ipv6": 690 config_data.append("address-family ipv6 unicast") 691 config_data.append("{} activate".format(neigh_cxt)) 692 693 disable_connected = peer.setdefault("disable_connected_check", False) 694 keep_alive = peer.setdefault("keepalivetimer", 60) 695 hold_down = peer.setdefault("holddowntimer", 180) 696 password = peer.setdefault("password", None) 697 no_password = peer.setdefault("no_password", None) 698 max_hop_limit = peer.setdefault("ebgp_multihop", 1) 699 graceful_restart = peer.setdefault("graceful-restart", None) 700 graceful_restart_helper = peer.setdefault("graceful-restart-helper", None) 701 graceful_restart_disable = peer.setdefault("graceful-restart-disable", None) 702 703 if update_source: 704 config_data.append( 705 "{} update-source {}".format(neigh_cxt, update_source) 706 ) 707 if disable_connected: 708 config_data.append( 709 "{} disable-connected-check".format(disable_connected) 710 ) 711 if update_source: 712 config_data.append( 713 "{} update-source {}".format(neigh_cxt, update_source) 714 ) 715 if int(keep_alive) != 60 and int(hold_down) != 180: 716 config_data.append( 717 "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down) 718 ) 719 720 if graceful_restart: 721 config_data.append("{} graceful-restart".format(neigh_cxt)) 722 elif graceful_restart == False: 723 config_data.append("no {} graceful-restart".format(neigh_cxt)) 724 725 if graceful_restart_helper: 726 config_data.append("{} graceful-restart-helper".format(neigh_cxt)) 727 elif graceful_restart_helper == False: 728 config_data.append("no {} graceful-restart-helper".format(neigh_cxt)) 729 730 if graceful_restart_disable: 731 config_data.append("{} graceful-restart-disable".format(neigh_cxt)) 732 elif graceful_restart_disable == False: 733 config_data.append("no {} graceful-restart-disable".format(neigh_cxt)) 734 735 if password: 736 config_data.append("{} password {}".format(neigh_cxt, password)) 737 738 if no_password: 739 config_data.append("no {} password {}".format(neigh_cxt, no_password)) 740 741 if max_hop_limit > 1: 742 config_data.append( 743 "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit) 744 ) 745 config_data.append("{} enforce-multihop".format(neigh_cxt)) 746 747 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 748 return config_data 749 750 751def __create_bgp_unicast_address_family( 752 topo, input_dict, router, addr_type, add_neigh=True 753): 754 """ 755 API prints bgp global config to bgp_json file. 756 757 Parameters 758 ---------- 759 * `bgp_cfg` : BGP class variables have BGP config saved in it for 760 particular router, 761 * `local_as_no` : Local as number 762 * `router_id` : Router-id 763 * `ecmp_path` : ECMP max path 764 * `gr_enable` : BGP global gracefull restart config 765 """ 766 767 config_data = [] 768 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 769 770 bgp_data = input_dict["address_family"] 771 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"] 772 773 for peer_name, peer_dict in deepcopy(neigh_data).iteritems(): 774 for dest_link, peer in peer_dict["dest_link"].iteritems(): 775 deactivate = None 776 activate = None 777 nh_details = topo[peer_name] 778 activate_addr_family = peer.setdefault("activate", None) 779 deactivate_addr_family = peer.setdefault("deactivate", None) 780 # Loopback interface 781 if "source_link" in peer and peer["source_link"] == "lo": 782 for destRouterLink, data in sorted(nh_details["links"].iteritems()): 783 if "type" in data and data["type"] == "loopback": 784 if dest_link == destRouterLink: 785 ip_addr = nh_details["links"][destRouterLink][ 786 addr_type 787 ].split("/")[0] 788 789 # Physical interface 790 else: 791 if dest_link in nh_details["links"].keys(): 792 793 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0] 794 if addr_type == "ipv4" and bgp_data["ipv6"]: 795 deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[ 796 0 797 ] 798 799 neigh_cxt = "neighbor {}".format(ip_addr) 800 config_data.append("address-family {} unicast".format(addr_type)) 801 802 if activate_addr_family is not None: 803 config_data.append( 804 "address-family {} unicast".format(activate_addr_family) 805 ) 806 807 config_data.append("{} activate".format(neigh_cxt)) 808 809 if deactivate and activate_addr_family is None: 810 config_data.append("no neighbor {} activate".format(deactivate)) 811 812 if deactivate_addr_family is not None: 813 config_data.append( 814 "address-family {} unicast".format(deactivate_addr_family) 815 ) 816 config_data.append("no {} activate".format(neigh_cxt)) 817 818 next_hop_self = peer.setdefault("next_hop_self", None) 819 send_community = peer.setdefault("send_community", None) 820 prefix_lists = peer.setdefault("prefix_lists", {}) 821 route_maps = peer.setdefault("route_maps", {}) 822 no_send_community = peer.setdefault("no_send_community", None) 823 allowas_in = peer.setdefault("allowas-in", None) 824 825 # next-hop-self 826 if next_hop_self is not None: 827 if next_hop_self is True: 828 config_data.append("{} next-hop-self".format(neigh_cxt)) 829 else: 830 config_data.append("no {} next-hop-self".format(neigh_cxt)) 831 832 # send_community 833 if send_community: 834 config_data.append("{} send-community".format(neigh_cxt)) 835 836 # no_send_community 837 if no_send_community: 838 config_data.append( 839 "no {} send-community {}".format(neigh_cxt, no_send_community) 840 ) 841 842 if "allowas_in" in peer: 843 allow_as_in = peer["allowas_in"] 844 config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in)) 845 846 if "no_allowas_in" in peer: 847 allow_as_in = peer["no_allowas_in"] 848 config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in)) 849 if prefix_lists: 850 for prefix_list in prefix_lists: 851 name = prefix_list.setdefault("name", {}) 852 direction = prefix_list.setdefault("direction", "in") 853 del_action = prefix_list.setdefault("delete", False) 854 if not name: 855 logger.info( 856 "Router %s: 'name' not present in " 857 "input_dict for BGP neighbor prefix lists", 858 router, 859 ) 860 else: 861 cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction) 862 if del_action: 863 cmd = "no {}".format(cmd) 864 config_data.append(cmd) 865 866 if route_maps: 867 for route_map in route_maps: 868 name = route_map.setdefault("name", {}) 869 direction = route_map.setdefault("direction", "in") 870 del_action = route_map.setdefault("delete", False) 871 if not name: 872 logger.info( 873 "Router %s: 'name' not present in " 874 "input_dict for BGP neighbor route name", 875 router, 876 ) 877 else: 878 cmd = "{} route-map {} {}".format(neigh_cxt, name, direction) 879 if del_action: 880 cmd = "no {}".format(cmd) 881 config_data.append(cmd) 882 883 if allowas_in: 884 number_occurences = allowas_in.setdefault("number_occurences", {}) 885 del_action = allowas_in.setdefault("delete", False) 886 887 cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences) 888 889 if del_action: 890 cmd = "no {}".format(cmd) 891 892 config_data.append(cmd) 893 894 return config_data 895 896 897def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict): 898 """ 899 API will save the current config to router's /etc/frr/ for BGPd 900 daemon(bgpd.conf file) 901 902 Paramters 903 --------- 904 * `tgen` : Topogen object 905 * `topo` : json file data 906 * `input_dict` : defines for which router, and which config 907 needs to be modified 908 909 Usage: 910 ------ 911 # Modify graceful-restart config not to set f-bit 912 # and write to /etc/frr 913 914 # Api call to delete advertised networks 915 input_dict_2 = { 916 "r5": { 917 "bgp": { 918 "address_family": { 919 "ipv4": { 920 "unicast": { 921 "advertise_networks": [ 922 { 923 "network": "101.0.20.1/32", 924 "no_of_network": 5, 925 "delete": True 926 } 927 ], 928 } 929 }, 930 "ipv6": { 931 "unicast": { 932 "advertise_networks": [ 933 { 934 "network": "5::1/128", 935 "no_of_network": 5, 936 "delete": True 937 } 938 ], 939 } 940 } 941 } 942 } 943 } 944 } 945 946 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict) 947 948 """ 949 950 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 951 try: 952 953 global LOGDIR 954 955 result = create_router_bgp( 956 tgen, topo, input_dict, build=False, load_config=False 957 ) 958 if result is not True: 959 return result 960 961 # Copy bgp config file to /etc/frr 962 for dut in input_dict.keys(): 963 router_list = tgen.routers() 964 for router, rnode in router_list.iteritems(): 965 if router != dut: 966 continue 967 968 TMPDIR = os.path.join(LOGDIR, tgen.modname) 969 970 logger.info("Delete BGP config when BGPd is down in {}".format(router)) 971 # Reading the config from /tmp/topotests and 972 # copy to /etc/frr/bgpd.conf 973 cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format( 974 TMPDIR, router, FRRCFG_FILE 975 ) 976 router_list[router].run(cmd) 977 978 except Exception as e: 979 # handle any exception 980 logger.error("Error %s occured. Arguments %s.", e.message, e.args) 981 982 # Traceback 983 errormsg = traceback.format_exc() 984 logger.error(errormsg) 985 return errormsg 986 987 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 988 return True 989 990 991############################################# 992# Verification APIs 993############################################# 994@retry(attempts=3, wait=2, return_is_str=True) 995def verify_router_id(tgen, topo, input_dict): 996 """ 997 Running command "show ip bgp json" for DUT and reading router-id 998 from input_dict and verifying with command output. 999 1. Statically modfified router-id should take place 1000 2. When static router-id is deleted highest loopback should 1001 become router-id 1002 3. When loopback intf is down then highest physcial intf 1003 should become router-id 1004 1005 Parameters 1006 ---------- 1007 * `tgen`: topogen object 1008 * `topo`: input json file data 1009 * `input_dict`: input dictionary, have details of Device Under Test, for 1010 which user wants to test the data 1011 Usage 1012 ----- 1013 # Verify if router-id for r1 is 12.12.12.12 1014 input_dict = { 1015 "r1":{ 1016 "router_id": "12.12.12.12" 1017 } 1018 # Verify that router-id for r1 is highest interface ip 1019 input_dict = { 1020 "routers": ["r1"] 1021 } 1022 result = verify_router_id(tgen, topo, input_dict) 1023 1024 Returns 1025 ------- 1026 errormsg(str) or True 1027 """ 1028 1029 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1030 for router in input_dict.keys(): 1031 if router not in tgen.routers(): 1032 continue 1033 1034 rnode = tgen.routers()[router] 1035 1036 del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False) 1037 1038 logger.info("Checking router %s router-id", router) 1039 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) 1040 router_id_out = show_bgp_json["ipv4Unicast"]["routerId"] 1041 router_id_out = ipaddress.IPv4Address(unicode(router_id_out)) 1042 1043 # Once router-id is deleted, highest interface ip should become 1044 # router-id 1045 if del_router_id: 1046 router_id = find_interface_with_greater_ip(topo, router) 1047 else: 1048 router_id = input_dict[router]["bgp"]["router_id"] 1049 router_id = ipaddress.IPv4Address(unicode(router_id)) 1050 1051 if router_id == router_id_out: 1052 logger.info("Found expected router-id %s for router %s", router_id, router) 1053 else: 1054 errormsg = ( 1055 "Router-id for router:{} mismatch, expected:" 1056 " {} but found:{}".format(router, router_id, router_id_out) 1057 ) 1058 return errormsg 1059 1060 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1061 return True 1062 1063 1064@retry(attempts=50, wait=3, return_is_str=True) 1065def verify_bgp_convergence(tgen, topo, dut=None): 1066 """ 1067 API will verify if BGP is converged with in the given time frame. 1068 Running "show bgp summary json" command and verify bgp neighbor 1069 state is established, 1070 Parameters 1071 ---------- 1072 * `tgen`: topogen object 1073 * `topo`: input json file data 1074 * `dut`: device under test 1075 Usage 1076 ----- 1077 # To veriry is BGP is converged for all the routers used in 1078 topology 1079 results = verify_bgp_convergence(tgen, topo, dut="r1") 1080 Returns 1081 ------- 1082 errormsg(str) or True 1083 """ 1084 1085 logger.debug("Entering lib API: verify_bgp_convergence()") 1086 for router, rnode in tgen.routers().iteritems(): 1087 if "bgp" not in topo["routers"][router]: 1088 continue 1089 1090 if dut is not None and dut != router: 1091 continue 1092 1093 logger.info("Verifying BGP Convergence on router %s:", router) 1094 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True) 1095 # Verifying output dictionary show_bgp_json is empty or not 1096 if not bool(show_bgp_json): 1097 errormsg = "BGP is not running" 1098 return errormsg 1099 1100 # To find neighbor ip type 1101 bgp_data_list = topo["routers"][router]["bgp"] 1102 1103 if type(bgp_data_list) is not list: 1104 bgp_data_list = [bgp_data_list] 1105 1106 for bgp_data in bgp_data_list: 1107 if "vrf" in bgp_data: 1108 vrf = bgp_data["vrf"] 1109 if vrf is None: 1110 vrf = "default" 1111 else: 1112 vrf = "default" 1113 1114 # To find neighbor ip type 1115 bgp_addr_type = bgp_data["address_family"] 1116 if "l2vpn" in bgp_addr_type: 1117 total_evpn_peer = 0 1118 1119 if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]: 1120 continue 1121 1122 bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"] 1123 total_evpn_peer += len(bgp_neighbors) 1124 1125 no_of_evpn_peer = 0 1126 for bgp_neighbor, peer_data in bgp_neighbors.items(): 1127 for _addr_type, dest_link_dict in peer_data.items(): 1128 data = topo["routers"][bgp_neighbor]["links"] 1129 for dest_link in dest_link_dict.keys(): 1130 if dest_link in data: 1131 peer_details = peer_data[_addr_type][dest_link] 1132 1133 neighbor_ip = data[dest_link][_addr_type].split("/")[0] 1134 nh_state = None 1135 1136 if ( 1137 "ipv4Unicast" in show_bgp_json[vrf] 1138 or "ipv6Unicast" in show_bgp_json[vrf] 1139 ): 1140 errormsg = ( 1141 "[DUT: %s] VRF: %s, " 1142 "ipv4Unicast/ipv6Unicast" 1143 " address-family present" 1144 " under l2vpn" % (router, vrf) 1145 ) 1146 return errormsg 1147 1148 l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][ 1149 "peers" 1150 ] 1151 nh_state = l2VpnEvpn_data[neighbor_ip]["state"] 1152 1153 if nh_state == "Established": 1154 no_of_evpn_peer += 1 1155 1156 if no_of_evpn_peer == total_evpn_peer: 1157 logger.info( 1158 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers", 1159 router, 1160 vrf, 1161 ) 1162 else: 1163 errormsg = ( 1164 "[DUT: %s] VRF: %s, BGP is not converged " 1165 "for evpn peers" % (router, vrf) 1166 ) 1167 return errormsg 1168 else: 1169 for addr_type in bgp_addr_type.keys(): 1170 if not check_address_types(addr_type): 1171 continue 1172 total_peer = 0 1173 1174 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1175 1176 for bgp_neighbor in bgp_neighbors: 1177 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) 1178 1179 for addr_type in bgp_addr_type.keys(): 1180 if not check_address_types(addr_type): 1181 continue 1182 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1183 1184 no_of_peer = 0 1185 for bgp_neighbor, peer_data in bgp_neighbors.items(): 1186 for dest_link in peer_data["dest_link"].keys(): 1187 data = topo["routers"][bgp_neighbor]["links"] 1188 if dest_link in data: 1189 peer_details = peer_data["dest_link"][dest_link] 1190 # for link local neighbors 1191 if ( 1192 "neighbor_type" in peer_details 1193 and peer_details["neighbor_type"] == "link-local" 1194 ): 1195 neighbor_ip = get_ipv6_linklocal_address( 1196 topo["routers"], bgp_neighbor, dest_link 1197 ) 1198 elif "source_link" in peer_details: 1199 neighbor_ip = topo["routers"][bgp_neighbor][ 1200 "links" 1201 ][peer_details["source_link"]][addr_type].split( 1202 "/" 1203 )[ 1204 0 1205 ] 1206 elif ( 1207 "neighbor_type" in peer_details 1208 and peer_details["neighbor_type"] == "unnumbered" 1209 ): 1210 neighbor_ip = data[dest_link]["peer-interface"] 1211 else: 1212 neighbor_ip = data[dest_link][addr_type].split("/")[ 1213 0 1214 ] 1215 nh_state = None 1216 1217 if addr_type == "ipv4": 1218 ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][ 1219 "peers" 1220 ] 1221 nh_state = ipv4_data[neighbor_ip]["state"] 1222 else: 1223 ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][ 1224 "peers" 1225 ] 1226 nh_state = ipv6_data[neighbor_ip]["state"] 1227 1228 if nh_state == "Established": 1229 no_of_peer += 1 1230 1231 if no_of_peer == total_peer: 1232 logger.info( 1233 "[DUT: %s] VRF: %s, BGP is Converged for %s address-family", 1234 router, 1235 vrf, 1236 addr_type, 1237 ) 1238 else: 1239 errormsg = ( 1240 "[DUT: %s] VRF: %s, BGP is not converged for %s address-family" 1241 % (router, vrf, addr_type) 1242 ) 1243 return errormsg 1244 1245 logger.debug("Exiting API: verify_bgp_convergence()") 1246 return True 1247 1248 1249@retry(attempts=3, wait=4, return_is_str=True) 1250def verify_bgp_community( 1251 tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False 1252): 1253 """ 1254 API to veiryf BGP large community is attached in route for any given 1255 DUT by running "show bgp ipv4/6 {route address} json" command. 1256 1257 Parameters 1258 ---------- 1259 * `tgen`: topogen object 1260 * `addr_type` : ip type, ipv4/ipv6 1261 * `dut`: Device Under Test 1262 * `network`: network for which set criteria needs to be verified 1263 * `input_dict`: having details like - for which router, community and 1264 values needs to be verified 1265 * `vrf`: VRF name 1266 * `bestpath`: To check best path cli 1267 1268 Usage 1269 ----- 1270 networks = ["200.50.2.0/32"] 1271 input_dict = { 1272 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5" 1273 } 1274 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None) 1275 1276 Returns 1277 ------- 1278 errormsg(str) or True 1279 """ 1280 1281 logger.debug("Entering lib API: verify_bgp_community()") 1282 if router not in tgen.routers(): 1283 return False 1284 1285 rnode = tgen.routers()[router] 1286 1287 logger.info( 1288 "Verifying BGP community attributes on dut %s: for %s " "network %s", 1289 router, 1290 addr_type, 1291 network, 1292 ) 1293 1294 command = "show bgp" 1295 1296 sleep(5) 1297 for net in network: 1298 if vrf: 1299 cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net) 1300 elif bestpath: 1301 cmd = "{} {} {} bestpath json".format(command, addr_type, net) 1302 else: 1303 cmd = "{} {} {} json".format(command, addr_type, net) 1304 1305 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True) 1306 if "paths" not in show_bgp_json: 1307 return "Prefix {} not found in BGP table of router: {}".format(net, router) 1308 1309 as_paths = show_bgp_json["paths"] 1310 found = False 1311 for i in range(len(as_paths)): 1312 if ( 1313 "largeCommunity" in show_bgp_json["paths"][i] 1314 or "community" in show_bgp_json["paths"][i] 1315 ): 1316 found = True 1317 logger.info( 1318 "Large Community attribute is found for route:" " %s in router: %s", 1319 net, 1320 router, 1321 ) 1322 if input_dict is not None: 1323 for criteria, comm_val in input_dict.items(): 1324 show_val = show_bgp_json["paths"][i][criteria]["string"] 1325 if comm_val == show_val: 1326 logger.info( 1327 "Verifying BGP %s for prefix: %s" 1328 " in router: %s, found expected" 1329 " value: %s", 1330 criteria, 1331 net, 1332 router, 1333 comm_val, 1334 ) 1335 else: 1336 errormsg = ( 1337 "Failed: Verifying BGP attribute" 1338 " {} for route: {} in router: {}" 1339 ", expected value: {} but found" 1340 ": {}".format(criteria, net, router, comm_val, show_val) 1341 ) 1342 return errormsg 1343 1344 if not found: 1345 errormsg = ( 1346 "Large Community attribute is not found for route: " 1347 "{} in router: {} ".format(net, router) 1348 ) 1349 return errormsg 1350 1351 logger.debug("Exiting lib API: verify_bgp_community()") 1352 return True 1353 1354 1355def modify_as_number(tgen, topo, input_dict): 1356 """ 1357 API reads local_as and remote_as from user defined input_dict and 1358 modify router"s ASNs accordingly. Router"s config is modified and 1359 recent/changed config is loadeded to router. 1360 1361 Parameters 1362 ---------- 1363 * `tgen` : Topogen object 1364 * `topo` : json file data 1365 * `input_dict` : defines for which router ASNs needs to be modified 1366 1367 Usage 1368 ----- 1369 To modify ASNs for router r1 1370 input_dict = { 1371 "r1": { 1372 "bgp": { 1373 "local_as": 131079 1374 } 1375 } 1376 result = modify_as_number(tgen, topo, input_dict) 1377 1378 Returns 1379 ------- 1380 errormsg(str) or True 1381 """ 1382 1383 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1384 try: 1385 1386 new_topo = deepcopy(topo["routers"]) 1387 router_dict = {} 1388 for router in input_dict.keys(): 1389 # Remove bgp configuration 1390 1391 router_dict.update({router: {"bgp": {"delete": True}}}) 1392 1393 new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"] 1394 1395 logger.info("Removing bgp configuration") 1396 create_router_bgp(tgen, topo, router_dict) 1397 1398 logger.info("Applying modified bgp configuration") 1399 create_router_bgp(tgen, new_topo) 1400 1401 except Exception as e: 1402 # handle any exception 1403 logger.error("Error %s occured. Arguments %s.", e.message, e.args) 1404 1405 # Traceback 1406 errormsg = traceback.format_exc() 1407 logger.error(errormsg) 1408 return errormsg 1409 1410 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1411 return True 1412 1413 1414@retry(attempts=3, wait=2, return_is_str=True) 1415def verify_as_numbers(tgen, topo, input_dict): 1416 """ 1417 This API is to verify AS numbers for given DUT by running 1418 "show ip bgp neighbor json" command. Local AS and Remote AS 1419 will ve verified with input_dict data and command output. 1420 1421 Parameters 1422 ---------- 1423 * `tgen`: topogen object 1424 * `topo`: input json file data 1425 * `addr_type` : ip type, ipv4/ipv6 1426 * `input_dict`: defines - for which router, AS numbers needs to be verified 1427 1428 Usage 1429 ----- 1430 input_dict = { 1431 "r1": { 1432 "bgp": { 1433 "local_as": 131079 1434 } 1435 } 1436 } 1437 result = verify_as_numbers(tgen, topo, addr_type, input_dict) 1438 1439 Returns 1440 ------- 1441 errormsg(str) or True 1442 """ 1443 1444 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1445 for router in input_dict.keys(): 1446 if router not in tgen.routers(): 1447 continue 1448 1449 rnode = tgen.routers()[router] 1450 1451 logger.info("Verifying AS numbers for dut %s:", router) 1452 1453 show_ip_bgp_neighbor_json = run_frr_cmd( 1454 rnode, "show ip bgp neighbor json", isjson=True 1455 ) 1456 local_as = input_dict[router]["bgp"]["local_as"] 1457 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 1458 1459 for addr_type in bgp_addr_type: 1460 if not check_address_types(addr_type): 1461 continue 1462 1463 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1464 1465 for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): 1466 remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"] 1467 for dest_link, peer_dict in peer_data["dest_link"].iteritems(): 1468 neighbor_ip = None 1469 data = topo["routers"][bgp_neighbor]["links"] 1470 1471 if dest_link in data: 1472 neighbor_ip = data[dest_link][addr_type].split("/")[0] 1473 neigh_data = show_ip_bgp_neighbor_json[neighbor_ip] 1474 # Verify Local AS for router 1475 if neigh_data["localAs"] != local_as: 1476 errormsg = ( 1477 "Failed: Verify local_as for dut {}," 1478 " found: {} but expected: {}".format( 1479 router, neigh_data["localAs"], local_as 1480 ) 1481 ) 1482 return errormsg 1483 else: 1484 logger.info( 1485 "Verified local_as for dut %s, found" " expected: %s", 1486 router, 1487 local_as, 1488 ) 1489 1490 # Verify Remote AS for neighbor 1491 if neigh_data["remoteAs"] != remote_as: 1492 errormsg = ( 1493 "Failed: Verify remote_as for dut " 1494 "{}'s neighbor {}, found: {} but " 1495 "expected: {}".format( 1496 router, bgp_neighbor, neigh_data["remoteAs"], remote_as 1497 ) 1498 ) 1499 return errormsg 1500 else: 1501 logger.info( 1502 "Verified remote_as for dut %s's " 1503 "neighbor %s, found expected: %s", 1504 router, 1505 bgp_neighbor, 1506 remote_as, 1507 ) 1508 1509 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1510 return True 1511 1512 1513@retry(attempts=50, wait=3, return_is_str=True) 1514def verify_bgp_convergence_from_running_config(tgen, dut=None): 1515 """ 1516 API to verify BGP convergence b/w loopback and physical interface. 1517 This API would be used when routers have BGP neighborship is loopback 1518 to physical or vice-versa 1519 1520 Parameters 1521 ---------- 1522 * `tgen`: topogen object 1523 * `dut`: device under test 1524 1525 Usage 1526 ----- 1527 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo, 1528 dut="r1") 1529 1530 Returns 1531 ------- 1532 errormsg(str) or True 1533 """ 1534 1535 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1536 1537 for router, rnode in tgen.routers().iteritems(): 1538 if dut is not None and dut != router: 1539 continue 1540 1541 logger.info("Verifying BGP Convergence on router %s:", router) 1542 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True) 1543 # Verifying output dictionary show_bgp_json is empty or not 1544 if not bool(show_bgp_json): 1545 errormsg = "BGP is not running" 1546 return errormsg 1547 1548 for vrf, addr_family_data in show_bgp_json.items(): 1549 for address_family, neighborship_data in addr_family_data.items(): 1550 total_peer = 0 1551 no_of_peer = 0 1552 1553 total_peer = len(neighborship_data["peers"].keys()) 1554 1555 for peer, peer_data in neighborship_data["peers"].items(): 1556 if peer_data["state"] == "Established": 1557 no_of_peer += 1 1558 1559 if total_peer != no_of_peer: 1560 errormsg = ( 1561 "[DUT: %s] VRF: %s, BGP is not converged" 1562 " for peer: %s" % (router, vrf, peer) 1563 ) 1564 return errormsg 1565 1566 logger.info("[DUT: %s]: vrf: %s, BGP is Converged", router, vrf) 1567 1568 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1569 1570 return True 1571 1572 1573def clear_bgp(tgen, addr_type, router, vrf=None): 1574 """ 1575 This API is to clear bgp neighborship by running 1576 clear ip bgp */clear bgp ipv6 * command, 1577 1578 Parameters 1579 ---------- 1580 * `tgen`: topogen object 1581 * `addr_type`: ip type ipv4/ipv6 1582 * `router`: device under test 1583 * `vrf`: vrf name 1584 1585 Usage 1586 ----- 1587 clear_bgp(tgen, addr_type, "r1") 1588 """ 1589 1590 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1591 1592 if router not in tgen.routers(): 1593 return False 1594 1595 rnode = tgen.routers()[router] 1596 1597 if vrf: 1598 if type(vrf) is not list: 1599 vrf = [vrf] 1600 1601 # Clearing BGP 1602 logger.info("Clearing BGP neighborship for router %s..", router) 1603 if addr_type == "ipv4": 1604 if vrf: 1605 for _vrf in vrf: 1606 run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf)) 1607 else: 1608 run_frr_cmd(rnode, "clear ip bgp *") 1609 elif addr_type == "ipv6": 1610 if vrf: 1611 for _vrf in vrf: 1612 run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf)) 1613 else: 1614 run_frr_cmd(rnode, "clear bgp ipv6 *") 1615 else: 1616 run_frr_cmd(rnode, "clear bgp *") 1617 1618 sleep(5) 1619 1620 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1621 1622 1623def clear_bgp_and_verify(tgen, topo, router): 1624 """ 1625 This API is to clear bgp neighborship and verify bgp neighborship 1626 is coming up(BGP is converged) usinf "show bgp summary json" command 1627 and also verifying for all bgp neighbors uptime before and after 1628 clear bgp sessions is different as the uptime must be changed once 1629 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd. 1630 1631 Parameters 1632 ---------- 1633 * `tgen`: topogen object 1634 * `topo`: input json file data 1635 * `router`: device under test 1636 1637 Usage 1638 ----- 1639 result = clear_bgp_and_verify(tgen, topo, addr_type, dut) 1640 1641 Returns 1642 ------- 1643 errormsg(str) or True 1644 """ 1645 1646 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1647 1648 if router not in tgen.routers(): 1649 return False 1650 1651 rnode = tgen.routers()[router] 1652 1653 peer_uptime_before_clear_bgp = {} 1654 sleeptime = 3 1655 1656 # Verifying BGP convergence before bgp clear command 1657 for retry in range(50): 1658 # Waiting for BGP to converge 1659 logger.info( 1660 "Waiting for %s sec for BGP to converge on router" " %s...", 1661 sleeptime, 1662 router, 1663 ) 1664 sleep(sleeptime) 1665 1666 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) 1667 # Verifying output dictionary show_bgp_json is empty or not 1668 if not bool(show_bgp_json): 1669 errormsg = "BGP is not running" 1670 return errormsg 1671 1672 # To find neighbor ip type 1673 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 1674 total_peer = 0 1675 for addr_type in bgp_addr_type.keys(): 1676 1677 if not check_address_types(addr_type): 1678 continue 1679 1680 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1681 1682 for bgp_neighbor in bgp_neighbors: 1683 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) 1684 1685 no_of_peer = 0 1686 for addr_type in bgp_addr_type: 1687 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1688 1689 for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): 1690 for dest_link, peer_dict in peer_data["dest_link"].iteritems(): 1691 data = topo["routers"][bgp_neighbor]["links"] 1692 1693 if dest_link in data: 1694 neighbor_ip = data[dest_link][addr_type].split("/")[0] 1695 if addr_type == "ipv4": 1696 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] 1697 nh_state = ipv4_data[neighbor_ip]["state"] 1698 1699 # Peer up time dictionary 1700 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[ 1701 neighbor_ip 1702 ]["peerUptimeEstablishedEpoch"] 1703 else: 1704 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] 1705 nh_state = ipv6_data[neighbor_ip]["state"] 1706 1707 # Peer up time dictionary 1708 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[ 1709 neighbor_ip 1710 ]["peerUptimeEstablishedEpoch"] 1711 1712 if nh_state == "Established": 1713 no_of_peer += 1 1714 1715 if no_of_peer == total_peer: 1716 logger.info("BGP is Converged for router %s before bgp" " clear", router) 1717 break 1718 else: 1719 logger.info( 1720 "BGP is not yet Converged for router %s " "before bgp clear", router 1721 ) 1722 else: 1723 errormsg = ( 1724 "TIMEOUT!! BGP is not converged in {} seconds for" 1725 " router {}".format(retry * sleeptime, router) 1726 ) 1727 return errormsg 1728 1729 # Clearing BGP 1730 logger.info("Clearing BGP neighborship for router %s..", router) 1731 for addr_type in bgp_addr_type.keys(): 1732 if addr_type == "ipv4": 1733 run_frr_cmd(rnode, "clear ip bgp *") 1734 elif addr_type == "ipv6": 1735 run_frr_cmd(rnode, "clear bgp ipv6 *") 1736 1737 peer_uptime_after_clear_bgp = {} 1738 # Verifying BGP convergence after bgp clear command 1739 for retry in range(50): 1740 1741 # Waiting for BGP to converge 1742 logger.info( 1743 "Waiting for %s sec for BGP to converge on router" " %s...", 1744 sleeptime, 1745 router, 1746 ) 1747 sleep(sleeptime) 1748 1749 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True) 1750 # Verifying output dictionary show_bgp_json is empty or not 1751 if not bool(show_bgp_json): 1752 errormsg = "BGP is not running" 1753 return errormsg 1754 1755 # To find neighbor ip type 1756 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 1757 total_peer = 0 1758 for addr_type in bgp_addr_type.keys(): 1759 if not check_address_types(addr_type): 1760 continue 1761 1762 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1763 1764 for bgp_neighbor in bgp_neighbors: 1765 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"]) 1766 1767 no_of_peer = 0 1768 for addr_type in bgp_addr_type: 1769 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1770 1771 for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): 1772 for dest_link, peer_dict in peer_data["dest_link"].iteritems(): 1773 data = topo["routers"][bgp_neighbor]["links"] 1774 1775 if dest_link in data: 1776 neighbor_ip = data[dest_link][addr_type].split("/")[0] 1777 if addr_type == "ipv4": 1778 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] 1779 nh_state = ipv4_data[neighbor_ip]["state"] 1780 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[ 1781 neighbor_ip 1782 ]["peerUptimeEstablishedEpoch"] 1783 else: 1784 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] 1785 nh_state = ipv6_data[neighbor_ip]["state"] 1786 # Peer up time dictionary 1787 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[ 1788 neighbor_ip 1789 ]["peerUptimeEstablishedEpoch"] 1790 1791 if nh_state == "Established": 1792 no_of_peer += 1 1793 1794 if no_of_peer == total_peer: 1795 logger.info("BGP is Converged for router %s after bgp clear", router) 1796 break 1797 else: 1798 logger.info( 1799 "BGP is not yet Converged for router %s after" " bgp clear", router 1800 ) 1801 else: 1802 errormsg = ( 1803 "TIMEOUT!! BGP is not converged in {} seconds for" 1804 " router {}".format(retry * sleeptime, router) 1805 ) 1806 return errormsg 1807 1808 # Comparing peerUptimeEstablishedEpoch dictionaries 1809 if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp: 1810 logger.info("BGP neighborship is reset after clear BGP on router %s", router) 1811 else: 1812 errormsg = ( 1813 "BGP neighborship is not reset after clear bgp on router" 1814 " {}".format(router) 1815 ) 1816 return errormsg 1817 1818 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 1819 return True 1820 1821 1822def verify_bgp_timers_and_functionality(tgen, topo, input_dict): 1823 """ 1824 To verify BGP timer config, execute "show ip bgp neighbor json" command 1825 and verify bgp timers with input_dict data. 1826 To veirfy bgp timers functonality, shutting down peer interface 1827 and verify BGP neighborship status. 1828 1829 Parameters 1830 ---------- 1831 * `tgen`: topogen object 1832 * `topo`: input json file data 1833 * `addr_type`: ip type, ipv4/ipv6 1834 * `input_dict`: defines for which router, bgp timers needs to be verified 1835 1836 Usage: 1837 # To verify BGP timers for neighbor r2 of router r1 1838 input_dict = { 1839 "r1": { 1840 "bgp": { 1841 "bgp_neighbors":{ 1842 "r2":{ 1843 "keepalivetimer": 5, 1844 "holddowntimer": 15, 1845 }}}}} 1846 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4", 1847 input_dict) 1848 1849 Returns 1850 ------- 1851 errormsg(str) or True 1852 """ 1853 1854 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 1855 sleep(5) 1856 router_list = tgen.routers() 1857 for router in input_dict.keys(): 1858 if router not in router_list: 1859 continue 1860 1861 rnode = router_list[router] 1862 1863 logger.info("Verifying bgp timers functionality, DUT is %s:", router) 1864 1865 show_ip_bgp_neighbor_json = run_frr_cmd( 1866 rnode, "show ip bgp neighbor json", isjson=True 1867 ) 1868 1869 bgp_addr_type = input_dict[router]["bgp"]["address_family"] 1870 1871 for addr_type in bgp_addr_type: 1872 if not check_address_types(addr_type): 1873 continue 1874 1875 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 1876 for bgp_neighbor, peer_data in bgp_neighbors.iteritems(): 1877 for dest_link, peer_dict in peer_data["dest_link"].iteritems(): 1878 data = topo["routers"][bgp_neighbor]["links"] 1879 1880 keepalivetimer = peer_dict["keepalivetimer"] 1881 holddowntimer = peer_dict["holddowntimer"] 1882 1883 if dest_link in data: 1884 neighbor_ip = data[dest_link][addr_type].split("/")[0] 1885 neighbor_intf = data[dest_link]["interface"] 1886 1887 # Verify HoldDownTimer for neighbor 1888 bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][ 1889 "bgpTimerHoldTimeMsecs" 1890 ] 1891 if bgpHoldTimeMsecs != holddowntimer * 1000: 1892 errormsg = ( 1893 "Verifying holddowntimer for bgp " 1894 "neighbor {} under dut {}, found: {} " 1895 "but expected: {}".format( 1896 neighbor_ip, 1897 router, 1898 bgpHoldTimeMsecs, 1899 holddowntimer * 1000, 1900 ) 1901 ) 1902 return errormsg 1903 1904 # Verify KeepAliveTimer for neighbor 1905 bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][ 1906 "bgpTimerKeepAliveIntervalMsecs" 1907 ] 1908 if bgpKeepAliveTimeMsecs != keepalivetimer * 1000: 1909 errormsg = ( 1910 "Verifying keepalivetimer for bgp " 1911 "neighbor {} under dut {}, found: {} " 1912 "but expected: {}".format( 1913 neighbor_ip, 1914 router, 1915 bgpKeepAliveTimeMsecs, 1916 keepalivetimer * 1000, 1917 ) 1918 ) 1919 return errormsg 1920 1921 #################### 1922 # Shutting down peer interface after keepalive time and 1923 # after some time bringing up peer interface. 1924 # verifying BGP neighborship in (hold down-keep alive) 1925 # time, it should not go down 1926 #################### 1927 1928 # Wait till keep alive time 1929 logger.info("=" * 20) 1930 logger.info("Scenario 1:") 1931 logger.info( 1932 "Shutdown and bring up peer interface: %s " 1933 "in keep alive time : %s sec and verify " 1934 " BGP neighborship is intact in %s sec ", 1935 neighbor_intf, 1936 keepalivetimer, 1937 (holddowntimer - keepalivetimer), 1938 ) 1939 logger.info("=" * 20) 1940 logger.info("Waiting for %s sec..", keepalivetimer) 1941 sleep(keepalivetimer) 1942 1943 # Shutting down peer ineterface 1944 logger.info( 1945 "Shutting down interface %s on router %s", 1946 neighbor_intf, 1947 bgp_neighbor, 1948 ) 1949 topotest.interface_set_status( 1950 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False 1951 ) 1952 1953 # Bringing up peer interface 1954 sleep(5) 1955 logger.info( 1956 "Bringing up interface %s on router %s..", 1957 neighbor_intf, 1958 bgp_neighbor, 1959 ) 1960 topotest.interface_set_status( 1961 router_list[bgp_neighbor], neighbor_intf, ifaceaction=True 1962 ) 1963 1964 # Verifying BGP neighborship is intact in 1965 # (holddown - keepalive) time 1966 for timer in range( 1967 keepalivetimer, holddowntimer, int(holddowntimer / 3) 1968 ): 1969 logger.info("Waiting for %s sec..", keepalivetimer) 1970 sleep(keepalivetimer) 1971 sleep(2) 1972 show_bgp_json = run_frr_cmd( 1973 rnode, "show bgp summary json", isjson=True 1974 ) 1975 1976 if addr_type == "ipv4": 1977 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] 1978 nh_state = ipv4_data[neighbor_ip]["state"] 1979 else: 1980 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] 1981 nh_state = ipv6_data[neighbor_ip]["state"] 1982 1983 if timer == (holddowntimer - keepalivetimer): 1984 if nh_state != "Established": 1985 errormsg = ( 1986 "BGP neighborship has not gone " 1987 "down in {} sec for neighbor {}".format( 1988 timer, bgp_neighbor 1989 ) 1990 ) 1991 return errormsg 1992 else: 1993 logger.info( 1994 "BGP neighborship is intact in %s" 1995 " sec for neighbor %s", 1996 timer, 1997 bgp_neighbor, 1998 ) 1999 2000 #################### 2001 # Shutting down peer interface and verifying that BGP 2002 # neighborship is going down in holddown time 2003 #################### 2004 logger.info("=" * 20) 2005 logger.info("Scenario 2:") 2006 logger.info( 2007 "Shutdown peer interface: %s and verify BGP" 2008 " neighborship has gone down in hold down " 2009 "time %s sec", 2010 neighbor_intf, 2011 holddowntimer, 2012 ) 2013 logger.info("=" * 20) 2014 2015 logger.info( 2016 "Shutting down interface %s on router %s..", 2017 neighbor_intf, 2018 bgp_neighbor, 2019 ) 2020 topotest.interface_set_status( 2021 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False 2022 ) 2023 2024 # Verifying BGP neighborship is going down in holddown time 2025 for timer in range( 2026 keepalivetimer, 2027 (holddowntimer + keepalivetimer), 2028 int(holddowntimer / 3), 2029 ): 2030 logger.info("Waiting for %s sec..", keepalivetimer) 2031 sleep(keepalivetimer) 2032 sleep(2) 2033 show_bgp_json = run_frr_cmd( 2034 rnode, "show bgp summary json", isjson=True 2035 ) 2036 2037 if addr_type == "ipv4": 2038 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"] 2039 nh_state = ipv4_data[neighbor_ip]["state"] 2040 else: 2041 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"] 2042 nh_state = ipv6_data[neighbor_ip]["state"] 2043 2044 if timer == holddowntimer: 2045 if nh_state == "Established": 2046 errormsg = ( 2047 "BGP neighborship has not gone " 2048 "down in {} sec for neighbor {}".format( 2049 timer, bgp_neighbor 2050 ) 2051 ) 2052 return errormsg 2053 else: 2054 logger.info( 2055 "BGP neighborship has gone down in" 2056 " %s sec for neighbor %s", 2057 timer, 2058 bgp_neighbor, 2059 ) 2060 2061 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2062 return True 2063 2064 2065@retry(attempts=3, wait=4, return_is_str=True) 2066def verify_bgp_attributes( 2067 tgen, 2068 addr_type, 2069 dut, 2070 static_routes, 2071 rmap_name=None, 2072 input_dict=None, 2073 seq_id=None, 2074 nexthop=None, 2075): 2076 """ 2077 API will verify BGP attributes set by Route-map for given prefix and 2078 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command 2079 in DUT to verify BGP attributes set by route-map, Set attributes 2080 values will be read from input_dict and verified with command output. 2081 2082 * `tgen`: topogen object 2083 * `addr_type` : ip type, ipv4/ipv6 2084 * `dut`: Device Under Test 2085 * `static_routes`: Static Routes for which BGP set attributes needs to be 2086 verified 2087 * `rmap_name`: route map name for which set criteria needs to be verified 2088 * `input_dict`: defines for which router, AS numbers needs 2089 * `seq_id`: sequence number of rmap, default is None 2090 2091 Usage 2092 ----- 2093 # To verify BGP attribute "localpref" set to 150 and "med" set to 30 2094 for prefix 10.0.20.1/32 in router r3. 2095 input_dict = { 2096 "r3": { 2097 "route_maps": { 2098 "rmap_match_pf_list1": [ 2099 { 2100 "action": "PERMIT", 2101 "match": {"prefix_list": "pf_list_1"}, 2102 "set": {"localpref": 150, "med": 30} 2103 } 2104 ], 2105 }, 2106 "as_path": "500 400" 2107 } 2108 } 2109 static_routes (list) = ["10.0.20.1/32"] 2110 2111 2112 2113 Returns 2114 ------- 2115 errormsg(str) or True 2116 """ 2117 2118 logger.debug("Entering lib API: verify_bgp_attributes()") 2119 for router, rnode in tgen.routers().iteritems(): 2120 if router != dut: 2121 continue 2122 2123 logger.info("Verifying BGP set attributes for dut {}:".format(router)) 2124 2125 for static_route in static_routes: 2126 cmd = "show bgp {} {} json".format(addr_type, static_route) 2127 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True) 2128 2129 dict_to_test = [] 2130 tmp_list = [] 2131 2132 if "route_maps" in input_dict.values()[0]: 2133 for rmap_router in input_dict.keys(): 2134 for rmap, values in input_dict[rmap_router]["route_maps"].items(): 2135 if rmap == rmap_name: 2136 dict_to_test = values 2137 for rmap_dict in values: 2138 if seq_id is not None: 2139 if type(seq_id) is not list: 2140 seq_id = [seq_id] 2141 2142 if "seq_id" in rmap_dict: 2143 rmap_seq_id = rmap_dict["seq_id"] 2144 for _seq_id in seq_id: 2145 if _seq_id == rmap_seq_id: 2146 tmp_list.append(rmap_dict) 2147 if tmp_list: 2148 dict_to_test = tmp_list 2149 2150 value = None 2151 for rmap_dict in dict_to_test: 2152 if "set" in rmap_dict: 2153 for criteria in rmap_dict["set"].keys(): 2154 found = False 2155 for path in show_bgp_json["paths"]: 2156 if criteria not in path: 2157 continue 2158 2159 if criteria == "aspath": 2160 value = path[criteria]["string"] 2161 else: 2162 value = path[criteria] 2163 2164 if rmap_dict["set"][criteria] == value: 2165 found = True 2166 logger.info( 2167 "Verifying BGP " 2168 "attribute {} for" 2169 " route: {} in " 2170 "router: {}, found" 2171 " expected value:" 2172 " {}".format( 2173 criteria, 2174 static_route, 2175 dut, 2176 value, 2177 ) 2178 ) 2179 break 2180 2181 if not found: 2182 errormsg = ( 2183 "Failed: Verifying BGP " 2184 "attribute {} for route:" 2185 " {} in router: {}, " 2186 " expected value: {} but" 2187 " found: {}".format( 2188 criteria, 2189 static_route, 2190 dut, 2191 rmap_dict["set"][criteria], 2192 value, 2193 ) 2194 ) 2195 return errormsg 2196 2197 logger.debug("Exiting lib API: verify_bgp_attributes()") 2198 return True 2199 2200 2201@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 2202def verify_best_path_as_per_bgp_attribute( 2203 tgen, addr_type, router, input_dict, attribute 2204): 2205 """ 2206 API is to verify best path according to BGP attributes for given routes. 2207 "show bgp ipv4/6 json" command will be run and verify best path according 2208 to shortest as-path, highest local-preference and med, lowest weight and 2209 route origin IGP>EGP>INCOMPLETE. 2210 Parameters 2211 ---------- 2212 * `tgen` : topogen object 2213 * `addr_type` : ip type, ipv4/ipv6 2214 * `tgen` : topogen object 2215 * `attribute` : calculate best path using this attribute 2216 * `input_dict`: defines different routes to calculate for which route 2217 best path is selected 2218 Usage 2219 ----- 2220 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from 2221 router r7 to router r1(DUT) as per shortest as-path attribute 2222 input_dict = { 2223 "r7": { 2224 "bgp": { 2225 "address_family": { 2226 "ipv4": { 2227 "unicast": { 2228 "advertise_networks": [ 2229 { 2230 "network": "200.50.2.0/32" 2231 }, 2232 { 2233 "network": "200.60.2.0/32" 2234 } 2235 ] 2236 } 2237 } 2238 } 2239 } 2240 } 2241 } 2242 attribute = "locPrf" 2243 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \ 2244 input_dict, attribute) 2245 Returns 2246 ------- 2247 errormsg(str) or True 2248 """ 2249 2250 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2251 2252 if router not in tgen.routers(): 2253 return False 2254 2255 rnode = tgen.routers()[router] 2256 2257 # Verifying show bgp json 2258 command = "show bgp" 2259 2260 sleep(2) 2261 logger.info("Verifying router %s RIB for best path:", router) 2262 2263 static_route = False 2264 advertise_network = False 2265 for route_val in input_dict.values(): 2266 if "static_routes" in route_val: 2267 static_route = True 2268 networks = route_val["static_routes"] 2269 else: 2270 advertise_network = True 2271 net_data = route_val["bgp"]["address_family"][addr_type]["unicast"] 2272 networks = net_data["advertise_networks"] 2273 2274 for network in networks: 2275 _network = network["network"] 2276 no_of_ip = network.setdefault("no_of_ip", 1) 2277 vrf = network.setdefault("vrf", None) 2278 2279 if vrf: 2280 cmd = "{} vrf {}".format(command, vrf) 2281 else: 2282 cmd = command 2283 2284 cmd = "{} {}".format(cmd, addr_type) 2285 cmd = "{} json".format(cmd) 2286 sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True) 2287 2288 routes = generate_ips(_network, no_of_ip) 2289 for route in routes: 2290 route = str(ipaddress.ip_network(unicode(route))) 2291 2292 if route in sh_ip_bgp_json["routes"]: 2293 route_attributes = sh_ip_bgp_json["routes"][route] 2294 _next_hop = None 2295 compare = None 2296 attribute_dict = {} 2297 for route_attribute in route_attributes: 2298 next_hops = route_attribute["nexthops"] 2299 for next_hop in next_hops: 2300 next_hop_ip = next_hop["ip"] 2301 attribute_dict[next_hop_ip] = route_attribute[attribute] 2302 2303 # AS_PATH attribute 2304 if attribute == "path": 2305 # Find next_hop for the route have minimum as_path 2306 _next_hop = min( 2307 attribute_dict, key=lambda x: len(set(attribute_dict[x])) 2308 ) 2309 compare = "SHORTEST" 2310 2311 # LOCAL_PREF attribute 2312 elif attribute == "locPrf": 2313 # Find next_hop for the route have highest local preference 2314 _next_hop = max( 2315 attribute_dict, key=(lambda k: attribute_dict[k]) 2316 ) 2317 compare = "HIGHEST" 2318 2319 # WEIGHT attribute 2320 elif attribute == "weight": 2321 # Find next_hop for the route have highest weight 2322 _next_hop = max( 2323 attribute_dict, key=(lambda k: attribute_dict[k]) 2324 ) 2325 compare = "HIGHEST" 2326 2327 # ORIGIN attribute 2328 elif attribute == "origin": 2329 # Find next_hop for the route have IGP as origin, - 2330 # - rule is IGP>EGP>INCOMPLETE 2331 _next_hop = [ 2332 key 2333 for (key, value) in attribute_dict.iteritems() 2334 if value == "IGP" 2335 ][0] 2336 compare = "" 2337 2338 # MED attribute 2339 elif attribute == "metric": 2340 # Find next_hop for the route have LOWEST MED 2341 _next_hop = min( 2342 attribute_dict, key=(lambda k: attribute_dict[k]) 2343 ) 2344 compare = "LOWEST" 2345 2346 # Show ip route 2347 if addr_type == "ipv4": 2348 command_1 = "show ip route" 2349 else: 2350 command_1 = "show ipv6 route" 2351 2352 if vrf: 2353 cmd = "{} vrf {} json".format(command_1, vrf) 2354 else: 2355 cmd = "{} json".format(command_1) 2356 2357 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 2358 2359 # Verifying output dictionary rib_routes_json is not empty 2360 if not bool(rib_routes_json): 2361 errormsg = "No route found in RIB of router {}..".format(router) 2362 return errormsg 2363 2364 st_found = False 2365 nh_found = False 2366 # Find best is installed in RIB 2367 if route in rib_routes_json: 2368 st_found = True 2369 # Verify next_hop in rib_routes_json 2370 if ( 2371 rib_routes_json[route][0]["nexthops"][0]["ip"] 2372 in attribute_dict 2373 ): 2374 nh_found = True 2375 else: 2376 errormsg = ( 2377 "Incorrect Nexthop for BGP route {} in " 2378 "RIB of router {}, Expected: {}, Found:" 2379 " {}\n".format( 2380 route, 2381 router, 2382 rib_routes_json[route][0]["nexthops"][0]["ip"], 2383 _next_hop, 2384 ) 2385 ) 2386 return errormsg 2387 2388 if st_found and nh_found: 2389 logger.info( 2390 "Best path for prefix: %s with next_hop: %s is " 2391 "installed according to %s %s: (%s) in RIB of " 2392 "router %s", 2393 route, 2394 _next_hop, 2395 compare, 2396 attribute, 2397 attribute_dict[_next_hop], 2398 router, 2399 ) 2400 2401 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2402 return True 2403 2404 2405def verify_best_path_as_per_admin_distance( 2406 tgen, addr_type, router, input_dict, attribute 2407): 2408 """ 2409 API is to verify best path according to admin distance for given 2410 route. "show ip/ipv6 route json" command will be run and verify 2411 best path accoring to shortest admin distanc. 2412 2413 Parameters 2414 ---------- 2415 * `addr_type` : ip type, ipv4/ipv6 2416 * `dut`: Device Under Test 2417 * `tgen` : topogen object 2418 * `attribute` : calculate best path using admin distance 2419 * `input_dict`: defines different routes with different admin distance 2420 to calculate for which route best path is selected 2421 Usage 2422 ----- 2423 # To verify best path for route 200.50.2.0/32 from router r2 to 2424 router r1(DUT) as per shortest admin distance which is 60. 2425 input_dict = { 2426 "r2": { 2427 "static_routes": [{"network": "200.50.2.0/32", \ 2428 "admin_distance": 80, "next_hop": "10.0.0.14"}, 2429 {"network": "200.50.2.0/32", \ 2430 "admin_distance": 60, "next_hop": "10.0.0.18"}] 2431 }} 2432 attribute = "locPrf" 2433 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \ 2434 input_dict, attribute): 2435 Returns 2436 ------- 2437 errormsg(str) or True 2438 """ 2439 2440 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2441 router_list = tgen.routers() 2442 if router not in router_list: 2443 return False 2444 2445 rnode = tgen.routers()[router] 2446 2447 sleep(5) 2448 logger.info("Verifying router %s RIB for best path:", router) 2449 2450 # Show ip route cmd 2451 if addr_type == "ipv4": 2452 command = "show ip route json" 2453 else: 2454 command = "show ipv6 route json" 2455 2456 for routes_from_router in input_dict.keys(): 2457 sh_ip_route_json = router_list[routes_from_router].vtysh_cmd( 2458 command, isjson=True 2459 ) 2460 networks = input_dict[routes_from_router]["static_routes"] 2461 for network in networks: 2462 route = network["network"] 2463 2464 route_attributes = sh_ip_route_json[route] 2465 _next_hop = None 2466 compare = None 2467 attribute_dict = {} 2468 for route_attribute in route_attributes: 2469 next_hops = route_attribute["nexthops"] 2470 for next_hop in next_hops: 2471 next_hop_ip = next_hop["ip"] 2472 attribute_dict[next_hop_ip] = route_attribute["distance"] 2473 2474 # Find next_hop for the route have LOWEST Admin Distance 2475 _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k])) 2476 compare = "LOWEST" 2477 2478 # Show ip route 2479 rib_routes_json = run_frr_cmd(rnode, command, isjson=True) 2480 2481 # Verifying output dictionary rib_routes_json is not empty 2482 if not bool(rib_routes_json): 2483 errormsg = "No route found in RIB of router {}..".format(router) 2484 return errormsg 2485 2486 st_found = False 2487 nh_found = False 2488 # Find best is installed in RIB 2489 if route in rib_routes_json: 2490 st_found = True 2491 # Verify next_hop in rib_routes_json 2492 if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop: 2493 nh_found = True 2494 else: 2495 errormsg = ( 2496 "Nexthop {} is Missing for BGP route {}" 2497 " in RIB of router {}\n".format(_next_hop, route, router) 2498 ) 2499 return errormsg 2500 2501 if st_found and nh_found: 2502 logger.info( 2503 "Best path for prefix: %s is installed according" 2504 " to %s %s: (%s) in RIB of router %s", 2505 route, 2506 compare, 2507 attribute, 2508 attribute_dict[_next_hop], 2509 router, 2510 ) 2511 2512 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 2513 return True 2514 2515 2516@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) 2517def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None): 2518 """ 2519 This API is to verify whether bgp rib has any 2520 matching route for a nexthop. 2521 2522 Parameters 2523 ---------- 2524 * `tgen`: topogen object 2525 * `dut`: input dut router name 2526 * `addr_type` : ip type ipv4/ipv6 2527 * `input_dict` : input dict, has details of static routes 2528 * `next_hop`[optional]: next_hop which needs to be verified, 2529 default = static 2530 * 'aspath'[optional]: aspath which needs to be verified 2531 2532 Usage 2533 ----- 2534 dut = 'r1' 2535 next_hop = "192.168.1.10" 2536 input_dict = topo['routers'] 2537 aspath = "100 200 300" 2538 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict, 2539 next_hop, aspath) 2540 2541 Returns 2542 ------- 2543 errormsg(str) or True 2544 """ 2545 2546 logger.debug("Entering lib API: verify_bgp_rib()") 2547 2548 router_list = tgen.routers() 2549 additional_nexthops_in_required_nhs = [] 2550 list1 = [] 2551 list2 = [] 2552 for routerInput in input_dict.keys(): 2553 for router, rnode in router_list.iteritems(): 2554 if router != dut: 2555 continue 2556 2557 # Verifying RIB routes 2558 command = "show bgp" 2559 2560 # Static routes 2561 sleep(2) 2562 logger.info("Checking router {} BGP RIB:".format(dut)) 2563 2564 if "static_routes" in input_dict[routerInput]: 2565 static_routes = input_dict[routerInput]["static_routes"] 2566 2567 for static_route in static_routes: 2568 found_routes = [] 2569 missing_routes = [] 2570 st_found = False 2571 nh_found = False 2572 2573 vrf = static_route.setdefault("vrf", None) 2574 community = static_route.setdefault("community", None) 2575 largeCommunity = static_route.setdefault("largeCommunity", None) 2576 2577 if vrf: 2578 cmd = "{} vrf {} {}".format(command, vrf, addr_type) 2579 2580 if community: 2581 cmd = "{} community {}".format(cmd, community) 2582 2583 if largeCommunity: 2584 cmd = "{} large-community {}".format(cmd, largeCommunity) 2585 else: 2586 cmd = "{} {}".format(command, addr_type) 2587 2588 cmd = "{} json".format(cmd) 2589 2590 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 2591 2592 # Verifying output dictionary rib_routes_json is not empty 2593 if bool(rib_routes_json) == False: 2594 errormsg = "No route found in rib of router {}..".format(router) 2595 return errormsg 2596 2597 network = static_route["network"] 2598 2599 if "no_of_ip" in static_route: 2600 no_of_ip = static_route["no_of_ip"] 2601 else: 2602 no_of_ip = 1 2603 2604 # Generating IPs for verification 2605 ip_list = generate_ips(network, no_of_ip) 2606 2607 for st_rt in ip_list: 2608 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 2609 2610 _addr_type = validate_ip_address(st_rt) 2611 if _addr_type != addr_type: 2612 continue 2613 2614 if st_rt in rib_routes_json["routes"]: 2615 st_found = True 2616 found_routes.append(st_rt) 2617 2618 if next_hop: 2619 if not isinstance(next_hop, list): 2620 next_hop = [next_hop] 2621 list1 = next_hop 2622 2623 found_hops = [ 2624 rib_r["ip"] 2625 for rib_r in rib_routes_json["routes"][st_rt][0][ 2626 "nexthops" 2627 ] 2628 ] 2629 list2 = found_hops 2630 2631 missing_list_of_nexthops = set(list2).difference(list1) 2632 additional_nexthops_in_required_nhs = set( 2633 list1 2634 ).difference(list2) 2635 2636 if list2: 2637 if additional_nexthops_in_required_nhs: 2638 logger.info( 2639 "Missing nexthop %s for route" 2640 " %s in RIB of router %s\n", 2641 additional_nexthops_in_required_nhs, 2642 st_rt, 2643 dut, 2644 ) 2645 errormsg = ( 2646 "Nexthop {} is Missing for " 2647 "route {} in RIB of router {}\n".format( 2648 additional_nexthops_in_required_nhs, 2649 st_rt, 2650 dut, 2651 ) 2652 ) 2653 return errormsg 2654 else: 2655 nh_found = True 2656 if aspath: 2657 found_paths = rib_routes_json["routes"][st_rt][0][ 2658 "path" 2659 ] 2660 if aspath == found_paths: 2661 aspath_found = True 2662 logger.info( 2663 "Found AS path {} for route" 2664 " {} in RIB of router " 2665 "{}\n".format(aspath, st_rt, dut) 2666 ) 2667 else: 2668 errormsg = ( 2669 "AS Path {} is missing for route" 2670 "for route {} in RIB of router {}\n".format( 2671 aspath, st_rt, dut 2672 ) 2673 ) 2674 return errormsg 2675 2676 else: 2677 missing_routes.append(st_rt) 2678 2679 if nh_found: 2680 logger.info( 2681 "Found next_hop {} for all bgp" 2682 " routes in RIB of" 2683 " router {}\n".format(next_hop, router) 2684 ) 2685 2686 if len(missing_routes) > 0: 2687 errormsg = ( 2688 "Missing route in RIB of router {}, " 2689 "routes: {}\n".format(dut, missing_routes) 2690 ) 2691 return errormsg 2692 2693 if found_routes: 2694 logger.info( 2695 "Verified routes in router {} BGP RIB, " 2696 "found routes are: {} \n".format(dut, found_routes) 2697 ) 2698 continue 2699 2700 if "bgp" not in input_dict[routerInput]: 2701 continue 2702 2703 # Advertise networks 2704 bgp_data_list = input_dict[routerInput]["bgp"] 2705 2706 if type(bgp_data_list) is not list: 2707 bgp_data_list = [bgp_data_list] 2708 2709 for bgp_data in bgp_data_list: 2710 vrf_id = bgp_data.setdefault("vrf", None) 2711 if vrf_id: 2712 cmd = "{} vrf {} {}".format(command, vrf_id, addr_type) 2713 else: 2714 cmd = "{} {}".format(command, addr_type) 2715 2716 cmd = "{} json".format(cmd) 2717 2718 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True) 2719 2720 # Verifying output dictionary rib_routes_json is not empty 2721 if bool(rib_routes_json) == False: 2722 errormsg = "No route found in rib of router {}..".format(router) 2723 return errormsg 2724 2725 bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"] 2726 advertise_network = bgp_net_advertise.setdefault( 2727 "advertise_networks", [] 2728 ) 2729 2730 for advertise_network_dict in advertise_network: 2731 found_routes = [] 2732 missing_routes = [] 2733 found = False 2734 2735 network = advertise_network_dict["network"] 2736 2737 if "no_of_network" in advertise_network_dict: 2738 no_of_network = advertise_network_dict["no_of_network"] 2739 else: 2740 no_of_network = 1 2741 2742 # Generating IPs for verification 2743 ip_list = generate_ips(network, no_of_network) 2744 2745 for st_rt in ip_list: 2746 st_rt = str(ipaddress.ip_network(unicode(st_rt))) 2747 2748 _addr_type = validate_ip_address(st_rt) 2749 if _addr_type != addr_type: 2750 continue 2751 2752 if st_rt in rib_routes_json["routes"]: 2753 found = True 2754 found_routes.append(st_rt) 2755 else: 2756 found = False 2757 missing_routes.append(st_rt) 2758 2759 if len(missing_routes) > 0: 2760 errormsg = ( 2761 "Missing route in BGP RIB of router {}," 2762 " are: {}\n".format(dut, missing_routes) 2763 ) 2764 return errormsg 2765 2766 if found_routes: 2767 logger.info( 2768 "Verified routes in router {} BGP RIB, found " 2769 "routes are: {}\n".format(dut, found_routes) 2770 ) 2771 2772 logger.debug("Exiting lib API: verify_bgp_rib()") 2773 return True 2774 2775 2776@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 2777def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer): 2778 """ 2779 This API is to verify verify_graceful_restart configuration of DUT and 2780 cross verify the same from the peer bgp routerrouter. 2781 2782 Parameters 2783 ---------- 2784 * `tgen`: topogen object 2785 * `topo`: input json file data 2786 * `addr_type` : ip type ipv4/ipv6 2787 * `input_dict`: input dictionary, have details of Device Under Test, for 2788 which user wants to test the data 2789 * `dut`: input dut router name 2790 * `peer`: input peer router name 2791 2792 Usage 2793 ----- 2794 "r1": { 2795 "bgp": { 2796 "address_family": { 2797 "ipv4": { 2798 "unicast": { 2799 "neighbor": { 2800 "r3": { 2801 "dest_link":{ 2802 "r1": { 2803 "graceful-restart": True 2804 } 2805 } 2806 } 2807 } 2808 } 2809 }, 2810 "ipv6": { 2811 "unicast": { 2812 "neighbor": { 2813 "r3": { 2814 "dest_link":{ 2815 "r1": { 2816 "graceful-restart": True 2817 } 2818 } 2819 } 2820 } 2821 } 2822 } 2823 } 2824 } 2825 } 2826 } 2827 2828 result = verify_graceful_restart(tgen, topo, addr_type, input_dict, 2829 dut = "r1", peer = 'r2') 2830 Returns 2831 ------- 2832 errormsg(str) or True 2833 """ 2834 2835 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 2836 2837 for router, rnode in tgen.routers().iteritems(): 2838 if router != dut: 2839 continue 2840 2841 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"] 2842 2843 if addr_type in bgp_addr_type: 2844 if not check_address_types(addr_type): 2845 continue 2846 2847 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 2848 2849 for bgp_neighbor, peer_data in bgp_neighbors.items(): 2850 if bgp_neighbor != peer: 2851 continue 2852 2853 for dest_link, peer_dict in peer_data["dest_link"].items(): 2854 data = topo["routers"][bgp_neighbor]["links"] 2855 2856 if dest_link in data: 2857 neighbor_ip = data[dest_link][addr_type].split("/")[0] 2858 2859 logger.info( 2860 "[DUT: {}]: Checking bgp graceful-restart show" 2861 " o/p {}".format(dut, neighbor_ip) 2862 ) 2863 2864 show_bgp_graceful_json = None 2865 2866 show_bgp_graceful_json = run_frr_cmd( 2867 rnode, 2868 "show bgp {} neighbor {} graceful-restart json".format( 2869 addr_type, neighbor_ip 2870 ), 2871 isjson=True, 2872 ) 2873 2874 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] 2875 2876 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: 2877 logger.info( 2878 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip) 2879 ) 2880 else: 2881 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format( 2882 dut, neighbor_ip 2883 ) 2884 return errormsg 2885 2886 lmode = None 2887 rmode = None 2888 # Local GR mode 2889 if "address_family" in input_dict[dut]["bgp"]: 2890 bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][ 2891 "unicast" 2892 ]["neighbor"][peer]["dest_link"] 2893 2894 for dest_link, data in bgp_neighbors.items(): 2895 if ( 2896 "graceful-restart-helper" in data 2897 and data["graceful-restart-helper"] 2898 ): 2899 lmode = "Helper" 2900 elif "graceful-restart" in data and data["graceful-restart"]: 2901 lmode = "Restart" 2902 elif ( 2903 "graceful-restart-disable" in data 2904 and data["graceful-restart-disable"] 2905 ): 2906 lmode = "Disable" 2907 else: 2908 lmode = None 2909 2910 if lmode is None: 2911 if "graceful-restart" in input_dict[dut]["bgp"]: 2912 2913 if ( 2914 "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"] 2915 and input_dict[dut]["bgp"]["graceful-restart"][ 2916 "graceful-restart" 2917 ] 2918 ): 2919 lmode = "Restart*" 2920 elif ( 2921 "graceful-restart-disable" 2922 in input_dict[dut]["bgp"]["graceful-restart"] 2923 and input_dict[dut]["bgp"]["graceful-restart"][ 2924 "graceful-restart-disable" 2925 ] 2926 ): 2927 lmode = "Disable*" 2928 else: 2929 lmode = "Helper*" 2930 else: 2931 lmode = "Helper*" 2932 2933 if lmode == "Disable" or lmode == "Disable*": 2934 return True 2935 2936 # Remote GR mode 2937 if "address_family" in input_dict[peer]["bgp"]: 2938 bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][ 2939 "unicast" 2940 ]["neighbor"][dut]["dest_link"] 2941 2942 for dest_link, data in bgp_neighbors.items(): 2943 if ( 2944 "graceful-restart-helper" in data 2945 and data["graceful-restart-helper"] 2946 ): 2947 rmode = "Helper" 2948 elif "graceful-restart" in data and data["graceful-restart"]: 2949 rmode = "Restart" 2950 elif ( 2951 "graceful-restart-disable" in data 2952 and data["graceful-restart-disable"] 2953 ): 2954 rmode = "Disable" 2955 else: 2956 rmode = None 2957 2958 if rmode is None: 2959 if "graceful-restart" in input_dict[peer]["bgp"]: 2960 2961 if ( 2962 "graceful-restart" 2963 in input_dict[peer]["bgp"]["graceful-restart"] 2964 and input_dict[peer]["bgp"]["graceful-restart"][ 2965 "graceful-restart" 2966 ] 2967 ): 2968 rmode = "Restart" 2969 elif ( 2970 "graceful-restart-disable" 2971 in input_dict[peer]["bgp"]["graceful-restart"] 2972 and input_dict[peer]["bgp"]["graceful-restart"][ 2973 "graceful-restart-disable" 2974 ] 2975 ): 2976 rmode = "Disable" 2977 else: 2978 rmode = "Helper" 2979 else: 2980 rmode = "Helper" 2981 2982 if show_bgp_graceful_json_out["localGrMode"] == lmode: 2983 logger.info( 2984 "[DUT: {}]: localGrMode : {} ".format( 2985 dut, show_bgp_graceful_json_out["localGrMode"] 2986 ) 2987 ) 2988 else: 2989 errormsg = ( 2990 "[DUT: {}]: localGrMode is not correct" 2991 " Expected: {}, Found: {}".format( 2992 dut, lmode, show_bgp_graceful_json_out["localGrMode"] 2993 ) 2994 ) 2995 return errormsg 2996 2997 if show_bgp_graceful_json_out["remoteGrMode"] == rmode: 2998 logger.info( 2999 "[DUT: {}]: remoteGrMode : {} ".format( 3000 dut, show_bgp_graceful_json_out["remoteGrMode"] 3001 ) 3002 ) 3003 elif ( 3004 show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable" 3005 and rmode == "Disable" 3006 ): 3007 logger.info( 3008 "[DUT: {}]: remoteGrMode : {} ".format( 3009 dut, show_bgp_graceful_json_out["remoteGrMode"] 3010 ) 3011 ) 3012 else: 3013 errormsg = ( 3014 "[DUT: {}]: remoteGrMode is not correct" 3015 " Expected: {}, Found: {}".format( 3016 dut, rmode, show_bgp_graceful_json_out["remoteGrMode"] 3017 ) 3018 ) 3019 return errormsg 3020 3021 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3022 return True 3023 3024 3025@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 3026def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer): 3027 """ 3028 This API is to verify r_bit in the BGP gr capability advertised 3029 by the neighbor router 3030 3031 Parameters 3032 ---------- 3033 * `tgen`: topogen object 3034 * `topo`: input json file data 3035 * `addr_type` : ip type ipv4/ipv6 3036 * `input_dict`: input dictionary, have details of Device Under Test, for 3037 which user wants to test the data 3038 * `dut`: input dut router name 3039 * `peer`: peer name 3040 Usage 3041 ----- 3042 input_dict = { 3043 "r1": { 3044 "bgp": { 3045 "address_family": { 3046 "ipv4": { 3047 "unicast": { 3048 "neighbor": { 3049 "r3": { 3050 "dest_link":{ 3051 "r1": { 3052 "graceful-restart": True 3053 } 3054 } 3055 } 3056 } 3057 } 3058 }, 3059 "ipv6": { 3060 "unicast": { 3061 "neighbor": { 3062 "r3": { 3063 "dest_link":{ 3064 "r1": { 3065 "graceful-restart": True 3066 } 3067 } 3068 } 3069 } 3070 } 3071 } 3072 } 3073 } 3074 } 3075 } 3076 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer) 3077 3078 Returns 3079 ------- 3080 errormsg(str) or True 3081 """ 3082 3083 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3084 3085 for router, rnode in tgen.routers().iteritems(): 3086 if router != dut: 3087 continue 3088 3089 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 3090 3091 if addr_type in bgp_addr_type: 3092 if not check_address_types(addr_type): 3093 continue 3094 3095 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 3096 3097 for bgp_neighbor, peer_data in bgp_neighbors.items(): 3098 if bgp_neighbor != peer: 3099 continue 3100 3101 for dest_link, peer_dict in peer_data["dest_link"].items(): 3102 data = topo["routers"][bgp_neighbor]["links"] 3103 3104 if dest_link in data: 3105 neighbor_ip = data[dest_link][addr_type].split("/")[0] 3106 3107 logger.info( 3108 "[DUT: {}]: Checking bgp graceful-restart show" 3109 " o/p {}".format(dut, neighbor_ip) 3110 ) 3111 3112 show_bgp_graceful_json = run_frr_cmd( 3113 rnode, 3114 "show bgp {} neighbor {} graceful-restart json".format( 3115 addr_type, neighbor_ip 3116 ), 3117 isjson=True, 3118 ) 3119 3120 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] 3121 3122 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: 3123 logger.info( 3124 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip) 3125 ) 3126 else: 3127 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format( 3128 dut, neighbor_ip 3129 ) 3130 return errormsg 3131 3132 if "rBit" in show_bgp_graceful_json_out: 3133 if show_bgp_graceful_json_out["rBit"]: 3134 logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip)) 3135 else: 3136 errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip) 3137 return errormsg 3138 3139 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3140 return True 3141 3142 3143@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 3144def verify_eor(tgen, topo, addr_type, input_dict, dut, peer): 3145 """ 3146 This API is to verify EOR 3147 3148 Parameters 3149 ---------- 3150 * `tgen`: topogen object 3151 * `topo`: input json file data 3152 * `addr_type` : ip type ipv4/ipv6 3153 * `input_dict`: input dictionary, have details of DUT, for 3154 which user wants to test the data 3155 * `dut`: input dut router name 3156 * `peer`: peer name 3157 Usage 3158 ----- 3159 input_dict = { 3160 input_dict = { 3161 "r1": { 3162 "bgp": { 3163 "address_family": { 3164 "ipv4": { 3165 "unicast": { 3166 "neighbor": { 3167 "r3": { 3168 "dest_link":{ 3169 "r1": { 3170 "graceful-restart": True 3171 } 3172 } 3173 } 3174 } 3175 } 3176 }, 3177 "ipv6": { 3178 "unicast": { 3179 "neighbor": { 3180 "r3": { 3181 "dest_link":{ 3182 "r1": { 3183 "graceful-restart": True 3184 } 3185 } 3186 } 3187 } 3188 } 3189 } 3190 } 3191 } 3192 } 3193 } 3194 3195 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer) 3196 3197 Returns 3198 ------- 3199 errormsg(str) or True 3200 """ 3201 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3202 3203 for router, rnode in tgen.routers().iteritems(): 3204 if router != dut: 3205 continue 3206 3207 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 3208 3209 if addr_type in bgp_addr_type: 3210 if not check_address_types(addr_type): 3211 continue 3212 3213 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 3214 3215 for bgp_neighbor, peer_data in bgp_neighbors.items(): 3216 if bgp_neighbor != peer: 3217 continue 3218 3219 for dest_link, peer_dict in peer_data["dest_link"].items(): 3220 data = topo["routers"][bgp_neighbor]["links"] 3221 3222 if dest_link in data: 3223 neighbor_ip = data[dest_link][addr_type].split("/")[0] 3224 3225 logger.info( 3226 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s", 3227 dut, 3228 neighbor_ip, 3229 ) 3230 3231 show_bgp_graceful_json = run_frr_cmd( 3232 rnode, 3233 "show bgp {} neighbor {} graceful-restart json".format( 3234 addr_type, neighbor_ip 3235 ), 3236 isjson=True, 3237 ) 3238 3239 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] 3240 3241 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: 3242 logger.info("[DUT: %s]: Neighbor ip matched %s", dut, neighbor_ip) 3243 else: 3244 errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % ( 3245 dut, 3246 neighbor_ip, 3247 ) 3248 return errormsg 3249 3250 if addr_type == "ipv4": 3251 afi = "ipv4Unicast" 3252 elif addr_type == "ipv6": 3253 afi = "ipv6Unicast" 3254 else: 3255 errormsg = "Address type %s is not supported" % (addr_type) 3256 return errormsg 3257 3258 eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"] 3259 if "endOfRibSend" in eor_json: 3260 3261 if eor_json["endOfRibSend"]: 3262 logger.info( 3263 "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi 3264 ) 3265 else: 3266 errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % ( 3267 dut, 3268 neighbor_ip, 3269 afi, 3270 ) 3271 return errormsg 3272 3273 if "endOfRibRecv" in eor_json: 3274 if eor_json["endOfRibRecv"]: 3275 logger.info( 3276 "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi 3277 ) 3278 else: 3279 errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % ( 3280 dut, 3281 neighbor_ip, 3282 afi, 3283 ) 3284 return errormsg 3285 3286 if "endOfRibSentAfterUpdate" in eor_json: 3287 if eor_json["endOfRibSentAfterUpdate"]: 3288 logger.info( 3289 "[DUT: %s]: EOR SendTime true for %s" " %s", 3290 dut, 3291 neighbor_ip, 3292 afi, 3293 ) 3294 else: 3295 errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % ( 3296 dut, 3297 neighbor_ip, 3298 afi, 3299 ) 3300 return errormsg 3301 3302 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3303 return True 3304 3305 3306@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 3307def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer): 3308 """ 3309 This API is to verify f_bit in the BGP gr capability advertised 3310 by the neighbor router 3311 3312 Parameters 3313 ---------- 3314 * `tgen`: topogen object 3315 * `topo`: input json file data 3316 * `addr_type` : ip type ipv4/ipv6 3317 * `input_dict`: input dictionary, have details of Device Under Test, for 3318 which user wants to test the data 3319 * `dut`: input dut router name 3320 * `peer`: peer name 3321 3322 Usage 3323 ----- 3324 input_dict = { 3325 "r1": { 3326 "bgp": { 3327 "address_family": { 3328 "ipv4": { 3329 "unicast": { 3330 "neighbor": { 3331 "r3": { 3332 "dest_link":{ 3333 "r1": { 3334 "graceful-restart": True 3335 } 3336 } 3337 } 3338 } 3339 } 3340 }, 3341 "ipv6": { 3342 "unicast": { 3343 "neighbor": { 3344 "r3": { 3345 "dest_link":{ 3346 "r1": { 3347 "graceful-restart": True 3348 } 3349 } 3350 } 3351 } 3352 } 3353 } 3354 } 3355 } 3356 } 3357 } 3358 3359 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer) 3360 3361 Returns 3362 ------- 3363 errormsg(str) or True 3364 """ 3365 3366 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3367 3368 for router, rnode in tgen.routers().iteritems(): 3369 if router != dut: 3370 continue 3371 3372 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 3373 3374 if addr_type in bgp_addr_type: 3375 if not check_address_types(addr_type): 3376 continue 3377 3378 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 3379 3380 for bgp_neighbor, peer_data in bgp_neighbors.items(): 3381 if bgp_neighbor != peer: 3382 continue 3383 3384 for dest_link, peer_dict in peer_data["dest_link"].items(): 3385 data = topo["routers"][bgp_neighbor]["links"] 3386 3387 if dest_link in data: 3388 neighbor_ip = data[dest_link][addr_type].split("/")[0] 3389 3390 logger.info( 3391 "[DUT: {}]: Checking bgp graceful-restart show" 3392 " o/p {}".format(dut, neighbor_ip) 3393 ) 3394 3395 show_bgp_graceful_json = run_frr_cmd( 3396 rnode, 3397 "show bgp {} neighbor {} graceful-restart json".format( 3398 addr_type, neighbor_ip 3399 ), 3400 isjson=True, 3401 ) 3402 3403 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] 3404 3405 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: 3406 logger.info( 3407 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip) 3408 ) 3409 else: 3410 errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format( 3411 dut, neighbor_ip 3412 ) 3413 return errormsg 3414 3415 if "ipv4Unicast" in show_bgp_graceful_json_out: 3416 if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]: 3417 logger.info( 3418 "[DUT: {}]: Fbit True for {} IPv4" 3419 " Unicast".format(dut, neighbor_ip) 3420 ) 3421 else: 3422 errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format( 3423 dut, neighbor_ip 3424 ) 3425 return errormsg 3426 3427 elif "ipv6Unicast" in show_bgp_graceful_json_out: 3428 if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]: 3429 logger.info( 3430 "[DUT: {}]: Fbit True for {} IPv6" 3431 " Unicast".format(dut, neighbor_ip) 3432 ) 3433 else: 3434 errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format( 3435 dut, neighbor_ip 3436 ) 3437 return errormsg 3438 else: 3439 show_bgp_graceful_json_out["ipv4Unicast"] 3440 show_bgp_graceful_json_out["ipv6Unicast"] 3441 3442 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3443 return True 3444 3445 3446@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 3447def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer): 3448 """ 3449 This API is to verify graceful restart timers, configured and recieved 3450 3451 Parameters 3452 ---------- 3453 * `tgen`: topogen object 3454 * `topo`: input json file data 3455 * `addr_type` : ip type ipv4/ipv6 3456 * `input_dict`: input dictionary, have details of Device Under Test, 3457 for which user wants to test the data 3458 * `dut`: input dut router name 3459 * `peer`: peer name 3460 Usage 3461 ----- 3462 # Configure graceful-restart 3463 input_dict_1 = { 3464 "r1": { 3465 "bgp": { 3466 "bgp_neighbors": { 3467 "r3": { 3468 "graceful-restart": "graceful-restart-helper" 3469 } 3470 }, 3471 "gracefulrestart": ["restart-time 150"] 3472 } 3473 }, 3474 "r3": { 3475 "bgp": { 3476 "bgp_neighbors": { 3477 "r1": { 3478 "graceful-restart": "graceful-restart" 3479 } 3480 } 3481 } 3482 } 3483 } 3484 3485 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict) 3486 3487 Returns 3488 ------- 3489 errormsg(str) or True 3490 """ 3491 3492 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3493 3494 for router, rnode in tgen.routers().iteritems(): 3495 if router != dut: 3496 continue 3497 3498 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"] 3499 3500 if addr_type in bgp_addr_type: 3501 if not check_address_types(addr_type): 3502 continue 3503 3504 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 3505 3506 for bgp_neighbor, peer_data in bgp_neighbors.items(): 3507 if bgp_neighbor != peer: 3508 continue 3509 3510 for dest_link, peer_dict in peer_data["dest_link"].items(): 3511 data = topo["routers"][bgp_neighbor]["links"] 3512 3513 if dest_link in data: 3514 neighbor_ip = data[dest_link][addr_type].split("/")[0] 3515 3516 logger.info( 3517 "[DUT: {}]: Checking bgp graceful-restart show" 3518 " o/p {}".format(dut, neighbor_ip) 3519 ) 3520 3521 show_bgp_graceful_json = run_frr_cmd( 3522 rnode, 3523 "show bgp {} neighbor {} graceful-restart json".format( 3524 addr_type, neighbor_ip 3525 ), 3526 isjson=True, 3527 ) 3528 3529 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] 3530 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: 3531 logger.info( 3532 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip) 3533 ) 3534 else: 3535 errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format( 3536 dut, neighbor_ip 3537 ) 3538 return errormsg 3539 3540 # Graceful-restart timer 3541 if "graceful-restart" in input_dict[peer]["bgp"]: 3542 if "timer" in input_dict[peer]["bgp"]["graceful-restart"]: 3543 for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][ 3544 "timer" 3545 ].items(): 3546 if rs_timer == "restart-time": 3547 3548 receivedTimer = value 3549 if ( 3550 show_bgp_graceful_json_out["timers"][ 3551 "receivedRestartTimer" 3552 ] 3553 == receivedTimer 3554 ): 3555 logger.info( 3556 "receivedRestartTimer is {}" 3557 " on {} from peer {}".format( 3558 receivedTimer, router, peer 3559 ) 3560 ) 3561 else: 3562 errormsg = ( 3563 "receivedRestartTimer is not" 3564 " as expected {}".format(receivedTimer) 3565 ) 3566 return errormsg 3567 3568 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3569 return True 3570 3571 3572@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2) 3573def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut): 3574 """ 3575 This API is to verify gr_address_family in the BGP gr capability advertised 3576 by the neighbor router 3577 3578 Parameters 3579 ---------- 3580 * `tgen`: topogen object 3581 * `topo`: input json file data 3582 * `addr_type` : ip type ipv4/ipv6 3583 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast 3584 * `dut`: input dut router name 3585 3586 Usage 3587 ----- 3588 3589 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1") 3590 3591 Returns 3592 ------- 3593 errormsg(str) or True 3594 """ 3595 3596 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3597 3598 for router, rnode in tgen.routers().iteritems(): 3599 if router != dut: 3600 continue 3601 3602 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"] 3603 3604 if addr_type in bgp_addr_type: 3605 if not check_address_types(addr_type): 3606 continue 3607 3608 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"] 3609 3610 for bgp_neighbor, peer_data in bgp_neighbors.items(): 3611 for dest_link, peer_dict in peer_data["dest_link"].items(): 3612 data = topo["routers"][bgp_neighbor]["links"] 3613 3614 if dest_link in data: 3615 neighbor_ip = data[dest_link][addr_type].split("/")[0] 3616 3617 logger.info( 3618 "[DUT: {}]: Checking bgp graceful-restart" 3619 " show o/p {}".format(dut, neighbor_ip) 3620 ) 3621 3622 show_bgp_graceful_json = run_frr_cmd( 3623 rnode, 3624 "show bgp {} neighbor {} graceful-restart json".format( 3625 addr_type, neighbor_ip 3626 ), 3627 isjson=True, 3628 ) 3629 3630 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip] 3631 3632 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip: 3633 logger.info("Neighbor ip matched {}".format(neighbor_ip)) 3634 else: 3635 errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip) 3636 return errormsg 3637 3638 if addr_family == "ipv4Unicast": 3639 if "ipv4Unicast" in show_bgp_graceful_json_out: 3640 logger.info("ipv4Unicast present for {} ".format(neighbor_ip)) 3641 return True 3642 else: 3643 errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip) 3644 return errormsg 3645 3646 elif addr_family == "ipv6Unicast": 3647 if "ipv6Unicast" in show_bgp_graceful_json_out: 3648 logger.info("ipv6Unicast present for {} ".format(neighbor_ip)) 3649 return True 3650 else: 3651 errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip) 3652 return errormsg 3653 else: 3654 errormsg = "Aaddress family: {} present for {} ".format( 3655 addr_family, neighbor_ip 3656 ) 3657 return errormsg 3658 3659 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 3660 3661 3662@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) 3663def verify_attributes_for_evpn_routes( 3664 tgen, 3665 topo, 3666 dut, 3667 input_dict, 3668 rd=None, 3669 rt=None, 3670 ethTag=None, 3671 ipLen=None, 3672 rd_peer=None, 3673 rt_peer=None, 3674): 3675 """ 3676 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1" 3677 command. 3678 3679 Parameters 3680 ---------- 3681 * `tgen`: topogen object 3682 * `topo` : json file data 3683 * `dut` : device under test 3684 * `input_dict`: having details like - for which route, rd value 3685 needs to be verified 3686 * `rd` : route distinguisher 3687 * `rt` : route target 3688 * `ethTag` : Ethernet Tag 3689 * `ipLen` : IP prefix length 3690 * `rd_peer` : Peer name from which RD will be auto-generated 3691 * `rt_peer` : Peer name from which RT will be auto-generated 3692 3693 Usage 3694 ----- 3695 input_dict_1 = { 3696 "r1": { 3697 "static_routes": [{ 3698 "network": [NETWORK1_1[addr_type]], 3699 "next_hop": NEXT_HOP_IP[addr_type], 3700 "vrf": "RED" 3701 }] 3702 } 3703 } 3704 3705 result = verify_attributes_for_evpn_routes(tgen, topo, 3706 input_dict, rd = "10.0.0.33:1") 3707 3708 Returns 3709 ------- 3710 errormsg(str) or True 3711 """ 3712 3713 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 3714 for router in input_dict.keys(): 3715 rnode = tgen.routers()[dut] 3716 3717 if "static_routes" in input_dict[router]: 3718 for static_route in input_dict[router]["static_routes"]: 3719 network = static_route["network"] 3720 3721 if "vrf" in static_route: 3722 vrf = static_route["vrf"] 3723 3724 if type(network) is not list: 3725 network = [network] 3726 3727 for route in network: 3728 route = route.split("/")[0] 3729 _addr_type = validate_ip_address(route) 3730 if "v4" in _addr_type: 3731 input_afi = "v4" 3732 elif "v6" in _addr_type: 3733 input_afi = "v6" 3734 3735 cmd = "show bgp l2vpn evpn {} json".format(route) 3736 evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True) 3737 if not bool(evpn_rd_value_json): 3738 errormsg = "No output for '{}' cli".format(cmd) 3739 return errormsg 3740 3741 if rd is not None and rd != "auto": 3742 logger.info( 3743 "[DUT: %s]: Verifying rd value for " "evpn route %s:", 3744 dut, 3745 route, 3746 ) 3747 3748 if rd in evpn_rd_value_json: 3749 rd_value_json = evpn_rd_value_json[rd] 3750 if rd_value_json["rd"] != rd: 3751 errormsg = ( 3752 "[DUT: %s] Failed: Verifying" 3753 " RD value for EVPN route: %s" 3754 "[FAILED]!!, EXPECTED : %s " 3755 " FOUND : %s" 3756 % (dut, route, rd, rd_value_json["rd"]) 3757 ) 3758 return errormsg 3759 3760 else: 3761 logger.info( 3762 "[DUT %s]: Verifying RD value for" 3763 " EVPN route: %s [PASSED]|| " 3764 "Found Exprected: %s", 3765 dut, 3766 route, 3767 rd, 3768 ) 3769 return True 3770 3771 else: 3772 errormsg = ( 3773 "[DUT: %s] RD : %s is not present" 3774 " in cli json output" % (dut, rd) 3775 ) 3776 return errormsg 3777 3778 if rd == "auto": 3779 logger.info( 3780 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:", 3781 dut, 3782 route, 3783 ) 3784 3785 if rd_peer: 3786 index = 1 3787 vni_dict = {} 3788 3789 rnode = tgen.routers()[rd_peer] 3790 vrfs = topo["routers"][rd_peer]["vrfs"] 3791 for vrf_dict in vrfs: 3792 vni_dict[vrf_dict["name"]] = index 3793 index += 1 3794 3795 show_bgp_json = run_frr_cmd( 3796 rnode, "show bgp vrf all summary json", isjson=True 3797 ) 3798 3799 # Verifying output dictionary show_bgp_json is empty 3800 if not bool(show_bgp_json): 3801 errormsg = "BGP is not running" 3802 return errormsg 3803 3804 show_bgp_json_vrf = show_bgp_json[vrf] 3805 for afi, afi_data in show_bgp_json_vrf.items(): 3806 if input_afi not in afi: 3807 continue 3808 router_id = afi_data["routerId"] 3809 3810 rd = "{}:{}".format(router_id, vni_dict[vrf]) 3811 if rd in evpn_rd_value_json: 3812 rd_value_json = evpn_rd_value_json[rd] 3813 if rd_value_json["rd"] != rd: 3814 errormsg = ( 3815 "[DUT: %s] Failed: Verifying" 3816 " RD value for EVPN route: %s" 3817 "[FAILED]!!, EXPECTED : %s " 3818 " FOUND : %s" 3819 % (dut, route, rd, rd_value_json["rd"]) 3820 ) 3821 return errormsg 3822 3823 else: 3824 logger.info( 3825 "[DUT %s]: Verifying RD value for" 3826 " EVPN route: %s [PASSED]|| " 3827 "Found Exprected: %s", 3828 dut, 3829 route, 3830 rd, 3831 ) 3832 return True 3833 3834 else: 3835 errormsg = ( 3836 "[DUT: %s] RD : %s is not present" 3837 " in cli json output" % (dut, rd) 3838 ) 3839 return errormsg 3840 3841 if rt == "auto": 3842 logger.info( 3843 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:", 3844 dut, 3845 route, 3846 ) 3847 3848 if rt_peer: 3849 vni_dict = {} 3850 3851 rnode = tgen.routers()[rt_peer] 3852 show_bgp_json = run_frr_cmd( 3853 rnode, "show bgp vrf all summary json", isjson=True 3854 ) 3855 3856 # Verifying output dictionary show_bgp_json is empty 3857 if not bool(show_bgp_json): 3858 errormsg = "BGP is not running" 3859 return errormsg 3860 3861 show_bgp_json_vrf = show_bgp_json[vrf] 3862 for afi, afi_data in show_bgp_json_vrf.items(): 3863 if input_afi not in afi: 3864 continue 3865 as_num = afi_data["as"] 3866 3867 show_vrf_vni_json = run_frr_cmd( 3868 rnode, "show vrf vni json", isjson=True 3869 ) 3870 3871 vrfs = show_vrf_vni_json["vrfs"] 3872 for vrf_dict in vrfs: 3873 if vrf_dict["vrf"] == vrf: 3874 vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"]) 3875 3876 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI 3877 # for auto derived RT value. 3878 if as_num > 65535: 3879 as_bin = bin(as_num) 3880 as_bin = as_bin[-16:] 3881 as_num = int(as_bin, 2) 3882 3883 rt = "{}:{}".format(str(as_num), vni_dict[vrf]) 3884 for _rd, route_data in evpn_rd_value_json.items(): 3885 if route_data["ip"] == route: 3886 for rt_data in route_data["paths"]: 3887 if vni_dict[vrf] == rt_data["VNI"]: 3888 rt_string = rt_data["extendedCommunity"][ 3889 "string" 3890 ] 3891 rt_input = "RT:{}".format(rt) 3892 if rt_input not in rt_string: 3893 errormsg = ( 3894 "[DUT: %s] Failed:" 3895 " Verifying RT " 3896 "value for EVPN " 3897 " route: %s" 3898 "[FAILED]!!," 3899 " EXPECTED : %s " 3900 " FOUND : %s" 3901 % (dut, route, rt_input, rt_string) 3902 ) 3903 return errormsg 3904 3905 else: 3906 logger.info( 3907 "[DUT %s]: Verifying " 3908 "RT value for EVPN " 3909 "route: %s [PASSED]||" 3910 "Found Exprected: %s", 3911 dut, 3912 route, 3913 rt_input, 3914 ) 3915 return True 3916 3917 else: 3918 errormsg = ( 3919 "[DUT: %s] Route : %s is not" 3920 " present in cli json output" % (dut, route) 3921 ) 3922 return errormsg 3923 3924 if rt is not None and rt != "auto": 3925 logger.info( 3926 "[DUT: %s]: Verifying rt value for " "evpn route %s:", 3927 dut, 3928 route, 3929 ) 3930 3931 if type(rt) is not list: 3932 rt = [rt] 3933 3934 for _rt in rt: 3935 for _rd, route_data in evpn_rd_value_json.items(): 3936 if route_data["ip"] == route: 3937 for rt_data in route_data["paths"]: 3938 rt_string = rt_data["extendedCommunity"][ 3939 "string" 3940 ] 3941 rt_input = "RT:{}".format(_rt) 3942 if rt_input not in rt_string: 3943 errormsg = ( 3944 "[DUT: %s] Failed: " 3945 "Verifying RT value " 3946 "for EVPN route: %s" 3947 "[FAILED]!!," 3948 " EXPECTED : %s " 3949 " FOUND : %s" 3950 % (dut, route, rt_input, rt_string) 3951 ) 3952 return errormsg 3953 3954 else: 3955 logger.info( 3956 "[DUT %s]: Verifying RT" 3957 " value for EVPN route:" 3958 " %s [PASSED]|| " 3959 "Found Exprected: %s", 3960 dut, 3961 route, 3962 rt_input, 3963 ) 3964 return True 3965 3966 else: 3967 errormsg = ( 3968 "[DUT: %s] Route : %s is not" 3969 " present in cli json output" % (dut, route) 3970 ) 3971 return errormsg 3972 3973 if ethTag is not None: 3974 logger.info( 3975 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut 3976 ) 3977 3978 for _rd, route_data in evpn_rd_value_json.items(): 3979 if route_data["ip"] == route: 3980 if route_data["ethTag"] != ethTag: 3981 errormsg = ( 3982 "[DUT: %s] RD: %s, Failed: " 3983 "Verifying ethTag value " 3984 "for EVPN route: %s" 3985 "[FAILED]!!," 3986 " EXPECTED : %s " 3987 " FOUND : %s" 3988 % ( 3989 dut, 3990 _rd, 3991 route, 3992 ethTag, 3993 route_data["ethTag"], 3994 ) 3995 ) 3996 return errormsg 3997 3998 else: 3999 logger.info( 4000 "[DUT %s]: RD: %s, Verifying " 4001 "ethTag value for EVPN route:" 4002 " %s [PASSED]|| " 4003 "Found Exprected: %s", 4004 dut, 4005 _rd, 4006 route, 4007 ethTag, 4008 ) 4009 return True 4010 4011 else: 4012 errormsg = ( 4013 "[DUT: %s] RD: %s, Route : %s " 4014 "is not present in cli json " 4015 "output" % (dut, _rd, route) 4016 ) 4017 return errormsg 4018 4019 if ipLen is not None: 4020 logger.info( 4021 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut 4022 ) 4023 4024 for _rd, route_data in evpn_rd_value_json.items(): 4025 if route_data["ip"] == route: 4026 if route_data["ipLen"] != int(ipLen): 4027 errormsg = ( 4028 "[DUT: %s] RD: %s, Failed: " 4029 "Verifying ipLen value " 4030 "for EVPN route: %s" 4031 "[FAILED]!!," 4032 " EXPECTED : %s " 4033 " FOUND : %s" 4034 % (dut, _rd, route, ipLen, route_data["ipLen"]) 4035 ) 4036 return errormsg 4037 4038 else: 4039 logger.info( 4040 "[DUT %s]: RD: %s, Verifying " 4041 "ipLen value for EVPN route:" 4042 " %s [PASSED]|| " 4043 "Found Exprected: %s", 4044 dut, 4045 _rd, 4046 route, 4047 ipLen, 4048 ) 4049 return True 4050 4051 else: 4052 errormsg = ( 4053 "[DUT: %s] RD: %s, Route : %s " 4054 "is not present in cli json " 4055 "output " % (dut, route) 4056 ) 4057 return errormsg 4058 4059 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 4060 return False 4061 4062 4063@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2) 4064def verify_evpn_routes( 4065 tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None 4066): 4067 """ 4068 API to verify evpn routes using "sh bgp l2vpn evpn" 4069 command. 4070 4071 Parameters 4072 ---------- 4073 * `tgen`: topogen object 4074 * `topo` : json file data 4075 * `dut` : device under test 4076 * `input_dict`: having details like - for which route, rd value 4077 needs to be verified 4078 * `route_type` : Route type 5 is supported as of now 4079 * `EthTag` : Ethernet tag, by-default is 0 4080 * `next_hop` : Prefered nexthop for the evpn routes 4081 4082 Usage 4083 ----- 4084 input_dict_1 = { 4085 "r1": { 4086 "static_routes": [{ 4087 "network": [NETWORK1_1[addr_type]], 4088 "next_hop": NEXT_HOP_IP[addr_type], 4089 "vrf": "RED" 4090 }] 4091 } 4092 } 4093 result = verify_evpn_routes(tgen, topo, input_dict) 4094 4095 Returns 4096 ------- 4097 errormsg(str) or True 4098 """ 4099 4100 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name)) 4101 4102 for router in input_dict.keys(): 4103 rnode = tgen.routers()[dut] 4104 4105 logger.info("[DUT: %s]: Verifying evpn routes: ", dut) 4106 4107 if "static_routes" in input_dict[router]: 4108 for static_route in input_dict[router]["static_routes"]: 4109 network = static_route["network"] 4110 4111 if type(network) is not list: 4112 network = [network] 4113 4114 missing_routes = {} 4115 for route in network: 4116 rd_keys = 0 4117 ip_len = route.split("/")[1] 4118 route = route.split("/")[0] 4119 4120 prefix = "[{}]:[{}]:[{}]:[{}]".format( 4121 routeType, EthTag, ip_len, route 4122 ) 4123 4124 cmd = "show bgp l2vpn evpn route json" 4125 evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True) 4126 4127 if not bool(evpn_value_json): 4128 errormsg = "No output for '{}' cli".format(cmd) 4129 return errormsg 4130 4131 if evpn_value_json["numPrefix"] == 0: 4132 errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut) 4133 return errormsg 4134 4135 for key, route_data_json in evpn_value_json.items(): 4136 if isinstance(route_data_json, dict): 4137 rd_keys += 1 4138 if prefix not in route_data_json: 4139 missing_routes[key] = prefix 4140 4141 if rd_keys == len(missing_routes.keys()): 4142 errormsg = ( 4143 "[DUT: %s]: " 4144 "Missing EVPN routes: " 4145 "%s [FAILED]!!" % (dut, list(set(missing_routes.values()))) 4146 ) 4147 return errormsg 4148 4149 for key, route_data_json in evpn_value_json.items(): 4150 if isinstance(route_data_json, dict): 4151 if prefix not in route_data_json: 4152 continue 4153 4154 for paths in route_data_json[prefix]["paths"]: 4155 for path in paths: 4156 if path["routeType"] != routeType: 4157 errormsg = ( 4158 "[DUT: %s]: " 4159 "Verifying routeType " 4160 "for EVPN route: %s " 4161 "[FAILED]!! " 4162 "Expected: %s, " 4163 "Found: %s" 4164 % ( 4165 dut, 4166 prefix, 4167 routeType, 4168 path["routeType"], 4169 ) 4170 ) 4171 return errormsg 4172 4173 elif next_hop: 4174 for nh_dict in path["nexthops"]: 4175 if nh_dict["ip"] != next_hop: 4176 errormsg = ( 4177 "[DUT: %s]: " 4178 "Verifying " 4179 "nexthop for " 4180 "EVPN route: %s" 4181 "[FAILED]!! " 4182 "Expected: %s," 4183 " Found: %s" 4184 % ( 4185 dut, 4186 prefix, 4187 next_hop, 4188 nh_dict["ip"], 4189 ) 4190 ) 4191 return errormsg 4192 4193 else: 4194 logger.info( 4195 "[DUT %s]: Verifying " 4196 "EVPN route : %s, " 4197 "routeType: %s is " 4198 "installed " 4199 "[PASSED]|| ", 4200 dut, 4201 prefix, 4202 routeType, 4203 ) 4204 return True 4205 4206 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name)) 4207 4208 return False 4209