]> git.proxmox.com Git - mirror_frr.git/commitdiff
Merge pull request #13670 from donaldsharp/coverity_99
authorDonatas Abraitis <donatas@opensourcerouting.org>
Mon, 5 Jun 2023 19:33:55 +0000 (22:33 +0300)
committerGitHub <noreply@github.com>
Mon, 5 Jun 2023 19:33:55 +0000 (22:33 +0300)
Coverity 99

bgpd/bgp_clist.c
bgpd/bgp_clist.h
bgpd/bgp_vty.c
mgmtd/mgmt_txn.c
tests/topotests/bgp_gr_functionality_topo3/bgp_gr_functionality_topo3.py [deleted file]
tests/topotests/bgp_gr_functionality_topo3/test_bgp_gr_functionality_topo3.py [new file with mode: 0644]

index ac5cdd6acba532ae8fd221fcab0d40616401248e..f3c308afb9e3e0f7a66ce9c6ad7c96a46a7f26d0 100644 (file)
@@ -659,9 +659,6 @@ bool community_list_match(struct community *com, struct community_list *list)
        struct community_entry *entry;
 
        for (entry = list->head; entry; entry = entry->next) {
-               if (entry->any)
-                       return entry->direct == COMMUNITY_PERMIT;
-
                if (entry->style == COMMUNITY_LIST_STANDARD) {
                        if (community_include(entry->u.com, COMMUNITY_INTERNET))
                                return entry->direct == COMMUNITY_PERMIT;
@@ -681,9 +678,6 @@ bool lcommunity_list_match(struct lcommunity *lcom, struct community_list *list)
        struct community_entry *entry;
 
        for (entry = list->head; entry; entry = entry->next) {
-               if (entry->any)
-                       return entry->direct == COMMUNITY_PERMIT;
-
                if (entry->style == LARGE_COMMUNITY_LIST_STANDARD) {
                        if (lcommunity_match(lcom, entry->u.lcom))
                                return entry->direct == COMMUNITY_PERMIT;
@@ -705,9 +699,6 @@ bool lcommunity_list_exact_match(struct lcommunity *lcom,
        struct community_entry *entry;
 
        for (entry = list->head; entry; entry = entry->next) {
-               if (entry->any)
-                       return entry->direct == COMMUNITY_PERMIT;
-
                if (entry->style == LARGE_COMMUNITY_LIST_STANDARD) {
                        if (lcommunity_cmp(lcom, entry->u.lcom))
                                return entry->direct == COMMUNITY_PERMIT;
@@ -724,9 +715,6 @@ bool ecommunity_list_match(struct ecommunity *ecom, struct community_list *list)
        struct community_entry *entry;
 
        for (entry = list->head; entry; entry = entry->next) {
-               if (entry->any)
-                       return entry->direct == COMMUNITY_PERMIT;
-
                if (entry->style == EXTCOMMUNITY_LIST_STANDARD) {
                        if (ecommunity_match(ecom, entry->u.ecom))
                                return entry->direct == COMMUNITY_PERMIT;
@@ -746,9 +734,6 @@ bool community_list_exact_match(struct community *com,
        struct community_entry *entry;
 
        for (entry = list->head; entry; entry = entry->next) {
-               if (entry->any)
-                       return entry->direct == COMMUNITY_PERMIT;
-
                if (entry->style == COMMUNITY_LIST_STANDARD) {
                        if (community_include(entry->u.com, COMMUNITY_INTERNET))
                                return entry->direct == COMMUNITY_PERMIT;
@@ -781,28 +766,18 @@ struct community *community_list_match_delete(struct community *com,
                val = community_val_get(com, i);
 
                for (entry = list->head; entry; entry = entry->next) {
-                       if (entry->any) {
-                               if (entry->direct == COMMUNITY_PERMIT) {
-                                       com_index_to_delete[delete_index] = i;
-                                       delete_index++;
-                               }
-                               break;
-                       }
-
-                       else if ((entry->style == COMMUNITY_LIST_STANDARD)
-                                && (community_include(entry->u.com,
-                                                      COMMUNITY_INTERNET)
-                                    || community_include(entry->u.com, val))) {
+                       if ((entry->style == COMMUNITY_LIST_STANDARD) &&
+                           (community_include(entry->u.com,
+                                              COMMUNITY_INTERNET) ||
+                            community_include(entry->u.com, val))) {
                                if (entry->direct == COMMUNITY_PERMIT) {
                                        com_index_to_delete[delete_index] = i;
                                        delete_index++;
                                }
                                break;
-                       }
-
-                       else if ((entry->style == COMMUNITY_LIST_EXPANDED)
-                                && community_regexp_include(entry->reg, com,
-                                                            i)) {
+                       } else if ((entry->style == COMMUNITY_LIST_EXPANDED) &&
+                                  community_regexp_include(entry->reg, com,
+                                                           i)) {
                                if (entry->direct == COMMUNITY_PERMIT) {
                                        com_index_to_delete[delete_index] = i;
                                        delete_index++;
@@ -836,12 +811,6 @@ static bool community_list_dup_check(struct community_list *list,
                if (entry->direct != new->direct)
                        continue;
 
-               if (entry->any != new->any)
-                       continue;
-
-               if (entry->any)
-                       return true;
-
                switch (entry->style) {
                case COMMUNITY_LIST_STANDARD:
                        if (community_cmp(entry->u.com, new->u.com))
@@ -910,7 +879,6 @@ int community_list_set(struct community_list_handler *ch, const char *name,
        entry = community_entry_new();
        entry->direct = direct;
        entry->style = style;
-       entry->any = (str ? false : true);
        entry->u.com = com;
        entry->reg = regex;
        entry->seq = seqnum;
@@ -987,16 +955,8 @@ struct lcommunity *lcommunity_list_match_delete(struct lcommunity *lcom,
        for (i = 0; i < lcom->size; i++) {
                ptr = lcom->val + (i * LCOMMUNITY_SIZE);
                for (entry = list->head; entry; entry = entry->next) {
-                       if (entry->any) {
-                               if (entry->direct == COMMUNITY_PERMIT) {
-                                       com_index_to_delete[delete_index] = i;
-                                       delete_index++;
-                               }
-                               break;
-                       }
-
-                       else if ((entry->style == LARGE_COMMUNITY_LIST_STANDARD)
-                                && lcommunity_include(entry->u.lcom, ptr)) {
+                       if ((entry->style == LARGE_COMMUNITY_LIST_STANDARD) &&
+                           lcommunity_include(entry->u.lcom, ptr)) {
                                if (entry->direct == COMMUNITY_PERMIT) {
                                        com_index_to_delete[delete_index] = i;
                                        delete_index++;
@@ -1004,9 +964,10 @@ struct lcommunity *lcommunity_list_match_delete(struct lcommunity *lcom,
                                break;
                        }
 
-                       else if ((entry->style == LARGE_COMMUNITY_LIST_EXPANDED)
-                                && lcommunity_regexp_include(entry->reg, lcom,
-                                                             i)) {
+                       else if ((entry->style ==
+                                 LARGE_COMMUNITY_LIST_EXPANDED) &&
+                                lcommunity_regexp_include(entry->reg, lcom,
+                                                          i)) {
                                if (entry->direct == COMMUNITY_PERMIT) {
                                        com_index_to_delete[delete_index] = i;
                                        delete_index++;
@@ -1125,7 +1086,6 @@ int lcommunity_list_set(struct community_list_handler *ch, const char *name,
        entry = community_entry_new();
        entry->direct = direct;
        entry->style = style;
-       entry->any = (str ? false : true);
        entry->u.lcom = lcom;
        entry->reg = regex;
        entry->seq = seqnum;
@@ -1246,7 +1206,6 @@ int extcommunity_list_set(struct community_list_handler *ch, const char *name,
        entry = community_entry_new();
        entry->direct = direct;
        entry->style = style;
-       entry->any = false;
        if (ecom)
                entry->config = ecommunity_ecom2str(
                        ecom, ECOMMUNITY_FORMAT_COMMUNITY_LIST, 0);
index 7a9b28038c2c1ae5d5749fa6fde9f2e32cbd9c60..8e5d637babee0cf85fdb4e6e29d4df718245b175 100644 (file)
@@ -65,9 +65,6 @@ struct community_entry {
        /* Standard or expanded.  */
        uint8_t style;
 
-       /* Any match.  */
-       bool any;
-
        /* Sequence number. */
        int64_t seq;
 
index 04bdba13454bb26c7987e42468be464cabd51f70..7063bc26ab2e402ea5467b6914a0b7b83b812f50 100644 (file)
@@ -20863,16 +20863,13 @@ static const char *community_list_config_str(struct community_entry *entry)
 {
        const char *str;
 
-       if (entry->any)
-               str = "";
-       else {
-               if (entry->style == COMMUNITY_LIST_STANDARD)
-                       str = community_str(entry->u.com, false, false);
-               else if (entry->style == LARGE_COMMUNITY_LIST_STANDARD)
-                       str = lcommunity_str(entry->u.lcom, false, false);
-               else
-                       str = entry->config;
-       }
+       if (entry->style == COMMUNITY_LIST_STANDARD)
+               str = community_str(entry->u.com, false, false);
+       else if (entry->style == LARGE_COMMUNITY_LIST_STANDARD)
+               str = lcommunity_str(entry->u.lcom, false, false);
+       else
+               str = entry->config;
+
        return str;
 }
 
@@ -20895,13 +20892,8 @@ static void community_list_show(struct vty *vty, struct community_list *list)
                                                : "expanded",
                                        list->name);
                }
-               if (entry->any)
-                       vty_out(vty, "    %s\n",
-                               community_direct_str(entry->direct));
-               else
-                       vty_out(vty, "    %s %s\n",
-                               community_direct_str(entry->direct),
-                               community_list_config_str(entry));
+               vty_out(vty, "    %s %s\n", community_direct_str(entry->direct),
+                       community_list_config_str(entry));
        }
 }
 
@@ -21260,13 +21252,8 @@ static void lcommunity_list_show(struct vty *vty, struct community_list *list)
                                                : "expanded",
                                        list->name);
                }
-               if (entry->any)
-                       vty_out(vty, "    %s\n",
-                               community_direct_str(entry->direct));
-               else
-                       vty_out(vty, "    %s %s\n",
-                               community_direct_str(entry->direct),
-                               community_list_config_str(entry));
+               vty_out(vty, "    %s %s\n", community_direct_str(entry->direct),
+                       community_list_config_str(entry));
        }
 }
 
@@ -21562,13 +21549,8 @@ static void extcommunity_list_show(struct vty *vty, struct community_list *list)
                                                : "expanded",
                                        list->name);
                }
-               if (entry->any)
-                       vty_out(vty, "    %s\n",
-                               community_direct_str(entry->direct));
-               else
-                       vty_out(vty, "    %s %s\n",
-                               community_direct_str(entry->direct),
-                               community_list_config_str(entry));
+               vty_out(vty, "    %s %s\n", community_direct_str(entry->direct),
+                       community_list_config_str(entry));
        }
 }
 
index 3d818cb4c2598ae8f82a3615aa151738d03c4aa8..a666422b7dbb79c0d959b08a1ab3111f17f44f0b 100644 (file)
@@ -1122,7 +1122,6 @@ static int mgmt_txn_create_config_batches(struct mgmt_txn_req *txn_req,
                }
 
                free(xpath);
-               xpath = NULL;
        }
 
        cmtcfg_req->cmt_stats->last_batch_cnt = num_chgs;
@@ -2542,7 +2541,7 @@ int mgmt_txn_notify_be_cfgdata_reply(
 {
        struct mgmt_txn_ctx *txn;
        struct mgmt_txn_be_cfg_batch *cfg_btch;
-       struct mgmt_commit_cfg_req *cmtcfg_req = NULL;
+       struct mgmt_commit_cfg_req *cmtcfg_req;
 
        txn = mgmt_txn_id2ctx(txn_id);
        if (!txn || txn->type != MGMTD_TXN_TYPE_CONFIG)
diff --git a/tests/topotests/bgp_gr_functionality_topo3/bgp_gr_functionality_topo3.py b/tests/topotests/bgp_gr_functionality_topo3/bgp_gr_functionality_topo3.py
deleted file mode 100644 (file)
index 593a8d6..0000000
+++ /dev/null
@@ -1,541 +0,0 @@
-#!/usr/bin/env python
-# SPDX-License-Identifier: ISC
-#
-# Copyright (c) 2019 by VMware, Inc. ("VMware")
-# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
-# in this file.
-#
-
-import os
-import sys
-import time
-import pytest
-from time import sleep
-
-import traceback
-import ipaddress
-
-# Save the Current Working Directory to find configuration files.
-CWD = os.path.dirname(os.path.realpath(__file__))
-sys.path.append(os.path.join("../"))
-sys.path.append(os.path.join("../lib/"))
-
-# pylint: disable=C0413
-# Import topogen and topotest helpers
-from lib.topogen import Topogen, get_topogen
-from lib.topolog import logger
-
-# Required to instantiate the topology builder class.
-
-# Import topoJson from lib, to create topology and initial configuration
-from lib.topojson import build_config_from_json
-from lib.bgp import (
-    clear_bgp,
-    verify_bgp_rib,
-    verify_graceful_restart,
-    create_router_bgp,
-    verify_r_bit,
-    verify_eor,
-    verify_f_bit,
-    verify_bgp_convergence,
-    verify_gr_address_family,
-    modify_bgp_config_when_bgpd_down,
-    verify_graceful_restart_timers,
-    verify_bgp_convergence_from_running_config,
-)
-
-# Import common_config to use commomnly used APIs
-from lib.common_config import (
-    create_common_configuration,
-    InvalidCLIError,
-    retry,
-    generate_ips,
-    FRRCFG_FILE,
-    find_interface_with_greater_ip,
-    check_address_types,
-    validate_ip_address,
-    run_frr_cmd,
-    get_frr_ipv6_linklocal,
-)
-
-from lib.common_config import (
-    write_test_header,
-    reset_config_on_routers,
-    start_topology,
-    kill_router_daemons,
-    start_router_daemons,
-    verify_rib,
-    check_address_types,
-    write_test_footer,
-    check_router_status,
-    step,
-    get_frr_ipv6_linklocal,
-    create_static_routes,
-    required_linux_kernel_version,
-)
-
-pytestmark = [pytest.mark.bgpd]
-
-
-# Global variables
-BGP_CONVERGENCE = False
-GR_RESTART_TIMER = 5
-GR_SELECT_DEFER_TIMER = 5
-GR_STALEPATH_TIMER = 5
-# Global variables
-# STATIC_ROUTES=[]
-NETWORK1_1 = {"ipv4": "192.0.2.1/32", "ipv6": "2001:DB8::1:1/128"}
-NETWORK1_2 = {"ipv4": "192.0.2.2/32", "ipv6": "2001:DB8::2:1/128"}
-NETWORK2_1 = {"ipv4": "192.0.2.3/32", "ipv6": "2001:DB8::3:1/128"}
-NETWORK2_2 = {"ipv4": "192.0.2.4/32", "ipv6": "2001:DB8::4:1/128"}
-NETWORK3_1 = {"ipv4": "192.0.2.5/32", "ipv6": "2001:DB8::5:1/128"}
-NETWORK3_2 = {"ipv4": "192.0.2.6/32", "ipv6": "2001:DB8::6:1/128"}
-NETWORK4_1 = {"ipv4": "192.0.2.7/32", "ipv6": "2001:DB8::7:1/128"}
-NETWORK4_2 = {"ipv4": "192.0.2.8/32", "ipv6": "2001:DB8::8:1/128"}
-NETWORK5_1 = {"ipv4": "192.0.2.9/32", "ipv6": "2001:DB8::9:1/128"}
-NETWORK5_2 = {"ipv4": "192.0.2.10/32", "ipv6": "2001:DB8::10:1/128"}
-
-NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
-
-PREFERRED_NEXT_HOP = "link_local"
-
-
-def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut, peer):
-    """
-    result = configure_gr_followed_by_clear(tgen, topo, dut)
-    assert result is True, \
-        "Testcase {} :Failed \n Error {}". \
-            format(tc_name, result)
-    """
-
-    result = create_router_bgp(tgen, topo, input_dict)
-    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
-    for addr_type in ADDR_TYPES:
-        clear_bgp(tgen, addr_type, dut)
-
-    for addr_type in ADDR_TYPES:
-        clear_bgp(tgen, addr_type, peer)
-
-    result = verify_bgp_convergence_from_running_config(tgen)
-    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
-
-    return True
-
-
-def verify_stale_routes_list(tgen, addr_type, dut, input_dict):
-    """
-    This API is use verify Stale routes on refering the network with next hop value
-    Parameters
-    ----------
-    * `tgen`: topogen object
-    * `dut`: input dut router name
-    * `addr_type` : ip type ipv4/ipv6
-    * `input_dict` : input dict, has details of static routes
-    Usage
-    -----
-    dut = 'r1'
-    input_dict = {
-                "r3": {
-                    "static_routes": [
-
-                        {
-                            "network": [NETWORK1_1[addr_type]],
-                            "no_of_ip": 2,
-                            "vrf": "RED"
-                        }
-                    ]
-                }
-            }
-
-    result = verify_stale_routes_list(tgen, addr_type, dut, input_dict)
-    Returns
-    -------
-    errormsg(str) or True
-    """
-    logger.debug("Entering lib API: verify_stale_routes_list()")
-    router_list = tgen.routers()
-    additional_nexthops_in_required_nhs = []
-    list1 = []
-    list2 = []
-    found_hops = []
-    for routerInput in input_dict.keys():
-        for router, rnode in router_list.items():
-            if router != dut:
-                continue
-            # Verifying RIB routes
-            command = "show bgp"
-            # Static routes
-            sleep(2)
-            logger.info("Checking router {} BGP RIB:".format(dut))
-            if "static_routes" in input_dict[routerInput]:
-                static_routes = input_dict[routerInput]["static_routes"]
-                for static_route in static_routes:
-                    found_routes = []
-                    missing_routes = []
-                    st_found = False
-                    nh_found = False
-                    vrf = static_route.setdefault("vrf", None)
-                    community = static_route.setdefault("community", None)
-                    largeCommunity = static_route.setdefault("largeCommunity", None)
-                    if vrf:
-                        cmd = "{} vrf {} {}".format(command, vrf, addr_type)
-                        if community:
-                            cmd = "{} community {}".format(cmd, community)
-                        if largeCommunity:
-                            cmd = "{} large-community {}".format(cmd, largeCommunity)
-                    else:
-                        cmd = "{} {}".format(command, addr_type)
-                    cmd = "{} json".format(cmd)
-                    rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
-                    # Verifying output dictionary rib_routes_json is not empty
-                    if bool(rib_routes_json) == False:
-                        errormsg = "[DUT: {}]: No route found in rib of router".format(
-                            router
-                        )
-                        return errormsg
-                    elif "warning" in rib_routes_json:
-                        errormsg = "[DUT: {}]: {}".format(
-                            router, rib_routes_json["warning"]
-                        )
-                        return errormsg
-                    network = static_route["network"]
-                    if "no_of_ip" in static_route:
-                        no_of_ip = static_route["no_of_ip"]
-                    else:
-                        no_of_ip = 1
-                    # Generating IPs for verification
-                    ip_list = generate_ips(network, no_of_ip)
-
-                    for st_rt in ip_list:
-                        st_rt = str(ipaddress.ip_network(st_rt))
-                        _addr_type = validate_ip_address(st_rt)
-                        if _addr_type != addr_type:
-                            continue
-                        if st_rt in rib_routes_json["routes"]:
-                            st_found = True
-
-                            found_routes.append(st_rt)
-                            for mnh in range(0, len(rib_routes_json["routes"][st_rt])):
-                                found_hops.append(
-                                    [
-                                        rib_r["ip"]
-                                        for rib_r in rib_routes_json["routes"][st_rt][
-                                            mnh
-                                        ]["nexthops"]
-                                    ]
-                                )
-                            return found_hops
-                        else:
-                            return "error  msg - no hops found"
-
-
-def setup_module(mod):
-    """
-    Sets up the pytest environment
-
-    * `mod`: module name
-    """
-
-    # Required linux kernel version for this suite to run.
-    result = required_linux_kernel_version("4.16")
-    if result is not True:
-        pytest.skip("Kernel requirements are not met, kernel version should be >=4.16")
-
-    global ADDR_TYPES
-
-    testsuite_run_time = time.asctime(time.localtime(time.time()))
-    logger.info("Testsuite start time: {}".format(testsuite_run_time))
-    logger.info("=" * 40)
-
-    logger.info("Running setup_module to create topology")
-
-    # This function initiates the topology build with Topogen...
-    json_file = "{}/bgp_gr_functionality_topo3.json".format(CWD)
-    tgen = Topogen(json_file, mod.__name__)
-    global topo
-    topo = tgen.json_topo
-    # ... and here it calls Mininet initialization functions.
-
-    # Starting topology, create tmp files which are loaded to routers
-    #  to start daemons and then start routers
-    start_topology(tgen)
-
-    # Creating configuration from JSON
-    build_config_from_json(tgen, topo)
-
-    # Api call verify whether BGP is converged
-    ADDR_TYPES = check_address_types()
-
-    for addr_type in ADDR_TYPES:
-        BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
-        assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error:" " {}".format(
-            BGP_CONVERGENCE
-        )
-
-    logger.info("Running setup_module() done")
-
-
-def teardown_module(mod):
-    """
-    Teardown the pytest environment
-
-    * `mod`: module name
-    """
-
-    logger.info("Running teardown_module to delete topology")
-
-    tgen = get_topogen()
-
-    # Stop toplogy and Remove tmp files
-    tgen.stop_topology()
-
-    logger.info(
-        "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
-    )
-    logger.info("=" * 40)
-
-
-################################################################################
-#
-#                       TEST CASES
-#
-################################################################################
-def test_bgp_gr_stale_routes(request):
-    tgen = get_topogen()
-    tc_name = request.node.name
-    write_test_header(tc_name)
-
-    step("Verify the router failures")
-    if tgen.routers_have_failure():
-        check_router_status(tgen)
-
-    step("Creating 5 static Routes in Router R3 with NULL0 as Next hop")
-    for addr_type in ADDR_TYPES:
-        input_dict_1 = {
-            "r3": {
-                "static_routes": [
-                    {
-                        "network": [NETWORK1_1[addr_type]] + [NETWORK1_2[addr_type]],
-                        "next_hop": NEXT_HOP_IP[addr_type],
-                    },
-                    {
-                        "network": [NETWORK2_1[addr_type]] + [NETWORK2_2[addr_type]],
-                        "next_hop": NEXT_HOP_IP[addr_type],
-                    },
-                    {
-                        "network": [NETWORK3_1[addr_type]] + [NETWORK3_2[addr_type]],
-                        "next_hop": NEXT_HOP_IP[addr_type],
-                    },
-                    {
-                        "network": [NETWORK4_1[addr_type]] + [NETWORK4_2[addr_type]],
-                        "next_hop": NEXT_HOP_IP[addr_type],
-                    },
-                    {
-                        "network": [NETWORK5_1[addr_type]] + [NETWORK5_2[addr_type]],
-                        "next_hop": NEXT_HOP_IP[addr_type],
-                    },
-                ]
-            }
-        }
-        result = create_static_routes(tgen, input_dict_1)
-        assert result is True, "Testcase {} : Failed \n Error: {}".format(
-            tc_name, result
-        )
-    step("verifying Created  Route  at R3 in VRF default")
-    for addr_type in ADDR_TYPES:
-        dut = "r3"
-        input_dict_1 = {"r3": topo["routers"]["r3"]}
-        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-    # done
-    step("verifying Created  Route  at R2 in VRF default")
-    for addr_type in ADDR_TYPES:
-        dut = "r2"
-        input_dict_1 = {"r2": topo["routers"]["r2"]}
-        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-    step("importing vrf RED on R2 under Address Family")
-    for addr_type in ADDR_TYPES:
-        input_import_vrf = {
-            "r2": {
-                "bgp": [
-                    {
-                        "local_as": 200,
-                        "vrf": "RED",
-                        "address_family": {
-                            addr_type: {"unicast": {"import": {"vrf": "default"}}}
-                        },
-                    }
-                ]
-            }
-        }
-        result = create_router_bgp(tgen, topo, input_import_vrf)
-        assert result is True, "Testcase {} : Failed \n Error: {}".format(
-            tc_name, result
-        )
-    # done
-    step("verifying static  Routes  at R2 in VRF RED")
-    for addr_type in ADDR_TYPES:
-        dut = "r2"
-        input_dict_1 = {"r2": topo["routers"]["r2"]}
-        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-
-    step("verifying static  Routes  at R1 in VRF RED")
-    for addr_type in ADDR_TYPES:
-        dut = "r1"
-        input_dict_1 = {"r1": topo["routers"]["r1"]}
-        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-
-    step("Configuring Graceful restart at R2 and R3 ")
-    input_dict = {
-        "r2": {
-            "bgp": {
-                "local_as": "200",
-                "graceful-restart": {
-                    "graceful-restart": True,
-                },
-            }
-        },
-        "r3": {
-            "bgp": {"local_as": "300", "graceful-restart": {"graceful-restart": True}}
-        },
-    }
-
-    configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r2", peer="r3")
-
-    step("verify Graceful restart at R2")
-    for addr_type in ADDR_TYPES:
-        result = verify_graceful_restart(
-            tgen, topo, addr_type, input_dict, dut="r2", peer="r3"
-        )
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-
-    step("verify Graceful restart at R3")
-    for addr_type in ADDR_TYPES:
-        result = verify_graceful_restart(
-            tgen, topo, addr_type, input_dict, dut="r3", peer="r2"
-        )
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-
-    step("Configuring Graceful-restart-disable at R3")
-    input_dict = {
-        "r2": {
-            "bgp": {
-                "local_as": "200",
-                "graceful-restart": {
-                    "graceful-restart": False,
-                },
-            }
-        },
-        "r3": {
-            "bgp": {"local_as": "300", "graceful-restart": {"graceful-restart": False}}
-        },
-    }
-    configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r3", peer="r2")
-
-    step("Verify Graceful-restart-disable at R3")
-    for addr_type in ADDR_TYPES:
-        result = verify_graceful_restart(
-            tgen, topo, addr_type, input_dict, dut="r3", peer="r2"
-        )
-        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
-
-    for iteration in range(5):
-        step("graceful-restart-disable:True  at R3")
-        input_dict = {
-            "r3": {
-                "bgp": {
-                    "graceful-restart": {
-                        "graceful-restart-disable": True,
-                    }
-                }
-            }
-        }
-        configure_gr_followed_by_clear(
-            tgen, topo, input_dict, tc_name, dut="r3", peer="r2"
-        )
-
-        step("Verifying  Routes at R2 on enabling GRD")
-        dut = "r2"
-        for addr_type in ADDR_TYPES:
-            input_dict_1 = {"r2": topo["routers"]["r2"]}
-            result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
-            assert result is True, "Testcase {} :Failed \n Error {}".format(
-                tc_name, result
-            )
-
-        step("Verify stale Routes in Router R2 enabling GRD")
-        for addr_type in ADDR_TYPES:
-            dut = "r2"
-            protocol = "bgp"
-            verify_nh_for_static_rtes = {
-                "r3": {
-                    "static_routes": [
-                        {
-                            "network": [NETWORK1_1[addr_type]],
-                            "no_of_ip": 2,
-                            "vrf": "RED",
-                        }
-                    ]
-                }
-            }
-            bgp_rib_next_hops = verify_stale_routes_list(
-                tgen, addr_type, dut, verify_nh_for_static_rtes
-            )
-            assert (
-                len(bgp_rib_next_hops) == 1
-            ) is True, "Testcase {} : Failed \n Error: {}".format(
-                tc_name, bgp_rib_next_hops, expected=True
-            )
-
-        step("graceful-restart-disable:False at R3")
-        input_dict = {
-            "r3": {
-                "bgp": {
-                    "graceful-restart": {
-                        "graceful-restart-disable": False,
-                    }
-                }
-            }
-        }
-        configure_gr_followed_by_clear(
-            tgen, topo, input_dict, tc_name, dut="r3", peer="r2"
-        )
-
-        step("Verifying  Routes at R2 on disabling GRD")
-        dut = "r2"
-        for addr_type in ADDR_TYPES:
-            input_dict_1 = {"r2": topo["routers"]["r2"]}
-            result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
-            assert result is True, "Testcase {} :Failed \n Error {}".format(
-                tc_name, result
-            )
-
-        step("Verify stale Routes in Router R2 on disabling GRD")
-        for addr_type in ADDR_TYPES:
-            dut = "r2"
-            protocol = "bgp"
-            verify_nh_for_static_rtes = {
-                "r3": {
-                    "static_routes": [
-                        {
-                            "network": [NETWORK1_1[addr_type]],
-                            "no_of_ip": 2,
-                            "vrf": "RED",
-                        }
-                    ]
-                }
-            }
-            bgp_rib_next_hops = verify_stale_routes_list(
-                tgen, addr_type, dut, verify_nh_for_static_rtes
-            )
-
-            stale_route_status = len(bgp_rib_next_hops) == 1
-            assert (
-                stale_route_status is True
-            ), "Testcase {} : Failed \n Error: {}".format(
-                tc_name, stale_route_status, expected=True
-            )
-    write_test_footer(tc_name)
diff --git a/tests/topotests/bgp_gr_functionality_topo3/test_bgp_gr_functionality_topo3.py b/tests/topotests/bgp_gr_functionality_topo3/test_bgp_gr_functionality_topo3.py
new file mode 100644 (file)
index 0000000..593a8d6
--- /dev/null
@@ -0,0 +1,541 @@
+#!/usr/bin/env python
+# SPDX-License-Identifier: ISC
+#
+# Copyright (c) 2019 by VMware, Inc. ("VMware")
+# Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
+# in this file.
+#
+
+import os
+import sys
+import time
+import pytest
+from time import sleep
+
+import traceback
+import ipaddress
+
+# Save the Current Working Directory to find configuration files.
+CWD = os.path.dirname(os.path.realpath(__file__))
+sys.path.append(os.path.join("../"))
+sys.path.append(os.path.join("../lib/"))
+
+# pylint: disable=C0413
+# Import topogen and topotest helpers
+from lib.topogen import Topogen, get_topogen
+from lib.topolog import logger
+
+# Required to instantiate the topology builder class.
+
+# Import topoJson from lib, to create topology and initial configuration
+from lib.topojson import build_config_from_json
+from lib.bgp import (
+    clear_bgp,
+    verify_bgp_rib,
+    verify_graceful_restart,
+    create_router_bgp,
+    verify_r_bit,
+    verify_eor,
+    verify_f_bit,
+    verify_bgp_convergence,
+    verify_gr_address_family,
+    modify_bgp_config_when_bgpd_down,
+    verify_graceful_restart_timers,
+    verify_bgp_convergence_from_running_config,
+)
+
+# Import common_config to use commomnly used APIs
+from lib.common_config import (
+    create_common_configuration,
+    InvalidCLIError,
+    retry,
+    generate_ips,
+    FRRCFG_FILE,
+    find_interface_with_greater_ip,
+    check_address_types,
+    validate_ip_address,
+    run_frr_cmd,
+    get_frr_ipv6_linklocal,
+)
+
+from lib.common_config import (
+    write_test_header,
+    reset_config_on_routers,
+    start_topology,
+    kill_router_daemons,
+    start_router_daemons,
+    verify_rib,
+    check_address_types,
+    write_test_footer,
+    check_router_status,
+    step,
+    get_frr_ipv6_linklocal,
+    create_static_routes,
+    required_linux_kernel_version,
+)
+
+pytestmark = [pytest.mark.bgpd]
+
+
+# Global variables
+BGP_CONVERGENCE = False
+GR_RESTART_TIMER = 5
+GR_SELECT_DEFER_TIMER = 5
+GR_STALEPATH_TIMER = 5
+# Global variables
+# STATIC_ROUTES=[]
+NETWORK1_1 = {"ipv4": "192.0.2.1/32", "ipv6": "2001:DB8::1:1/128"}
+NETWORK1_2 = {"ipv4": "192.0.2.2/32", "ipv6": "2001:DB8::2:1/128"}
+NETWORK2_1 = {"ipv4": "192.0.2.3/32", "ipv6": "2001:DB8::3:1/128"}
+NETWORK2_2 = {"ipv4": "192.0.2.4/32", "ipv6": "2001:DB8::4:1/128"}
+NETWORK3_1 = {"ipv4": "192.0.2.5/32", "ipv6": "2001:DB8::5:1/128"}
+NETWORK3_2 = {"ipv4": "192.0.2.6/32", "ipv6": "2001:DB8::6:1/128"}
+NETWORK4_1 = {"ipv4": "192.0.2.7/32", "ipv6": "2001:DB8::7:1/128"}
+NETWORK4_2 = {"ipv4": "192.0.2.8/32", "ipv6": "2001:DB8::8:1/128"}
+NETWORK5_1 = {"ipv4": "192.0.2.9/32", "ipv6": "2001:DB8::9:1/128"}
+NETWORK5_2 = {"ipv4": "192.0.2.10/32", "ipv6": "2001:DB8::10:1/128"}
+
+NEXT_HOP_IP = {"ipv4": "Null0", "ipv6": "Null0"}
+
+PREFERRED_NEXT_HOP = "link_local"
+
+
+def configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut, peer):
+    """
+    result = configure_gr_followed_by_clear(tgen, topo, dut)
+    assert result is True, \
+        "Testcase {} :Failed \n Error {}". \
+            format(tc_name, result)
+    """
+
+    result = create_router_bgp(tgen, topo, input_dict)
+    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+    for addr_type in ADDR_TYPES:
+        clear_bgp(tgen, addr_type, dut)
+
+    for addr_type in ADDR_TYPES:
+        clear_bgp(tgen, addr_type, peer)
+
+    result = verify_bgp_convergence_from_running_config(tgen)
+    assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
+
+    return True
+
+
+def verify_stale_routes_list(tgen, addr_type, dut, input_dict):
+    """
+    This API is use verify Stale routes on refering the network with next hop value
+    Parameters
+    ----------
+    * `tgen`: topogen object
+    * `dut`: input dut router name
+    * `addr_type` : ip type ipv4/ipv6
+    * `input_dict` : input dict, has details of static routes
+    Usage
+    -----
+    dut = 'r1'
+    input_dict = {
+                "r3": {
+                    "static_routes": [
+
+                        {
+                            "network": [NETWORK1_1[addr_type]],
+                            "no_of_ip": 2,
+                            "vrf": "RED"
+                        }
+                    ]
+                }
+            }
+
+    result = verify_stale_routes_list(tgen, addr_type, dut, input_dict)
+    Returns
+    -------
+    errormsg(str) or True
+    """
+    logger.debug("Entering lib API: verify_stale_routes_list()")
+    router_list = tgen.routers()
+    additional_nexthops_in_required_nhs = []
+    list1 = []
+    list2 = []
+    found_hops = []
+    for routerInput in input_dict.keys():
+        for router, rnode in router_list.items():
+            if router != dut:
+                continue
+            # Verifying RIB routes
+            command = "show bgp"
+            # Static routes
+            sleep(2)
+            logger.info("Checking router {} BGP RIB:".format(dut))
+            if "static_routes" in input_dict[routerInput]:
+                static_routes = input_dict[routerInput]["static_routes"]
+                for static_route in static_routes:
+                    found_routes = []
+                    missing_routes = []
+                    st_found = False
+                    nh_found = False
+                    vrf = static_route.setdefault("vrf", None)
+                    community = static_route.setdefault("community", None)
+                    largeCommunity = static_route.setdefault("largeCommunity", None)
+                    if vrf:
+                        cmd = "{} vrf {} {}".format(command, vrf, addr_type)
+                        if community:
+                            cmd = "{} community {}".format(cmd, community)
+                        if largeCommunity:
+                            cmd = "{} large-community {}".format(cmd, largeCommunity)
+                    else:
+                        cmd = "{} {}".format(command, addr_type)
+                    cmd = "{} json".format(cmd)
+                    rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
+                    # Verifying output dictionary rib_routes_json is not empty
+                    if bool(rib_routes_json) == False:
+                        errormsg = "[DUT: {}]: No route found in rib of router".format(
+                            router
+                        )
+                        return errormsg
+                    elif "warning" in rib_routes_json:
+                        errormsg = "[DUT: {}]: {}".format(
+                            router, rib_routes_json["warning"]
+                        )
+                        return errormsg
+                    network = static_route["network"]
+                    if "no_of_ip" in static_route:
+                        no_of_ip = static_route["no_of_ip"]
+                    else:
+                        no_of_ip = 1
+                    # Generating IPs for verification
+                    ip_list = generate_ips(network, no_of_ip)
+
+                    for st_rt in ip_list:
+                        st_rt = str(ipaddress.ip_network(st_rt))
+                        _addr_type = validate_ip_address(st_rt)
+                        if _addr_type != addr_type:
+                            continue
+                        if st_rt in rib_routes_json["routes"]:
+                            st_found = True
+
+                            found_routes.append(st_rt)
+                            for mnh in range(0, len(rib_routes_json["routes"][st_rt])):
+                                found_hops.append(
+                                    [
+                                        rib_r["ip"]
+                                        for rib_r in rib_routes_json["routes"][st_rt][
+                                            mnh
+                                        ]["nexthops"]
+                                    ]
+                                )
+                            return found_hops
+                        else:
+                            return "error  msg - no hops found"
+
+
+def setup_module(mod):
+    """
+    Sets up the pytest environment
+
+    * `mod`: module name
+    """
+
+    # Required linux kernel version for this suite to run.
+    result = required_linux_kernel_version("4.16")
+    if result is not True:
+        pytest.skip("Kernel requirements are not met, kernel version should be >=4.16")
+
+    global ADDR_TYPES
+
+    testsuite_run_time = time.asctime(time.localtime(time.time()))
+    logger.info("Testsuite start time: {}".format(testsuite_run_time))
+    logger.info("=" * 40)
+
+    logger.info("Running setup_module to create topology")
+
+    # This function initiates the topology build with Topogen...
+    json_file = "{}/bgp_gr_functionality_topo3.json".format(CWD)
+    tgen = Topogen(json_file, mod.__name__)
+    global topo
+    topo = tgen.json_topo
+    # ... and here it calls Mininet initialization functions.
+
+    # Starting topology, create tmp files which are loaded to routers
+    #  to start daemons and then start routers
+    start_topology(tgen)
+
+    # Creating configuration from JSON
+    build_config_from_json(tgen, topo)
+
+    # Api call verify whether BGP is converged
+    ADDR_TYPES = check_address_types()
+
+    for addr_type in ADDR_TYPES:
+        BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
+        assert BGP_CONVERGENCE is True, "setup_module : Failed \n Error:" " {}".format(
+            BGP_CONVERGENCE
+        )
+
+    logger.info("Running setup_module() done")
+
+
+def teardown_module(mod):
+    """
+    Teardown the pytest environment
+
+    * `mod`: module name
+    """
+
+    logger.info("Running teardown_module to delete topology")
+
+    tgen = get_topogen()
+
+    # Stop toplogy and Remove tmp files
+    tgen.stop_topology()
+
+    logger.info(
+        "Testsuite end time: {}".format(time.asctime(time.localtime(time.time())))
+    )
+    logger.info("=" * 40)
+
+
+################################################################################
+#
+#                       TEST CASES
+#
+################################################################################
+def test_bgp_gr_stale_routes(request):
+    tgen = get_topogen()
+    tc_name = request.node.name
+    write_test_header(tc_name)
+
+    step("Verify the router failures")
+    if tgen.routers_have_failure():
+        check_router_status(tgen)
+
+    step("Creating 5 static Routes in Router R3 with NULL0 as Next hop")
+    for addr_type in ADDR_TYPES:
+        input_dict_1 = {
+            "r3": {
+                "static_routes": [
+                    {
+                        "network": [NETWORK1_1[addr_type]] + [NETWORK1_2[addr_type]],
+                        "next_hop": NEXT_HOP_IP[addr_type],
+                    },
+                    {
+                        "network": [NETWORK2_1[addr_type]] + [NETWORK2_2[addr_type]],
+                        "next_hop": NEXT_HOP_IP[addr_type],
+                    },
+                    {
+                        "network": [NETWORK3_1[addr_type]] + [NETWORK3_2[addr_type]],
+                        "next_hop": NEXT_HOP_IP[addr_type],
+                    },
+                    {
+                        "network": [NETWORK4_1[addr_type]] + [NETWORK4_2[addr_type]],
+                        "next_hop": NEXT_HOP_IP[addr_type],
+                    },
+                    {
+                        "network": [NETWORK5_1[addr_type]] + [NETWORK5_2[addr_type]],
+                        "next_hop": NEXT_HOP_IP[addr_type],
+                    },
+                ]
+            }
+        }
+        result = create_static_routes(tgen, input_dict_1)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+    step("verifying Created  Route  at R3 in VRF default")
+    for addr_type in ADDR_TYPES:
+        dut = "r3"
+        input_dict_1 = {"r3": topo["routers"]["r3"]}
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+    # done
+    step("verifying Created  Route  at R2 in VRF default")
+    for addr_type in ADDR_TYPES:
+        dut = "r2"
+        input_dict_1 = {"r2": topo["routers"]["r2"]}
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+    step("importing vrf RED on R2 under Address Family")
+    for addr_type in ADDR_TYPES:
+        input_import_vrf = {
+            "r2": {
+                "bgp": [
+                    {
+                        "local_as": 200,
+                        "vrf": "RED",
+                        "address_family": {
+                            addr_type: {"unicast": {"import": {"vrf": "default"}}}
+                        },
+                    }
+                ]
+            }
+        }
+        result = create_router_bgp(tgen, topo, input_import_vrf)
+        assert result is True, "Testcase {} : Failed \n Error: {}".format(
+            tc_name, result
+        )
+    # done
+    step("verifying static  Routes  at R2 in VRF RED")
+    for addr_type in ADDR_TYPES:
+        dut = "r2"
+        input_dict_1 = {"r2": topo["routers"]["r2"]}
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+    step("verifying static  Routes  at R1 in VRF RED")
+    for addr_type in ADDR_TYPES:
+        dut = "r1"
+        input_dict_1 = {"r1": topo["routers"]["r1"]}
+        result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+    step("Configuring Graceful restart at R2 and R3 ")
+    input_dict = {
+        "r2": {
+            "bgp": {
+                "local_as": "200",
+                "graceful-restart": {
+                    "graceful-restart": True,
+                },
+            }
+        },
+        "r3": {
+            "bgp": {"local_as": "300", "graceful-restart": {"graceful-restart": True}}
+        },
+    }
+
+    configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r2", peer="r3")
+
+    step("verify Graceful restart at R2")
+    for addr_type in ADDR_TYPES:
+        result = verify_graceful_restart(
+            tgen, topo, addr_type, input_dict, dut="r2", peer="r3"
+        )
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+    step("verify Graceful restart at R3")
+    for addr_type in ADDR_TYPES:
+        result = verify_graceful_restart(
+            tgen, topo, addr_type, input_dict, dut="r3", peer="r2"
+        )
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+    step("Configuring Graceful-restart-disable at R3")
+    input_dict = {
+        "r2": {
+            "bgp": {
+                "local_as": "200",
+                "graceful-restart": {
+                    "graceful-restart": False,
+                },
+            }
+        },
+        "r3": {
+            "bgp": {"local_as": "300", "graceful-restart": {"graceful-restart": False}}
+        },
+    }
+    configure_gr_followed_by_clear(tgen, topo, input_dict, tc_name, dut="r3", peer="r2")
+
+    step("Verify Graceful-restart-disable at R3")
+    for addr_type in ADDR_TYPES:
+        result = verify_graceful_restart(
+            tgen, topo, addr_type, input_dict, dut="r3", peer="r2"
+        )
+        assert result is True, "Testcase {} :Failed \n Error {}".format(tc_name, result)
+
+    for iteration in range(5):
+        step("graceful-restart-disable:True  at R3")
+        input_dict = {
+            "r3": {
+                "bgp": {
+                    "graceful-restart": {
+                        "graceful-restart-disable": True,
+                    }
+                }
+            }
+        }
+        configure_gr_followed_by_clear(
+            tgen, topo, input_dict, tc_name, dut="r3", peer="r2"
+        )
+
+        step("Verifying  Routes at R2 on enabling GRD")
+        dut = "r2"
+        for addr_type in ADDR_TYPES:
+            input_dict_1 = {"r2": topo["routers"]["r2"]}
+            result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
+            assert result is True, "Testcase {} :Failed \n Error {}".format(
+                tc_name, result
+            )
+
+        step("Verify stale Routes in Router R2 enabling GRD")
+        for addr_type in ADDR_TYPES:
+            dut = "r2"
+            protocol = "bgp"
+            verify_nh_for_static_rtes = {
+                "r3": {
+                    "static_routes": [
+                        {
+                            "network": [NETWORK1_1[addr_type]],
+                            "no_of_ip": 2,
+                            "vrf": "RED",
+                        }
+                    ]
+                }
+            }
+            bgp_rib_next_hops = verify_stale_routes_list(
+                tgen, addr_type, dut, verify_nh_for_static_rtes
+            )
+            assert (
+                len(bgp_rib_next_hops) == 1
+            ) is True, "Testcase {} : Failed \n Error: {}".format(
+                tc_name, bgp_rib_next_hops, expected=True
+            )
+
+        step("graceful-restart-disable:False at R3")
+        input_dict = {
+            "r3": {
+                "bgp": {
+                    "graceful-restart": {
+                        "graceful-restart-disable": False,
+                    }
+                }
+            }
+        }
+        configure_gr_followed_by_clear(
+            tgen, topo, input_dict, tc_name, dut="r3", peer="r2"
+        )
+
+        step("Verifying  Routes at R2 on disabling GRD")
+        dut = "r2"
+        for addr_type in ADDR_TYPES:
+            input_dict_1 = {"r2": topo["routers"]["r2"]}
+            result = verify_bgp_rib(tgen, addr_type, dut, input_dict_1)
+            assert result is True, "Testcase {} :Failed \n Error {}".format(
+                tc_name, result
+            )
+
+        step("Verify stale Routes in Router R2 on disabling GRD")
+        for addr_type in ADDR_TYPES:
+            dut = "r2"
+            protocol = "bgp"
+            verify_nh_for_static_rtes = {
+                "r3": {
+                    "static_routes": [
+                        {
+                            "network": [NETWORK1_1[addr_type]],
+                            "no_of_ip": 2,
+                            "vrf": "RED",
+                        }
+                    ]
+                }
+            }
+            bgp_rib_next_hops = verify_stale_routes_list(
+                tgen, addr_type, dut, verify_nh_for_static_rtes
+            )
+
+            stale_route_status = len(bgp_rib_next_hops) == 1
+            assert (
+                stale_route_status is True
+            ), "Testcase {} : Failed \n Error: {}".format(
+                tc_name, stale_route_status, expected=True
+            )
+    write_test_footer(tc_name)