2 # Copyright (c) 2020 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
21 from copy
import deepcopy
23 from time
import sleep
24 from lib
.topolog
import logger
26 from lib
.topotest
import frr_unicode
28 # Import common_config to use commomnly used APIs
29 from lib
.common_config
import (
30 create_common_configuration
,
39 LOGDIR
= "/tmp/topotests/"
42 ################################
44 ################################
47 def create_router_ospf(tgen
, topo
, input_dict
=None, build
=False, load_config
=True):
49 API to configure ospf on router.
53 * `tgen` : Topogen object
54 * `topo` : json file data
55 * `input_dict` : Input dict data, required when configuring from testcase
56 * `build` : Only for initial setup phase this is set as True.
57 * `load_config` : Loading the config to router this is set as True.
64 "router_id": "22.22.22.22",
65 "area": [{ "id":0.0.0.0, "type": "nssa"}]
69 result = create_router_ospf(tgen, topo, input_dict)
75 logger
.debug("Entering lib API: create_router_ospf()")
79 input_dict
= deepcopy(topo
)
81 topo
= topo
["routers"]
82 input_dict
= deepcopy(input_dict
)
84 for router
in input_dict
.keys():
85 if "ospf" not in input_dict
[router
]:
86 logger
.debug("Router %s: 'ospf' not present in input_dict", router
)
89 result
= __create_ospf_global(tgen
, input_dict
, router
, build
, load_config
)
91 ospf_data
= input_dict
[router
]["ospf"]
93 logger
.debug("Exiting lib API: create_router_ospf()")
97 def __create_ospf_global(tgen
, input_dict
, router
, build
=False, load_config
=True):
99 Helper API to create ospf global configuration.
103 * `tgen` : Topogen object
104 * `input_dict` : Input dict data, required when configuring from testcase
105 * `router` : router to be configured.
106 * `build` : Only for initial setup phase this is set as True.
107 * `load_config` : Loading the config to router this is set as True.
115 logger
.debug("Entering lib API: __create_ospf_global()")
118 ospf_data
= input_dict
[router
]["ospf"]
119 del_ospf_action
= ospf_data
.setdefault("delete", False)
121 config_data
= ["no router ospf"]
122 result
= create_common_configuration(
123 tgen
, router
, config_data
, "ospf", build
, load_config
130 config_data
.append(cmd
)
133 router_id
= ospf_data
.setdefault("router_id", None)
134 del_router_id
= ospf_data
.setdefault("del_router_id", False)
136 config_data
.append("no ospf router-id")
138 config_data
.append("ospf router-id {}".format(router_id
))
140 # redistribute command
141 redistribute_data
= ospf_data
.setdefault("redistribute", {})
142 if redistribute_data
:
143 for redistribute
in redistribute_data
:
144 if "redist_type" not in redistribute
:
146 "Router %s: 'redist_type' not present in " "input_dict", router
149 cmd
= "redistribute {}".format(redistribute
["redist_type"])
150 for red_type
in redistribute_data
:
151 if "route_map" in red_type
:
152 cmd
= cmd
+ " route-map {}".format(red_type
["route_map"])
153 del_action
= redistribute
.setdefault("delete", False)
155 cmd
= "no {}".format(cmd
)
156 config_data
.append(cmd
)
158 area_data
= ospf_data
.setdefault("area", {})
160 for area
in area_data
:
163 "Router %s: 'area id' not present in " "input_dict", router
166 cmd
= "area {}".format(area
["id"])
169 cmd
= cmd
+ " {}".format(area
["type"])
171 del_action
= area
.setdefault("delete", False)
173 cmd
= "no {}".format(cmd
)
174 config_data
.append(cmd
)
176 # summary information
177 summary_data
= ospf_data
.setdefault("summary-address", {})
179 for summary
in summary_data
:
180 if "prefix" not in summary
:
182 "Router %s: 'summary-address' not present in " "input_dict",
186 cmd
= "summary {}/{}".format(summary
["prefix"], summary
["mask"])
188 _tag
= summary
.setdefault("tag", None)
190 cmd
= "{} tag {}".format(cmd
, _tag
)
192 _advertise
= summary
.setdefault("advertise", True)
194 cmd
= "{} no-advertise".format(cmd
)
196 del_action
= summary
.setdefault("delete", False)
198 cmd
= "no {}".format(cmd
)
199 config_data
.append(cmd
)
200 result
= create_common_configuration(
201 tgen
, router
, config_data
, "ospf", build
, load_config
204 except InvalidCLIError
:
206 errormsg
= traceback
.format_exc()
207 logger
.error(errormsg
)
210 logger
.debug("Exiting lib API: create_ospf_global()")
214 def create_router_ospf6(tgen
, topo
, input_dict
=None, build
=False, load_config
=True):
216 API to configure ospf on router
220 * `tgen` : Topogen object
221 * `topo` : json file data
222 * `input_dict` : Input dict data, required when configuring from testcase
223 * `build` : Only for initial setup phase this is set as True.
230 "router_id": "22.22.22.22",
238 logger
.debug("Entering lib API: create_router_ospf()")
242 input_dict
= deepcopy(topo
)
244 topo
= topo
["routers"]
245 input_dict
= deepcopy(input_dict
)
246 for router
in input_dict
.keys():
247 if "ospf" not in input_dict
[router
]:
248 logger
.debug("Router %s: 'ospf' not present in input_dict", router
)
251 result
= __create_ospf_global(tgen
, input_dict
, router
, build
, load_config
)
253 logger
.debug("Exiting lib API: create_router_ospf()")
257 def __create_ospf6_global(tgen
, input_dict
, router
, build
=False, load_config
=True):
259 Helper API to create ospf global configuration.
263 * `tgen` : Topogen object
264 * `input_dict` : Input dict data, required when configuring from testcase
265 * `router` : router id to be configured.
266 * `build` : Only for initial setup phase this is set as True.
274 logger
.debug("Entering lib API: __create_ospf_global()")
277 ospf_data
= input_dict
[router
]["ospf6"]
278 del_ospf_action
= ospf_data
.setdefault("delete", False)
280 config_data
= ["no ipv6 router ospf"]
281 result
= create_common_configuration(
282 tgen
, router
, config_data
, "ospf", build
, load_config
289 config_data
.append(cmd
)
291 router_id
= ospf_data
.setdefault("router_id", None)
292 del_router_id
= ospf_data
.setdefault("del_router_id", False)
294 config_data
.append("no ospf router-id")
296 config_data
.append("ospf router-id {}".format(router_id
))
298 result
= create_common_configuration(
299 tgen
, router
, config_data
, "ospf", build
, load_config
301 except InvalidCLIError
:
303 errormsg
= traceback
.format_exc()
304 logger
.error(errormsg
)
307 logger
.debug("Exiting lib API: create_ospf_global()")
311 def config_ospf_interface(tgen
, topo
, input_dict
=None, build
=False, load_config
=True):
313 API to configure ospf on router.
317 * `tgen` : Topogen object
318 * `topo` : json file data
319 * `input_dict` : Input dict data, required when configuring from testcase
320 * `build` : Only for initial setup phase this is set as True.
321 * `load_config` : Loading the config to router this is set as True.
330 "authentication": 'message-digest',
331 "authentication-key": "ospf",
332 "message-digest-key": "10"
338 result = config_ospf_interface(tgen, topo, r1_ospf_auth)
344 logger
.debug("Enter lib config_ospf_interface")
346 input_dict
= deepcopy(topo
)
348 input_dict
= deepcopy(input_dict
)
349 for router
in input_dict
.keys():
351 for lnk
in input_dict
[router
]["links"].keys():
352 if "ospf" not in input_dict
[router
]["links"][lnk
]:
354 "Router %s: ospf configs is not present in"
355 "input_dict, passed input_dict",
360 ospf_data
= input_dict
[router
]["links"][lnk
]["ospf"]
361 data_ospf_area
= ospf_data
.setdefault("area", None)
362 data_ospf_auth
= ospf_data
.setdefault("authentication", None)
363 data_ospf_dr_priority
= ospf_data
.setdefault("priority", None)
364 data_ospf_cost
= ospf_data
.setdefault("cost", None)
367 intf
= topo
["routers"][router
]["links"][lnk
]["interface"]
369 intf
= topo
["switches"][router
]["links"][lnk
]["interface"]
372 cmd
= "interface {}".format(intf
)
374 config_data
.append(cmd
)
375 # interface area config
377 cmd
= "ip ospf area {}".format(data_ospf_area
)
378 config_data
.append(cmd
)
379 # interface ospf auth
381 if data_ospf_auth
== "null":
382 cmd
= "ip ospf authentication null"
383 elif data_ospf_auth
== "message-digest":
384 cmd
= "ip ospf authentication message-digest"
386 cmd
= "ip ospf authentication"
388 if "del_action" in ospf_data
:
389 cmd
= "no {}".format(cmd
)
390 config_data
.append(cmd
)
392 if "message-digest-key" in ospf_data
:
393 cmd
= "ip ospf message-digest-key {} md5 {}".format(
394 ospf_data
["message-digest-key"], ospf_data
["authentication-key"]
396 if "del_action" in ospf_data
:
397 cmd
= "no {}".format(cmd
)
398 config_data
.append(cmd
)
401 "authentication-key" in ospf_data
402 and "message-digest-key" not in ospf_data
404 cmd
= "ip ospf authentication-key {}".format(
405 ospf_data
["authentication-key"]
407 if "del_action" in ospf_data
:
408 cmd
= "no {}".format(cmd
)
409 config_data
.append(cmd
)
411 # interface ospf dr priority
412 if data_ospf_dr_priority
in ospf_data
:
413 cmd
= "ip ospf priority {}".format(ospf_data
["priority"])
414 if "del_action" in ospf_data
:
415 cmd
= "no {}".format(cmd
)
416 config_data
.append(cmd
)
418 # interface ospf cost
419 if data_ospf_cost
in ospf_data
:
420 cmd
= "ip ospf cost {}".format(ospf_data
["cost"])
421 if "del_action" in ospf_data
:
422 cmd
= "no {}".format(cmd
)
423 config_data
.append(cmd
)
428 result
= create_common_configuration(
429 tgen
, router
, config_data
, "interface_config", build
=build
431 logger
.debug("Exiting lib API: create_igmp_config()")
435 def clear_ospf(tgen
, router
):
437 This API is to clear ospf neighborship by running
438 clear ip ospf interface * command,
442 * `tgen`: topogen object
443 * `router`: device under test
447 clear_ospf(tgen, "r1")
450 logger
.debug("Entering lib API: clear_ospf()")
451 if router
not in tgen
.routers():
454 rnode
= tgen
.routers()[router
]
457 logger
.info("Clearing ospf process for router %s..", router
)
459 run_frr_cmd(rnode
, "clear ip ospf interface ")
461 logger
.debug("Exiting lib API: clear_ospf()")
464 ################################
466 ################################
467 @retry(attempts
=40, wait
=2, return_is_str
=True)
468 def verify_ospf_neighbor(tgen
, topo
, dut
=None, input_dict
=None, lan
=False):
470 This API is to verify ospf neighborship by running
471 show ip ospf neighbour command,
475 * `tgen` : Topogen object
476 * `topo` : json file data
477 * `dut`: device under test
478 * `input_dict` : Input dict data, required when configuring from testcase
479 * `lan` : verify neighbors in lan topology
483 1. To check FULL neighbors.
484 verify_ospf_neighbor(tgen, topo, dut=dut)
486 2. To check neighbors with their roles.
507 result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
511 True or False (Error Message)
513 logger
.debug("Entering lib API: verify_ospf_neighbor()")
516 for router
, rnode
in tgen
.routers().items():
517 if "ospf" not in topo
["routers"][router
]:
520 if dut
is not None and dut
!= router
:
523 logger
.info("Verifying OSPF neighborship on router %s:", router
)
524 show_ospf_json
= run_frr_cmd(
525 rnode
, "show ip ospf neighbor all json", isjson
=True
528 # Verifying output dictionary show_ospf_json is empty or not
529 if not bool(show_ospf_json
):
530 errormsg
= "OSPF is not running"
533 ospf_data_list
= input_dict
[router
]["ospf"]
534 ospf_nbr_list
= ospf_data_list
["neighbors"]
536 for ospf_nbr
, nbr_data
in ospf_nbr_list
.items():
537 data_ip
= topo
["routers"][ospf_nbr
]["links"]
538 data_rid
= topo
["routers"][ospf_nbr
]["ospf"]["router_id"]
539 if ospf_nbr
in data_ip
:
540 nbr_details
= nbr_data
[ospf_nbr
]
542 for switch
in topo
["switches"]:
543 if "ospf" in topo
["switches"][switch
]["links"][router
]:
544 neighbor_ip
= data_ip
[switch
]["ipv4"].split("/")[0]
548 neighbor_ip
= data_ip
[router
]["ipv4"].split("/")[0]
551 neighbor_ip
= neighbor_ip
.lower()
554 nh_state
= show_ospf_json
[nbr_rid
][0]["state"].split("/")[0]
555 intf_state
= show_ospf_json
[nbr_rid
][0]["state"].split("/")[1]
557 errormsg
= "[DUT: {}] OSPF peer {} missing".format(router
, nbr_rid
)
560 nbr_state
= nbr_data
.setdefault("state", None)
561 nbr_role
= nbr_data
.setdefault("role", None)
564 if nbr_state
== nh_state
:
566 "[DUT: {}] OSPF Nbr is {}:{} State {}".format(
567 router
, ospf_nbr
, nbr_rid
, nh_state
573 "[DUT: {}] OSPF is not Converged, neighbor"
574 " state is {}".format(router
, nh_state
)
578 if nbr_role
== intf_state
:
580 "[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
581 router
, ospf_nbr
, nbr_rid
, nbr_role
586 "[DUT: {}] OSPF is not Converged with rid"
587 "{}, role is {}".format(router
, nbr_rid
, intf_state
)
592 for router
, rnode
in tgen
.routers().items():
593 if "ospf" not in topo
["routers"][router
]:
596 if dut
is not None and dut
!= router
:
599 logger
.info("Verifying OSPF neighborship on router %s:", router
)
600 show_ospf_json
= run_frr_cmd(
601 rnode
, "show ip ospf neighbor all json", isjson
=True
603 # Verifying output dictionary show_ospf_json is empty or not
604 if not bool(show_ospf_json
):
605 errormsg
= "OSPF is not running"
608 ospf_data_list
= topo
["routers"][router
]["ospf"]
609 ospf_neighbors
= ospf_data_list
["neighbors"]
611 total_peer
= len(ospf_neighbors
.keys())
613 ospf_nbr_list
= ospf_data_list
["neighbors"]
615 for ospf_nbr
, nbr_data
in ospf_nbr_list
.items():
617 data_ip
= topo
["routers"][nbr_data
["nbr"]]["links"]
618 data_rid
= topo
["routers"][nbr_data
["nbr"]]["ospf"]["router_id"]
620 data_ip
= topo
["routers"][ospf_nbr
]["links"]
621 data_rid
= topo
["routers"][ospf_nbr
]["ospf"]["router_id"]
622 if ospf_nbr
in data_ip
:
623 nbr_details
= nbr_data
[ospf_nbr
]
625 for switch
in topo
["switches"]:
626 if "ospf" in topo
["switches"][switch
]["links"][router
]:
627 neighbor_ip
= data_ip
[switch
]["ipv4"].split("/")[0]
631 neighbor_ip
= data_ip
[router
]["ipv4"].split("/")[0]
634 neighbor_ip
= neighbor_ip
.lower()
637 nh_state
= show_ospf_json
[nbr_rid
][0]["state"].split("/")[0]
639 errormsg
= "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
640 router
, nbr_rid
, ospf_nbr
644 if nh_state
== "Full":
647 if no_of_peer
== total_peer
:
648 logger
.info("[DUT: {}] OSPF is Converged".format(router
))
651 errormsg
= "[DUT: {}] OSPF is not Converged".format(router
)
654 logger
.debug("Exiting API: verify_ospf_neighbor()")
658 @retry(attempts
=21, wait
=2, return_is_str
=True)
660 tgen
, dut
, input_dict
, next_hop
=None, tag
=None, metric
=None, fib
=None
663 This API is to verify ospf routes by running
664 show ip ospf route command.
668 * `tgen` : Topogen object
669 * `dut`: device under test
670 * `input_dict` : Input dict data, required when configuring from testcase
671 * `next_hop` : next to be verified
672 * `tag` : tag to be verified
673 * `metric` : metric to be verified
674 * `fib` : True if the route is installed in FIB.
690 result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
694 True or False (Error Message)
697 logger
.info("Entering lib API: verify_ospf_rib()")
699 router_list
= tgen
.routers()
700 additional_nexthops_in_required_nhs
= []
702 for routerInput
in input_dict
.keys():
703 for router
, rnode
in router_list
.items():
707 logger
.info("Checking router %s RIB:", router
)
709 # Verifying RIB routes
710 command
= "show ip ospf route"
716 "static_routes" in input_dict
[routerInput
]
717 or "prefix" in input_dict
[routerInput
]
719 if "prefix" in input_dict
[routerInput
]:
720 static_routes
= input_dict
[routerInput
]["prefix"]
722 static_routes
= input_dict
[routerInput
]["static_routes"]
724 for static_route
in static_routes
:
725 cmd
= "{}".format(command
)
727 cmd
= "{} json".format(cmd
)
729 ospf_rib_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
731 # Verifying output dictionary ospf_rib_json is not empty
732 if bool(ospf_rib_json
) is False:
734 "[DUT: {}] No routes found in OSPF route "
735 "table".format(router
)
739 network
= static_route
["network"]
740 no_of_ip
= static_route
.setdefault("no_of_ip", 1)
741 _tag
= static_route
.setdefault("tag", None)
742 _rtype
= static_route
.setdefault("routeType", None)
744 # Generating IPs for verification
745 ip_list
= generate_ips(network
, no_of_ip
)
749 for st_rt
in ip_list
:
750 st_rt
= str(ipaddr
.IPNetwork(frr_unicode(st_rt
)))
752 _addr_type
= validate_ip_address(st_rt
)
753 if _addr_type
!= "ipv4":
756 if st_rt
in ospf_rib_json
:
758 found_routes
.append(st_rt
)
761 if type(next_hop
) is not list:
762 next_hop
= [next_hop
]
764 for mnh
in range(0, len(ospf_rib_json
[st_rt
])):
767 in ospf_rib_json
[st_rt
][mnh
]["nexthops"][0]
772 for rib_r
in ospf_rib_json
[st_rt
][mnh
][
779 missing_list_of_nexthops
= set(
781 ).difference(next_hop
)
782 additional_nexthops_in_required_nhs
= set(
784 ).difference(found_hops
[0])
786 if additional_nexthops_in_required_nhs
:
789 "%s is not active for route %s in "
790 "RIB of router %s\n",
791 additional_nexthops_in_required_nhs
,
796 "Nexthop {} is not active"
797 " for route {} in RIB of router"
799 additional_nexthops_in_required_nhs
,
808 elif next_hop
and fib
is None:
809 if type(next_hop
) is not list:
810 next_hop
= [next_hop
]
813 for rib_r
in ospf_rib_json
[st_rt
]["nexthops"]
817 missing_list_of_nexthops
= set(
819 ).difference(next_hop
)
820 additional_nexthops_in_required_nhs
= set(
822 ).difference(found_hops
)
824 if additional_nexthops_in_required_nhs
:
826 "Missing nexthop %s for route"
827 " %s in RIB of router %s\n",
828 additional_nexthops_in_required_nhs
,
833 "Nexthop {} is Missing for "
834 "route {} in RIB of router {}\n".format(
835 additional_nexthops_in_required_nhs
,
844 if "routeType" not in ospf_rib_json
[st_rt
]:
846 "[DUT: {}]: routeType missing"
847 "for route {} in OSPF RIB \n".format(dut
, st_rt
)
850 elif _rtype
!= ospf_rib_json
[st_rt
]["routeType"]:
852 "[DUT: {}]: routeType mismatch"
853 "for route {} in OSPF RIB \n".format(dut
, st_rt
)
858 "DUT: {}]: Found routeType {}"
859 "for route {}".format(dut
, _rtype
, st_rt
)
862 if "tag" not in ospf_rib_json
[st_rt
]:
864 "[DUT: {}]: tag is not"
866 " route {} in RIB \n".format(dut
, st_rt
)
870 if _tag
!= ospf_rib_json
[st_rt
]["tag"]:
872 "[DUT: {}]: tag value {}"
873 " is not matched for"
874 " route {} in RIB \n".format(
882 if metric
is not None:
883 if "type2cost" not in ospf_rib_json
[st_rt
]:
885 "[DUT: {}]: metric is"
887 " route {} in RIB \n".format(dut
, st_rt
)
891 if metric
!= ospf_rib_json
[st_rt
]["type2cost"]:
893 "[DUT: {}]: metric value "
894 "{} is not matched for "
895 "route {} in RIB \n".format(
904 missing_routes
.append(st_rt
)
908 "[DUT: {}]: Found next_hop {} for all OSPF"
909 " routes in RIB".format(router
, next_hop
)
912 if len(missing_routes
) > 0:
913 errormsg
= "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
920 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
926 logger
.info("Exiting lib API: verify_ospf_rib()")
930 @retry(attempts
=10, wait
=2, return_is_str
=True)
931 def verify_ospf_interface(tgen
, topo
, dut
=None, lan
=False, input_dict
=None):
933 This API is to verify ospf routes by running
934 show ip ospf interface command.
938 * `tgen` : Topogen object
939 * `topo` : topology descriptions
940 * `dut`: device under test
941 * `lan`: if set to true this interface belongs to LAN.
942 * `input_dict` : Input dict data, required when configuring from testcase
954 'mcastMemberOspfDesignatedRouters': True,
955 'mcastMemberOspfAllRouters': True,
963 result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
967 True or False (Error Message)
970 logger
.debug("Entering lib API: verify_ospf_interface()")
972 for router
, rnode
in tgen
.routers().items():
973 if "ospf" not in topo
["routers"][router
]:
976 if dut
is not None and dut
!= router
:
979 logger
.info("Verifying OSPF interface on router %s:", router
)
980 show_ospf_json
= run_frr_cmd(rnode
, "show ip ospf interface json", isjson
=True)
982 # Verifying output dictionary show_ospf_json is empty or not
983 if not bool(show_ospf_json
):
984 errormsg
= "OSPF is not running"
987 # To find neighbor ip type
988 ospf_intf_data
= input_dict
[router
]["links"]
989 for ospf_intf
, intf_data
in ospf_intf_data
.items():
990 intf
= topo
["routers"][router
]["links"][ospf_intf
]["interface"]
991 if intf
in show_ospf_json
["interfaces"]:
992 for intf_attribute
in intf_data
["ospf"]:
994 intf_data
["ospf"][intf_attribute
]
995 == show_ospf_json
["interfaces"][intf
][intf_attribute
]
998 "[DUT: %s] OSPF interface %s: %s is %s",
1002 intf_data
["ospf"][intf_attribute
],
1005 errormsg
= "[DUT: {}] OSPF interface {}: {} is {}, \
1006 Expected is {}".format(
1010 intf_data
["ospf"][intf_attribute
],
1011 show_ospf_json
["interfaces"][intf
][intf_attribute
],
1015 logger
.debug("Exiting API: verify_ospf_interface()")
1019 @retry(attempts
=11, wait
=2, return_is_str
=True)
1020 def verify_ospf_database(tgen
, topo
, dut
, input_dict
):
1022 This API is to verify ospf lsa's by running
1023 show ip ospf database command.
1027 * `tgen` : Topogen object
1028 * `dut`: device under test
1029 * `input_dict` : Input dict data, required when configuring from testcase
1030 * `topo` : next to be verified
1037 "Router Link States": {
1038 "100.1.1.0-100.1.1.0": {
1039 "LSID": "100.1.1.0",
1040 "Advertised router": "100.1.1.0",
1042 "Sequence Number": "80000006",
1047 "Net Link States": {
1048 "10.0.0.2-100.1.1.1": {
1050 "Advertised router": "100.1.1.1",
1052 "Sequence Number": "80000001",
1059 result = verify_ospf_database(tgen, topo, dut, input_dict)
1063 True or False (Error Message)
1068 logger
.debug("Entering lib API: verify_ospf_database()")
1070 if "ospf" not in topo
["routers"][dut
]:
1071 errormsg
= "[DUT: {}] OSPF is not configured on the router.".format(dut
)
1074 rnode
= tgen
.routers()[dut
]
1076 logger
.info("Verifying OSPF interface on router %s:", dut
)
1077 show_ospf_json
= run_frr_cmd(rnode
, "show ip ospf database json", isjson
=True)
1078 # Verifying output dictionary show_ospf_json is empty or not
1079 if not bool(show_ospf_json
):
1080 errormsg
= "OSPF is not running"
1083 # for inter and inter lsa's
1084 ospf_db_data
= input_dict
.setdefault("areas", None)
1085 ospf_external_lsa
= input_dict
.setdefault("AS External Link States", None)
1087 for ospf_area
, area_lsa
in ospf_db_data
.items():
1088 if ospf_area
in show_ospf_json
["areas"]:
1089 if "Router Link States" in area_lsa
:
1090 for lsa
in area_lsa
["Router Link States"]:
1093 in show_ospf_json
["areas"][ospf_area
]["Router Link States"]
1096 "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
1104 "[DUT: {}] OSPF LSDB area {}: expected"
1105 " Router LSA is {}".format(router
, ospf_area
, lsa
)
1108 if "Net Link States" in area_lsa
:
1109 for lsa
in area_lsa
["Net Link States"]:
1110 if lsa
in show_ospf_json
["areas"][ospf_area
]["Net Link States"]:
1112 "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
1120 "[DUT: {}] OSPF LSDB area {}: expected"
1121 " Network LSA is {}".format(router
, ospf_area
, lsa
)
1124 if "Summary Link States" in area_lsa
:
1125 for lsa
in area_lsa
["Summary Link States"]:
1128 in show_ospf_json
["areas"][ospf_area
]["Summary Link States"]
1131 "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
1139 "[DUT: {}] OSPF LSDB area {}: expected"
1140 " Summary LSA is {}".format(router
, ospf_area
, lsa
)
1143 if "ASBR-Summary Link States" in area_lsa
:
1144 for lsa
in area_lsa
["ASBR-Summary Link States"]:
1147 in show_ospf_json
["areas"][ospf_area
][
1148 "ASBR-Summary Link States"
1152 "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
1160 "[DUT: {}] OSPF LSDB area {}: expected"
1161 " ASBR Summary LSA is {}".format(router
, ospf_area
, lsa
)
1164 if ospf_external_lsa
:
1165 for ospf_ext_lsa
, ext_lsa_data
in ospf_external_lsa
.items():
1166 if ospf_ext_lsa
in show_ospf_json
["AS External Link States"]:
1168 "[DUT: %s] OSPF LSDB:External LSA %s", router
, ospf_ext_lsa
1173 "[DUT: {}] OSPF LSDB : expected"
1174 " External LSA is {}".format(router
, ospf_ext_lsa
)
1178 logger
.debug("Exiting API: verify_ospf_database()")
1182 @retry(attempts
=10, wait
=2, return_is_str
=True)
1183 def verify_ospf_summary(tgen
, topo
, dut
, input_dict
):
1185 This API is to verify ospf routes by running
1186 show ip ospf interface command.
1190 * `tgen` : Topogen object
1191 * `topo` : topology descriptions
1192 * `dut`: device under test
1193 * `input_dict` : Input dict data, required when configuring from testcase
1199 "Summary address": "11.0.0.0/8",
1200 "Metric-type": "E2",
1203 "External route count": 5
1206 result = verify_ospf_summary(tgen, topo, dut, input_dict)
1210 True or False (Error Message)
1213 logger
.debug("Entering lib API: verify_ospf_summary()")
1217 logger
.info("Verifying OSPF summary on router %s:", router
)
1219 if "ospf" not in topo
["routers"][dut
]:
1220 errormsg
= "[DUT: {}] OSPF is not configured on the router.".format(router
)
1223 rnode
= tgen
.routers()[dut
]
1224 show_ospf_json
= run_frr_cmd(rnode
, "show ip ospf summary detail json", isjson
=True)
1226 # Verifying output dictionary show_ospf_json is empty or not
1227 if not bool(show_ospf_json
):
1228 errormsg
= "OSPF is not running"
1231 # To find neighbor ip type
1232 ospf_summary_data
= input_dict
1233 for ospf_summ
, summ_data
in ospf_summary_data
.items():
1234 if ospf_summ
not in show_ospf_json
:
1236 summary
= ospf_summary_data
[ospf_summ
]["Summary address"]
1237 if summary
in show_ospf_json
:
1238 for summ
in summ_data
:
1239 if summ_data
[summ
] == show_ospf_json
[summary
][summ
]:
1241 "[DUT: %s] OSPF summary %s:%s is %s",
1250 "[DUT: {}] OSPF summary {}:{} is %s, "
1251 "Expected is {}".format(
1252 router
, summary
, summ
, show_ospf_json
[summary
][summ
]
1257 logger
.debug("Exiting API: verify_ospf_summary()")