1#!/usr/bin/python
2#
3# Copyright (c) 2020 by VMware, Inc. ("VMware")
4# Used Copyright (c) 2018 by Network Device Education Foundation,
5# Inc. ("NetDEF") in this file.
6#
7# Permission to use, copy, modify, and/or distribute this software
8# for any purpose with or without fee is hereby granted, provided
9# that the above copyright notice and this permission notice appear
10# in all copies.
11#
12# THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
13# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
14# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
15# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
16# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
17# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
18# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
19# OF THIS SOFTWARE.
20#
21
22"""
23Following tests are covered to test bgp recursive route and ebgp
24multi-hop functionality:
25
261.  Verify that BGP routes are installed in iBGP peer, only when there
27    is a recursive route for next-hop reachability.
282.  Verify that any BGP prefix received with next hop as self-ip is
29    not installed in BGP RIB or FIB table.
303.  Verify password authentication for eBGP and iBGP peers.
314.  Verify that for a BGP prefix next-hop information doesn't change
32    when same prefix is received from another peer via recursive lookup.
335.  Verify that BGP path attributes are present in CLI outputs and
34    JSON format, even if set to default.
356.  Verifying the BGP peering between loopback and physical link's IP
36    of 2 peer routers.
377.  Verify that BGP Active/Standby/Pre-emption/ECMP.
38"""
39
40import os
41import sys
42import time
43import json
44import pytest
45from time import sleep
46from copy import deepcopy
47
48# Save the Current Working Directory to find configuration files.
49CWD = os.path.dirname(os.path.realpath(__file__))
50sys.path.append(os.path.join(CWD, "../"))
51
52# pylint: disable=C0413
53# Import topogen and topotest helpers
54from lib import topotest
55from mininet.topo import Topo
56from lib.topogen import Topogen, get_topogen
57
58# Import topoJson from lib, to create topology and initial configuration
59from lib.common_config import (
60    start_topology,
61    write_test_header,
62    apply_raw_config,
63    write_test_footer,
64    reset_config_on_routers,
65    verify_rib,
66    create_static_routes,
67    check_address_types,
68    step,
69    create_route_maps,
70    create_interface_in_kernel,
71    shutdown_bringup_interface,
72    addKernelRoute,
73    delete_route_maps,
74    kill_mininet_routers_process,
75)
76from lib.topolog import logger
77from lib.bgp import (
78    verify_bgp_convergence,
79    create_router_bgp,
80    clear_bgp_and_verify,
81    verify_bgp_rib,
82    verify_bgp_convergence_from_running_config,
83    modify_as_number,
84    verify_bgp_attributes,
85    clear_bgp,
86)
87from lib.topojson import build_topo_from_json, build_config_from_json
88
89# Reading the data from JSON File for topology and configuration creation
90jsonFile = "{}/bgp_recursive_route_ebgp_multi_hop.json".format(CWD)
91try:
92    with open(jsonFile, "r") as topoJson:
93        topo = json.load(topoJson)
94except IOError :
95    logger.info("Could not read file:", jsonFile)
96
97# Global variables
98BGP_CONVERGENCE = False
99KEEP_ALIVE_TIMER = 2
100HOLD_DOWN_TIMER = 6
101ADDR_TYPES = check_address_types()
102NETWORK = {
103    "ipv4": ["100.1.1.1/32", "100.1.1.2/32"],
104    "ipv6": ["100::1/128", "100::2/128"],
105}
106
107RECUR_NEXT_HOP = {
108    "N1": {"ipv4": "20.20.20.20/24", "ipv6": "20:20::20:20/120"},
109    "N2": {"ipv4": "30.30.30.30/24", "ipv6": "30:30::30:30/120"},
110    "N3": {"ipv4": "40.40.40.40/24", "ipv6": "40:40::40:40/120"},
111}
112
113CHANGED_NEXT_HOP = {
114    "4thOctate": {"ipv4": "10.0.1.250/24", "ipv6": "fd00:0:0:1::100/64"},
115    "3rdOctate": {"ipv4": "10.0.10.2/24", "ipv6": "fd00:0:0:10::2/64"},
116}
117
118Loopabck_IP = {
119    "Lo_R1": {"ipv4": "1.1.1.1/32", "ipv6": "1:1::1:1/128"},
120    "Lo_R4": {"ipv4": "4.4.4.4/32", "ipv6": "4:4::4:4/128"},
121}
122
123
124class CreateTopo(Topo):
125    """
126    Test BasicTopo - topology 1
127
128    * `Topo`: Topology object
129    """
130
131    def build(self, *_args, **_opts):
132        """Build function"""
133        tgen = get_topogen(self)
134
135        # Building topology from json file
136        build_topo_from_json(tgen, topo)
137
138
139def setup_module(mod):
140    """
141    Sets up the pytest environment
142
143    * `mod`: module name
144    """
145
146    testsuite_run_time = time.asctime(time.localtime(time.time()))
147    logger.info("Testsuite start time: {}".format(testsuite_run_time))
148    logger.info("=" * 40)
149
150    logger.info("Running setup_module to create topology")
151
152    # This function initiates the topology build with Topogen...
153    tgen = Topogen(CreateTopo, mod.__name__)
154    # ... and here it calls Mininet initialization functions.
155
156    # Starting topology, create tmp files which are loaded to routers
157    #  to start deamons and then start routers
158    start_topology(tgen)
159
160    # Creating configuration from JSON
161    build_config_from_json(tgen, topo)
162
163    global BGP_CONVERGENCE
164    BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
165    assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error : {}".format(
166        BGP_CONVERGENCE
167    )
168
169    logger.info("Running setup_module() done")
170
171
172def teardown_module():
173    """Teardown the pytest environment"""
174
175    logger.info("Running teardown_module to delete topology")
176
177    tgen = get_topogen()
178
179    # Stop toplogy and Remove tmp files
180    tgen.stop_topology()
181
182    logger.info(
183        "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
184    )
185    logger.info("=" * 40)
186
187
188#####################################################
189#
190#   Tests starting
191#
192#####################################################
193
194
195def test_recursive_routes_iBGP_peer_p1(request):
196    """
197    Verify that BGP routes are installed in iBGP peer, only
198    when there is a recursive route for next-hop reachability.
199    """
200
201    tc_name = request.node.name
202    write_test_header(tc_name)
203    tgen = get_topogen()
204
205    # Don"t run this test if we have any failure.
206    if tgen.routers_have_failure():
207        pytest.skip(tgen.errors)
208
209    step("Initial config :Configure BGP neighborship between R1 and R3.")
210    reset_config_on_routers(tgen)
211
212    dut = "r1"
213    protocol = "static"
214
215    step(
216        "Configure static routes on R1 pointing next-hop as connected"
217        "link between R1 & R3's IP"
218    )
219    for addr_type in ADDR_TYPES:
220        input_dict_4 = {
221            "r1": {
222                "static_routes": [
223                    {
224                        "network": NETWORK[addr_type],
225                        "next_hop": topo["routers"]["r3"]["links"]["r1"][
226                            addr_type
227                        ].split("/")[0],
228                    }
229                ]
230            }
231        }
232        result = create_static_routes(tgen, input_dict_4)
233
234        step(
235            "Verify on router R1 that these static routes are "
236            "installed in RIB+FIB of R1"
237        )
238        result = verify_rib(
239            tgen,
240            addr_type,
241            dut,
242            input_dict_4,
243            next_hop=topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0],
244            protocol=protocol,
245        )
246        assert result is True, "Testcase {} : Failed \n Error : {}".format(
247            tc_name, result
248        )
249
250    step("Redistribute these static routes in BGP on router R1")
251    input_dict_2 = {
252        "r1": {
253            "bgp": {
254                "address_family": {
255                    "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
256                    "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
257                }
258            }
259        }
260    }
261    result = create_router_bgp(tgen, topo, input_dict_2)
262
263    step(
264        "Verify on router R1 that these static routes are installed"
265        "in RIB table as well"
266    )
267    for addr_type in ADDR_TYPES:
268        input_dict_4 = {
269            "r1": {
270                "static_routes": [
271                    {
272                        "network": NETWORK[addr_type],
273                        "next_hop": topo["routers"]["r3"]["links"]["r1"][
274                            addr_type
275                        ].split("/")[0],
276                    }
277                ]
278            }
279        }
280        result = verify_bgp_rib(
281            tgen,
282            addr_type,
283            dut,
284            input_dict_4,
285            next_hop=topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0],
286        )
287        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
288
289    step(
290        "Configure a static routes for next hop IP on R2 via multiple"
291        "recursive static routes"
292    )
293    dut = "r2"
294    create_interface_in_kernel(
295        tgen, dut, "lo", "40.40.40.50", netmask="255.255.255.0", create=True
296    )
297    create_interface_in_kernel(
298        tgen, dut, "lo", "40:40::40:50", netmask="120", create=True
299    )
300    for addr_type in ADDR_TYPES:
301        input_dict_3 = {
302            "r2": {
303                "static_routes": [
304                    {
305                        "network": topo["routers"]["r3"]["links"]["r1"][addr_type],
306                        "next_hop": RECUR_NEXT_HOP["N1"][addr_type].split("/")[0],
307                    },
308                    {
309                        "network": RECUR_NEXT_HOP["N1"][addr_type],
310                        "next_hop": RECUR_NEXT_HOP["N2"][addr_type].split("/")[0],
311                    },
312                    {
313                        "network": RECUR_NEXT_HOP["N2"][addr_type],
314                        "next_hop": RECUR_NEXT_HOP["N3"][addr_type].split("/")[0],
315                    },
316                ]
317            }
318        }
319        result = create_static_routes(tgen, input_dict_3)
320        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
321
322        step("verify if redistributed routes are now installed in FIB of R2")
323        result = verify_rib(
324            tgen,
325            addr_type,
326            "r2",
327            input_dict_4,
328            next_hop=topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0],
329            protocol="bgp",
330        )
331        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
332
333    step("Delete 1 route from static recursive for the next-hop IP")
334    dut = "r2"
335    for addr_type in ADDR_TYPES:
336        input_dict_3 = {
337            "r2": {
338                "static_routes": [
339                    {
340                        "network": RECUR_NEXT_HOP["N1"][addr_type],
341                        "next_hop": RECUR_NEXT_HOP["N2"][addr_type].split("/")[0],
342                        "delete": True,
343                    }
344                ]
345            }
346        }
347        result = create_static_routes(tgen, input_dict_3)
348        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
349
350        step("Verify that redistributed routes are withdrawn from FIB of R2")
351        result = verify_rib(
352            tgen,
353            addr_type,
354            dut,
355            input_dict_4,
356            next_hop=topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0],
357            protocol="bgp",
358            expected=False
359        )
360        assert result is not True, "Testcase  : Failed \n Error : {}".format(
361            tc_name, result
362        )
363
364    step("Reconfigure the same static route on R2 again")
365    dut = "r2"
366    for addr_type in ADDR_TYPES:
367        input_dict_3 = {
368            "r2": {
369                "static_routes": [
370                    {
371                        "network": RECUR_NEXT_HOP["N1"][addr_type],
372                        "next_hop": RECUR_NEXT_HOP["N2"][addr_type].split("/")[0],
373                    }
374                ]
375            }
376        }
377        result = create_static_routes(tgen, input_dict_3)
378        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
379
380        step("Verify that redistributed routes are again installed" "in FIB of R2")
381        result = verify_rib(
382            tgen,
383            addr_type,
384            dut,
385            input_dict_4,
386            next_hop=topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0],
387            protocol="bgp",
388        )
389        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
390
391    step("Configure static route with changed next-hop from same subnet")
392    for addr_type in ADDR_TYPES:
393        input_dict_4 = {
394            "r1": {
395                "static_routes": [
396                    {
397                        "network": NETWORK[addr_type],
398                        "next_hop": topo["routers"]["r3"]["links"]["r1"][
399                            addr_type
400                        ].split("/")[0],
401                        "delete": True,
402                    },
403                    {
404                        "network": NETWORK[addr_type],
405                        "next_hop": CHANGED_NEXT_HOP["4thOctate"][addr_type].split("/")[
406                            0
407                        ],
408                    },
409                ]
410            }
411        }
412        result = create_static_routes(tgen, input_dict_4)
413        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
414
415        result = verify_rib(tgen, addr_type, "r1", input_dict_4, protocol="static")
416        assert result is True, "Testcase {} : Failed \n Error : {}".format(
417            tc_name, result
418        )
419
420        step(
421            "Verify that redistributed routes are not withdrawn as changed"
422            "next-hop IP, belongs to the same subnet"
423        )
424        result = verify_rib(tgen, addr_type, "r2", input_dict_4, protocol="bgp")
425        assert result is True, "Testcase {} : Failed \n Error : {}".format(
426            tc_name, result
427        )
428
429    step("Configure static route with changed next-hop from different subnet")
430    dut = "r1"
431    create_interface_in_kernel(
432        tgen, dut, "lo10", "10.0.10.10", netmask="255.255.255.0", create=True
433    )
434    create_interface_in_kernel(
435        tgen, dut, "lo10", "fd00:0:0:10::104", netmask="64", create=True
436    )
437    for addr_type in ADDR_TYPES:
438        input_dict_4 = {
439            "r1": {
440                "static_routes": [
441                    {
442                        "network": NETWORK[addr_type],
443                        "next_hop": CHANGED_NEXT_HOP["4thOctate"][addr_type].split("/")[
444                            0
445                        ],
446                        "delete": True,
447                    },
448                    {
449                        "network": NETWORK[addr_type],
450                        "next_hop": CHANGED_NEXT_HOP["3rdOctate"][addr_type].split("/")[
451                            0
452                        ],
453                    },
454                ]
455            }
456        }
457        result = create_static_routes(tgen, input_dict_4)
458        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
459
460        result = verify_rib(tgen, addr_type, "r1", input_dict_4, protocol="static")
461        assert result is True, "Testcase {} : Failed \n Error : {}".format(
462            tc_name, result
463        )
464
465        step(
466            "Verify that redistributed routes are withdrawn as changed "
467            "next-hop IP, belongs to different subnet"
468        )
469        result = verify_rib(
470            tgen, addr_type, "r2", input_dict_4, protocol="bgp", expected=False
471        )
472        assert result is not True, "Testcase {} : Failed \n Error : {}".format(
473            tc_name, result
474        )
475
476    write_test_footer(tc_name)
477
478
479def test_next_hop_as_self_ip_p1(request):
480    """
481    Verify that any BGP prefix received with next hop as
482    self-ip is not installed in BGP RIB or FIB table.
483    """
484
485    tc_name = request.node.name
486    write_test_header(tc_name)
487    tgen = get_topogen()
488
489    # Don"t run this test if we have any failure.
490    if tgen.routers_have_failure():
491        pytest.skip(tgen.errors)
492
493    step("Initial config :Configure BGP neighborship between R1 and R3.")
494    reset_config_on_routers(tgen)
495
496    step(
497        "Configure static routes on R1 with a next-hop IP belonging"
498        "to the same subnet of R2's link IP."
499    )
500    dut = "r1"
501    create_interface_in_kernel(
502        tgen,
503        dut,
504        "lo10",
505        topo["routers"]["r4"]["links"]["r2"]["ipv4"].split("/")[0],
506        netmask="255.255.255.0",
507        create=True,
508    )
509    create_interface_in_kernel(
510        tgen,
511        dut,
512        "lo10",
513        topo["routers"]["r4"]["links"]["r2"]["ipv6"].split("/")[0],
514        netmask="64",
515        create=True,
516    )
517    for addr_type in ADDR_TYPES:
518        input_dict_4 = {
519            "r1": {
520                "static_routes": [
521                    {
522                        "network": NETWORK[addr_type],
523                        "next_hop": topo["routers"]["r2"]["links"]["r4"][
524                            addr_type
525                        ].split("/")[0],
526                    }
527                ]
528            }
529        }
530        result = create_static_routes(tgen, input_dict_4)
531
532        step("Verify that static routes are installed in RIB and FIB of R1")
533        result = verify_rib(
534            tgen,
535            addr_type,
536            dut,
537            input_dict_4,
538            next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
539            protocol="static",
540        )
541        assert result is True, "Testcase {} : Failed \n Error : {}".format(
542            tc_name, result
543        )
544
545    step("Redistribute static routes into BGP on R1")
546    input_dict_2 = {
547        "r1": {
548            "bgp": {
549                "address_family": {
550                    "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
551                    "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
552                }
553            }
554        }
555    }
556    result = create_router_bgp(tgen, topo, input_dict_2)
557
558    step(
559        "Verify that R2 denies the prefixes received in update message,"
560        "as next-hop IP belongs to connected interface"
561    )
562    for addr_type in ADDR_TYPES:
563        input_dict_4 = {
564            "r1": {
565                "static_routes": [
566                    {
567                        "network": NETWORK[addr_type],
568                        "next_hop": topo["routers"]["r2"]["links"]["r4"][
569                            addr_type
570                        ].split("/")[0],
571                    }
572                ]
573            }
574        }
575        result = verify_bgp_rib(
576            tgen,
577            addr_type,
578            "r2",
579            input_dict_4,
580            next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
581            expected=False
582        )
583        assert result is not True, "Testcase  : Failed \n Error : {}".format(
584            tc_name, result
585        )
586
587    step("Shut interface on R2 that has IP from the subnet as BGP next-hop")
588    intf_r2_r4 = topo["routers"]["r2"]["links"]["r4"]["interface"]
589    shutdown_bringup_interface(tgen, "r2", intf_r2_r4)
590
591    for addr_type in ADDR_TYPES:
592        clear_bgp(tgen, addr_type, "r2")
593    step(
594        "Verify that redistributed routes now appear only in BGP table,"
595        "as next-hop IP is no more active on R2"
596    )
597    for addr_type in ADDR_TYPES:
598        input_dict_4 = {
599            "r1": {
600                "static_routes": [
601                    {
602                        "network": NETWORK[addr_type],
603                        "next_hop": topo["routers"]["r2"]["links"]["r4"][
604                            addr_type
605                        ].split("/")[0],
606                    }
607                ]
608            }
609        }
610        result = verify_bgp_rib(
611            tgen,
612            addr_type,
613            "r2",
614            input_dict_4,
615            next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
616        )
617        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
618
619    step("No shutdown interface on R2 which was shut in previous step")
620    intf_r2_r4 = topo["routers"]["r2"]["links"]["r4"]["interface"]
621    shutdown_bringup_interface(tgen, "r2", intf_r2_r4, ifaceaction=True)
622
623    step(
624        "Verify that R2 dosn't install prefixes RIB to FIB as next-hop"
625        "interface is up now"
626    )
627    for addr_type in ADDR_TYPES:
628        input_dict_4 = {
629            "r1": {
630                "static_routes": [
631                    {
632                        "network": NETWORK[addr_type],
633                        "next_hop": topo["routers"]["r2"]["links"]["r4"][
634                            addr_type
635                        ].split("/")[0],
636                    }
637                ]
638            }
639        }
640        result = verify_bgp_rib(
641            tgen,
642            addr_type,
643            "r2",
644            input_dict_4,
645            next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
646        )
647        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
648        result = verify_rib(
649            tgen,
650            addr_type,
651            "r2",
652            input_dict_4,
653            next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
654            expected=False
655        )
656        assert result is not True, "Testcase  : Failed \n Error : {}".format(
657            tc_name, result
658        )
659
660    write_test_footer(tc_name)
661
662
663def test_next_hop_with_recursive_lookup_p1(request):
664    """
665    Verify that for a BGP prefix next-hop information doesn't change
666    when same prefix is received from another peer via recursive lookup.
667    """
668
669    tc_name = request.node.name
670    write_test_header(tc_name)
671    tgen = get_topogen()
672
673    # Don"t run this test if we have any failure.
674    if tgen.routers_have_failure():
675        pytest.skip(tgen.errors)
676
677    step("Initial config :Configure BGP neighborship between R1 and R3.")
678    reset_config_on_routers(tgen)
679
680    step("Verify that BGP peering comes up.")
681
682    result = verify_bgp_convergence_from_running_config(tgen)
683    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
684
685    step("Do redistribute connected on router R3.")
686    input_dict_1 = {
687        "r3": {
688            "bgp": {
689                "address_family": {
690                    "ipv4": {
691                        "unicast": {"redistribute": [{"redist_type": "connected"}]}
692                    },
693                    "ipv6": {
694                        "unicast": {"redistribute": [{"redist_type": "connected"}]}
695                    },
696                }
697            }
698        }
699    }
700
701    result = create_router_bgp(tgen, topo, input_dict_1)
702    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
703
704    step("Verify that R1 receives all connected")
705    for addr_type in ADDR_TYPES:
706        routes = {
707            "ipv4": ["1.0.3.17/32", "10.0.1.0/24", "10.0.3.0/24"],
708            "ipv6": ["2001:db8:f::3:17/128", "fd00:0:0:1::/64", "fd00:0:0:3::/64"],
709        }
710        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
711        result = verify_rib(tgen, addr_type, "r1", input_dict, protocol="bgp")
712        assert result is True, "Testcase {} : Failed \n Error : {}".format(
713            tc_name, result
714        )
715
716    step(
717        "Configure a BGP neighborship between R1 and R4, directly via "
718        "eBGP multi-hop."
719    )
720    r1_local_as = topo["routers"]["r1"]["bgp"]["local_as"]
721    r1_r3_addr = topo["routers"]["r1"]["links"]["r3"]
722    r4_local_as = topo["routers"]["r4"]["bgp"]["local_as"]
723    r4_r3_addr = topo["routers"]["r4"]["links"]["r3"]
724    ebgp_multi_hop = 3
725
726    for addr_type in ADDR_TYPES:
727        raw_config = {
728            "r1": {
729                "raw_config": [
730                    "router bgp {}".format(r1_local_as),
731                    "neighbor {} remote-as {}".format(
732                        r4_r3_addr[addr_type].split("/")[0], r4_local_as
733                    ),
734                    "neighbor {} timers {} {}".format(
735                        r4_r3_addr[addr_type].split("/")[0],
736                        KEEP_ALIVE_TIMER,
737                        HOLD_DOWN_TIMER,
738                    ),
739                    "neighbor {} ebgp-multihop {}".format(
740                        r4_r3_addr[addr_type].split("/")[0], ebgp_multi_hop
741                    ),
742                ]
743            },
744            "r4": {
745                "raw_config": [
746                    "router bgp {}".format(r4_local_as),
747                    "neighbor {} remote-as {}".format(
748                        r1_r3_addr[addr_type].split("/")[0], r1_local_as
749                    ),
750                    "neighbor {} timers {} {}".format(
751                        r1_r3_addr[addr_type].split("/")[0],
752                        KEEP_ALIVE_TIMER,
753                        HOLD_DOWN_TIMER,
754                    ),
755                    "neighbor {} ebgp-multihop {}".format(
756                        r1_r3_addr[addr_type].split("/")[0], ebgp_multi_hop
757                    ),
758                ]
759            },
760        }
761        result = apply_raw_config(tgen, raw_config)
762        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
763
764    for addr_type in ADDR_TYPES:
765        if addr_type == "ipv4":
766            raw_config = {
767                "r1": {
768                    "raw_config": [
769                        "router bgp {}".format(r1_local_as),
770                        "address-family {} unicast".format(addr_type),
771                        "no neighbor {} activate".format(
772                            r4_r3_addr["ipv6"].split("/")[0]
773                        ),
774                    ]
775                },
776                "r4": {
777                    "raw_config": [
778                        "router bgp {}".format(r4_local_as),
779                        "address-family {} unicast".format(addr_type),
780                        "no neighbor {} activate".format(
781                            r1_r3_addr["ipv6"].split("/")[0]
782                        ),
783                    ]
784                },
785            }
786        else:
787            raw_config = {
788                "r1": {
789                    "raw_config": [
790                        "router bgp {}".format(r1_local_as),
791                        "address-family {} unicast".format(addr_type),
792                        "neighbor {} activate".format(
793                            r4_r3_addr[addr_type].split("/")[0]
794                        ),
795                    ]
796                },
797                "r4": {
798                    "raw_config": [
799                        "router bgp {}".format(r4_local_as),
800                        "address-family {} unicast".format(addr_type),
801                        "neighbor {} activate".format(
802                            r1_r3_addr[addr_type].split("/")[0]
803                        ),
804                    ]
805                },
806            }
807        result = apply_raw_config(tgen, raw_config)
808        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
809
810    step("Verify that BGP session between R1 and R4 comes up" "(recursively via R3).")
811    result = verify_bgp_convergence_from_running_config(tgen)
812    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
813
814    step("Do redistribute connected on router R4.")
815    input_dict_1 = {
816        "r4": {
817            "bgp": {
818                "address_family": {
819                    "ipv4": {
820                        "unicast": {"redistribute": [{"redist_type": "connected"}]}
821                    },
822                    "ipv6": {
823                        "unicast": {"redistribute": [{"redist_type": "connected"}]}
824                    },
825                }
826            }
827        }
828    }
829
830    result = create_router_bgp(tgen, topo, input_dict_1)
831    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
832
833    step(
834        "Verify that R1 now receives BGP prefix of link r3-r4 via 2 "
835        "next-hops R3 and R4. however do not install with NHT R4 in FIB."
836    )
837    for addr_type in ADDR_TYPES:
838        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
839
840        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
841        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
842
843        result = verify_rib(
844            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
845        )
846        assert result is True, "Testcase {} : Failed \n Error : {}".format(
847            tc_name, result
848        )
849
850    step("Clear bgp sessions from R1 using 'clear ip bgp *'")
851    for addr_type in ADDR_TYPES:
852        clear_bgp(tgen, addr_type, "r1")
853
854    step(
855        "Verify that prefix of link r3-r4 is again learned via 2 "
856        "next-hops (from R3 and R4 directly)"
857    )
858    for addr_type in ADDR_TYPES:
859        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
860
861        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
862        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
863
864        result = verify_rib(
865            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
866        )
867        assert result is True, "Testcase {} : Failed \n Error : {}".format(
868            tc_name, result
869        )
870
871    step("Remove redistribution from router R3.")
872    input_dict_1 = {
873        "r3": {
874            "bgp": {
875                "local_as": "300",
876                "address_family": {
877                    "ipv4": {
878                        "unicast": {
879                            "redistribute": [
880                                {"redist_type": "connected", "delete": True}
881                            ]
882                        }
883                    },
884                    "ipv6": {
885                        "unicast": {
886                            "redistribute": [
887                                {"redist_type": "connected", "delete": True}
888                            ]
889                        }
890                    },
891                },
892            }
893        }
894    }
895
896    result = create_router_bgp(tgen, topo, input_dict_1)
897    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
898
899    step(
900        "Verify that peering between R1-R4 goes down and prefix "
901        "of link r3-r4, with NHT R4 is withdrawn."
902    )
903
904    logger.info("Sleeping for holddowntimer: {}".format(HOLD_DOWN_TIMER))
905    sleep(HOLD_DOWN_TIMER + 1)
906
907    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
908    assert (
909        result is not True
910    ), "Testcase {} : Failed \n" "BGP is converged \n Error : {}".format(tc_name, result)
911    logger.info("Expected behaviour: {}".format(result))
912
913    for addr_type in ADDR_TYPES:
914        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
915
916        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
917        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
918
919        result = verify_rib(
920            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
921        )
922        assert result is True, "Testcase {} : Failed \n Error : {}".format(
923            tc_name, result
924        )
925
926    step("Re-apply redistribution on R3.")
927
928    input_dict_1 = {
929        "r3": {
930            "bgp": {
931                "local_as": "300",
932                "address_family": {
933                    "ipv4": {
934                        "unicast": {"redistribute": [{"redist_type": "connected"}]}
935                    },
936                    "ipv6": {
937                        "unicast": {"redistribute": [{"redist_type": "connected"}]}
938                    },
939                },
940            }
941        }
942    }
943
944    result = create_router_bgp(tgen, topo, input_dict_1)
945    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
946
947    step(
948        "Verify that peering between R1-R4 goes down and prefix "
949        "of link r3-r4 with NHT R4 is withdrawn."
950    )
951
952    result = verify_bgp_convergence_from_running_config(tgen)
953    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
954
955    for addr_type in ADDR_TYPES:
956        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
957
958        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
959        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
960
961        result = verify_rib(
962            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
963        )
964        assert result is True, "Testcase {} : Failed \n Error : {}".format(
965            tc_name, result
966        )
967
968    step("Remove redistribution from router R4.")
969
970    input_dict_1 = {
971        "r4": {
972            "bgp": {
973                "local_as": "400",
974                "address_family": {
975                    "ipv4": {
976                        "unicast": {
977                            "redistribute": [
978                                {"redist_type": "connected", "delete": True}
979                            ]
980                        }
981                    },
982                    "ipv6": {
983                        "unicast": {
984                            "redistribute": [
985                                {"redist_type": "connected", "delete": True}
986                            ]
987                        }
988                    },
989                },
990            }
991        }
992    }
993
994    result = create_router_bgp(tgen, topo, input_dict_1)
995    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
996
997    step(
998        "Verify that peering between R1-R4 doesn't go down but prefix "
999        "of link r3-r4 with NHT R4 is withdrawn."
1000    )
1001
1002    logger.info("Sleeping for holddowntimer: {}".format(HOLD_DOWN_TIMER))
1003    sleep(HOLD_DOWN_TIMER + 1)
1004
1005    result = verify_bgp_convergence_from_running_config(tgen)
1006    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1007
1008    for addr_type in ADDR_TYPES:
1009        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
1010
1011        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
1012        next_hop = topo["routers"]["r4"]["links"]["r3"][addr_type].split("/")[0]
1013
1014        result = verify_rib(
1015            tgen,
1016            addr_type,
1017            "r1",
1018            input_dict,
1019            protocol="bgp",
1020            next_hop=next_hop,
1021            expected=False
1022        )
1023        assert result is not True, (
1024            "Testcase {} : Failed \n "
1025            "Route is still present \n Error : {}".format(tc_name, result)
1026        )
1027
1028    step("Re-apply redistribution on R4.")
1029
1030    input_dict_1 = {
1031        "r4": {
1032            "bgp": {
1033                "local_as": "400",
1034                "address_family": {
1035                    "ipv4": {
1036                        "unicast": {
1037                            "redistribute": [
1038                                {"redist_type": "connected", "delete": True}
1039                            ]
1040                        }
1041                    },
1042                    "ipv6": {
1043                        "unicast": {
1044                            "redistribute": [
1045                                {"redist_type": "connected", "delete": True}
1046                            ]
1047                        }
1048                    },
1049                },
1050            }
1051        }
1052    }
1053
1054    result = create_router_bgp(tgen, topo, input_dict_1)
1055    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1056
1057    step("Verify that prefix of link r3-r4 is re-learned via NHT R4.")
1058
1059    for addr_type in ADDR_TYPES:
1060        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
1061
1062        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
1063        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1064
1065        result = verify_rib(
1066            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
1067        )
1068        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1069            tc_name, result
1070        )
1071
1072    step("Toggle the interface on R3(ifconfig 192.34).")
1073
1074    intf_r3_r4 = topo["routers"]["r3"]["links"]["r4"]["interface"]
1075    shutdown_bringup_interface(tgen, "r3", intf_r3_r4)
1076
1077    step(
1078        "Verify that peering between R1-R4 goes down and comes up when "
1079        "interface is toggled. Also prefix of link r3-r4(via both NHTs) is"
1080        " withdrawn and re-learned accordingly."
1081    )
1082
1083    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
1084    assert (
1085        result is not True
1086    ), "Testcase {} : Failed \n" "BGP is converged \n Error : {}".format(tc_name, result)
1087    logger.info("Expected behaviour: {}".format(result))
1088
1089    for addr_type in ADDR_TYPES:
1090        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
1091
1092        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
1093        next_hop = topo["routers"]["r4"]["links"]["r3"][addr_type].split("/")[0]
1094
1095        result = verify_rib(
1096            tgen,
1097            addr_type,
1098            "r1",
1099            input_dict,
1100            protocol="bgp",
1101            next_hop=next_hop,
1102            expected=False
1103        )
1104        assert result is not True, (
1105            "Testcase {} : Failed \n "
1106            "Route is still present \n Error : {}".format(tc_name, result)
1107        )
1108
1109    shutdown_bringup_interface(tgen, "r3", intf_r3_r4, True)
1110
1111    result = verify_bgp_convergence_from_running_config(tgen)
1112    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1113
1114    for addr_type in ADDR_TYPES:
1115        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
1116
1117        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
1118        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1119
1120        result = verify_rib(
1121            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
1122        )
1123        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1124            tc_name, result
1125        )
1126
1127    step("Toggle the interface on R4(ifconfig 192.34).")
1128
1129    intf_r4_r3 = topo["routers"]["r4"]["links"]["r3"]["interface"]
1130    shutdown_bringup_interface(tgen, "r4", intf_r4_r3)
1131
1132    step(
1133        "Verify that peering between R1-R4 goes down and comes up when"
1134        "interface is toggled. Also prefix of link r3-r4(via R4)"
1135        " is withdrawn and re-learned accordingly."
1136    )
1137
1138    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
1139    assert (
1140        result is not True
1141    ), "Testcase {} : Failed \n" "BGP is converged \n Error : {}".format(tc_name, result)
1142    logger.info("Expected behaviour: {}".format(result))
1143
1144    for addr_type in ADDR_TYPES:
1145        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
1146
1147        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
1148        next_hop = topo["routers"]["r4"]["links"]["r3"][addr_type].split("/")[0]
1149
1150        result = verify_rib(
1151            tgen,
1152            addr_type,
1153            "r1",
1154            input_dict,
1155            protocol="bgp",
1156            next_hop=next_hop,
1157            expected=False
1158        )
1159        assert result is not True, (
1160            "Testcase {} : Failed \n "
1161            "Route is still present \n Error : {}".format(tc_name, result)
1162        )
1163
1164    shutdown_bringup_interface(tgen, "r4", intf_r4_r3, True)
1165
1166    result = verify_bgp_convergence_from_running_config(tgen)
1167    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1168
1169    for addr_type in ADDR_TYPES:
1170        routes = {"ipv4": ["10.0.3.0/24"], "ipv6": ["fd00:0:0:3::/64"]}
1171
1172        input_dict = {"r1": {"static_routes": [{"network": routes[addr_type]}]}}
1173        next_hop = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1174
1175        result = verify_rib(
1176            tgen, addr_type, "r1", input_dict, protocol="bgp", next_hop=next_hop
1177        )
1178        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1179            tc_name, result
1180        )
1181
1182    write_test_footer(tc_name)
1183
1184
1185def test_BGP_path_attributes_default_values_p1(request):
1186    """
1187    Verify that BGP path attributes are present in CLI
1188    outputs and JSON format, even if set to default.
1189    """
1190
1191    tc_name = request.node.name
1192    write_test_header(tc_name)
1193    tgen = get_topogen()
1194
1195    # Don"t run this test if we have any failure.
1196    if tgen.routers_have_failure():
1197        pytest.skip(tgen.errors)
1198
1199    step("Initial config: Configure BGP neighborship, between R1-R2 & R1-R3")
1200    reset_config_on_routers(tgen)
1201
1202    step("Advertise a set of prefixes from R1 to both peers R2 and R3")
1203    for addr_type in ADDR_TYPES:
1204        input_dict_1 = {
1205            "r1": {
1206                "static_routes": [{"network": NETWORK[addr_type], "next_hop": "null0"}]
1207            }
1208        }
1209        result = create_static_routes(tgen, input_dict_1)
1210
1211    input_dict_2 = {
1212        "r1": {
1213            "bgp": {
1214                "address_family": {
1215                    "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
1216                    "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
1217                }
1218            }
1219        }
1220    }
1221    result = create_router_bgp(tgen, topo, input_dict_2)
1222
1223    step(
1224        "Verify that advertised prefixes are received on R4 and well"
1225        "known attributes are present in the CLI and JSON outputs with"
1226        "default values without any route-map config."
1227    )
1228    for addr_type in ADDR_TYPES:
1229        input_dict_3 = {"r4": {"static_routes": [{"network": NETWORK[addr_type]}]}}
1230        result = verify_bgp_rib(
1231            tgen,
1232            addr_type,
1233            "r4",
1234            input_dict_3,
1235            next_hop=[
1236                topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
1237                topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
1238            ],
1239        )
1240        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
1241
1242    for addr_type in ADDR_TYPES:
1243        input_dict_4 = {
1244            "r4": {
1245                "route_maps": {
1246                    "rmap_pf": [{"set": {"origin": "incomplete", "aspath": "300 100"}}]
1247                }
1248            }
1249        }
1250
1251        result = verify_bgp_attributes(
1252            tgen,
1253            addr_type,
1254            "r4",
1255            NETWORK[addr_type],
1256            rmap_name="rmap_pf",
1257            input_dict=input_dict_4,
1258        )
1259        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
1260
1261    step(
1262        "Configure a route-map to set below attribute value as 500"
1263        "and apply on R4 in an inbound direction"
1264    )
1265    for addr_type in ADDR_TYPES:
1266        input_dict_4 = {
1267            "r4": {
1268                "route_maps": {
1269                    "Path_Attribue": [
1270                        {
1271                            "action": "permit",
1272                            "set": {
1273                                "path": {"as_num": 500, "as_action": "prepend"},
1274                                "locPrf": 500,
1275                                "origin": "egp",
1276                            },
1277                        }
1278                    ]
1279                }
1280            }
1281        }
1282    result = create_route_maps(tgen, input_dict_4)
1283    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1284
1285    input_dict_5 = {
1286        "r4": {
1287            "bgp": {
1288                "address_family": {
1289                    "ipv4": {
1290                        "unicast": {
1291                            "neighbor": {
1292                                "r3": {
1293                                    "dest_link": {
1294                                        "r4": {
1295                                            "route_maps": [
1296                                                {
1297                                                    "name": "Path_Attribue",
1298                                                    "direction": "in",
1299                                                }
1300                                            ]
1301                                        }
1302                                    }
1303                                }
1304                            }
1305                        }
1306                    },
1307                    "ipv6": {
1308                        "unicast": {
1309                            "neighbor": {
1310                                "r3": {
1311                                    "dest_link": {
1312                                        "r4": {
1313                                            "route_maps": [
1314                                                {
1315                                                    "name": "Path_Attribue",
1316                                                    "direction": "in",
1317                                                }
1318                                            ]
1319                                        }
1320                                    }
1321                                }
1322                            }
1323                        }
1324                    },
1325                }
1326            }
1327        }
1328    }
1329
1330    result = create_router_bgp(tgen, topo, input_dict_5)
1331    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1332
1333    step(
1334        "Verify that once the route-map is applied all the attributes"
1335        "part of route-map, changes value to 500"
1336    )
1337    for addr_type in ADDR_TYPES:
1338        input_dict_4 = {
1339            "r4": {
1340                "route_maps": {
1341                    "rmap_pf": [
1342                        {
1343                            "set": {
1344                                "locPrf": 500,
1345                                "aspath": "500 300 100",
1346                                "origin": "EGP",
1347                            }
1348                        }
1349                    ]
1350                }
1351            }
1352        }
1353        result = verify_bgp_attributes(
1354            tgen,
1355            addr_type,
1356            "r4",
1357            NETWORK[addr_type],
1358            rmap_name="rmap_pf",
1359            input_dict=input_dict_4,
1360        )
1361        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
1362
1363    step("Remove the route-map from R4")
1364    input_dict_5 = {
1365        "r4": {
1366            "bgp": {
1367                "address_family": {
1368                    "ipv4": {
1369                        "unicast": {
1370                            "neighbor": {
1371                                "r3": {
1372                                    "dest_link": {
1373                                        "r4": {
1374                                            "route_maps": [
1375                                                {
1376                                                    "name": "Path_Attribue",
1377                                                    "direction": "in",
1378                                                    "delete": True,
1379                                                }
1380                                            ]
1381                                        }
1382                                    }
1383                                }
1384                            }
1385                        }
1386                    },
1387                    "ipv6": {
1388                        "unicast": {
1389                            "neighbor": {
1390                                "r3": {
1391                                    "dest_link": {
1392                                        "r4": {
1393                                            "route_maps": [
1394                                                {
1395                                                    "name": "Path_Attribue",
1396                                                    "direction": "in",
1397                                                    "delete": True,
1398                                                }
1399                                            ]
1400                                        }
1401                                    }
1402                                }
1403                            }
1404                        }
1405                    },
1406                }
1407            }
1408        }
1409    }
1410
1411    result = create_router_bgp(tgen, topo, input_dict_5)
1412    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1413
1414    step(
1415        "Verify on R4 that well known attributes are present in the CLI &"
1416        "JSON outputs again with default values without route-map config"
1417    )
1418    for addr_type in ADDR_TYPES:
1419        input_dict_4 = {
1420            "r4": {
1421                "route_maps": {
1422                    "rmap_pf": [{"set": {"aspath": "300 100", "origin": "incomplete"}}]
1423                }
1424            }
1425        }
1426        result = verify_bgp_attributes(
1427            tgen,
1428            addr_type,
1429            "r4",
1430            NETWORK[addr_type],
1431            rmap_name="rmap_pf",
1432            input_dict=input_dict_4,
1433            nexthop=None,
1434        )
1435        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
1436
1437    write_test_footer(tc_name)
1438
1439
1440def test_BGP_peering_bw_loopback_and_physical_p1(request):
1441    """
1442    Verifying the BGP peering between loopback and
1443    physical link's IP of 2 peer routers.
1444    """
1445
1446    tc_name = request.node.name
1447    write_test_header(tc_name)
1448    tgen = get_topogen()
1449
1450    # Don"t run this test if we have any failure.
1451    if tgen.routers_have_failure():
1452        pytest.skip(tgen.errors)
1453
1454    step("Initial config :Configure BGP neighborship between R1 and R3.")
1455    reset_config_on_routers(tgen)
1456
1457    step("Configure a loopback interface on R1")
1458    dut = "r1"
1459    create_interface_in_kernel(
1460        tgen, dut, "lo10", "1.1.1.1", netmask="255.255.255.255", create=True
1461    )
1462    create_interface_in_kernel(
1463        tgen, dut, "lo10", "1:1::1:1", netmask="128", create=True
1464    )
1465
1466    step("Configure BGP session between R1's loopbak & R3")
1467    for addr_type in ADDR_TYPES:
1468        input_dict_1 = {
1469            "r3": {
1470                "static_routes": [
1471                    {
1472                        "network": Loopabck_IP["Lo_R1"][addr_type],
1473                        "next_hop": topo["routers"]["r1"]["links"]["r3"][
1474                            addr_type
1475                        ].split("/")[0],
1476                    }
1477                ]
1478            }
1479        }
1480        result = create_static_routes(tgen, input_dict_1)
1481        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1482            tc_name, result
1483        )
1484        result = verify_rib(
1485            tgen,
1486            addr_type,
1487            "r3",
1488            input_dict_1,
1489            protocol="static",
1490            next_hop=topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0],
1491        )
1492        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1493            tc_name, result
1494        )
1495
1496    for addr_type in ADDR_TYPES:
1497        raw_config = {
1498            "r1": {
1499                "raw_config": [
1500                    "router bgp {}".format(topo["routers"]["r1"]["bgp"]["local_as"]),
1501                    "address-family {} unicast".format(addr_type),
1502                    "neighbor {} update-source lo10".format(
1503                        topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1504                    ),
1505                    "neighbor {} timers 1 3".format(
1506                        topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1507                    ),
1508                ]
1509            },
1510            "r3": {
1511                "raw_config": [
1512                    "router bgp {}".format(topo["routers"]["r3"]["bgp"]["local_as"]),
1513                    "address-family {} unicast".format(addr_type),
1514                    "no neighbor {} remote-as {}".format(
1515                        topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0],
1516                        topo["routers"]["r1"]["bgp"]["local_as"],
1517                    ),
1518                    "neighbor {} remote-as {}".format(
1519                        Loopabck_IP["Lo_R1"][addr_type].split("/")[0],
1520                        topo["routers"]["r1"]["bgp"]["local_as"],
1521                    ),
1522                    "neighbor {} ebgp-multihop 3".format(
1523                        Loopabck_IP["Lo_R1"][addr_type].split("/")[0]
1524                    ),
1525                ]
1526            },
1527        }
1528        result = apply_raw_config(tgen, raw_config)
1529        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
1530
1531    for addr_type in ADDR_TYPES:
1532        if addr_type == "ipv6":
1533            raw_config = {
1534                "r3": {
1535                    "raw_config": [
1536                        "router bgp {}".format(
1537                            topo["routers"]["r3"]["bgp"]["local_as"]
1538                        ),
1539                        "address-family {} unicast".format(addr_type),
1540                        "neighbor {} activate".format(
1541                            Loopabck_IP["Lo_R1"][addr_type].split("/")[0]
1542                        ),
1543                    ]
1544                }
1545            }
1546        else:
1547            raw_config = {
1548                "r3": {
1549                    "raw_config": [
1550                        "router bgp {}".format(
1551                            topo["routers"]["r3"]["bgp"]["local_as"]
1552                        ),
1553                        "address-family {} unicast".format(addr_type),
1554                        "no neighbor {} activate".format(
1555                            Loopabck_IP["Lo_R1"]["ipv6"].split("/")[0]
1556                        ),
1557                    ]
1558                }
1559            }
1560        result = apply_raw_config(tgen, raw_config)
1561        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
1562
1563    step("Verify that BGP neighborship between R1 and R3 comes up")
1564    result = verify_bgp_convergence_from_running_config(tgen)
1565    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1566
1567    step("Remove ebgp-multihop command from R3")
1568    for addr_type in ADDR_TYPES:
1569        raw_config = {
1570            "r3": {
1571                "raw_config": [
1572                    "router bgp {}".format(topo["routers"]["r3"]["bgp"]["local_as"]),
1573                    "no neighbor {} ebgp-multihop 3".format(
1574                        Loopabck_IP["Lo_R1"][addr_type].split("/")[0]
1575                    ),
1576                ]
1577            }
1578        }
1579        result = apply_raw_config(tgen, raw_config)
1580        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
1581
1582    step("Verify that once eBGP multi-hop is removed, BGP session goes down")
1583    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
1584    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
1585        tc_name, result
1586    )
1587
1588    step("Add ebgp-multihop command on R3 again")
1589    for addr_type in ADDR_TYPES:
1590        raw_config = {
1591            "r3": {
1592                "raw_config": [
1593                    "router bgp {}".format(topo["routers"]["r3"]["bgp"]["local_as"]),
1594                    "neighbor {} ebgp-multihop 3".format(
1595                        Loopabck_IP["Lo_R1"][addr_type].split("/")[0]
1596                    ),
1597                ]
1598            }
1599        }
1600        result = apply_raw_config(tgen, raw_config)
1601        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
1602
1603    step("Verify that BGP neighborship between R1 and R3 comes up")
1604    result = verify_bgp_convergence_from_running_config(tgen)
1605    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1606
1607    step("Remove update-source command from R1")
1608    for addr_type in ADDR_TYPES:
1609        raw_config = {
1610            "r1": {
1611                "raw_config": [
1612                    "router bgp {}".format(topo["routers"]["r1"]["bgp"]["local_as"]),
1613                    "no neighbor {} update-source lo10".format(
1614                        topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1615                    ),
1616                ]
1617            }
1618        }
1619        result = apply_raw_config(tgen, raw_config)
1620        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
1621
1622    step("Verify that BGP session goes down, when update-source is removed")
1623    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
1624    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
1625        tc_name, result
1626    )
1627
1628    step("Add update-source command on R1 again")
1629    for addr_type in ADDR_TYPES:
1630        raw_config = {
1631            "r1": {
1632                "raw_config": [
1633                    "router bgp {}".format(topo["routers"]["r1"]["bgp"]["local_as"]),
1634                    "neighbor {} update-source lo10".format(
1635                        topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
1636                    ),
1637                ]
1638            }
1639        }
1640        result = apply_raw_config(tgen, raw_config)
1641        assert result is True, "Testcase {} : Failed Error : {}".format(tc_name, result)
1642
1643    step("Verify that BGP neighborship between R1 and R3 comes up")
1644    result = verify_bgp_convergence_from_running_config(tgen)
1645    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1646
1647    step("Remove static route from R3")
1648    for addr_type in ADDR_TYPES:
1649        input_dict_1 = {
1650            "r3": {
1651                "static_routes": [
1652                    {
1653                        "network": Loopabck_IP["Lo_R1"][addr_type],
1654                        "next_hop": topo["routers"]["r1"]["links"]["r3"][
1655                            addr_type
1656                        ].split("/")[0],
1657                        "delete": True,
1658                    }
1659                ]
1660            }
1661        }
1662        result = create_static_routes(tgen, input_dict_1)
1663        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1664            tc_name, result
1665        )
1666        result = verify_rib(
1667            tgen,
1668            addr_type,
1669            "r3",
1670            input_dict_1,
1671            protocol="static",
1672            next_hop=topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0],
1673            expected=False
1674        )
1675        assert result is not True, "Testcase {} : Failed \n Error : {}".format(
1676            tc_name, result
1677        )
1678
1679    sleep(3)
1680    step("Verify that BGP session goes down, when static route is removed")
1681    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
1682    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
1683        tc_name, result
1684    )
1685
1686    step("Add static route on R3 again")
1687    for addr_type in ADDR_TYPES:
1688        input_dict_1 = {
1689            "r3": {
1690                "static_routes": [
1691                    {
1692                        "network": Loopabck_IP["Lo_R1"][addr_type],
1693                        "next_hop": topo["routers"]["r1"]["links"]["r3"][
1694                            addr_type
1695                        ].split("/")[0],
1696                    }
1697                ]
1698            }
1699        }
1700        result = create_static_routes(tgen, input_dict_1)
1701        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1702            tc_name, result
1703        )
1704        result = verify_rib(
1705            tgen,
1706            addr_type,
1707            "r3",
1708            input_dict_1,
1709            protocol="static",
1710            next_hop=topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0],
1711        )
1712        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1713            tc_name, result
1714        )
1715
1716    step("Verify that BGP neighborship between R1 and R3 comes up")
1717    result = verify_bgp_convergence_from_running_config(tgen)
1718    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1719
1720    step("Toggle physical interface on R1")
1721    intf_r1_r3 = topo["routers"]["r1"]["links"]["r3"]["interface"]
1722    shutdown_bringup_interface(tgen, "r1", intf_r1_r3)
1723    sleep(3)
1724    step("Verify that BGP neighborship between R1 and R3 goes down")
1725    result = verify_bgp_convergence_from_running_config(tgen, expected=False)
1726    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
1727        tc_name, result
1728    )
1729
1730    intf_r1_r3 = topo["routers"]["r1"]["links"]["r3"]["interface"]
1731    shutdown_bringup_interface(tgen, "r1", intf_r1_r3, True)
1732
1733    step("Verify that BGP neighborship between R1 and R3 comes up")
1734    result = verify_bgp_convergence_from_running_config(tgen)
1735    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1736
1737    write_test_footer(tc_name)
1738
1739
1740def test_BGP_active_standby_preemption_and_ecmp_p1(request):
1741    """
1742    Verify that BGP Active/Standby/Pre-emption/ECMP.
1743    """
1744
1745    tc_name = request.node.name
1746    write_test_header(tc_name)
1747    tgen = get_topogen()
1748
1749    # Don"t run this test if we have any failure.
1750    if tgen.routers_have_failure():
1751        pytest.skip(tgen.errors)
1752
1753    step("Initial config :Configure BGP neighborship between R1 and R3.")
1754    reset_config_on_routers(tgen)
1755
1756    step("Change the AS number on R2 as 200")
1757    input_dict = {"r2": {"bgp": {"local_as": 200}}}
1758    result = modify_as_number(tgen, topo, input_dict)
1759    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1760
1761    step("Verify BGP converge after changing the AS number on R2")
1762    result = verify_bgp_convergence_from_running_config(tgen)
1763    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1764
1765    step("Advertise a set of prefixes from R1 to both peers R2 & R3")
1766    for addr_type in ADDR_TYPES:
1767        input_dict_1 = {
1768            "r1": {
1769                "static_routes": [{"network": NETWORK[addr_type], "next_hop": "null0"}]
1770            }
1771        }
1772        result = create_static_routes(tgen, input_dict_1)
1773        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1774            tc_name, result
1775        )
1776
1777    input_dict_2 = {
1778        "r1": {
1779            "bgp": {
1780                "address_family": {
1781                    "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
1782                    "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
1783                }
1784            }
1785        }
1786    }
1787
1788    result = create_router_bgp(tgen, topo, input_dict_2)
1789    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1790
1791    step("Verify that R4 receives BGP prefixes via both peer routers R2 & R3")
1792    for addr_type in ADDR_TYPES:
1793        input_dict_3 = {"r4": {"static_routes": [{"network": NETWORK[addr_type]}]}}
1794        result = verify_bgp_rib(
1795            tgen,
1796            addr_type,
1797            "r4",
1798            input_dict_3,
1799            next_hop=[
1800                topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
1801                topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
1802            ],
1803        )
1804        assert result is True, "Testcase  : Failed \n Error : {}".format(tc_name, result)
1805
1806    step(
1807        "Configure a route-map to set as-path attribute and"
1808        "apply on R3 in an inbound direction:"
1809    )
1810
1811    input_dict_4 = {
1812        "r3": {
1813            "route_maps": {
1814                "Path_Attribue": [
1815                    {
1816                        "action": "permit",
1817                        "set": {"path": {"as_num": 123, "as_action": "prepend"}},
1818                    }
1819                ]
1820            }
1821        }
1822    }
1823    result = create_route_maps(tgen, input_dict_4)
1824    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1825
1826    input_dict_5 = {
1827        "r3": {
1828            "bgp": {
1829                "address_family": {
1830                    "ipv4": {
1831                        "unicast": {
1832                            "neighbor": {
1833                                "r1": {
1834                                    "dest_link": {
1835                                        "r3": {
1836                                            "route_maps": [
1837                                                {
1838                                                    "name": "Path_Attribue",
1839                                                    "direction": "in",
1840                                                }
1841                                            ]
1842                                        }
1843                                    }
1844                                }
1845                            }
1846                        }
1847                    },
1848                    "ipv6": {
1849                        "unicast": {
1850                            "neighbor": {
1851                                "r1": {
1852                                    "dest_link": {
1853                                        "r3": {
1854                                            "route_maps": [
1855                                                {
1856                                                    "name": "Path_Attribue",
1857                                                    "direction": "in",
1858                                                }
1859                                            ]
1860                                        }
1861                                    }
1862                                }
1863                            }
1864                        }
1865                    },
1866                }
1867            }
1868        }
1869    }
1870    result = create_router_bgp(tgen, topo, input_dict_5)
1871    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1872
1873    step("Verify on R4, BGP routes with shorter as-path are installed in FIB")
1874    for addr_type in ADDR_TYPES:
1875        dut = "r4"
1876        protocol = "bgp"
1877        input_dict_6 = {"r4": {"static_routes": [{"network": NETWORK[addr_type]}]}}
1878        result = verify_rib(
1879            tgen,
1880            addr_type,
1881            dut,
1882            input_dict_6,
1883            next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
1884            protocol=protocol,
1885        )
1886        assert result is True, "Testcase {} : Failed \n Error : {}".format(
1887            tc_name, result
1888        )
1889
1890    step("Shutdown BGP neighorship between R1-R2")
1891    dut = "r4"
1892    intf_r4_r2 = topo["routers"]["r4"]["links"]["r2"]["interface"]
1893    shutdown_bringup_interface(tgen, dut, intf_r4_r2)
1894
1895    step(
1896        "Verify that prefixes from next-hop via R2 are withdrawn"
1897        "and installed via next-hop as R3"
1898    )
1899    result = verify_rib(
1900        tgen,
1901        addr_type,
1902        dut,
1903        input_dict_2,
1904        next_hop=topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
1905        protocol=protocol,
1906    )
1907    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1908
1909    step("Do a no shut for BGP neighorship between R2-R4")
1910    shutdown_bringup_interface(tgen, dut, intf_r4_r2, ifaceaction=True)
1911
1912    step(
1913        "Verify that prefixes from next-hop via R3 are withdrawn"
1914        "from R4 and installed via next-hop as R2 (preemption)"
1915    )
1916    result = verify_rib(
1917        tgen,
1918        addr_type,
1919        dut,
1920        input_dict_2,
1921        next_hop=topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
1922        protocol=protocol,
1923    )
1924    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1925
1926    step("Remove the route-map from R3's neighbor statement")
1927    input_dict_5 = {
1928        "r3": {
1929            "bgp": {
1930                "address_family": {
1931                    "ipv4": {
1932                        "unicast": {
1933                            "neighbor": {
1934                                "r1": {
1935                                    "dest_link": {
1936                                        "r3": {
1937                                            "route_maps": [
1938                                                {
1939                                                    "name": "Path_Attribue",
1940                                                    "direction": "in",
1941                                                    "delete": True,
1942                                                }
1943                                            ]
1944                                        }
1945                                    }
1946                                }
1947                            }
1948                        }
1949                    },
1950                    "ipv6": {
1951                        "unicast": {
1952                            "neighbor": {
1953                                "r1": {
1954                                    "dest_link": {
1955                                        "r3": {
1956                                            "route_maps": [
1957                                                {
1958                                                    "name": "Path_Attribue",
1959                                                    "direction": "in",
1960                                                    "delete": True,
1961                                                }
1962                                            ]
1963                                        }
1964                                    }
1965                                }
1966                            }
1967                        }
1968                    },
1969                }
1970            }
1971        }
1972    }
1973    result = create_router_bgp(tgen, topo, input_dict_5)
1974    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1975
1976    step("Configure multipath-relax and maximum-paths 2 on R4 for ECMP")
1977    input_dict_8 = {
1978        "r4": {
1979            "bgp": {
1980                "address_family": {
1981                    "ipv4": {"unicast": {"maximum_paths": {"ebgp": 2}}},
1982                    "ipv6": {"unicast": {"maximum_paths": {"ebgp": 2}}},
1983                }
1984            }
1985        }
1986    }
1987    result = create_router_bgp(tgen, topo, input_dict_8)
1988    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1989
1990    maxpath_relax = {
1991        "r4": {"bgp": {"local_as": "400", "bestpath": {"aspath": "multipath-relax"}}}
1992    }
1993
1994    result = create_router_bgp(tgen, topo, maxpath_relax)
1995    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
1996
1997    step("Verify FIB of R4, BGP prefixes with ECMP next-hop via R2 and R3")
1998    for addr_type in ADDR_TYPES:
1999        input_dict = {"r4": {"static_routes": [{"network": NETWORK[addr_type]}]}}
2000        result = verify_rib(
2001            tgen,
2002            addr_type,
2003            "r4",
2004            input_dict,
2005            next_hop=[
2006                topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
2007                topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
2008            ],
2009        )
2010        assert result is True, "Testcase {} : Failed \n Error : {}".format(
2011            tc_name, result
2012        )
2013
2014    step("Remove multipath-relax command from R4")
2015
2016    del_maxpath_relax = {
2017        "r4": {
2018            "bgp": {
2019                "local_as": "400",
2020                "bestpath": {"aspath": "multipath-relax", "delete": True},
2021            }
2022        }
2023    }
2024
2025    result = create_router_bgp(tgen, topo, del_maxpath_relax)
2026    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2027
2028    step("Verify that ECMP is no longer happening on R4.")
2029    for addr_type in ADDR_TYPES:
2030        input_dict = {"r4": {"static_routes": [{"network": NETWORK[addr_type]}]}}
2031        result = verify_rib(
2032            tgen,
2033            addr_type,
2034            "r4",
2035            input_dict,
2036            next_hop=[
2037                topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
2038                topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
2039            ],
2040            expected=False
2041        )
2042        assert result is not True, "Testcase {} : Failed \n Error : {}".format(
2043            tc_name, result
2044        )
2045
2046    step("Reconfigure multipath-relax command on R4")
2047    result = create_router_bgp(tgen, topo, maxpath_relax)
2048    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2049
2050    step("Verify FIB of R4, BGP prefixes with ECMP next-hop via R2 and R3")
2051    result = verify_rib(
2052        tgen,
2053        addr_type,
2054        "r4",
2055        input_dict,
2056        next_hop=[
2057            topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
2058            topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
2059        ],
2060    )
2061    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2062
2063    step("Remove maximum-path 2 command from R4")
2064    input_dict_8 = {
2065        "r4": {
2066            "bgp": {
2067                "address_family": {
2068                    "ipv4": {"unicast": {"maximum_paths": {"ebgp": 1,}}},
2069                    "ipv6": {"unicast": {"maximum_paths": {"ebgp": 1,}}},
2070                }
2071            }
2072        }
2073    }
2074    result = create_router_bgp(tgen, topo, input_dict_8)
2075    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2076
2077    step("Verify that ECMP is no longer happening on R4")
2078    result = verify_rib(
2079        tgen,
2080        addr_type,
2081        "r4",
2082        input_dict,
2083        next_hop=[
2084            topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
2085            topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
2086        ],
2087        expected=False
2088    )
2089    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
2090        tc_name, result
2091    )
2092
2093    step("Re-configure maximum-path 2 command on R4")
2094    input_dict_8 = {
2095        "r4": {
2096            "bgp": {
2097                "address_family": {
2098                    "ipv4": {"unicast": {"maximum_paths": {"ebgp": 2,}}},
2099                    "ipv6": {"unicast": {"maximum_paths": {"ebgp": 2,}}},
2100                }
2101            }
2102        }
2103    }
2104    result = create_router_bgp(tgen, topo, input_dict_8)
2105    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2106
2107    step("Verify FIB of R4, BGP prefixes with ECMP next-hop via R2 and R3")
2108    result = verify_rib(
2109        tgen,
2110        addr_type,
2111        "r4",
2112        input_dict,
2113        next_hop=[
2114            topo["routers"]["r2"]["links"]["r4"][addr_type].split("/")[0],
2115            topo["routers"]["r3"]["links"]["r4"][addr_type].split("/")[0],
2116        ],
2117    )
2118    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2119
2120    write_test_footer(tc_name)
2121
2122
2123def test_password_authentication_for_eBGP_and_iBGP_peers_p1(request):
2124    """
2125    Verify password authentication for eBGP and iBGP peers.
2126    """
2127
2128    tc_name = request.node.name
2129    write_test_header(tc_name)
2130    tgen = get_topogen()
2131
2132    # Don"t run this test if we have any failure.
2133    if tgen.routers_have_failure():
2134        pytest.skip(tgen.errors)
2135
2136    step("Initial config :Configure BGP neighborship between R1 and R3.")
2137    reset_config_on_routers(tgen)
2138
2139    step(
2140        "Add a static route on R1 for loopbacks IP's reachability of R2, R3"
2141        "and on R2 and R3 for loopback IP of R1"
2142    )
2143    for addr_type in ADDR_TYPES:
2144        nh1 = topo["routers"]["r3"]["links"]["r1"][addr_type].split("/")[0]
2145        nh2 = topo["routers"]["r1"]["links"]["r2"][addr_type].split("/")[0]
2146        nh3 = topo["routers"]["r1"]["links"]["r3"][addr_type].split("/")[0]
2147        nh4 = topo["routers"]["r2"]["links"]["r1"][addr_type].split("/")[0]
2148        input_dict_1 = {
2149            "r1": {
2150                "static_routes": [
2151                    {
2152                        "network": topo["routers"]["r3"]["links"]["lo"][addr_type],
2153                        "next_hop": nh1,
2154                    }
2155                ]
2156            }
2157        }
2158        input_dict_2 = {
2159            "r2": {
2160                "static_routes": [
2161                    {
2162                        "network": topo["routers"]["r1"]["links"]["lo"][addr_type],
2163                        "next_hop": nh2,
2164                    }
2165                ]
2166            }
2167        }
2168        input_dict_3 = {
2169            "r3": {
2170                "static_routes": [
2171                    {
2172                        "network": topo["routers"]["r1"]["links"]["lo"][addr_type],
2173                        "next_hop": nh3,
2174                    }
2175                ]
2176            }
2177        }
2178        input_dict_4 = {
2179            "r1": {
2180                "static_routes": [
2181                    {
2182                        "network": topo["routers"]["r2"]["links"]["lo"][addr_type],
2183                        "next_hop": nh4,
2184                    }
2185                ]
2186            }
2187        }
2188        dut_list = ["r1", "r2", "r3", "r1"]
2189        nexthop_list = [nh1, nh2, nh3, nh4]
2190        input_dict_list = [input_dict_1, input_dict_2, input_dict_3, input_dict_4]
2191        for dut, next_hop, input_dict in zip(dut_list, nexthop_list, input_dict_list):
2192            result = create_static_routes(tgen, input_dict)
2193            assert result is True, "Testcase {} : Failed \n Error : {}".format(
2194                tc_name, result
2195            )
2196
2197            step("Verify that static routes are installed in FIB of routers")
2198            result = verify_rib(
2199                tgen, addr_type, dut, input_dict, next_hop=next_hop, protocol="static"
2200            )
2201            assert result is True, "Testcase {} : Failed \n Error : {}".format(
2202                tc_name, result
2203            )
2204
2205    step("Configure BGP sessions between R1-R2 and R1-R3 over loopback IPs")
2206    for routerN in ["r1", "r3"]:
2207        for addr_type in ADDR_TYPES:
2208            if routerN == "r1":
2209                bgp_neighbor = "r3"
2210            elif routerN == "r3":
2211                bgp_neighbor = "r1"
2212            topo["routers"][routerN]["bgp"]["address_family"][addr_type]["unicast"][
2213                "neighbor"
2214            ][bgp_neighbor]["dest_link"] = {
2215                "lo": {"ebgp_multihop": 2, "source_link": "lo"}
2216            }
2217    build_config_from_json(tgen, topo, save_bkup=False)
2218
2219    for routerN in ["r1", "r2"]:
2220        for addr_type in ADDR_TYPES:
2221            if routerN == "r1":
2222                bgp_neighbor = "r2"
2223            elif routerN == "r2":
2224                bgp_neighbor = "r1"
2225            topo["routers"][routerN]["bgp"]["address_family"][addr_type]["unicast"][
2226                "neighbor"
2227            ][bgp_neighbor]["dest_link"] = {"lo": {"source_link": "lo"}}
2228    build_config_from_json(tgen, topo, save_bkup=False)
2229
2230    for routerN in ["r1", "r2", "r3"]:
2231        for addr_type in ADDR_TYPES:
2232            for bgp_neighbor in topo["routers"][routerN]["bgp"]["address_family"][
2233                addr_type
2234            ]["unicast"]["neighbor"].keys():
2235                if routerN in ["r1", "r2", "r3"] and bgp_neighbor == "r4":
2236                    continue
2237                if addr_type == "ipv4":
2238                    topo["routers"][routerN]["bgp"]["address_family"][addr_type][
2239                        "unicast"
2240                    ]["neighbor"][bgp_neighbor]["dest_link"] = {
2241                        "lo": {"deactivate": "ipv6"}
2242                    }
2243                elif addr_type == "ipv6":
2244                    topo["routers"][routerN]["bgp"]["address_family"][addr_type][
2245                        "unicast"
2246                    ]["neighbor"][bgp_neighbor]["dest_link"] = {
2247                        "lo": {"deactivate": "ipv4"}
2248                    }
2249    build_config_from_json(tgen, topo, save_bkup=False)
2250
2251    result = verify_bgp_convergence(tgen, topo)
2252    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2253
2254    step("Configure authentication password on R1 for neighbor statements")
2255    for bgp_neighbor in ["r2", "r3"]:
2256        for addr_type in ADDR_TYPES:
2257            topo["routers"]["r1"]["bgp"]["address_family"][addr_type]["unicast"][
2258                "neighbor"
2259            ][bgp_neighbor]["dest_link"] = {"lo": {"password": "vmware"}}
2260    build_config_from_json(tgen, topo, save_bkup=False)
2261
2262    step(
2263        "Verify that both sessions go down as only R1 has password"
2264        "configured but not peer routers"
2265    )
2266    result = verify_bgp_convergence(tgen, topo, expected=False)
2267    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
2268        tc_name, result
2269    )
2270
2271    step("configure same password on R2 and R3")
2272    for routerN in ["r2", "r3"]:
2273        for addr_type in ADDR_TYPES:
2274            topo["routers"][routerN]["bgp"]["address_family"][addr_type]["unicast"][
2275                "neighbor"
2276            ]["r1"]["dest_link"] = {"lo": {"password": "vmware"}}
2277    build_config_from_json(tgen, topo, save_bkup=False)
2278
2279    step("Verify that all BGP sessions come up due to identical passwords")
2280    result = verify_bgp_convergence(tgen, topo)
2281    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2282
2283    step("Configure same password on R2 and R3, but in CAPs.")
2284    for routerN in ["r2", "r3"]:
2285        for addr_type in ADDR_TYPES:
2286            topo["routers"][routerN]["bgp"]["address_family"][addr_type]["unicast"][
2287                "neighbor"
2288            ]["r1"]["dest_link"] = {"lo": {"password": "VMWARE"}}
2289    build_config_from_json(tgen, topo, save_bkup=False)
2290
2291    step(
2292        "Verify that BGP sessions do not come up as password"
2293        "strings are in CAPs on R2 and R3"
2294    )
2295    result = verify_bgp_convergence(tgen, topo, expected=False)
2296    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
2297        tc_name, result
2298    )
2299
2300    step("Configure same password on R2 and R3 without CAPs")
2301    for routerN in ["r2", "r3"]:
2302        for addr_type in ADDR_TYPES:
2303            topo["routers"][routerN]["bgp"]["address_family"][addr_type]["unicast"][
2304                "neighbor"
2305            ]["r1"]["dest_link"] = {"lo": {"password": "vmware"}}
2306    build_config_from_json(tgen, topo, save_bkup=False)
2307
2308    step("Verify all BGP sessions come up again due to identical passwords")
2309    result = verify_bgp_convergence(tgen, topo)
2310    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2311
2312    step("Remove password from R1")
2313    for bgp_neighbor in ["r2", "r3"]:
2314        for addr_type in ADDR_TYPES:
2315            topo["routers"]["r1"]["bgp"]["address_family"][addr_type]["unicast"][
2316                "neighbor"
2317            ][bgp_neighbor]["dest_link"] = {"lo": {"no_password": "vmware"}}
2318    build_config_from_json(tgen, topo, save_bkup=False)
2319
2320    step("Verify if password is removed from R1, both sessions go down again")
2321    result = verify_bgp_convergence(tgen, topo, expected=False)
2322    assert result is not True, "Testcase {} : Failed \n Error : {}".format(
2323        tc_name, result
2324    )
2325
2326    step("Configure alphanumeric password on R1 and peer routers R2,R3")
2327    for bgp_neighbor in ["r2", "r3"]:
2328        for addr_type in ADDR_TYPES:
2329            topo["routers"]["r1"]["bgp"]["address_family"][addr_type]["unicast"][
2330                "neighbor"
2331            ][bgp_neighbor]["dest_link"] = {"lo": {"password": "Vmware@123"}}
2332    build_config_from_json(tgen, topo, save_bkup=False)
2333
2334    for routerN in ["r2", "r3"]:
2335        for addr_type in ADDR_TYPES:
2336            topo["routers"][routerN]["bgp"]["address_family"][addr_type]["unicast"][
2337                "neighbor"
2338            ]["r1"]["dest_link"] = {"lo": {"password": "Vmware@123"}}
2339    build_config_from_json(tgen, topo, save_bkup=False)
2340
2341    step(
2342        "Verify that sessions Come up irrespective of characters"
2343        "used in password string"
2344    )
2345    result = verify_bgp_convergence(tgen, topo)
2346    assert result is True, "Testcase {} : Failed \n Error : {}".format(tc_name, result)
2347
2348    write_test_footer(tc_name)
2349
2350
2351if __name__ == "__main__":
2352    args = ["-s"] + sys.argv[1:]
2353    sys.exit(pytest.main(args))
2354