1#
2# Copyright (c) 2019 by VMware, Inc. ("VMware")
3# Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4# ("NetDEF") in this file.
5#
6# Permission to use, copy, modify, and/or distribute this software
7# for any purpose with or without fee is hereby granted, provided
8# that the above copyright notice and this permission notice appear
9# in all copies.
10#
11# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18# OF THIS SOFTWARE.
19#
20
21from copy import deepcopy
22from time import sleep
23import traceback
24import ipaddress
25import os
26import sys
27from lib import topotest
28from lib.topolog import logger
29
30from lib.topogen import TopoRouter, get_topogen
31
32# Import common_config to use commomnly used APIs
33from lib.common_config import (
34    create_common_configuration,
35    InvalidCLIError,
36    load_config_to_router,
37    check_address_types,
38    generate_ips,
39    validate_ip_address,
40    find_interface_with_greater_ip,
41    run_frr_cmd,
42    FRRCFG_FILE,
43    retry,
44)
45
46LOGDIR = "/tmp/topotests/"
47TMPDIR = None
48
49
50def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True):
51    """
52    API to configure bgp on router
53
54    Parameters
55    ----------
56    * `tgen` : Topogen object
57    * `topo` : json file data
58    * `input_dict` : Input dict data, required when configuring from testcase
59    * `build` : Only for initial setup phase this is set as True.
60
61    Usage
62    -----
63    input_dict = {
64        "r1": {
65            "bgp": {
66                "local_as": "200",
67                "router_id": "22.22.22.22",
68                "graceful-restart": {
69                    "graceful-restart": True,
70                    "preserve-fw-state": True,
71                    "timer": {
72                        "restart-time": 30,
73                        "rib-stale-time": 30,
74                        "select-defer-time": 30,
75                    }
76                },
77                "address_family": {
78                    "ipv4": {
79                        "unicast": {
80                            "redistribute": [{
81                                "redist_type": "static",
82                                    "attribute": {
83                                        "metric" : 123
84                                    }
85                                },
86                                {"redist_type": "connected"}
87                            ],
88                            "advertise_networks": [
89                                {
90                                    "network": "20.0.0.0/32",
91                                    "no_of_network": 10
92                                },
93                                {
94                                    "network": "30.0.0.0/32",
95                                    "no_of_network": 10
96                                }
97                            ],
98                            "neighbor": {
99                                "r3": {
100                                    "keepalivetimer": 60,
101                                    "holddowntimer": 180,
102                                    "dest_link": {
103                                        "r4": {
104                                            "allowas-in": {
105                                                    "number_occurences": 2
106                                            },
107                                            "prefix_lists": [
108                                                {
109                                                    "name": "pf_list_1",
110                                                    "direction": "in"
111                                                }
112                                            ],
113                                            "route_maps": [{
114                                                "name": "RMAP_MED_R3",
115                                                 "direction": "in"
116                                            }],
117                                            "next_hop_self": True
118                                        },
119                                        "r1": {"graceful-restart-helper": True}
120                                    }
121                                }
122                            }
123                        }
124                    }
125                }
126            }
127        }
128    }
129
130
131    Returns
132    -------
133    True or False
134    """
135    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
136    result = False
137
138    # Flag is used when testing ipv6 over ipv4 or vice-versa
139    afi_test = False
140
141    if not input_dict:
142        input_dict = deepcopy(topo)
143    else:
144        topo = topo["routers"]
145        input_dict = deepcopy(input_dict)
146
147    for router in input_dict.keys():
148        if "bgp" not in input_dict[router]:
149            logger.debug("Router %s: 'bgp' not present in input_dict", router)
150            continue
151
152        bgp_data_list = input_dict[router]["bgp"]
153
154        if type(bgp_data_list) is not list:
155            bgp_data_list = [bgp_data_list]
156
157        for bgp_data in bgp_data_list:
158            data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
159            if data_all_bgp:
160                bgp_addr_data = bgp_data.setdefault("address_family", {})
161
162                if not bgp_addr_data:
163                    logger.debug(
164                        "Router %s: 'address_family' not present in "
165                        "input_dict for BGP",
166                        router,
167                    )
168                else:
169
170                    ipv4_data = bgp_addr_data.setdefault("ipv4", {})
171                    ipv6_data = bgp_addr_data.setdefault("ipv6", {})
172                    l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
173
174                    neigh_unicast = (
175                        True
176                        if ipv4_data.setdefault("unicast", {})
177                        or ipv6_data.setdefault("unicast", {})
178                        else False
179                    )
180
181                    l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False
182
183                    if neigh_unicast:
184                        data_all_bgp = __create_bgp_unicast_neighbor(
185                            tgen,
186                            topo,
187                            bgp_data,
188                            router,
189                            afi_test,
190                            config_data=data_all_bgp,
191                        )
192
193                    if l2vpn_evpn:
194                        data_all_bgp = __create_l2vpn_evpn_address_family(
195                            tgen, topo, bgp_data, router, config_data=data_all_bgp
196                        )
197
198            try:
199                result = create_common_configuration(
200                    tgen, router, data_all_bgp, "bgp", build, load_config
201                )
202            except InvalidCLIError:
203                # Traceback
204                errormsg = traceback.format_exc()
205                logger.error(errormsg)
206                return errormsg
207
208    logger.debug("Exiting lib API: create_router_bgp()")
209    return result
210
211
212def __create_bgp_global(tgen, input_dict, router, build=False):
213    """
214    Helper API to create bgp global configuration.
215
216    Parameters
217    ----------
218    * `tgen` : Topogen object
219    * `input_dict` : Input dict data, required when configuring from testcase
220    * `router` : router id to be configured.
221    * `build` : Only for initial setup phase this is set as True.
222
223    Returns
224    -------
225    True or False
226    """
227
228    result = False
229    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
230
231    bgp_data = input_dict
232    del_bgp_action = bgp_data.setdefault("delete", False)
233
234    config_data = []
235
236    if "local_as" not in bgp_data and build:
237        logger.debug(
238            "Router %s: 'local_as' not present in input_dict" "for BGP", router
239        )
240        return False
241
242    local_as = bgp_data.setdefault("local_as", "")
243    cmd = "router bgp {}".format(local_as)
244    vrf_id = bgp_data.setdefault("vrf", None)
245    if vrf_id:
246        cmd = "{} vrf {}".format(cmd, vrf_id)
247
248    if del_bgp_action:
249        cmd = "no {}".format(cmd)
250        config_data.append(cmd)
251
252        return config_data
253
254    config_data.append(cmd)
255    config_data.append("no bgp ebgp-requires-policy")
256
257    router_id = bgp_data.setdefault("router_id", None)
258    del_router_id = bgp_data.setdefault("del_router_id", False)
259    if del_router_id:
260        config_data.append("no bgp router-id")
261    if router_id:
262        config_data.append("bgp router-id {}".format(router_id))
263
264    config_data.append("no bgp network import-check")
265
266    bst_path = bgp_data.setdefault("bestpath", None)
267    if bst_path:
268        if "aspath" in bst_path:
269            if "delete" in bst_path:
270                config_data.append(
271                    "no bgp bestpath as-path {}".format(bst_path["aspath"])
272                )
273            else:
274                config_data.append("bgp bestpath as-path {}".format(bst_path["aspath"]))
275
276    if "graceful-restart" in bgp_data:
277        graceful_config = bgp_data["graceful-restart"]
278
279        graceful_restart = graceful_config.setdefault("graceful-restart", None)
280
281        graceful_restart_disable = graceful_config.setdefault(
282            "graceful-restart-disable", None
283        )
284
285        preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None)
286
287        disable_eor = graceful_config.setdefault("disable-eor", None)
288
289        if graceful_restart == False:
290            cmd = "no bgp graceful-restart"
291        if graceful_restart:
292            cmd = "bgp graceful-restart"
293
294        if graceful_restart is not None:
295            config_data.append(cmd)
296
297        if graceful_restart_disable == False:
298            cmd = "no bgp graceful-restart-disable"
299        if graceful_restart_disable:
300            cmd = "bgp graceful-restart-disable"
301
302        if graceful_restart_disable is not None:
303            config_data.append(cmd)
304
305        if preserve_fw_state == False:
306            cmd = "no bgp graceful-restart preserve-fw-state"
307        if preserve_fw_state:
308            cmd = "bgp graceful-restart preserve-fw-state"
309
310        if preserve_fw_state is not None:
311            config_data.append(cmd)
312
313        if disable_eor == False:
314            cmd = "no bgp graceful-restart disable-eor"
315        if disable_eor:
316            cmd = "bgp graceful-restart disable-eor"
317
318        if disable_eor is not None:
319            config_data.append(cmd)
320
321        if "timer" in bgp_data["graceful-restart"]:
322            timer = bgp_data["graceful-restart"]["timer"]
323
324            if "delete" in timer:
325                del_action = timer["delete"]
326            else:
327                del_action = False
328
329            for rs_timer, value in timer.items():
330                rs_timer_value = timer.setdefault(rs_timer, None)
331
332                if rs_timer_value and rs_timer != "delete":
333                    cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value)
334
335                    if del_action:
336                        cmd = "no {}".format(cmd)
337
338                config_data.append(cmd)
339
340    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
341    return config_data
342
343
344def __create_bgp_unicast_neighbor(
345    tgen, topo, input_dict, router, afi_test, config_data=None
346):
347    """
348    Helper API to create configuration for address-family unicast
349
350    Parameters
351    ----------
352    * `tgen` : Topogen object
353    * `topo` : json file data
354    * `input_dict` : Input dict data, required when configuring from testcase
355    * `router` : router id to be configured.
356    * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
357    * `build` : Only for initial setup phase this is set as True.
358    """
359
360    result = False
361    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
362
363    add_neigh = True
364    bgp_data = input_dict
365    if "router bgp" in config_data:
366        add_neigh = False
367
368    bgp_data = input_dict["address_family"]
369
370    for addr_type, addr_dict in bgp_data.iteritems():
371        if not addr_dict:
372            continue
373
374        if not check_address_types(addr_type) and not afi_test:
375            continue
376
377        addr_data = addr_dict["unicast"]
378        if addr_data:
379            config_data.append("address-family {} unicast".format(addr_type))
380        advertise_network = addr_data.setdefault("advertise_networks", [])
381        for advertise_network_dict in advertise_network:
382            network = advertise_network_dict["network"]
383            if type(network) is not list:
384                network = [network]
385
386            if "no_of_network" in advertise_network_dict:
387                no_of_network = advertise_network_dict["no_of_network"]
388            else:
389                no_of_network = 1
390
391            del_action = advertise_network_dict.setdefault("delete", False)
392
393            # Generating IPs for verification
394            prefix = str(ipaddress.ip_network(unicode(network[0])).prefixlen)
395            network_list = generate_ips(network, no_of_network)
396            for ip in network_list:
397                ip = str(ipaddress.ip_network(unicode(ip)).network_address)
398
399                cmd = "network {}/{}".format(ip, prefix)
400                if del_action:
401                    cmd = "no {}".format(cmd)
402
403                config_data.append(cmd)
404
405        max_paths = addr_data.setdefault("maximum_paths", {})
406        if max_paths:
407            ibgp = max_paths.setdefault("ibgp", None)
408            ebgp = max_paths.setdefault("ebgp", None)
409            if ibgp:
410                config_data.append("maximum-paths ibgp {}".format(ibgp))
411            if ebgp:
412                config_data.append("maximum-paths {}".format(ebgp))
413
414        aggregate_addresses = addr_data.setdefault("aggregate_address", [])
415        for aggregate_address in aggregate_addresses:
416            network = aggregate_address.setdefault("network", None)
417            if not network:
418                logger.debug(
419                    "Router %s: 'network' not present in " "input_dict for BGP", router
420                )
421            else:
422                cmd = "aggregate-address {}".format(network)
423
424                as_set = aggregate_address.setdefault("as_set", False)
425                summary = aggregate_address.setdefault("summary", False)
426                del_action = aggregate_address.setdefault("delete", False)
427                if as_set:
428                    cmd = "{} as-set".format(cmd)
429                if summary:
430                    cmd = "{} summary".format(cmd)
431
432                if del_action:
433                    cmd = "no {}".format(cmd)
434
435                config_data.append(cmd)
436
437        redistribute_data = addr_data.setdefault("redistribute", {})
438        if redistribute_data:
439            for redistribute in redistribute_data:
440                if "redist_type" not in redistribute:
441                    logger.debug(
442                        "Router %s: 'redist_type' not present in " "input_dict", router
443                    )
444                else:
445                    cmd = "redistribute {}".format(redistribute["redist_type"])
446                    redist_attr = redistribute.setdefault("attribute", None)
447                    if redist_attr:
448                        if isinstance(redist_attr, dict):
449                            for key, value in redist_attr.items():
450                                cmd = "{} {} {}".format(cmd, key, value)
451                        else:
452                            cmd = "{} {}".format(cmd, redist_attr)
453
454                    del_action = redistribute.setdefault("delete", False)
455                    if del_action:
456                        cmd = "no {}".format(cmd)
457                    config_data.append(cmd)
458
459        import_vrf_data = addr_data.setdefault("import", {})
460        if import_vrf_data:
461            cmd = "import vrf {}".format(import_vrf_data["vrf"])
462
463            del_action = import_vrf_data.setdefault("delete", False)
464            if del_action:
465                cmd = "no {}".format(cmd)
466            config_data.append(cmd)
467
468        if "neighbor" in addr_data:
469            neigh_data = __create_bgp_neighbor(
470                topo, input_dict, router, addr_type, add_neigh
471            )
472            config_data.extend(neigh_data)
473
474    for addr_type, addr_dict in bgp_data.iteritems():
475        if not addr_dict or not check_address_types(addr_type):
476            continue
477
478        addr_data = addr_dict["unicast"]
479        if "neighbor" in addr_data:
480            neigh_addr_data = __create_bgp_unicast_address_family(
481                topo, input_dict, router, addr_type, add_neigh
482            )
483
484            config_data.extend(neigh_addr_data)
485
486    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
487    return config_data
488
489
490def __create_l2vpn_evpn_address_family(
491    tgen, topo, input_dict, router, config_data=None
492):
493    """
494    Helper API to create configuration for l2vpn evpn address-family
495
496    Parameters
497    ----------
498    * `tgen` : Topogen object
499    * `topo` : json file data
500    * `input_dict` : Input dict data, required when configuring
501                     from testcase
502    * `router` : router id to be configured.
503    * `build` : Only for initial setup phase this is set as True.
504    """
505
506    result = False
507
508    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
509
510    bgp_data = input_dict["address_family"]
511
512    for family_type, family_dict in bgp_data.iteritems():
513        if family_type != "l2vpn":
514            continue
515
516        family_data = family_dict["evpn"]
517        if family_data:
518            config_data.append("address-family l2vpn evpn")
519
520        advertise_data = family_data.setdefault("advertise", {})
521        neighbor_data = family_data.setdefault("neighbor", {})
522        advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None)
523        rd_data = family_data.setdefault("rd", None)
524        no_rd_data = family_data.setdefault("no rd", False)
525        route_target_data = family_data.setdefault("route-target", {})
526
527        if advertise_data:
528            for address_type, unicast_type in advertise_data.items():
529
530                if isinstance(unicast_type, dict):
531                    for key, value in unicast_type.items():
532                        cmd = "advertise {} {}".format(address_type, key)
533
534                        if value:
535                            route_map = value.setdefault("route-map", {})
536                            advertise_del_action = value.setdefault("delete", None)
537
538                            if route_map:
539                                cmd = "{} route-map {}".format(cmd, route_map)
540
541                            if advertise_del_action:
542                                cmd = "no {}".format(cmd)
543
544                    config_data.append(cmd)
545
546        if neighbor_data:
547            for neighbor, neighbor_data in neighbor_data.items():
548                ipv4_neighbor = neighbor_data.setdefault("ipv4", {})
549                ipv6_neighbor = neighbor_data.setdefault("ipv6", {})
550
551                if ipv4_neighbor:
552                    for neighbor_name, action in ipv4_neighbor.items():
553                        neighbor_ip = topo[neighbor]["links"][neighbor_name][
554                            "ipv4"
555                        ].split("/")[0]
556
557                        if isinstance(action, dict):
558                            next_hop_self = action.setdefault("next_hop_self", None)
559                            route_maps = action.setdefault("route_maps", {})
560
561                            if next_hop_self is not None:
562                                if next_hop_self is True:
563                                    config_data.append(
564                                        "neighbor {} "
565                                        "next-hop-self".format(neighbor_ip)
566                                    )
567                                elif next_hop_self is False:
568                                    config_data.append(
569                                        "no neighbor {} "
570                                        "next-hop-self".format(neighbor_ip)
571                                    )
572
573                            if route_maps:
574                                for route_map in route_maps:
575                                    name = route_map.setdefault("name", {})
576                                    direction = route_map.setdefault("direction", "in")
577                                    del_action = route_map.setdefault("delete", False)
578
579                                    if not name:
580                                        logger.info(
581                                            "Router %s: 'name' "
582                                            "not present in "
583                                            "input_dict for BGP "
584                                            "neighbor route name",
585                                            router,
586                                        )
587                                    else:
588                                        cmd = "neighbor {} route-map {} " "{}".format(
589                                            neighbor_ip, name, direction
590                                        )
591
592                                        if del_action:
593                                            cmd = "no {}".format(cmd)
594
595                                        config_data.append(cmd)
596
597                        else:
598                            if action == "activate":
599                                cmd = "neighbor {} activate".format(neighbor_ip)
600                            elif action == "deactivate":
601                                cmd = "no neighbor {} activate".format(neighbor_ip)
602
603                            config_data.append(cmd)
604
605                if ipv6_neighbor:
606                    for neighbor_name, action in ipv4_neighbor.items():
607                        neighbor_ip = topo[neighbor]["links"][neighbor_name][
608                            "ipv6"
609                        ].split("/")[0]
610                        if action == "activate":
611                            cmd = "neighbor {} activate".format(neighbor_ip)
612                        elif action == "deactivate":
613                            cmd = "no neighbor {} activate".format(neighbor_ip)
614
615                        config_data.append(cmd)
616
617        if advertise_all_vni_data == True:
618            cmd = "advertise-all-vni"
619            config_data.append(cmd)
620        elif advertise_all_vni_data == False:
621            cmd = "no advertise-all-vni"
622            config_data.append(cmd)
623
624        if rd_data:
625            cmd = "rd {}".format(rd_data)
626            config_data.append(cmd)
627
628        if no_rd_data:
629            cmd = "no rd {}".format(no_rd_data)
630            config_data.append(cmd)
631
632        if route_target_data:
633            for rt_type, rt_dict in route_target_data.items():
634                for _rt_dict in rt_dict:
635                    rt_value = _rt_dict.setdefault("value", None)
636                    del_rt = _rt_dict.setdefault("delete", None)
637
638                    if rt_value:
639                        cmd = "route-target {} {}".format(rt_type, rt_value)
640                    if del_rt:
641                        cmd = "no {}".format(cmd)
642
643                    config_data.append(cmd)
644
645    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
646
647    return config_data
648
649
650def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
651    """
652    Helper API to create neighbor specific configuration
653
654    Parameters
655    ----------
656    * `tgen` : Topogen object
657    * `topo` : json file data
658    * `input_dict` : Input dict data, required when configuring from testcase
659    * `router` : router id to be configured
660    """
661
662    config_data = []
663    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
664
665    bgp_data = input_dict["address_family"]
666    neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
667
668    for name, peer_dict in neigh_data.iteritems():
669        for dest_link, peer in peer_dict["dest_link"].iteritems():
670            nh_details = topo[name]
671
672            if "vrfs" in topo[router] or type(nh_details["bgp"]) is list:
673                remote_as = nh_details["bgp"][0]["local_as"]
674            else:
675                remote_as = nh_details["bgp"]["local_as"]
676
677            update_source = None
678
679            if dest_link in nh_details["links"].keys():
680                ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
681            # Loopback interface
682            if "source_link" in peer and peer["source_link"] == "lo":
683                update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
684
685            neigh_cxt = "neighbor {}".format(ip_addr)
686
687            if add_neigh:
688                config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
689            if addr_type == "ipv6":
690                config_data.append("address-family ipv6 unicast")
691                config_data.append("{} activate".format(neigh_cxt))
692
693            disable_connected = peer.setdefault("disable_connected_check", False)
694            keep_alive = peer.setdefault("keepalivetimer", 60)
695            hold_down = peer.setdefault("holddowntimer", 180)
696            password = peer.setdefault("password", None)
697            no_password = peer.setdefault("no_password", None)
698            max_hop_limit = peer.setdefault("ebgp_multihop", 1)
699            graceful_restart = peer.setdefault("graceful-restart", None)
700            graceful_restart_helper = peer.setdefault("graceful-restart-helper", None)
701            graceful_restart_disable = peer.setdefault("graceful-restart-disable", None)
702
703            if update_source:
704                config_data.append(
705                    "{} update-source {}".format(neigh_cxt, update_source)
706                )
707            if disable_connected:
708                config_data.append(
709                    "{} disable-connected-check".format(disable_connected)
710                )
711            if update_source:
712                config_data.append(
713                    "{} update-source {}".format(neigh_cxt, update_source)
714                )
715            if int(keep_alive) != 60 and int(hold_down) != 180:
716                config_data.append(
717                    "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
718                )
719
720            if graceful_restart:
721                config_data.append("{} graceful-restart".format(neigh_cxt))
722            elif graceful_restart == False:
723                config_data.append("no {} graceful-restart".format(neigh_cxt))
724
725            if graceful_restart_helper:
726                config_data.append("{} graceful-restart-helper".format(neigh_cxt))
727            elif graceful_restart_helper == False:
728                config_data.append("no {} graceful-restart-helper".format(neigh_cxt))
729
730            if graceful_restart_disable:
731                config_data.append("{} graceful-restart-disable".format(neigh_cxt))
732            elif graceful_restart_disable == False:
733                config_data.append("no {} graceful-restart-disable".format(neigh_cxt))
734
735            if password:
736                config_data.append("{} password {}".format(neigh_cxt, password))
737
738            if no_password:
739                config_data.append("no {} password {}".format(neigh_cxt, no_password))
740
741            if max_hop_limit > 1:
742                config_data.append(
743                    "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
744                )
745                config_data.append("{} enforce-multihop".format(neigh_cxt))
746
747    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
748    return config_data
749
750
751def __create_bgp_unicast_address_family(
752    topo, input_dict, router, addr_type, add_neigh=True
753):
754    """
755    API prints bgp global config to bgp_json file.
756
757    Parameters
758    ----------
759    * `bgp_cfg` : BGP class variables have BGP config saved in it for
760                  particular router,
761    * `local_as_no` : Local as number
762    * `router_id` : Router-id
763    * `ecmp_path` : ECMP max path
764    * `gr_enable` : BGP global gracefull restart config
765    """
766
767    config_data = []
768    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
769
770    bgp_data = input_dict["address_family"]
771    neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
772
773    for peer_name, peer_dict in deepcopy(neigh_data).iteritems():
774        for dest_link, peer in peer_dict["dest_link"].iteritems():
775            deactivate = None
776            activate = None
777            nh_details = topo[peer_name]
778            activate_addr_family = peer.setdefault("activate", None)
779            deactivate_addr_family = peer.setdefault("deactivate", None)
780            # Loopback interface
781            if "source_link" in peer and peer["source_link"] == "lo":
782                for destRouterLink, data in sorted(nh_details["links"].iteritems()):
783                    if "type" in data and data["type"] == "loopback":
784                        if dest_link == destRouterLink:
785                            ip_addr = nh_details["links"][destRouterLink][
786                                addr_type
787                            ].split("/")[0]
788
789            # Physical interface
790            else:
791                if dest_link in nh_details["links"].keys():
792
793                    ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
794                    if addr_type == "ipv4" and bgp_data["ipv6"]:
795                        deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
796                            0
797                        ]
798
799            neigh_cxt = "neighbor {}".format(ip_addr)
800            config_data.append("address-family {} unicast".format(addr_type))
801
802            if activate_addr_family is not None:
803                config_data.append(
804                    "address-family {} unicast".format(activate_addr_family)
805                )
806
807                config_data.append("{} activate".format(neigh_cxt))
808
809            if deactivate and activate_addr_family is None:
810                config_data.append("no neighbor {} activate".format(deactivate))
811
812            if deactivate_addr_family is not None:
813                config_data.append(
814                    "address-family {} unicast".format(deactivate_addr_family)
815                )
816                config_data.append("no {} activate".format(neigh_cxt))
817
818            next_hop_self = peer.setdefault("next_hop_self", None)
819            send_community = peer.setdefault("send_community", None)
820            prefix_lists = peer.setdefault("prefix_lists", {})
821            route_maps = peer.setdefault("route_maps", {})
822            no_send_community = peer.setdefault("no_send_community", None)
823            allowas_in = peer.setdefault("allowas-in", None)
824
825            # next-hop-self
826            if next_hop_self is not None:
827                if next_hop_self is True:
828                    config_data.append("{} next-hop-self".format(neigh_cxt))
829                else:
830                    config_data.append("no {} next-hop-self".format(neigh_cxt))
831
832            # send_community
833            if send_community:
834                config_data.append("{} send-community".format(neigh_cxt))
835
836            # no_send_community
837            if no_send_community:
838                config_data.append(
839                    "no {} send-community {}".format(neigh_cxt, no_send_community)
840                )
841
842            if "allowas_in" in peer:
843                allow_as_in = peer["allowas_in"]
844                config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
845
846            if "no_allowas_in" in peer:
847                allow_as_in = peer["no_allowas_in"]
848                config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
849            if prefix_lists:
850                for prefix_list in prefix_lists:
851                    name = prefix_list.setdefault("name", {})
852                    direction = prefix_list.setdefault("direction", "in")
853                    del_action = prefix_list.setdefault("delete", False)
854                    if not name:
855                        logger.info(
856                            "Router %s: 'name' not present in "
857                            "input_dict for BGP neighbor prefix lists",
858                            router,
859                        )
860                    else:
861                        cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
862                        if del_action:
863                            cmd = "no {}".format(cmd)
864                        config_data.append(cmd)
865
866            if route_maps:
867                for route_map in route_maps:
868                    name = route_map.setdefault("name", {})
869                    direction = route_map.setdefault("direction", "in")
870                    del_action = route_map.setdefault("delete", False)
871                    if not name:
872                        logger.info(
873                            "Router %s: 'name' not present in "
874                            "input_dict for BGP neighbor route name",
875                            router,
876                        )
877                    else:
878                        cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
879                        if del_action:
880                            cmd = "no {}".format(cmd)
881                        config_data.append(cmd)
882
883            if allowas_in:
884                number_occurences = allowas_in.setdefault("number_occurences", {})
885                del_action = allowas_in.setdefault("delete", False)
886
887                cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
888
889                if del_action:
890                    cmd = "no {}".format(cmd)
891
892                config_data.append(cmd)
893
894    return config_data
895
896
897def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
898    """
899    API will save the current config to router's /etc/frr/ for BGPd
900    daemon(bgpd.conf file)
901
902    Paramters
903    ---------
904    * `tgen`  : Topogen object
905    * `topo`  : json file data
906    * `input_dict` : defines for which router, and which config
907                     needs to be modified
908
909    Usage:
910    ------
911    # Modify graceful-restart config not to set f-bit
912    # and write to /etc/frr
913
914    # Api call to delete advertised networks
915    input_dict_2 = {
916        "r5": {
917            "bgp": {
918                "address_family": {
919                    "ipv4": {
920                        "unicast": {
921                            "advertise_networks": [
922                                {
923                                    "network": "101.0.20.1/32",
924                                    "no_of_network": 5,
925                                    "delete": True
926                                }
927                            ],
928                        }
929                    },
930                    "ipv6": {
931                        "unicast": {
932                            "advertise_networks": [
933                                {
934                                    "network": "5::1/128",
935                                    "no_of_network": 5,
936                                    "delete": True
937                                }
938                            ],
939                        }
940                    }
941                }
942            }
943        }
944    }
945
946    result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
947
948    """
949
950    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
951    try:
952
953        global LOGDIR
954
955        result = create_router_bgp(
956            tgen, topo, input_dict, build=False, load_config=False
957        )
958        if result is not True:
959            return result
960
961        # Copy bgp config file to /etc/frr
962        for dut in input_dict.keys():
963            router_list = tgen.routers()
964            for router, rnode in router_list.iteritems():
965                if router != dut:
966                    continue
967
968                TMPDIR = os.path.join(LOGDIR, tgen.modname)
969
970                logger.info("Delete BGP config when BGPd is down in {}".format(router))
971                # Reading the config from /tmp/topotests and
972                # copy to /etc/frr/bgpd.conf
973                cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
974                    TMPDIR, router, FRRCFG_FILE
975                )
976                router_list[router].run(cmd)
977
978    except Exception as e:
979        # handle any exception
980        logger.error("Error %s occured. Arguments %s.", e.message, e.args)
981
982        # Traceback
983        errormsg = traceback.format_exc()
984        logger.error(errormsg)
985        return errormsg
986
987    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
988    return True
989
990
991#############################################
992# Verification APIs
993#############################################
994@retry(attempts=3, wait=2, return_is_str=True)
995def verify_router_id(tgen, topo, input_dict):
996    """
997    Running command "show ip bgp json" for DUT and reading router-id
998    from input_dict and verifying with command output.
999    1. Statically modfified router-id should take place
1000    2. When static router-id is deleted highest loopback should
1001       become router-id
1002    3. When loopback intf is down then highest physcial intf
1003       should become router-id
1004
1005    Parameters
1006    ----------
1007    * `tgen`: topogen object
1008    * `topo`: input json file data
1009    * `input_dict`: input dictionary, have details of Device Under Test, for
1010                    which user wants to test the data
1011    Usage
1012    -----
1013    # Verify if router-id for r1 is 12.12.12.12
1014    input_dict = {
1015        "r1":{
1016            "router_id": "12.12.12.12"
1017        }
1018    # Verify that router-id for r1 is highest interface ip
1019    input_dict = {
1020        "routers": ["r1"]
1021    }
1022    result = verify_router_id(tgen, topo, input_dict)
1023
1024    Returns
1025    -------
1026    errormsg(str) or True
1027    """
1028
1029    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1030    for router in input_dict.keys():
1031        if router not in tgen.routers():
1032            continue
1033
1034        rnode = tgen.routers()[router]
1035
1036        del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
1037
1038        logger.info("Checking router %s router-id", router)
1039        show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1040        router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
1041        router_id_out = ipaddress.IPv4Address(unicode(router_id_out))
1042
1043        # Once router-id is deleted, highest interface ip should become
1044        # router-id
1045        if del_router_id:
1046            router_id = find_interface_with_greater_ip(topo, router)
1047        else:
1048            router_id = input_dict[router]["bgp"]["router_id"]
1049        router_id = ipaddress.IPv4Address(unicode(router_id))
1050
1051        if router_id == router_id_out:
1052            logger.info("Found expected router-id %s for router %s", router_id, router)
1053        else:
1054            errormsg = (
1055                "Router-id for router:{} mismatch, expected:"
1056                " {} but found:{}".format(router, router_id, router_id_out)
1057            )
1058            return errormsg
1059
1060    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1061    return True
1062
1063
1064@retry(attempts=50, wait=3, return_is_str=True)
1065def verify_bgp_convergence(tgen, topo, dut=None):
1066    """
1067    API will verify if BGP is converged with in the given time frame.
1068    Running "show bgp summary json" command and verify bgp neighbor
1069    state is established,
1070    Parameters
1071    ----------
1072    * `tgen`: topogen object
1073    * `topo`: input json file data
1074    * `dut`: device under test
1075    Usage
1076    -----
1077    # To veriry is BGP is converged for all the routers used in
1078    topology
1079    results = verify_bgp_convergence(tgen, topo, dut="r1")
1080    Returns
1081    -------
1082    errormsg(str) or True
1083    """
1084
1085    logger.debug("Entering lib API: verify_bgp_convergence()")
1086    for router, rnode in tgen.routers().iteritems():
1087        if "bgp" not in topo["routers"][router]:
1088            continue
1089
1090        if dut is not None and dut != router:
1091            continue
1092
1093        logger.info("Verifying BGP Convergence on router %s:", router)
1094        show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1095        # Verifying output dictionary show_bgp_json is empty or not
1096        if not bool(show_bgp_json):
1097            errormsg = "BGP is not running"
1098            return errormsg
1099
1100        # To find neighbor ip type
1101        bgp_data_list = topo["routers"][router]["bgp"]
1102
1103        if type(bgp_data_list) is not list:
1104            bgp_data_list = [bgp_data_list]
1105
1106        for bgp_data in bgp_data_list:
1107            if "vrf" in bgp_data:
1108                vrf = bgp_data["vrf"]
1109                if vrf is None:
1110                    vrf = "default"
1111            else:
1112                vrf = "default"
1113
1114            # To find neighbor ip type
1115            bgp_addr_type = bgp_data["address_family"]
1116            if "l2vpn" in bgp_addr_type:
1117                total_evpn_peer = 0
1118
1119                if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
1120                    continue
1121
1122                bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
1123                total_evpn_peer += len(bgp_neighbors)
1124
1125                no_of_evpn_peer = 0
1126                for bgp_neighbor, peer_data in bgp_neighbors.items():
1127                    for _addr_type, dest_link_dict in peer_data.items():
1128                        data = topo["routers"][bgp_neighbor]["links"]
1129                        for dest_link in dest_link_dict.keys():
1130                            if dest_link in data:
1131                                peer_details = peer_data[_addr_type][dest_link]
1132
1133                                neighbor_ip = data[dest_link][_addr_type].split("/")[0]
1134                                nh_state = None
1135
1136                                if (
1137                                    "ipv4Unicast" in show_bgp_json[vrf]
1138                                    or "ipv6Unicast" in show_bgp_json[vrf]
1139                                ):
1140                                    errormsg = (
1141                                        "[DUT: %s] VRF: %s, "
1142                                        "ipv4Unicast/ipv6Unicast"
1143                                        " address-family present"
1144                                        " under l2vpn" % (router, vrf)
1145                                    )
1146                                    return errormsg
1147
1148                                l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
1149                                    "peers"
1150                                ]
1151                                nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
1152
1153                                if nh_state == "Established":
1154                                    no_of_evpn_peer += 1
1155
1156                if no_of_evpn_peer == total_evpn_peer:
1157                    logger.info(
1158                        "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1159                        router,
1160                        vrf,
1161                    )
1162                else:
1163                    errormsg = (
1164                        "[DUT: %s] VRF: %s, BGP is not converged "
1165                        "for evpn peers" % (router, vrf)
1166                    )
1167                    return errormsg
1168            else:
1169                for addr_type in bgp_addr_type.keys():
1170                    if not check_address_types(addr_type):
1171                        continue
1172                    total_peer = 0
1173
1174                    bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1175
1176                    for bgp_neighbor in bgp_neighbors:
1177                        total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1178
1179                for addr_type in bgp_addr_type.keys():
1180                    if not check_address_types(addr_type):
1181                        continue
1182                    bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1183
1184                    no_of_peer = 0
1185                    for bgp_neighbor, peer_data in bgp_neighbors.items():
1186                        for dest_link in peer_data["dest_link"].keys():
1187                            data = topo["routers"][bgp_neighbor]["links"]
1188                            if dest_link in data:
1189                                peer_details = peer_data["dest_link"][dest_link]
1190                                # for link local neighbors
1191                                if (
1192                                    "neighbor_type" in peer_details
1193                                    and peer_details["neighbor_type"] == "link-local"
1194                                ):
1195                                    neighbor_ip = get_ipv6_linklocal_address(
1196                                        topo["routers"], bgp_neighbor, dest_link
1197                                    )
1198                                elif "source_link" in peer_details:
1199                                    neighbor_ip = topo["routers"][bgp_neighbor][
1200                                        "links"
1201                                    ][peer_details["source_link"]][addr_type].split(
1202                                        "/"
1203                                    )[
1204                                        0
1205                                    ]
1206                                elif (
1207                                    "neighbor_type" in peer_details
1208                                    and peer_details["neighbor_type"] == "unnumbered"
1209                                ):
1210                                    neighbor_ip = data[dest_link]["peer-interface"]
1211                                else:
1212                                    neighbor_ip = data[dest_link][addr_type].split("/")[
1213                                        0
1214                                    ]
1215                                nh_state = None
1216
1217                                if addr_type == "ipv4":
1218                                    ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
1219                                        "peers"
1220                                    ]
1221                                    nh_state = ipv4_data[neighbor_ip]["state"]
1222                                else:
1223                                    ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
1224                                        "peers"
1225                                    ]
1226                                    nh_state = ipv6_data[neighbor_ip]["state"]
1227
1228                                if nh_state == "Established":
1229                                    no_of_peer += 1
1230
1231                    if no_of_peer == total_peer:
1232                        logger.info(
1233                            "[DUT: %s] VRF: %s, BGP is Converged for %s address-family",
1234                            router,
1235                            vrf,
1236                            addr_type,
1237                        )
1238                    else:
1239                        errormsg = (
1240                            "[DUT: %s] VRF: %s, BGP is not converged for %s address-family"
1241                            % (router, vrf, addr_type)
1242                        )
1243                        return errormsg
1244
1245    logger.debug("Exiting API: verify_bgp_convergence()")
1246    return True
1247
1248
1249@retry(attempts=3, wait=4, return_is_str=True)
1250def verify_bgp_community(
1251    tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False
1252):
1253    """
1254    API to veiryf BGP large community is attached in route for any given
1255    DUT by running "show bgp ipv4/6 {route address} json" command.
1256
1257    Parameters
1258    ----------
1259    * `tgen`: topogen object
1260    * `addr_type` : ip type, ipv4/ipv6
1261    * `dut`: Device Under Test
1262    * `network`: network for which set criteria needs to be verified
1263    * `input_dict`: having details like - for which router, community and
1264            values needs to be verified
1265    * `vrf`: VRF name
1266    * `bestpath`: To check best path cli
1267
1268    Usage
1269    -----
1270    networks = ["200.50.2.0/32"]
1271    input_dict = {
1272        "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1273    }
1274    result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1275
1276    Returns
1277    -------
1278    errormsg(str) or True
1279    """
1280
1281    logger.debug("Entering lib API: verify_bgp_community()")
1282    if router not in tgen.routers():
1283        return False
1284
1285    rnode = tgen.routers()[router]
1286
1287    logger.info(
1288        "Verifying BGP community attributes on dut %s: for %s " "network %s",
1289        router,
1290        addr_type,
1291        network,
1292    )
1293
1294    command = "show bgp"
1295
1296    sleep(5)
1297    for net in network:
1298        if vrf:
1299            cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
1300        elif bestpath:
1301            cmd = "{} {} {} bestpath json".format(command, addr_type, net)
1302        else:
1303            cmd = "{} {} {} json".format(command, addr_type, net)
1304
1305        show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
1306        if "paths" not in show_bgp_json:
1307            return "Prefix {} not found in BGP table of router: {}".format(net, router)
1308
1309        as_paths = show_bgp_json["paths"]
1310        found = False
1311        for i in range(len(as_paths)):
1312            if (
1313                "largeCommunity" in show_bgp_json["paths"][i]
1314                or "community" in show_bgp_json["paths"][i]
1315            ):
1316                found = True
1317                logger.info(
1318                    "Large Community attribute is found for route:" " %s in router: %s",
1319                    net,
1320                    router,
1321                )
1322                if input_dict is not None:
1323                    for criteria, comm_val in input_dict.items():
1324                        show_val = show_bgp_json["paths"][i][criteria]["string"]
1325                        if comm_val == show_val:
1326                            logger.info(
1327                                "Verifying BGP %s for prefix: %s"
1328                                " in router: %s, found expected"
1329                                " value: %s",
1330                                criteria,
1331                                net,
1332                                router,
1333                                comm_val,
1334                            )
1335                        else:
1336                            errormsg = (
1337                                "Failed: Verifying BGP attribute"
1338                                " {} for route: {} in router: {}"
1339                                ", expected  value: {} but found"
1340                                ": {}".format(criteria, net, router, comm_val, show_val)
1341                            )
1342                            return errormsg
1343
1344        if not found:
1345            errormsg = (
1346                "Large Community attribute is not found for route: "
1347                "{} in router: {} ".format(net, router)
1348            )
1349            return errormsg
1350
1351    logger.debug("Exiting lib API: verify_bgp_community()")
1352    return True
1353
1354
1355def modify_as_number(tgen, topo, input_dict):
1356    """
1357    API reads local_as and remote_as from user defined input_dict and
1358    modify router"s ASNs accordingly. Router"s config is modified and
1359    recent/changed config is loadeded to router.
1360
1361    Parameters
1362    ----------
1363    * `tgen`  : Topogen object
1364    * `topo`  : json file data
1365    * `input_dict` :  defines for which router ASNs needs to be modified
1366
1367    Usage
1368    -----
1369    To modify ASNs for router r1
1370    input_dict = {
1371        "r1": {
1372            "bgp": {
1373                "local_as": 131079
1374            }
1375        }
1376    result = modify_as_number(tgen, topo, input_dict)
1377
1378    Returns
1379    -------
1380    errormsg(str) or True
1381    """
1382
1383    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1384    try:
1385
1386        new_topo = deepcopy(topo["routers"])
1387        router_dict = {}
1388        for router in input_dict.keys():
1389            # Remove bgp configuration
1390
1391            router_dict.update({router: {"bgp": {"delete": True}}})
1392
1393            new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"]
1394
1395        logger.info("Removing bgp configuration")
1396        create_router_bgp(tgen, topo, router_dict)
1397
1398        logger.info("Applying modified bgp configuration")
1399        create_router_bgp(tgen, new_topo)
1400
1401    except Exception as e:
1402        # handle any exception
1403        logger.error("Error %s occured. Arguments %s.", e.message, e.args)
1404
1405        # Traceback
1406        errormsg = traceback.format_exc()
1407        logger.error(errormsg)
1408        return errormsg
1409
1410    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1411    return True
1412
1413
1414@retry(attempts=3, wait=2, return_is_str=True)
1415def verify_as_numbers(tgen, topo, input_dict):
1416    """
1417    This API is to verify AS numbers for given DUT by running
1418    "show ip bgp neighbor json" command. Local AS and Remote AS
1419    will ve verified with input_dict data and command output.
1420
1421    Parameters
1422    ----------
1423    * `tgen`: topogen object
1424    * `topo`: input json file data
1425    * `addr_type` : ip type, ipv4/ipv6
1426    * `input_dict`: defines - for which router, AS numbers needs to be verified
1427
1428    Usage
1429    -----
1430    input_dict = {
1431        "r1": {
1432            "bgp": {
1433                "local_as": 131079
1434            }
1435        }
1436    }
1437    result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1438
1439    Returns
1440    -------
1441    errormsg(str) or True
1442    """
1443
1444    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1445    for router in input_dict.keys():
1446        if router not in tgen.routers():
1447            continue
1448
1449        rnode = tgen.routers()[router]
1450
1451        logger.info("Verifying AS numbers for  dut %s:", router)
1452
1453        show_ip_bgp_neighbor_json = run_frr_cmd(
1454            rnode, "show ip bgp neighbor json", isjson=True
1455        )
1456        local_as = input_dict[router]["bgp"]["local_as"]
1457        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1458
1459        for addr_type in bgp_addr_type:
1460            if not check_address_types(addr_type):
1461                continue
1462
1463            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1464
1465            for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1466                remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
1467                for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1468                    neighbor_ip = None
1469                    data = topo["routers"][bgp_neighbor]["links"]
1470
1471                    if dest_link in data:
1472                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
1473                    neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
1474                    # Verify Local AS for router
1475                    if neigh_data["localAs"] != local_as:
1476                        errormsg = (
1477                            "Failed: Verify local_as for dut {},"
1478                            " found: {} but expected: {}".format(
1479                                router, neigh_data["localAs"], local_as
1480                            )
1481                        )
1482                        return errormsg
1483                    else:
1484                        logger.info(
1485                            "Verified local_as for dut %s, found" " expected: %s",
1486                            router,
1487                            local_as,
1488                        )
1489
1490                    # Verify Remote AS for neighbor
1491                    if neigh_data["remoteAs"] != remote_as:
1492                        errormsg = (
1493                            "Failed: Verify remote_as for dut "
1494                            "{}'s neighbor {}, found: {} but "
1495                            "expected: {}".format(
1496                                router, bgp_neighbor, neigh_data["remoteAs"], remote_as
1497                            )
1498                        )
1499                        return errormsg
1500                    else:
1501                        logger.info(
1502                            "Verified remote_as for dut %s's "
1503                            "neighbor %s, found expected: %s",
1504                            router,
1505                            bgp_neighbor,
1506                            remote_as,
1507                        )
1508
1509    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1510    return True
1511
1512
1513@retry(attempts=50, wait=3, return_is_str=True)
1514def verify_bgp_convergence_from_running_config(tgen, dut=None):
1515    """
1516    API to verify BGP convergence b/w loopback and physical interface.
1517    This API would be used when routers have BGP neighborship is loopback
1518    to physical or vice-versa
1519
1520    Parameters
1521    ----------
1522    * `tgen`: topogen object
1523    * `dut`: device under test
1524
1525    Usage
1526    -----
1527    results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1528        dut="r1")
1529
1530    Returns
1531    -------
1532    errormsg(str) or True
1533    """
1534
1535    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1536
1537    for router, rnode in tgen.routers().iteritems():
1538        if dut is not None and dut != router:
1539            continue
1540
1541        logger.info("Verifying BGP Convergence on router %s:", router)
1542        show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1543        # Verifying output dictionary show_bgp_json is empty or not
1544        if not bool(show_bgp_json):
1545            errormsg = "BGP is not running"
1546            return errormsg
1547
1548        for vrf, addr_family_data in show_bgp_json.items():
1549            for address_family, neighborship_data in addr_family_data.items():
1550                total_peer = 0
1551                no_of_peer = 0
1552
1553                total_peer = len(neighborship_data["peers"].keys())
1554
1555                for peer, peer_data in neighborship_data["peers"].items():
1556                    if peer_data["state"] == "Established":
1557                        no_of_peer += 1
1558
1559                if total_peer != no_of_peer:
1560                    errormsg = (
1561                        "[DUT: %s] VRF: %s, BGP is not converged"
1562                        " for peer: %s" % (router, vrf, peer)
1563                    )
1564                    return errormsg
1565
1566            logger.info("[DUT: %s]: vrf: %s, BGP is Converged", router, vrf)
1567
1568    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1569
1570    return True
1571
1572
1573def clear_bgp(tgen, addr_type, router, vrf=None):
1574    """
1575    This API is to clear bgp neighborship by running
1576    clear ip bgp */clear bgp ipv6 * command,
1577
1578    Parameters
1579    ----------
1580    * `tgen`: topogen object
1581    * `addr_type`: ip type ipv4/ipv6
1582    * `router`: device under test
1583    * `vrf`: vrf name
1584
1585    Usage
1586    -----
1587    clear_bgp(tgen, addr_type, "r1")
1588    """
1589
1590    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1591
1592    if router not in tgen.routers():
1593        return False
1594
1595    rnode = tgen.routers()[router]
1596
1597    if vrf:
1598        if type(vrf) is not list:
1599            vrf = [vrf]
1600
1601    # Clearing BGP
1602    logger.info("Clearing BGP neighborship for router %s..", router)
1603    if addr_type == "ipv4":
1604        if vrf:
1605            for _vrf in vrf:
1606                run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
1607        else:
1608            run_frr_cmd(rnode, "clear ip bgp *")
1609    elif addr_type == "ipv6":
1610        if vrf:
1611            for _vrf in vrf:
1612                run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
1613        else:
1614            run_frr_cmd(rnode, "clear bgp ipv6 *")
1615    else:
1616        run_frr_cmd(rnode, "clear bgp *")
1617
1618    sleep(5)
1619
1620    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1621
1622
1623def clear_bgp_and_verify(tgen, topo, router):
1624    """
1625    This API is to clear bgp neighborship and verify bgp neighborship
1626    is coming up(BGP is converged) usinf "show bgp summary json" command
1627    and also verifying for all bgp neighbors uptime before and after
1628    clear bgp sessions is different as the uptime must be changed once
1629    bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1630
1631    Parameters
1632    ----------
1633    * `tgen`: topogen object
1634    * `topo`: input json file data
1635    * `router`: device under test
1636
1637    Usage
1638    -----
1639    result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1640
1641    Returns
1642    -------
1643    errormsg(str) or True
1644    """
1645
1646    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1647
1648    if router not in tgen.routers():
1649        return False
1650
1651    rnode = tgen.routers()[router]
1652
1653    peer_uptime_before_clear_bgp = {}
1654    sleeptime = 3
1655
1656    # Verifying BGP convergence before bgp clear command
1657    for retry in range(50):
1658        # Waiting for BGP to converge
1659        logger.info(
1660            "Waiting for %s sec for BGP to converge on router" " %s...",
1661            sleeptime,
1662            router,
1663        )
1664        sleep(sleeptime)
1665
1666        show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1667        # Verifying output dictionary show_bgp_json is empty or not
1668        if not bool(show_bgp_json):
1669            errormsg = "BGP is not running"
1670            return errormsg
1671
1672        # To find neighbor ip type
1673        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1674        total_peer = 0
1675        for addr_type in bgp_addr_type.keys():
1676
1677            if not check_address_types(addr_type):
1678                continue
1679
1680            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1681
1682            for bgp_neighbor in bgp_neighbors:
1683                total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1684
1685        no_of_peer = 0
1686        for addr_type in bgp_addr_type:
1687            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1688
1689            for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1690                for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1691                    data = topo["routers"][bgp_neighbor]["links"]
1692
1693                    if dest_link in data:
1694                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
1695                        if addr_type == "ipv4":
1696                            ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1697                            nh_state = ipv4_data[neighbor_ip]["state"]
1698
1699                            # Peer up time dictionary
1700                            peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
1701                                neighbor_ip
1702                            ]["peerUptimeEstablishedEpoch"]
1703                        else:
1704                            ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1705                            nh_state = ipv6_data[neighbor_ip]["state"]
1706
1707                            # Peer up time dictionary
1708                            peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
1709                                neighbor_ip
1710                            ]["peerUptimeEstablishedEpoch"]
1711
1712                        if nh_state == "Established":
1713                            no_of_peer += 1
1714
1715        if no_of_peer == total_peer:
1716            logger.info("BGP is Converged for router %s before bgp" " clear", router)
1717            break
1718        else:
1719            logger.info(
1720                "BGP is not yet Converged for router %s " "before bgp clear", router
1721            )
1722    else:
1723        errormsg = (
1724            "TIMEOUT!! BGP is not converged in {} seconds for"
1725            " router {}".format(retry * sleeptime, router)
1726        )
1727        return errormsg
1728
1729    # Clearing BGP
1730    logger.info("Clearing BGP neighborship for router %s..", router)
1731    for addr_type in bgp_addr_type.keys():
1732        if addr_type == "ipv4":
1733            run_frr_cmd(rnode, "clear ip bgp *")
1734        elif addr_type == "ipv6":
1735            run_frr_cmd(rnode, "clear bgp ipv6 *")
1736
1737    peer_uptime_after_clear_bgp = {}
1738    # Verifying BGP convergence after bgp clear command
1739    for retry in range(50):
1740
1741        # Waiting for BGP to converge
1742        logger.info(
1743            "Waiting for %s sec for BGP to converge on router" " %s...",
1744            sleeptime,
1745            router,
1746        )
1747        sleep(sleeptime)
1748
1749        show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1750        # Verifying output dictionary show_bgp_json is empty or not
1751        if not bool(show_bgp_json):
1752            errormsg = "BGP is not running"
1753            return errormsg
1754
1755        # To find neighbor ip type
1756        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1757        total_peer = 0
1758        for addr_type in bgp_addr_type.keys():
1759            if not check_address_types(addr_type):
1760                continue
1761
1762            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1763
1764            for bgp_neighbor in bgp_neighbors:
1765                total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1766
1767        no_of_peer = 0
1768        for addr_type in bgp_addr_type:
1769            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1770
1771            for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1772                for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1773                    data = topo["routers"][bgp_neighbor]["links"]
1774
1775                    if dest_link in data:
1776                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
1777                        if addr_type == "ipv4":
1778                            ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1779                            nh_state = ipv4_data[neighbor_ip]["state"]
1780                            peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
1781                                neighbor_ip
1782                            ]["peerUptimeEstablishedEpoch"]
1783                        else:
1784                            ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1785                            nh_state = ipv6_data[neighbor_ip]["state"]
1786                            # Peer up time dictionary
1787                            peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
1788                                neighbor_ip
1789                            ]["peerUptimeEstablishedEpoch"]
1790
1791                        if nh_state == "Established":
1792                            no_of_peer += 1
1793
1794        if no_of_peer == total_peer:
1795            logger.info("BGP is Converged for router %s after bgp clear", router)
1796            break
1797        else:
1798            logger.info(
1799                "BGP is not yet Converged for router %s after" " bgp clear", router
1800            )
1801    else:
1802        errormsg = (
1803            "TIMEOUT!! BGP is not converged in {} seconds for"
1804            " router {}".format(retry * sleeptime, router)
1805        )
1806        return errormsg
1807
1808    # Comparing peerUptimeEstablishedEpoch dictionaries
1809    if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
1810        logger.info("BGP neighborship is reset after clear BGP on router %s", router)
1811    else:
1812        errormsg = (
1813            "BGP neighborship is not reset after clear bgp on router"
1814            " {}".format(router)
1815        )
1816        return errormsg
1817
1818    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1819    return True
1820
1821
1822def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
1823    """
1824    To verify BGP timer config, execute "show ip bgp neighbor json" command
1825    and verify bgp timers with input_dict data.
1826    To veirfy bgp timers functonality, shutting down peer interface
1827    and verify BGP neighborship status.
1828
1829    Parameters
1830    ----------
1831    * `tgen`: topogen object
1832    * `topo`: input json file data
1833    * `addr_type`: ip type, ipv4/ipv6
1834    * `input_dict`: defines for which router, bgp timers needs to be verified
1835
1836    Usage:
1837    # To verify BGP timers for neighbor r2 of router r1
1838    input_dict = {
1839        "r1": {
1840           "bgp": {
1841               "bgp_neighbors":{
1842                  "r2":{
1843                      "keepalivetimer": 5,
1844                      "holddowntimer": 15,
1845                   }}}}}
1846    result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
1847        input_dict)
1848
1849    Returns
1850    -------
1851    errormsg(str) or True
1852    """
1853
1854    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1855    sleep(5)
1856    router_list = tgen.routers()
1857    for router in input_dict.keys():
1858        if router not in router_list:
1859            continue
1860
1861        rnode = router_list[router]
1862
1863        logger.info("Verifying bgp timers functionality, DUT is %s:", router)
1864
1865        show_ip_bgp_neighbor_json = run_frr_cmd(
1866            rnode, "show ip bgp neighbor json", isjson=True
1867        )
1868
1869        bgp_addr_type = input_dict[router]["bgp"]["address_family"]
1870
1871        for addr_type in bgp_addr_type:
1872            if not check_address_types(addr_type):
1873                continue
1874
1875            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1876            for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1877                for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1878                    data = topo["routers"][bgp_neighbor]["links"]
1879
1880                    keepalivetimer = peer_dict["keepalivetimer"]
1881                    holddowntimer = peer_dict["holddowntimer"]
1882
1883                    if dest_link in data:
1884                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
1885                        neighbor_intf = data[dest_link]["interface"]
1886
1887                    # Verify HoldDownTimer for neighbor
1888                    bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
1889                        "bgpTimerHoldTimeMsecs"
1890                    ]
1891                    if bgpHoldTimeMsecs != holddowntimer * 1000:
1892                        errormsg = (
1893                            "Verifying holddowntimer for bgp "
1894                            "neighbor {} under dut {}, found: {} "
1895                            "but expected: {}".format(
1896                                neighbor_ip,
1897                                router,
1898                                bgpHoldTimeMsecs,
1899                                holddowntimer * 1000,
1900                            )
1901                        )
1902                        return errormsg
1903
1904                    # Verify KeepAliveTimer for neighbor
1905                    bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
1906                        "bgpTimerKeepAliveIntervalMsecs"
1907                    ]
1908                    if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
1909                        errormsg = (
1910                            "Verifying keepalivetimer for bgp "
1911                            "neighbor {} under dut {}, found: {} "
1912                            "but expected: {}".format(
1913                                neighbor_ip,
1914                                router,
1915                                bgpKeepAliveTimeMsecs,
1916                                keepalivetimer * 1000,
1917                            )
1918                        )
1919                        return errormsg
1920
1921                    ####################
1922                    # Shutting down peer interface after keepalive time and
1923                    # after some time bringing up peer interface.
1924                    # verifying BGP neighborship in (hold down-keep alive)
1925                    # time, it should not go down
1926                    ####################
1927
1928                    # Wait till keep alive time
1929                    logger.info("=" * 20)
1930                    logger.info("Scenario 1:")
1931                    logger.info(
1932                        "Shutdown and bring up peer interface: %s "
1933                        "in keep alive time : %s sec and verify "
1934                        " BGP neighborship  is intact in %s sec ",
1935                        neighbor_intf,
1936                        keepalivetimer,
1937                        (holddowntimer - keepalivetimer),
1938                    )
1939                    logger.info("=" * 20)
1940                    logger.info("Waiting for %s sec..", keepalivetimer)
1941                    sleep(keepalivetimer)
1942
1943                    # Shutting down peer ineterface
1944                    logger.info(
1945                        "Shutting down interface %s on router %s",
1946                        neighbor_intf,
1947                        bgp_neighbor,
1948                    )
1949                    topotest.interface_set_status(
1950                        router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
1951                    )
1952
1953                    # Bringing up peer interface
1954                    sleep(5)
1955                    logger.info(
1956                        "Bringing up interface %s on router %s..",
1957                        neighbor_intf,
1958                        bgp_neighbor,
1959                    )
1960                    topotest.interface_set_status(
1961                        router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
1962                    )
1963
1964                # Verifying BGP neighborship is intact in
1965                # (holddown - keepalive) time
1966                for timer in range(
1967                    keepalivetimer, holddowntimer, int(holddowntimer / 3)
1968                ):
1969                    logger.info("Waiting for %s sec..", keepalivetimer)
1970                    sleep(keepalivetimer)
1971                    sleep(2)
1972                    show_bgp_json = run_frr_cmd(
1973                        rnode, "show bgp summary json", isjson=True
1974                    )
1975
1976                    if addr_type == "ipv4":
1977                        ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1978                        nh_state = ipv4_data[neighbor_ip]["state"]
1979                    else:
1980                        ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1981                        nh_state = ipv6_data[neighbor_ip]["state"]
1982
1983                    if timer == (holddowntimer - keepalivetimer):
1984                        if nh_state != "Established":
1985                            errormsg = (
1986                                "BGP neighborship has not  gone "
1987                                "down in {} sec for neighbor {}".format(
1988                                    timer, bgp_neighbor
1989                                )
1990                            )
1991                            return errormsg
1992                        else:
1993                            logger.info(
1994                                "BGP neighborship is intact in %s"
1995                                " sec for neighbor %s",
1996                                timer,
1997                                bgp_neighbor,
1998                            )
1999
2000                ####################
2001                # Shutting down peer interface and verifying that BGP
2002                # neighborship is going down in holddown time
2003                ####################
2004                logger.info("=" * 20)
2005                logger.info("Scenario 2:")
2006                logger.info(
2007                    "Shutdown peer interface: %s and verify BGP"
2008                    " neighborship has gone down in hold down "
2009                    "time %s sec",
2010                    neighbor_intf,
2011                    holddowntimer,
2012                )
2013                logger.info("=" * 20)
2014
2015                logger.info(
2016                    "Shutting down interface %s on router %s..",
2017                    neighbor_intf,
2018                    bgp_neighbor,
2019                )
2020                topotest.interface_set_status(
2021                    router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2022                )
2023
2024                # Verifying BGP neighborship is going down in holddown time
2025                for timer in range(
2026                    keepalivetimer,
2027                    (holddowntimer + keepalivetimer),
2028                    int(holddowntimer / 3),
2029                ):
2030                    logger.info("Waiting for %s sec..", keepalivetimer)
2031                    sleep(keepalivetimer)
2032                    sleep(2)
2033                    show_bgp_json = run_frr_cmd(
2034                        rnode, "show bgp summary json", isjson=True
2035                    )
2036
2037                    if addr_type == "ipv4":
2038                        ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2039                        nh_state = ipv4_data[neighbor_ip]["state"]
2040                    else:
2041                        ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2042                        nh_state = ipv6_data[neighbor_ip]["state"]
2043
2044                    if timer == holddowntimer:
2045                        if nh_state == "Established":
2046                            errormsg = (
2047                                "BGP neighborship has not gone "
2048                                "down in {} sec for neighbor {}".format(
2049                                    timer, bgp_neighbor
2050                                )
2051                            )
2052                            return errormsg
2053                        else:
2054                            logger.info(
2055                                "BGP neighborship has gone down in"
2056                                " %s sec for neighbor %s",
2057                                timer,
2058                                bgp_neighbor,
2059                            )
2060
2061    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2062    return True
2063
2064
2065@retry(attempts=3, wait=4, return_is_str=True)
2066def verify_bgp_attributes(
2067    tgen,
2068    addr_type,
2069    dut,
2070    static_routes,
2071    rmap_name=None,
2072    input_dict=None,
2073    seq_id=None,
2074    nexthop=None,
2075):
2076    """
2077    API will verify BGP attributes set by Route-map for given prefix and
2078    DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2079    in DUT to verify BGP attributes set by route-map, Set attributes
2080    values will be read from input_dict and verified with command output.
2081
2082    * `tgen`: topogen object
2083    * `addr_type` : ip type, ipv4/ipv6
2084    * `dut`: Device Under Test
2085    * `static_routes`: Static Routes for which BGP set attributes needs to be
2086                       verified
2087    * `rmap_name`: route map name for which set criteria needs to be verified
2088    * `input_dict`: defines for which router, AS numbers needs
2089    * `seq_id`: sequence number of rmap, default is None
2090
2091    Usage
2092    -----
2093    # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2094    for prefix 10.0.20.1/32 in router r3.
2095    input_dict = {
2096        "r3": {
2097            "route_maps": {
2098                "rmap_match_pf_list1": [
2099                    {
2100                        "action": "PERMIT",
2101                        "match": {"prefix_list": "pf_list_1"},
2102                        "set": {"localpref": 150, "med": 30}
2103                    }
2104                ],
2105            },
2106            "as_path": "500 400"
2107        }
2108    }
2109    static_routes (list) = ["10.0.20.1/32"]
2110
2111
2112
2113    Returns
2114    -------
2115    errormsg(str) or True
2116    """
2117
2118    logger.debug("Entering lib API: verify_bgp_attributes()")
2119    for router, rnode in tgen.routers().iteritems():
2120        if router != dut:
2121            continue
2122
2123        logger.info("Verifying BGP set attributes for dut {}:".format(router))
2124
2125        for static_route in static_routes:
2126            cmd = "show bgp {} {} json".format(addr_type, static_route)
2127            show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2128
2129            dict_to_test = []
2130            tmp_list = []
2131
2132            if "route_maps" in input_dict.values()[0]:
2133                for rmap_router in input_dict.keys():
2134                    for rmap, values in input_dict[rmap_router]["route_maps"].items():
2135                        if rmap == rmap_name:
2136                            dict_to_test = values
2137                            for rmap_dict in values:
2138                                if seq_id is not None:
2139                                    if type(seq_id) is not list:
2140                                        seq_id = [seq_id]
2141
2142                                    if "seq_id" in rmap_dict:
2143                                        rmap_seq_id = rmap_dict["seq_id"]
2144                                        for _seq_id in seq_id:
2145                                            if _seq_id == rmap_seq_id:
2146                                                tmp_list.append(rmap_dict)
2147                            if tmp_list:
2148                                dict_to_test = tmp_list
2149
2150                            value = None
2151                            for rmap_dict in dict_to_test:
2152                                if "set" in rmap_dict:
2153                                    for criteria in rmap_dict["set"].keys():
2154                                        found = False
2155                                        for path in show_bgp_json["paths"]:
2156                                            if criteria not in path:
2157                                                continue
2158
2159                                            if criteria == "aspath":
2160                                                value = path[criteria]["string"]
2161                                            else:
2162                                                value = path[criteria]
2163
2164                                            if rmap_dict["set"][criteria] == value:
2165                                                found = True
2166                                                logger.info(
2167                                                    "Verifying BGP "
2168                                                    "attribute {} for"
2169                                                    " route: {} in "
2170                                                    "router: {}, found"
2171                                                    " expected value:"
2172                                                    " {}".format(
2173                                                        criteria,
2174                                                        static_route,
2175                                                        dut,
2176                                                        value,
2177                                                    )
2178                                                )
2179                                                break
2180
2181                                        if not found:
2182                                            errormsg = (
2183                                                "Failed: Verifying BGP "
2184                                                "attribute {} for route:"
2185                                                " {} in router: {}, "
2186                                                " expected value: {} but"
2187                                                " found: {}".format(
2188                                                    criteria,
2189                                                    static_route,
2190                                                    dut,
2191                                                    rmap_dict["set"][criteria],
2192                                                    value,
2193                                                )
2194                                            )
2195                                            return errormsg
2196
2197    logger.debug("Exiting lib API: verify_bgp_attributes()")
2198    return True
2199
2200
2201@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
2202def verify_best_path_as_per_bgp_attribute(
2203    tgen, addr_type, router, input_dict, attribute
2204):
2205    """
2206    API is to verify best path according to BGP attributes for given routes.
2207    "show bgp ipv4/6 json" command will be run and verify best path according
2208    to shortest as-path, highest local-preference and med, lowest weight and
2209    route origin IGP>EGP>INCOMPLETE.
2210    Parameters
2211    ----------
2212    * `tgen` : topogen object
2213    * `addr_type` : ip type, ipv4/ipv6
2214    * `tgen` : topogen object
2215    * `attribute` : calculate best path using this attribute
2216    * `input_dict`: defines different routes to calculate for which route
2217                    best path is selected
2218    Usage
2219    -----
2220    # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2221    router r7 to router r1(DUT) as per shortest as-path attribute
2222    input_dict = {
2223        "r7": {
2224            "bgp": {
2225                "address_family": {
2226                    "ipv4": {
2227                        "unicast": {
2228                            "advertise_networks": [
2229                                {
2230                                    "network": "200.50.2.0/32"
2231                                },
2232                                {
2233                                    "network": "200.60.2.0/32"
2234                                }
2235                            ]
2236                        }
2237                    }
2238                }
2239            }
2240        }
2241    }
2242    attribute = "locPrf"
2243    result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2244                         input_dict,  attribute)
2245    Returns
2246    -------
2247    errormsg(str) or True
2248    """
2249
2250    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2251
2252    if router not in tgen.routers():
2253        return False
2254
2255    rnode = tgen.routers()[router]
2256
2257    # Verifying show bgp json
2258    command = "show bgp"
2259
2260    sleep(2)
2261    logger.info("Verifying router %s RIB for best path:", router)
2262
2263    static_route = False
2264    advertise_network = False
2265    for route_val in input_dict.values():
2266        if "static_routes" in route_val:
2267            static_route = True
2268            networks = route_val["static_routes"]
2269        else:
2270            advertise_network = True
2271            net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
2272            networks = net_data["advertise_networks"]
2273
2274        for network in networks:
2275            _network = network["network"]
2276            no_of_ip = network.setdefault("no_of_ip", 1)
2277            vrf = network.setdefault("vrf", None)
2278
2279            if vrf:
2280                cmd = "{} vrf {}".format(command, vrf)
2281            else:
2282                cmd = command
2283
2284            cmd = "{} {}".format(cmd, addr_type)
2285            cmd = "{} json".format(cmd)
2286            sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2287
2288            routes = generate_ips(_network, no_of_ip)
2289            for route in routes:
2290                route = str(ipaddress.ip_network(unicode(route)))
2291
2292                if route in sh_ip_bgp_json["routes"]:
2293                    route_attributes = sh_ip_bgp_json["routes"][route]
2294                    _next_hop = None
2295                    compare = None
2296                    attribute_dict = {}
2297                    for route_attribute in route_attributes:
2298                        next_hops = route_attribute["nexthops"]
2299                        for next_hop in next_hops:
2300                            next_hop_ip = next_hop["ip"]
2301                        attribute_dict[next_hop_ip] = route_attribute[attribute]
2302
2303                    # AS_PATH attribute
2304                    if attribute == "path":
2305                        # Find next_hop for the route have minimum as_path
2306                        _next_hop = min(
2307                            attribute_dict, key=lambda x: len(set(attribute_dict[x]))
2308                        )
2309                        compare = "SHORTEST"
2310
2311                    # LOCAL_PREF attribute
2312                    elif attribute == "locPrf":
2313                        # Find next_hop for the route have highest local preference
2314                        _next_hop = max(
2315                            attribute_dict, key=(lambda k: attribute_dict[k])
2316                        )
2317                        compare = "HIGHEST"
2318
2319                    # WEIGHT attribute
2320                    elif attribute == "weight":
2321                        # Find next_hop for the route have highest weight
2322                        _next_hop = max(
2323                            attribute_dict, key=(lambda k: attribute_dict[k])
2324                        )
2325                        compare = "HIGHEST"
2326
2327                    # ORIGIN attribute
2328                    elif attribute == "origin":
2329                        # Find next_hop for the route have IGP as origin, -
2330                        # - rule is IGP>EGP>INCOMPLETE
2331                        _next_hop = [
2332                            key
2333                            for (key, value) in attribute_dict.iteritems()
2334                            if value == "IGP"
2335                        ][0]
2336                        compare = ""
2337
2338                    # MED  attribute
2339                    elif attribute == "metric":
2340                        # Find next_hop for the route have LOWEST MED
2341                        _next_hop = min(
2342                            attribute_dict, key=(lambda k: attribute_dict[k])
2343                        )
2344                        compare = "LOWEST"
2345
2346                    # Show ip route
2347                    if addr_type == "ipv4":
2348                        command_1 = "show ip route"
2349                    else:
2350                        command_1 = "show ipv6 route"
2351
2352                    if vrf:
2353                        cmd = "{} vrf {} json".format(command_1, vrf)
2354                    else:
2355                        cmd = "{} json".format(command_1)
2356
2357                    rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2358
2359                    # Verifying output dictionary rib_routes_json is not empty
2360                    if not bool(rib_routes_json):
2361                        errormsg = "No route found in RIB of router {}..".format(router)
2362                        return errormsg
2363
2364                    st_found = False
2365                    nh_found = False
2366                    # Find best is installed in RIB
2367                    if route in rib_routes_json:
2368                        st_found = True
2369                        # Verify next_hop in rib_routes_json
2370                        if (
2371                            rib_routes_json[route][0]["nexthops"][0]["ip"]
2372                            in attribute_dict
2373                        ):
2374                            nh_found = True
2375                        else:
2376                            errormsg = (
2377                                "Incorrect Nexthop for BGP route {} in "
2378                                "RIB of router {}, Expected: {}, Found:"
2379                                " {}\n".format(
2380                                    route,
2381                                    router,
2382                                    rib_routes_json[route][0]["nexthops"][0]["ip"],
2383                                    _next_hop,
2384                                )
2385                            )
2386                            return errormsg
2387
2388                    if st_found and nh_found:
2389                        logger.info(
2390                            "Best path for prefix: %s with next_hop: %s is "
2391                            "installed according to %s %s: (%s) in RIB of "
2392                            "router %s",
2393                            route,
2394                            _next_hop,
2395                            compare,
2396                            attribute,
2397                            attribute_dict[_next_hop],
2398                            router,
2399                        )
2400
2401    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2402    return True
2403
2404
2405def verify_best_path_as_per_admin_distance(
2406    tgen, addr_type, router, input_dict, attribute
2407):
2408    """
2409    API is to verify best path according to admin distance for given
2410    route. "show ip/ipv6 route json" command will be run and verify
2411    best path accoring to shortest admin distanc.
2412
2413    Parameters
2414    ----------
2415    * `addr_type` : ip type, ipv4/ipv6
2416    * `dut`: Device Under Test
2417    * `tgen` : topogen object
2418    * `attribute` : calculate best path using admin distance
2419    * `input_dict`: defines different routes with different admin distance
2420                    to calculate for which route best path is selected
2421    Usage
2422    -----
2423    # To verify best path for route 200.50.2.0/32 from  router r2 to
2424    router r1(DUT) as per shortest admin distance which is 60.
2425    input_dict = {
2426        "r2": {
2427            "static_routes": [{"network": "200.50.2.0/32", \
2428                 "admin_distance": 80, "next_hop": "10.0.0.14"},
2429                              {"network": "200.50.2.0/32", \
2430                 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2431        }}
2432    attribute = "locPrf"
2433    result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2434                        input_dict, attribute):
2435    Returns
2436    -------
2437    errormsg(str) or True
2438    """
2439
2440    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2441    router_list = tgen.routers()
2442    if router not in router_list:
2443        return False
2444
2445    rnode = tgen.routers()[router]
2446
2447    sleep(5)
2448    logger.info("Verifying router %s RIB for best path:", router)
2449
2450    # Show ip route cmd
2451    if addr_type == "ipv4":
2452        command = "show ip route json"
2453    else:
2454        command = "show ipv6 route json"
2455
2456    for routes_from_router in input_dict.keys():
2457        sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
2458            command, isjson=True
2459        )
2460        networks = input_dict[routes_from_router]["static_routes"]
2461        for network in networks:
2462            route = network["network"]
2463
2464            route_attributes = sh_ip_route_json[route]
2465            _next_hop = None
2466            compare = None
2467            attribute_dict = {}
2468            for route_attribute in route_attributes:
2469                next_hops = route_attribute["nexthops"]
2470                for next_hop in next_hops:
2471                    next_hop_ip = next_hop["ip"]
2472                attribute_dict[next_hop_ip] = route_attribute["distance"]
2473
2474            # Find next_hop for the route have LOWEST Admin Distance
2475            _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
2476            compare = "LOWEST"
2477
2478        # Show ip route
2479        rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
2480
2481        # Verifying output dictionary rib_routes_json is not empty
2482        if not bool(rib_routes_json):
2483            errormsg = "No route found in RIB of router {}..".format(router)
2484            return errormsg
2485
2486        st_found = False
2487        nh_found = False
2488        # Find best is installed in RIB
2489        if route in rib_routes_json:
2490            st_found = True
2491            # Verify next_hop in rib_routes_json
2492            if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop:
2493                nh_found = True
2494            else:
2495                errormsg = (
2496                    "Nexthop {} is Missing for BGP route {}"
2497                    " in RIB of router {}\n".format(_next_hop, route, router)
2498                )
2499                return errormsg
2500
2501        if st_found and nh_found:
2502            logger.info(
2503                "Best path for prefix: %s is installed according"
2504                " to %s %s: (%s) in RIB of router %s",
2505                route,
2506                compare,
2507                attribute,
2508                attribute_dict[_next_hop],
2509                router,
2510            )
2511
2512    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2513    return True
2514
2515
2516@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
2517def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
2518    """
2519    This API is to verify whether bgp rib has any
2520    matching route for a nexthop.
2521
2522    Parameters
2523    ----------
2524    * `tgen`: topogen object
2525    * `dut`: input dut router name
2526    * `addr_type` : ip type ipv4/ipv6
2527    * `input_dict` : input dict, has details of static routes
2528    * `next_hop`[optional]: next_hop which needs to be verified,
2529       default = static
2530    * 'aspath'[optional]: aspath which needs to be verified
2531
2532    Usage
2533    -----
2534    dut = 'r1'
2535    next_hop = "192.168.1.10"
2536    input_dict = topo['routers']
2537    aspath = "100 200 300"
2538    result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2539                            next_hop, aspath)
2540
2541    Returns
2542    -------
2543    errormsg(str) or True
2544    """
2545
2546    logger.debug("Entering lib API: verify_bgp_rib()")
2547
2548    router_list = tgen.routers()
2549    additional_nexthops_in_required_nhs = []
2550    list1 = []
2551    list2 = []
2552    for routerInput in input_dict.keys():
2553        for router, rnode in router_list.iteritems():
2554            if router != dut:
2555                continue
2556
2557            # Verifying RIB routes
2558            command = "show bgp"
2559
2560            # Static routes
2561            sleep(2)
2562            logger.info("Checking router {} BGP RIB:".format(dut))
2563
2564            if "static_routes" in input_dict[routerInput]:
2565                static_routes = input_dict[routerInput]["static_routes"]
2566
2567                for static_route in static_routes:
2568                    found_routes = []
2569                    missing_routes = []
2570                    st_found = False
2571                    nh_found = False
2572
2573                    vrf = static_route.setdefault("vrf", None)
2574                    community = static_route.setdefault("community", None)
2575                    largeCommunity = static_route.setdefault("largeCommunity", None)
2576
2577                    if vrf:
2578                        cmd = "{} vrf {} {}".format(command, vrf, addr_type)
2579
2580                        if community:
2581                            cmd = "{} community {}".format(cmd, community)
2582
2583                        if largeCommunity:
2584                            cmd = "{} large-community {}".format(cmd, largeCommunity)
2585                    else:
2586                        cmd = "{} {}".format(command, addr_type)
2587
2588                    cmd = "{} json".format(cmd)
2589
2590                    rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2591
2592                    # Verifying output dictionary rib_routes_json is not empty
2593                    if bool(rib_routes_json) == False:
2594                        errormsg = "No route found in rib of router {}..".format(router)
2595                        return errormsg
2596
2597                    network = static_route["network"]
2598
2599                    if "no_of_ip" in static_route:
2600                        no_of_ip = static_route["no_of_ip"]
2601                    else:
2602                        no_of_ip = 1
2603
2604                    # Generating IPs for verification
2605                    ip_list = generate_ips(network, no_of_ip)
2606
2607                    for st_rt in ip_list:
2608                        st_rt = str(ipaddress.ip_network(unicode(st_rt)))
2609
2610                        _addr_type = validate_ip_address(st_rt)
2611                        if _addr_type != addr_type:
2612                            continue
2613
2614                        if st_rt in rib_routes_json["routes"]:
2615                            st_found = True
2616                            found_routes.append(st_rt)
2617
2618                            if next_hop:
2619                                if not isinstance(next_hop, list):
2620                                    next_hop = [next_hop]
2621                                    list1 = next_hop
2622
2623                                found_hops = [
2624                                    rib_r["ip"]
2625                                    for rib_r in rib_routes_json["routes"][st_rt][0][
2626                                        "nexthops"
2627                                    ]
2628                                ]
2629                                list2 = found_hops
2630
2631                                missing_list_of_nexthops = set(list2).difference(list1)
2632                                additional_nexthops_in_required_nhs = set(
2633                                    list1
2634                                ).difference(list2)
2635
2636                                if list2:
2637                                    if additional_nexthops_in_required_nhs:
2638                                        logger.info(
2639                                            "Missing nexthop %s for route"
2640                                            " %s in RIB of router %s\n",
2641                                            additional_nexthops_in_required_nhs,
2642                                            st_rt,
2643                                            dut,
2644                                        )
2645                                        errormsg = (
2646                                            "Nexthop {} is Missing for "
2647                                            "route {} in RIB of router {}\n".format(
2648                                                additional_nexthops_in_required_nhs,
2649                                                st_rt,
2650                                                dut,
2651                                            )
2652                                        )
2653                                        return errormsg
2654                                    else:
2655                                        nh_found = True
2656                            if aspath:
2657                                found_paths = rib_routes_json["routes"][st_rt][0][
2658                                    "path"
2659                                ]
2660                                if aspath == found_paths:
2661                                    aspath_found = True
2662                                    logger.info(
2663                                        "Found AS path {} for route"
2664                                        " {} in RIB of router "
2665                                        "{}\n".format(aspath, st_rt, dut)
2666                                    )
2667                                else:
2668                                    errormsg = (
2669                                        "AS Path {} is missing for route"
2670                                        "for route {} in RIB of router {}\n".format(
2671                                            aspath, st_rt, dut
2672                                        )
2673                                    )
2674                                    return errormsg
2675
2676                        else:
2677                            missing_routes.append(st_rt)
2678
2679                    if nh_found:
2680                        logger.info(
2681                            "Found next_hop {} for all bgp"
2682                            " routes in RIB of"
2683                            " router {}\n".format(next_hop, router)
2684                        )
2685
2686                    if len(missing_routes) > 0:
2687                        errormsg = (
2688                            "Missing route in RIB of router {}, "
2689                            "routes: {}\n".format(dut, missing_routes)
2690                        )
2691                        return errormsg
2692
2693                    if found_routes:
2694                        logger.info(
2695                            "Verified routes in router {} BGP RIB, "
2696                            "found routes are: {} \n".format(dut, found_routes)
2697                        )
2698                continue
2699
2700            if "bgp" not in input_dict[routerInput]:
2701                continue
2702
2703            # Advertise networks
2704            bgp_data_list = input_dict[routerInput]["bgp"]
2705
2706            if type(bgp_data_list) is not list:
2707                bgp_data_list = [bgp_data_list]
2708
2709            for bgp_data in bgp_data_list:
2710                vrf_id = bgp_data.setdefault("vrf", None)
2711                if vrf_id:
2712                    cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
2713                else:
2714                    cmd = "{} {}".format(command, addr_type)
2715
2716                cmd = "{} json".format(cmd)
2717
2718                rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2719
2720                # Verifying output dictionary rib_routes_json is not empty
2721                if bool(rib_routes_json) == False:
2722                    errormsg = "No route found in rib of router {}..".format(router)
2723                    return errormsg
2724
2725                bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
2726                advertise_network = bgp_net_advertise.setdefault(
2727                    "advertise_networks", []
2728                )
2729
2730                for advertise_network_dict in advertise_network:
2731                    found_routes = []
2732                    missing_routes = []
2733                    found = False
2734
2735                    network = advertise_network_dict["network"]
2736
2737                    if "no_of_network" in advertise_network_dict:
2738                        no_of_network = advertise_network_dict["no_of_network"]
2739                    else:
2740                        no_of_network = 1
2741
2742                    # Generating IPs for verification
2743                    ip_list = generate_ips(network, no_of_network)
2744
2745                    for st_rt in ip_list:
2746                        st_rt = str(ipaddress.ip_network(unicode(st_rt)))
2747
2748                        _addr_type = validate_ip_address(st_rt)
2749                        if _addr_type != addr_type:
2750                            continue
2751
2752                        if st_rt in rib_routes_json["routes"]:
2753                            found = True
2754                            found_routes.append(st_rt)
2755                        else:
2756                            found = False
2757                            missing_routes.append(st_rt)
2758
2759                    if len(missing_routes) > 0:
2760                        errormsg = (
2761                            "Missing route in BGP RIB of router {},"
2762                            " are: {}\n".format(dut, missing_routes)
2763                        )
2764                        return errormsg
2765
2766                    if found_routes:
2767                        logger.info(
2768                            "Verified routes in router {} BGP RIB, found "
2769                            "routes are: {}\n".format(dut, found_routes)
2770                        )
2771
2772    logger.debug("Exiting lib API: verify_bgp_rib()")
2773    return True
2774
2775
2776@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
2777def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer):
2778    """
2779    This API is to verify verify_graceful_restart configuration of DUT and
2780    cross verify the same from the peer bgp routerrouter.
2781
2782    Parameters
2783    ----------
2784    * `tgen`: topogen object
2785    * `topo`: input json file data
2786    * `addr_type` : ip type ipv4/ipv6
2787    * `input_dict`: input dictionary, have details of Device Under Test, for
2788                    which user wants to test the data
2789    * `dut`: input dut router name
2790    * `peer`: input peer router name
2791
2792    Usage
2793    -----
2794        "r1": {
2795            "bgp": {
2796                "address_family": {
2797                    "ipv4": {
2798                        "unicast": {
2799                            "neighbor": {
2800                                "r3": {
2801                                    "dest_link":{
2802                                        "r1": {
2803                                            "graceful-restart": True
2804                                        }
2805                                    }
2806                                }
2807                            }
2808                        }
2809                    },
2810                    "ipv6": {
2811                        "unicast": {
2812                            "neighbor": {
2813                                "r3": {
2814                                    "dest_link":{
2815                                        "r1": {
2816                                            "graceful-restart": True
2817                                        }
2818                                    }
2819                                }
2820                            }
2821                        }
2822                    }
2823                }
2824            }
2825        }
2826    }
2827
2828    result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
2829                                     dut = "r1", peer = 'r2')
2830    Returns
2831    -------
2832    errormsg(str) or True
2833    """
2834
2835    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2836
2837    for router, rnode in tgen.routers().iteritems():
2838        if router != dut:
2839            continue
2840
2841        bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
2842
2843        if addr_type in bgp_addr_type:
2844            if not check_address_types(addr_type):
2845                continue
2846
2847            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2848
2849            for bgp_neighbor, peer_data in bgp_neighbors.items():
2850                if bgp_neighbor != peer:
2851                    continue
2852
2853                for dest_link, peer_dict in peer_data["dest_link"].items():
2854                    data = topo["routers"][bgp_neighbor]["links"]
2855
2856                    if dest_link in data:
2857                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
2858
2859            logger.info(
2860                "[DUT: {}]: Checking bgp graceful-restart show"
2861                " o/p {}".format(dut, neighbor_ip)
2862            )
2863
2864            show_bgp_graceful_json = None
2865
2866            show_bgp_graceful_json = run_frr_cmd(
2867                rnode,
2868                "show bgp {} neighbor {} graceful-restart json".format(
2869                    addr_type, neighbor_ip
2870                ),
2871                isjson=True,
2872            )
2873
2874            show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
2875
2876            if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
2877                logger.info(
2878                    "[DUT: {}]: Neighbor ip matched  {}".format(dut, neighbor_ip)
2879                )
2880            else:
2881                errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
2882                    dut, neighbor_ip
2883                )
2884                return errormsg
2885
2886            lmode = None
2887            rmode = None
2888            # Local GR mode
2889            if "address_family" in input_dict[dut]["bgp"]:
2890                bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][
2891                    "unicast"
2892                ]["neighbor"][peer]["dest_link"]
2893
2894                for dest_link, data in bgp_neighbors.items():
2895                    if (
2896                        "graceful-restart-helper" in data
2897                        and data["graceful-restart-helper"]
2898                    ):
2899                        lmode = "Helper"
2900                    elif "graceful-restart" in data and data["graceful-restart"]:
2901                        lmode = "Restart"
2902                    elif (
2903                        "graceful-restart-disable" in data
2904                        and data["graceful-restart-disable"]
2905                    ):
2906                        lmode = "Disable"
2907                    else:
2908                        lmode = None
2909
2910            if lmode is None:
2911                if "graceful-restart" in input_dict[dut]["bgp"]:
2912
2913                    if (
2914                        "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
2915                        and input_dict[dut]["bgp"]["graceful-restart"][
2916                            "graceful-restart"
2917                        ]
2918                    ):
2919                        lmode = "Restart*"
2920                    elif (
2921                        "graceful-restart-disable"
2922                        in input_dict[dut]["bgp"]["graceful-restart"]
2923                        and input_dict[dut]["bgp"]["graceful-restart"][
2924                            "graceful-restart-disable"
2925                        ]
2926                    ):
2927                        lmode = "Disable*"
2928                    else:
2929                        lmode = "Helper*"
2930                else:
2931                    lmode = "Helper*"
2932
2933            if lmode == "Disable" or lmode == "Disable*":
2934                return True
2935
2936            # Remote GR mode
2937            if "address_family" in input_dict[peer]["bgp"]:
2938                bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][
2939                    "unicast"
2940                ]["neighbor"][dut]["dest_link"]
2941
2942                for dest_link, data in bgp_neighbors.items():
2943                    if (
2944                        "graceful-restart-helper" in data
2945                        and data["graceful-restart-helper"]
2946                    ):
2947                        rmode = "Helper"
2948                    elif "graceful-restart" in data and data["graceful-restart"]:
2949                        rmode = "Restart"
2950                    elif (
2951                        "graceful-restart-disable" in data
2952                        and data["graceful-restart-disable"]
2953                    ):
2954                        rmode = "Disable"
2955                    else:
2956                        rmode = None
2957
2958            if rmode is None:
2959                if "graceful-restart" in input_dict[peer]["bgp"]:
2960
2961                    if (
2962                        "graceful-restart"
2963                        in input_dict[peer]["bgp"]["graceful-restart"]
2964                        and input_dict[peer]["bgp"]["graceful-restart"][
2965                            "graceful-restart"
2966                        ]
2967                    ):
2968                        rmode = "Restart"
2969                    elif (
2970                        "graceful-restart-disable"
2971                        in input_dict[peer]["bgp"]["graceful-restart"]
2972                        and input_dict[peer]["bgp"]["graceful-restart"][
2973                            "graceful-restart-disable"
2974                        ]
2975                    ):
2976                        rmode = "Disable"
2977                    else:
2978                        rmode = "Helper"
2979                else:
2980                    rmode = "Helper"
2981
2982            if show_bgp_graceful_json_out["localGrMode"] == lmode:
2983                logger.info(
2984                    "[DUT: {}]: localGrMode : {} ".format(
2985                        dut, show_bgp_graceful_json_out["localGrMode"]
2986                    )
2987                )
2988            else:
2989                errormsg = (
2990                    "[DUT: {}]: localGrMode is not correct"
2991                    " Expected: {}, Found: {}".format(
2992                        dut, lmode, show_bgp_graceful_json_out["localGrMode"]
2993                    )
2994                )
2995                return errormsg
2996
2997            if show_bgp_graceful_json_out["remoteGrMode"] == rmode:
2998                logger.info(
2999                    "[DUT: {}]: remoteGrMode : {} ".format(
3000                        dut, show_bgp_graceful_json_out["remoteGrMode"]
3001                    )
3002                )
3003            elif (
3004                show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable"
3005                and rmode == "Disable"
3006            ):
3007                logger.info(
3008                    "[DUT: {}]: remoteGrMode : {} ".format(
3009                        dut, show_bgp_graceful_json_out["remoteGrMode"]
3010                    )
3011                )
3012            else:
3013                errormsg = (
3014                    "[DUT: {}]: remoteGrMode is not correct"
3015                    " Expected: {}, Found: {}".format(
3016                        dut, rmode, show_bgp_graceful_json_out["remoteGrMode"]
3017                    )
3018                )
3019                return errormsg
3020
3021    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3022    return True
3023
3024
3025@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3026def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer):
3027    """
3028    This API is to verify r_bit in the BGP gr capability advertised
3029    by the neighbor router
3030
3031    Parameters
3032    ----------
3033    * `tgen`: topogen object
3034    * `topo`: input json file data
3035    * `addr_type` : ip type ipv4/ipv6
3036    * `input_dict`: input dictionary, have details of Device Under Test, for
3037                    which user wants to test the data
3038    * `dut`: input dut router name
3039    * `peer`: peer name
3040    Usage
3041    -----
3042    input_dict = {
3043        "r1": {
3044            "bgp": {
3045                "address_family": {
3046                    "ipv4": {
3047                        "unicast": {
3048                            "neighbor": {
3049                                "r3": {
3050                                    "dest_link":{
3051                                        "r1": {
3052                                            "graceful-restart": True
3053                                        }
3054                                    }
3055                                }
3056                            }
3057                        }
3058                    },
3059                    "ipv6": {
3060                        "unicast": {
3061                            "neighbor": {
3062                                "r3": {
3063                                    "dest_link":{
3064                                        "r1": {
3065                                            "graceful-restart": True
3066                                        }
3067                                    }
3068                                }
3069                            }
3070                        }
3071                    }
3072                }
3073            }
3074        }
3075    }
3076    result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3077
3078    Returns
3079    -------
3080    errormsg(str) or True
3081    """
3082
3083    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3084
3085    for router, rnode in tgen.routers().iteritems():
3086        if router != dut:
3087            continue
3088
3089        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3090
3091        if addr_type in bgp_addr_type:
3092            if not check_address_types(addr_type):
3093                continue
3094
3095            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3096
3097            for bgp_neighbor, peer_data in bgp_neighbors.items():
3098                if bgp_neighbor != peer:
3099                    continue
3100
3101                for dest_link, peer_dict in peer_data["dest_link"].items():
3102                    data = topo["routers"][bgp_neighbor]["links"]
3103
3104                    if dest_link in data:
3105                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
3106
3107            logger.info(
3108                "[DUT: {}]: Checking bgp graceful-restart show"
3109                " o/p  {}".format(dut, neighbor_ip)
3110            )
3111
3112            show_bgp_graceful_json = run_frr_cmd(
3113                rnode,
3114                "show bgp {} neighbor {} graceful-restart json".format(
3115                    addr_type, neighbor_ip
3116                ),
3117                isjson=True,
3118            )
3119
3120            show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3121
3122            if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3123                logger.info(
3124                    "[DUT: {}]: Neighbor ip matched  {}".format(dut, neighbor_ip)
3125                )
3126            else:
3127                errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3128                    dut, neighbor_ip
3129                )
3130                return errormsg
3131
3132            if "rBit" in show_bgp_graceful_json_out:
3133                if show_bgp_graceful_json_out["rBit"]:
3134                    logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip))
3135                else:
3136                    errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
3137                    return errormsg
3138
3139    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3140    return True
3141
3142
3143@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3144def verify_eor(tgen, topo, addr_type, input_dict, dut, peer):
3145    """
3146    This API is to verify EOR
3147
3148    Parameters
3149    ----------
3150    * `tgen`: topogen object
3151    * `topo`: input json file data
3152    * `addr_type` : ip type ipv4/ipv6
3153    * `input_dict`: input dictionary, have details of DUT, for
3154                    which user wants to test the data
3155    * `dut`: input dut router name
3156    * `peer`: peer name
3157    Usage
3158    -----
3159    input_dict = {
3160    input_dict = {
3161        "r1": {
3162            "bgp": {
3163                "address_family": {
3164                    "ipv4": {
3165                        "unicast": {
3166                            "neighbor": {
3167                                "r3": {
3168                                    "dest_link":{
3169                                        "r1": {
3170                                            "graceful-restart": True
3171                                        }
3172                                    }
3173                                }
3174                            }
3175                        }
3176                    },
3177                    "ipv6": {
3178                        "unicast": {
3179                            "neighbor": {
3180                                "r3": {
3181                                    "dest_link":{
3182                                        "r1": {
3183                                            "graceful-restart": True
3184                                        }
3185                                    }
3186                                }
3187                            }
3188                        }
3189                    }
3190                }
3191            }
3192        }
3193    }
3194
3195    result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3196
3197    Returns
3198    -------
3199    errormsg(str) or True
3200    """
3201    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3202
3203    for router, rnode in tgen.routers().iteritems():
3204        if router != dut:
3205            continue
3206
3207        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3208
3209        if addr_type in bgp_addr_type:
3210            if not check_address_types(addr_type):
3211                continue
3212
3213            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3214
3215            for bgp_neighbor, peer_data in bgp_neighbors.items():
3216                if bgp_neighbor != peer:
3217                    continue
3218
3219                for dest_link, peer_dict in peer_data["dest_link"].items():
3220                    data = topo["routers"][bgp_neighbor]["links"]
3221
3222                    if dest_link in data:
3223                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
3224
3225            logger.info(
3226                "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3227                dut,
3228                neighbor_ip,
3229            )
3230
3231            show_bgp_graceful_json = run_frr_cmd(
3232                rnode,
3233                "show bgp {} neighbor {}  graceful-restart json".format(
3234                    addr_type, neighbor_ip
3235                ),
3236                isjson=True,
3237            )
3238
3239            show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3240
3241            if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3242                logger.info("[DUT: %s]: Neighbor ip matched  %s", dut, neighbor_ip)
3243            else:
3244                errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3245                    dut,
3246                    neighbor_ip,
3247                )
3248                return errormsg
3249
3250            if addr_type == "ipv4":
3251                afi = "ipv4Unicast"
3252            elif addr_type == "ipv6":
3253                afi = "ipv6Unicast"
3254            else:
3255                errormsg = "Address type %s is not supported" % (addr_type)
3256                return errormsg
3257
3258            eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
3259            if "endOfRibSend" in eor_json:
3260
3261                if eor_json["endOfRibSend"]:
3262                    logger.info(
3263                        "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
3264                    )
3265                else:
3266                    errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
3267                        dut,
3268                        neighbor_ip,
3269                        afi,
3270                    )
3271                    return errormsg
3272
3273            if "endOfRibRecv" in eor_json:
3274                if eor_json["endOfRibRecv"]:
3275                    logger.info(
3276                        "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
3277                    )
3278                else:
3279                    errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
3280                        dut,
3281                        neighbor_ip,
3282                        afi,
3283                    )
3284                    return errormsg
3285
3286            if "endOfRibSentAfterUpdate" in eor_json:
3287                if eor_json["endOfRibSentAfterUpdate"]:
3288                    logger.info(
3289                        "[DUT: %s]: EOR SendTime true for %s" " %s",
3290                        dut,
3291                        neighbor_ip,
3292                        afi,
3293                    )
3294                else:
3295                    errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3296                        dut,
3297                        neighbor_ip,
3298                        afi,
3299                    )
3300                    return errormsg
3301
3302    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3303    return True
3304
3305
3306@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3307def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer):
3308    """
3309    This API is to verify f_bit in the BGP gr capability advertised
3310    by the neighbor router
3311
3312    Parameters
3313    ----------
3314    * `tgen`: topogen object
3315    * `topo`: input json file data
3316    * `addr_type` : ip type ipv4/ipv6
3317    * `input_dict`: input dictionary, have details of Device Under Test, for
3318                    which user wants to test the data
3319    * `dut`: input dut router name
3320    * `peer`: peer name
3321
3322    Usage
3323    -----
3324    input_dict = {
3325        "r1": {
3326            "bgp": {
3327                "address_family": {
3328                    "ipv4": {
3329                        "unicast": {
3330                            "neighbor": {
3331                                "r3": {
3332                                    "dest_link":{
3333                                        "r1": {
3334                                            "graceful-restart": True
3335                                        }
3336                                    }
3337                                }
3338                            }
3339                        }
3340                    },
3341                    "ipv6": {
3342                        "unicast": {
3343                            "neighbor": {
3344                                "r3": {
3345                                    "dest_link":{
3346                                        "r1": {
3347                                            "graceful-restart": True
3348                                        }
3349                                    }
3350                                }
3351                            }
3352                        }
3353                    }
3354                }
3355            }
3356        }
3357    }
3358
3359    result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3360
3361    Returns
3362    -------
3363    errormsg(str) or True
3364    """
3365
3366    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3367
3368    for router, rnode in tgen.routers().iteritems():
3369        if router != dut:
3370            continue
3371
3372        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3373
3374        if addr_type in bgp_addr_type:
3375            if not check_address_types(addr_type):
3376                continue
3377
3378            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3379
3380            for bgp_neighbor, peer_data in bgp_neighbors.items():
3381                if bgp_neighbor != peer:
3382                    continue
3383
3384                for dest_link, peer_dict in peer_data["dest_link"].items():
3385                    data = topo["routers"][bgp_neighbor]["links"]
3386
3387                    if dest_link in data:
3388                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
3389
3390            logger.info(
3391                "[DUT: {}]: Checking bgp graceful-restart show"
3392                " o/p  {}".format(dut, neighbor_ip)
3393            )
3394
3395            show_bgp_graceful_json = run_frr_cmd(
3396                rnode,
3397                "show bgp {} neighbor {} graceful-restart json".format(
3398                    addr_type, neighbor_ip
3399                ),
3400                isjson=True,
3401            )
3402
3403            show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3404
3405            if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3406                logger.info(
3407                    "[DUT: {}]: Neighbor ip matched  {}".format(dut, neighbor_ip)
3408                )
3409            else:
3410                errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format(
3411                    dut, neighbor_ip
3412                )
3413                return errormsg
3414
3415            if "ipv4Unicast" in show_bgp_graceful_json_out:
3416                if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]:
3417                    logger.info(
3418                        "[DUT: {}]: Fbit True for {} IPv4"
3419                        " Unicast".format(dut, neighbor_ip)
3420                    )
3421                else:
3422                    errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3423                        dut, neighbor_ip
3424                    )
3425                    return errormsg
3426
3427            elif "ipv6Unicast" in show_bgp_graceful_json_out:
3428                if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]:
3429                    logger.info(
3430                        "[DUT: {}]: Fbit True for {} IPv6"
3431                        " Unicast".format(dut, neighbor_ip)
3432                    )
3433                else:
3434                    errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3435                        dut, neighbor_ip
3436                    )
3437                    return errormsg
3438            else:
3439                show_bgp_graceful_json_out["ipv4Unicast"]
3440                show_bgp_graceful_json_out["ipv6Unicast"]
3441
3442    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3443    return True
3444
3445
3446@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3447def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer):
3448    """
3449    This API is to verify graceful restart timers, configured and recieved
3450
3451    Parameters
3452    ----------
3453    * `tgen`: topogen object
3454    * `topo`: input json file data
3455    * `addr_type` : ip type ipv4/ipv6
3456    * `input_dict`: input dictionary, have details of Device Under Test,
3457                    for which user wants to test the data
3458    * `dut`: input dut router name
3459    * `peer`: peer name
3460    Usage
3461    -----
3462    # Configure graceful-restart
3463    input_dict_1 = {
3464        "r1": {
3465            "bgp": {
3466                "bgp_neighbors": {
3467                    "r3": {
3468                        "graceful-restart": "graceful-restart-helper"
3469                    }
3470                },
3471                "gracefulrestart": ["restart-time 150"]
3472            }
3473        },
3474        "r3": {
3475            "bgp": {
3476                "bgp_neighbors": {
3477                    "r1": {
3478                        "graceful-restart": "graceful-restart"
3479                    }
3480                }
3481            }
3482        }
3483    }
3484
3485    result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3486
3487    Returns
3488    -------
3489    errormsg(str) or True
3490    """
3491
3492    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3493
3494    for router, rnode in tgen.routers().iteritems():
3495        if router != dut:
3496            continue
3497
3498        bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3499
3500        if addr_type in bgp_addr_type:
3501            if not check_address_types(addr_type):
3502                continue
3503
3504            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3505
3506            for bgp_neighbor, peer_data in bgp_neighbors.items():
3507                if bgp_neighbor != peer:
3508                    continue
3509
3510                for dest_link, peer_dict in peer_data["dest_link"].items():
3511                    data = topo["routers"][bgp_neighbor]["links"]
3512
3513                    if dest_link in data:
3514                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
3515
3516            logger.info(
3517                "[DUT: {}]: Checking bgp graceful-restart show"
3518                " o/p {}".format(dut, neighbor_ip)
3519            )
3520
3521            show_bgp_graceful_json = run_frr_cmd(
3522                rnode,
3523                "show bgp {} neighbor {} graceful-restart json".format(
3524                    addr_type, neighbor_ip
3525                ),
3526                isjson=True,
3527            )
3528
3529            show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3530            if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3531                logger.info(
3532                    "[DUT: {}]: Neighbor ip matched  {}".format(dut, neighbor_ip)
3533                )
3534            else:
3535                errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3536                    dut, neighbor_ip
3537                )
3538                return errormsg
3539
3540            # Graceful-restart timer
3541            if "graceful-restart" in input_dict[peer]["bgp"]:
3542                if "timer" in input_dict[peer]["bgp"]["graceful-restart"]:
3543                    for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][
3544                        "timer"
3545                    ].items():
3546                        if rs_timer == "restart-time":
3547
3548                            receivedTimer = value
3549                            if (
3550                                show_bgp_graceful_json_out["timers"][
3551                                    "receivedRestartTimer"
3552                                ]
3553                                == receivedTimer
3554                            ):
3555                                logger.info(
3556                                    "receivedRestartTimer is {}"
3557                                    " on {} from peer {}".format(
3558                                        receivedTimer, router, peer
3559                                    )
3560                                )
3561                            else:
3562                                errormsg = (
3563                                    "receivedRestartTimer is not"
3564                                    " as expected {}".format(receivedTimer)
3565                                )
3566                                return errormsg
3567
3568    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3569    return True
3570
3571
3572@retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3573def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut):
3574    """
3575    This API is to verify gr_address_family in the BGP gr capability advertised
3576    by the neighbor router
3577
3578    Parameters
3579    ----------
3580    * `tgen`: topogen object
3581    * `topo`: input json file data
3582    * `addr_type` : ip type ipv4/ipv6
3583    * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3584    * `dut`: input dut router name
3585
3586    Usage
3587    -----
3588
3589    result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1")
3590
3591    Returns
3592    -------
3593    errormsg(str) or True
3594    """
3595
3596    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3597
3598    for router, rnode in tgen.routers().iteritems():
3599        if router != dut:
3600            continue
3601
3602        bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3603
3604        if addr_type in bgp_addr_type:
3605            if not check_address_types(addr_type):
3606                continue
3607
3608            bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3609
3610            for bgp_neighbor, peer_data in bgp_neighbors.items():
3611                for dest_link, peer_dict in peer_data["dest_link"].items():
3612                    data = topo["routers"][bgp_neighbor]["links"]
3613
3614                    if dest_link in data:
3615                        neighbor_ip = data[dest_link][addr_type].split("/")[0]
3616
3617            logger.info(
3618                "[DUT: {}]: Checking bgp graceful-restart"
3619                " show o/p  {}".format(dut, neighbor_ip)
3620            )
3621
3622            show_bgp_graceful_json = run_frr_cmd(
3623                rnode,
3624                "show bgp {} neighbor {} graceful-restart json".format(
3625                    addr_type, neighbor_ip
3626                ),
3627                isjson=True,
3628            )
3629
3630            show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3631
3632            if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3633                logger.info("Neighbor ip matched  {}".format(neighbor_ip))
3634            else:
3635                errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
3636                return errormsg
3637
3638            if addr_family == "ipv4Unicast":
3639                if "ipv4Unicast" in show_bgp_graceful_json_out:
3640                    logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
3641                    return True
3642                else:
3643                    errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
3644                    return errormsg
3645
3646            elif addr_family == "ipv6Unicast":
3647                if "ipv6Unicast" in show_bgp_graceful_json_out:
3648                    logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
3649                    return True
3650                else:
3651                    errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
3652                    return errormsg
3653            else:
3654                errormsg = "Aaddress family: {} present for {} ".format(
3655                    addr_family, neighbor_ip
3656                )
3657                return errormsg
3658
3659    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3660
3661
3662@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
3663def verify_attributes_for_evpn_routes(
3664    tgen,
3665    topo,
3666    dut,
3667    input_dict,
3668    rd=None,
3669    rt=None,
3670    ethTag=None,
3671    ipLen=None,
3672    rd_peer=None,
3673    rt_peer=None,
3674):
3675    """
3676    API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
3677    command.
3678
3679    Parameters
3680    ----------
3681    * `tgen`: topogen object
3682    * `topo` : json file data
3683    * `dut` : device under test
3684    * `input_dict`: having details like - for which route, rd value
3685                    needs to be verified
3686    * `rd` : route distinguisher
3687    * `rt` : route target
3688    * `ethTag` : Ethernet Tag
3689    * `ipLen` : IP prefix length
3690    * `rd_peer` : Peer name from which RD will be auto-generated
3691    * `rt_peer` : Peer name from which RT will be auto-generated
3692
3693    Usage
3694    -----
3695        input_dict_1 = {
3696            "r1": {
3697                "static_routes": [{
3698                    "network": [NETWORK1_1[addr_type]],
3699                    "next_hop": NEXT_HOP_IP[addr_type],
3700                    "vrf": "RED"
3701                }]
3702            }
3703        }
3704
3705        result = verify_attributes_for_evpn_routes(tgen, topo,
3706                    input_dict, rd = "10.0.0.33:1")
3707
3708    Returns
3709    -------
3710    errormsg(str) or True
3711    """
3712
3713    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3714    for router in input_dict.keys():
3715        rnode = tgen.routers()[dut]
3716
3717        if "static_routes" in input_dict[router]:
3718            for static_route in input_dict[router]["static_routes"]:
3719                network = static_route["network"]
3720
3721                if "vrf" in static_route:
3722                    vrf = static_route["vrf"]
3723
3724                if type(network) is not list:
3725                    network = [network]
3726
3727                for route in network:
3728                    route = route.split("/")[0]
3729                    _addr_type = validate_ip_address(route)
3730                    if "v4" in _addr_type:
3731                        input_afi = "v4"
3732                    elif "v6" in _addr_type:
3733                        input_afi = "v6"
3734
3735                    cmd = "show bgp l2vpn evpn {} json".format(route)
3736                    evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True)
3737                    if not bool(evpn_rd_value_json):
3738                        errormsg = "No output for '{}' cli".format(cmd)
3739                        return errormsg
3740
3741                    if rd is not None and rd != "auto":
3742                        logger.info(
3743                            "[DUT: %s]: Verifying rd value for " "evpn route %s:",
3744                            dut,
3745                            route,
3746                        )
3747
3748                        if rd in evpn_rd_value_json:
3749                            rd_value_json = evpn_rd_value_json[rd]
3750                            if rd_value_json["rd"] != rd:
3751                                errormsg = (
3752                                    "[DUT: %s] Failed: Verifying"
3753                                    " RD value for EVPN route: %s"
3754                                    "[FAILED]!!, EXPECTED  : %s "
3755                                    " FOUND : %s"
3756                                    % (dut, route, rd, rd_value_json["rd"])
3757                                )
3758                                return errormsg
3759
3760                            else:
3761                                logger.info(
3762                                    "[DUT %s]: Verifying RD value for"
3763                                    " EVPN route: %s [PASSED]|| "
3764                                    "Found Exprected: %s",
3765                                    dut,
3766                                    route,
3767                                    rd,
3768                                )
3769                                return True
3770
3771                        else:
3772                            errormsg = (
3773                                "[DUT: %s] RD : %s is not present"
3774                                " in cli json output" % (dut, rd)
3775                            )
3776                            return errormsg
3777
3778                    if rd == "auto":
3779                        logger.info(
3780                            "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
3781                            dut,
3782                            route,
3783                        )
3784
3785                        if rd_peer:
3786                            index = 1
3787                            vni_dict = {}
3788
3789                            rnode = tgen.routers()[rd_peer]
3790                            vrfs = topo["routers"][rd_peer]["vrfs"]
3791                            for vrf_dict in vrfs:
3792                                vni_dict[vrf_dict["name"]] = index
3793                                index += 1
3794
3795                        show_bgp_json = run_frr_cmd(
3796                            rnode, "show bgp vrf all summary json", isjson=True
3797                        )
3798
3799                        # Verifying output dictionary show_bgp_json is empty
3800                        if not bool(show_bgp_json):
3801                            errormsg = "BGP is not running"
3802                            return errormsg
3803
3804                        show_bgp_json_vrf = show_bgp_json[vrf]
3805                        for afi, afi_data in show_bgp_json_vrf.items():
3806                            if input_afi not in afi:
3807                                continue
3808                            router_id = afi_data["routerId"]
3809
3810                        rd = "{}:{}".format(router_id, vni_dict[vrf])
3811                        if rd in evpn_rd_value_json:
3812                            rd_value_json = evpn_rd_value_json[rd]
3813                            if rd_value_json["rd"] != rd:
3814                                errormsg = (
3815                                    "[DUT: %s] Failed: Verifying"
3816                                    " RD value for EVPN route: %s"
3817                                    "[FAILED]!!, EXPECTED  : %s "
3818                                    " FOUND : %s"
3819                                    % (dut, route, rd, rd_value_json["rd"])
3820                                )
3821                                return errormsg
3822
3823                            else:
3824                                logger.info(
3825                                    "[DUT %s]: Verifying RD value for"
3826                                    " EVPN route: %s [PASSED]|| "
3827                                    "Found Exprected: %s",
3828                                    dut,
3829                                    route,
3830                                    rd,
3831                                )
3832                                return True
3833
3834                        else:
3835                            errormsg = (
3836                                "[DUT: %s] RD : %s is not present"
3837                                " in cli json output" % (dut, rd)
3838                            )
3839                            return errormsg
3840
3841                    if rt == "auto":
3842                        logger.info(
3843                            "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
3844                            dut,
3845                            route,
3846                        )
3847
3848                        if rt_peer:
3849                            vni_dict = {}
3850
3851                            rnode = tgen.routers()[rt_peer]
3852                            show_bgp_json = run_frr_cmd(
3853                                rnode, "show bgp vrf all summary json", isjson=True
3854                            )
3855
3856                            # Verifying output dictionary show_bgp_json is empty
3857                            if not bool(show_bgp_json):
3858                                errormsg = "BGP is not running"
3859                                return errormsg
3860
3861                            show_bgp_json_vrf = show_bgp_json[vrf]
3862                            for afi, afi_data in show_bgp_json_vrf.items():
3863                                if input_afi not in afi:
3864                                    continue
3865                                as_num = afi_data["as"]
3866
3867                            show_vrf_vni_json = run_frr_cmd(
3868                                rnode, "show vrf vni json", isjson=True
3869                            )
3870
3871                            vrfs = show_vrf_vni_json["vrfs"]
3872                            for vrf_dict in vrfs:
3873                                if vrf_dict["vrf"] == vrf:
3874                                    vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"])
3875
3876                        # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
3877                        # for auto derived RT value.
3878                        if as_num > 65535:
3879                            as_bin = bin(as_num)
3880                            as_bin = as_bin[-16:]
3881                            as_num = int(as_bin, 2)
3882
3883                        rt = "{}:{}".format(str(as_num), vni_dict[vrf])
3884                        for _rd, route_data in evpn_rd_value_json.items():
3885                            if route_data["ip"] == route:
3886                                for rt_data in route_data["paths"]:
3887                                    if vni_dict[vrf] == rt_data["VNI"]:
3888                                        rt_string = rt_data["extendedCommunity"][
3889                                            "string"
3890                                        ]
3891                                        rt_input = "RT:{}".format(rt)
3892                                        if rt_input not in rt_string:
3893                                            errormsg = (
3894                                                "[DUT: %s] Failed:"
3895                                                " Verifying RT "
3896                                                "value for EVPN "
3897                                                " route: %s"
3898                                                "[FAILED]!!,"
3899                                                " EXPECTED  : %s "
3900                                                " FOUND : %s"
3901                                                % (dut, route, rt_input, rt_string)
3902                                            )
3903                                            return errormsg
3904
3905                                        else:
3906                                            logger.info(
3907                                                "[DUT %s]: Verifying "
3908                                                "RT value for EVPN "
3909                                                "route: %s [PASSED]||"
3910                                                "Found Exprected: %s",
3911                                                dut,
3912                                                route,
3913                                                rt_input,
3914                                            )
3915                                            return True
3916
3917                            else:
3918                                errormsg = (
3919                                    "[DUT: %s] Route : %s is not"
3920                                    " present in cli json output" % (dut, route)
3921                                )
3922                                return errormsg
3923
3924                    if rt is not None and rt != "auto":
3925                        logger.info(
3926                            "[DUT: %s]: Verifying rt value for " "evpn route %s:",
3927                            dut,
3928                            route,
3929                        )
3930
3931                        if type(rt) is not list:
3932                            rt = [rt]
3933
3934                        for _rt in rt:
3935                            for _rd, route_data in evpn_rd_value_json.items():
3936                                if route_data["ip"] == route:
3937                                    for rt_data in route_data["paths"]:
3938                                        rt_string = rt_data["extendedCommunity"][
3939                                            "string"
3940                                        ]
3941                                        rt_input = "RT:{}".format(_rt)
3942                                        if rt_input not in rt_string:
3943                                            errormsg = (
3944                                                "[DUT: %s] Failed: "
3945                                                "Verifying RT value "
3946                                                "for EVPN route: %s"
3947                                                "[FAILED]!!,"
3948                                                " EXPECTED  : %s "
3949                                                " FOUND : %s"
3950                                                % (dut, route, rt_input, rt_string)
3951                                            )
3952                                            return errormsg
3953
3954                                        else:
3955                                            logger.info(
3956                                                "[DUT %s]: Verifying RT"
3957                                                " value for EVPN route:"
3958                                                " %s [PASSED]|| "
3959                                                "Found Exprected: %s",
3960                                                dut,
3961                                                route,
3962                                                rt_input,
3963                                            )
3964                                            return True
3965
3966                                else:
3967                                    errormsg = (
3968                                        "[DUT: %s] Route : %s is not"
3969                                        " present in cli json output" % (dut, route)
3970                                    )
3971                                    return errormsg
3972
3973                    if ethTag is not None:
3974                        logger.info(
3975                            "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
3976                        )
3977
3978                        for _rd, route_data in evpn_rd_value_json.items():
3979                            if route_data["ip"] == route:
3980                                if route_data["ethTag"] != ethTag:
3981                                    errormsg = (
3982                                        "[DUT: %s] RD: %s, Failed: "
3983                                        "Verifying ethTag value "
3984                                        "for EVPN route: %s"
3985                                        "[FAILED]!!,"
3986                                        " EXPECTED  : %s "
3987                                        " FOUND : %s"
3988                                        % (
3989                                            dut,
3990                                            _rd,
3991                                            route,
3992                                            ethTag,
3993                                            route_data["ethTag"],
3994                                        )
3995                                    )
3996                                    return errormsg
3997
3998                                else:
3999                                    logger.info(
4000                                        "[DUT %s]: RD: %s, Verifying "
4001                                        "ethTag value for EVPN route:"
4002                                        " %s [PASSED]|| "
4003                                        "Found Exprected: %s",
4004                                        dut,
4005                                        _rd,
4006                                        route,
4007                                        ethTag,
4008                                    )
4009                                    return True
4010
4011                            else:
4012                                errormsg = (
4013                                    "[DUT: %s] RD: %s, Route : %s "
4014                                    "is not present in cli json "
4015                                    "output" % (dut, _rd, route)
4016                                )
4017                                return errormsg
4018
4019                    if ipLen is not None:
4020                        logger.info(
4021                            "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4022                        )
4023
4024                        for _rd, route_data in evpn_rd_value_json.items():
4025                            if route_data["ip"] == route:
4026                                if route_data["ipLen"] != int(ipLen):
4027                                    errormsg = (
4028                                        "[DUT: %s] RD: %s, Failed: "
4029                                        "Verifying ipLen value "
4030                                        "for EVPN route: %s"
4031                                        "[FAILED]!!,"
4032                                        " EXPECTED  : %s "
4033                                        " FOUND : %s"
4034                                        % (dut, _rd, route, ipLen, route_data["ipLen"])
4035                                    )
4036                                    return errormsg
4037
4038                                else:
4039                                    logger.info(
4040                                        "[DUT %s]: RD: %s, Verifying "
4041                                        "ipLen value for EVPN route:"
4042                                        " %s [PASSED]|| "
4043                                        "Found Exprected: %s",
4044                                        dut,
4045                                        _rd,
4046                                        route,
4047                                        ipLen,
4048                                    )
4049                                    return True
4050
4051                            else:
4052                                errormsg = (
4053                                    "[DUT: %s] RD: %s, Route : %s "
4054                                    "is not present in cli json "
4055                                    "output " % (dut, route)
4056                                )
4057                                return errormsg
4058
4059    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4060    return False
4061
4062
4063@retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
4064def verify_evpn_routes(
4065    tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None
4066):
4067    """
4068    API to verify evpn routes using "sh bgp l2vpn evpn"
4069    command.
4070
4071    Parameters
4072    ----------
4073    * `tgen`: topogen object
4074    * `topo` : json file data
4075    * `dut` : device under test
4076    * `input_dict`: having details like - for which route, rd value
4077                    needs to be verified
4078    * `route_type` : Route type 5 is supported as of now
4079    * `EthTag` : Ethernet tag, by-default is 0
4080    * `next_hop` : Prefered nexthop for the evpn routes
4081
4082    Usage
4083    -----
4084        input_dict_1 = {
4085            "r1": {
4086                "static_routes": [{
4087                    "network": [NETWORK1_1[addr_type]],
4088                    "next_hop": NEXT_HOP_IP[addr_type],
4089                    "vrf": "RED"
4090                }]
4091            }
4092        }
4093        result = verify_evpn_routes(tgen, topo, input_dict)
4094
4095    Returns
4096    -------
4097    errormsg(str) or True
4098    """
4099
4100    logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4101
4102    for router in input_dict.keys():
4103        rnode = tgen.routers()[dut]
4104
4105        logger.info("[DUT: %s]: Verifying evpn routes: ", dut)
4106
4107        if "static_routes" in input_dict[router]:
4108            for static_route in input_dict[router]["static_routes"]:
4109                network = static_route["network"]
4110
4111                if type(network) is not list:
4112                    network = [network]
4113
4114                missing_routes = {}
4115                for route in network:
4116                    rd_keys = 0
4117                    ip_len = route.split("/")[1]
4118                    route = route.split("/")[0]
4119
4120                    prefix = "[{}]:[{}]:[{}]:[{}]".format(
4121                        routeType, EthTag, ip_len, route
4122                    )
4123
4124                    cmd = "show bgp l2vpn evpn route json"
4125                    evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4126
4127                    if not bool(evpn_value_json):
4128                        errormsg = "No output for '{}' cli".format(cmd)
4129                        return errormsg
4130
4131                    if evpn_value_json["numPrefix"] == 0:
4132                        errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut)
4133                        return errormsg
4134
4135                    for key, route_data_json in evpn_value_json.items():
4136                        if isinstance(route_data_json, dict):
4137                            rd_keys += 1
4138                            if prefix not in route_data_json:
4139                                missing_routes[key] = prefix
4140
4141                    if rd_keys == len(missing_routes.keys()):
4142                        errormsg = (
4143                            "[DUT: %s]:  "
4144                            "Missing EVPN routes: "
4145                            "%s [FAILED]!!" % (dut, list(set(missing_routes.values())))
4146                        )
4147                        return errormsg
4148
4149                    for key, route_data_json in evpn_value_json.items():
4150                        if isinstance(route_data_json, dict):
4151                            if prefix not in route_data_json:
4152                                continue
4153
4154                            for paths in route_data_json[prefix]["paths"]:
4155                                for path in paths:
4156                                    if path["routeType"] != routeType:
4157                                        errormsg = (
4158                                            "[DUT: %s]:  "
4159                                            "Verifying routeType "
4160                                            "for EVPN route: %s "
4161                                            "[FAILED]!! "
4162                                            "Expected: %s, "
4163                                            "Found: %s"
4164                                            % (
4165                                                dut,
4166                                                prefix,
4167                                                routeType,
4168                                                path["routeType"],
4169                                            )
4170                                        )
4171                                        return errormsg
4172
4173                                    elif next_hop:
4174                                        for nh_dict in path["nexthops"]:
4175                                            if nh_dict["ip"] != next_hop:
4176                                                errormsg = (
4177                                                    "[DUT: %s]: "
4178                                                    "Verifying "
4179                                                    "nexthop for "
4180                                                    "EVPN route: %s"
4181                                                    "[FAILED]!! "
4182                                                    "Expected: %s,"
4183                                                    " Found: %s"
4184                                                    % (
4185                                                        dut,
4186                                                        prefix,
4187                                                        next_hop,
4188                                                        nh_dict["ip"],
4189                                                    )
4190                                                )
4191                                                return errormsg
4192
4193                                    else:
4194                                        logger.info(
4195                                            "[DUT %s]: Verifying "
4196                                            "EVPN route : %s, "
4197                                            "routeType: %s is "
4198                                            "installed "
4199                                            "[PASSED]|| ",
4200                                            dut,
4201                                            prefix,
4202                                            routeType,
4203                                        )
4204                                        return True
4205
4206    logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4207
4208    return False
4209