]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/ospf.py
23b647d0941394418796f2c1036762d89cc4b070
[mirror_frr.git] / tests / topotests / lib / ospf.py
1 #
2 # Copyright (c) 2020 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 from copy import deepcopy
22 import traceback
23 from time import sleep
24 from lib.topolog import logger
25 import ipaddr
26 from lib.topotest import frr_unicode
27
28 # Import common_config to use commomnly used APIs
29 from lib.common_config import (
30 create_common_configuration,
31 InvalidCLIError,
32 retry,
33 generate_ips,
34 check_address_types,
35 validate_ip_address,
36 run_frr_cmd,
37 )
38
39 LOGDIR = "/tmp/topotests/"
40 TMPDIR = None
41
42 ################################
43 # Configure procs
44 ################################
45
46
47 def create_router_ospf(tgen, topo, input_dict=None, build=False, load_config=True):
48 """
49 API to configure ospf on router.
50
51 Parameters
52 ----------
53 * `tgen` : Topogen object
54 * `topo` : json file data
55 * `input_dict` : Input dict data, required when configuring from testcase
56 * `build` : Only for initial setup phase this is set as True.
57 * `load_config` : Loading the config to router this is set as True.
58
59 Usage
60 -----
61 input_dict = {
62 "r1": {
63 "ospf": {
64 "router_id": "22.22.22.22",
65 "area": [{ "id":0.0.0.0, "type": "nssa"}]
66 }
67 }
68
69 result = create_router_ospf(tgen, topo, input_dict)
70
71 Returns
72 -------
73 True or False
74 """
75 logger.debug("Entering lib API: create_router_ospf()")
76 result = False
77
78 if not input_dict:
79 input_dict = deepcopy(topo)
80 else:
81 topo = topo["routers"]
82 input_dict = deepcopy(input_dict)
83
84 for router in input_dict.keys():
85 if "ospf" not in input_dict[router]:
86 logger.debug("Router %s: 'ospf' not present in input_dict", router)
87 continue
88
89 result = __create_ospf_global(tgen, input_dict, router, build, load_config)
90 if result is True:
91 ospf_data = input_dict[router]["ospf"]
92
93 logger.debug("Exiting lib API: create_router_ospf()")
94 return result
95
96
97 def __create_ospf_global(tgen, input_dict, router, build=False, load_config=True):
98 """
99 Helper API to create ospf global configuration.
100
101 Parameters
102 ----------
103 * `tgen` : Topogen object
104 * `input_dict` : Input dict data, required when configuring from testcase
105 * `router` : router to be configured.
106 * `build` : Only for initial setup phase this is set as True.
107 * `load_config` : Loading the config to router this is set as True.
108
109 Returns
110 -------
111 True or False
112 """
113
114 result = False
115 logger.debug("Entering lib API: __create_ospf_global()")
116 try:
117
118 ospf_data = input_dict[router]["ospf"]
119 del_ospf_action = ospf_data.setdefault("delete", False)
120 if del_ospf_action:
121 config_data = ["no router ospf"]
122 result = create_common_configuration(
123 tgen, router, config_data, "ospf", build, load_config
124 )
125 return result
126
127 config_data = []
128 cmd = "router ospf"
129
130 config_data.append(cmd)
131
132 # router id
133 router_id = ospf_data.setdefault("router_id", None)
134 del_router_id = ospf_data.setdefault("del_router_id", False)
135 if del_router_id:
136 config_data.append("no ospf router-id")
137 if router_id:
138 config_data.append("ospf router-id {}".format(router_id))
139
140 # redistribute command
141 redistribute_data = ospf_data.setdefault("redistribute", {})
142 if redistribute_data:
143 for redistribute in redistribute_data:
144 if "redist_type" not in redistribute:
145 logger.debug(
146 "Router %s: 'redist_type' not present in " "input_dict", router
147 )
148 else:
149 cmd = "redistribute {}".format(redistribute["redist_type"])
150 for red_type in redistribute_data:
151 if "route_map" in red_type:
152 cmd = cmd + " route-map {}".format(red_type["route_map"])
153 del_action = redistribute.setdefault("delete", False)
154 if del_action:
155 cmd = "no {}".format(cmd)
156 config_data.append(cmd)
157 # area information
158 area_data = ospf_data.setdefault("area", {})
159 if area_data:
160 for area in area_data:
161 if "id" not in area:
162 logger.debug(
163 "Router %s: 'area id' not present in " "input_dict", router
164 )
165 else:
166 cmd = "area {}".format(area["id"])
167
168 if "type" in area:
169 cmd = cmd + " {}".format(area["type"])
170
171 del_action = area.setdefault("delete", False)
172 if del_action:
173 cmd = "no {}".format(cmd)
174 config_data.append(cmd)
175
176 # summary information
177 summary_data = ospf_data.setdefault("summary-address", {})
178 if summary_data:
179 for summary in summary_data:
180 if "prefix" not in summary:
181 logger.debug(
182 "Router %s: 'summary-address' not present in " "input_dict",
183 router,
184 )
185 else:
186 cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
187
188 _tag = summary.setdefault("tag", None)
189 if _tag:
190 cmd = "{} tag {}".format(cmd, _tag)
191
192 _advertise = summary.setdefault("advertise", True)
193 if not _advertise:
194 cmd = "{} no-advertise".format(cmd)
195
196 del_action = summary.setdefault("delete", False)
197 if del_action:
198 cmd = "no {}".format(cmd)
199 config_data.append(cmd)
200 result = create_common_configuration(
201 tgen, router, config_data, "ospf", build, load_config
202 )
203
204 except InvalidCLIError:
205 # Traceback
206 errormsg = traceback.format_exc()
207 logger.error(errormsg)
208 return errormsg
209
210 logger.debug("Exiting lib API: create_ospf_global()")
211 return result
212
213
214 def create_router_ospf6(tgen, topo, input_dict=None, build=False, load_config=True):
215 """
216 API to configure ospf on router
217
218 Parameters
219 ----------
220 * `tgen` : Topogen object
221 * `topo` : json file data
222 * `input_dict` : Input dict data, required when configuring from testcase
223 * `build` : Only for initial setup phase this is set as True.
224
225 Usage
226 -----
227 input_dict = {
228 "r1": {
229 "ospf6": {
230 "router_id": "22.22.22.22",
231 }
232 }
233
234 Returns
235 -------
236 True or False
237 """
238 logger.debug("Entering lib API: create_router_ospf()")
239 result = False
240
241 if not input_dict:
242 input_dict = deepcopy(topo)
243 else:
244 topo = topo["routers"]
245 input_dict = deepcopy(input_dict)
246 for router in input_dict.keys():
247 if "ospf" not in input_dict[router]:
248 logger.debug("Router %s: 'ospf' not present in input_dict", router)
249 continue
250
251 result = __create_ospf_global(tgen, input_dict, router, build, load_config)
252
253 logger.debug("Exiting lib API: create_router_ospf()")
254 return result
255
256
257 def __create_ospf6_global(tgen, input_dict, router, build=False, load_config=True):
258 """
259 Helper API to create ospf global configuration.
260
261 Parameters
262 ----------
263 * `tgen` : Topogen object
264 * `input_dict` : Input dict data, required when configuring from testcase
265 * `router` : router id to be configured.
266 * `build` : Only for initial setup phase this is set as True.
267
268 Returns
269 -------
270 True or False
271 """
272
273 result = False
274 logger.debug("Entering lib API: __create_ospf_global()")
275 try:
276
277 ospf_data = input_dict[router]["ospf6"]
278 del_ospf_action = ospf_data.setdefault("delete", False)
279 if del_ospf_action:
280 config_data = ["no ipv6 router ospf"]
281 result = create_common_configuration(
282 tgen, router, config_data, "ospf", build, load_config
283 )
284 return result
285
286 config_data = []
287 cmd = "router ospf"
288
289 config_data.append(cmd)
290
291 router_id = ospf_data.setdefault("router_id", None)
292 del_router_id = ospf_data.setdefault("del_router_id", False)
293 if del_router_id:
294 config_data.append("no ospf router-id")
295 if router_id:
296 config_data.append("ospf router-id {}".format(router_id))
297
298 result = create_common_configuration(
299 tgen, router, config_data, "ospf", build, load_config
300 )
301 except InvalidCLIError:
302 # Traceback
303 errormsg = traceback.format_exc()
304 logger.error(errormsg)
305 return errormsg
306
307 logger.debug("Exiting lib API: create_ospf_global()")
308 return result
309
310
311 def config_ospf_interface(tgen, topo, input_dict=None, build=False, load_config=True):
312 """
313 API to configure ospf on router.
314
315 Parameters
316 ----------
317 * `tgen` : Topogen object
318 * `topo` : json file data
319 * `input_dict` : Input dict data, required when configuring from testcase
320 * `build` : Only for initial setup phase this is set as True.
321 * `load_config` : Loading the config to router this is set as True.
322
323 Usage
324 -----
325 r1_ospf_auth = {
326 "r1": {
327 "links": {
328 "r2": {
329 "ospf": {
330 "authentication": 'message-digest',
331 "authentication-key": "ospf",
332 "message-digest-key": "10"
333 }
334 }
335 }
336 }
337 }
338 result = config_ospf_interface(tgen, topo, r1_ospf_auth)
339
340 Returns
341 -------
342 True or False
343 """
344 logger.debug("Enter lib config_ospf_interface")
345 if not input_dict:
346 input_dict = deepcopy(topo)
347 else:
348 input_dict = deepcopy(input_dict)
349 for router in input_dict.keys():
350 config_data = []
351 for lnk in input_dict[router]["links"].keys():
352 if "ospf" not in input_dict[router]["links"][lnk]:
353 logger.debug(
354 "Router %s: ospf configs is not present in"
355 "input_dict, passed input_dict",
356 router,
357 input_dict,
358 )
359 continue
360 ospf_data = input_dict[router]["links"][lnk]["ospf"]
361 data_ospf_area = ospf_data.setdefault("area", None)
362 data_ospf_auth = ospf_data.setdefault("authentication", None)
363 data_ospf_dr_priority = ospf_data.setdefault("priority", None)
364 data_ospf_cost = ospf_data.setdefault("cost", None)
365
366 try:
367 intf = topo["routers"][router]["links"][lnk]["interface"]
368 except KeyError:
369 intf = topo["switches"][router]["links"][lnk]["interface"]
370
371 # interface
372 cmd = "interface {}".format(intf)
373
374 config_data.append(cmd)
375 # interface area config
376 if data_ospf_area:
377 cmd = "ip ospf area {}".format(data_ospf_area)
378 config_data.append(cmd)
379 # interface ospf auth
380 if data_ospf_auth:
381 if data_ospf_auth == "null":
382 cmd = "ip ospf authentication null"
383 elif data_ospf_auth == "message-digest":
384 cmd = "ip ospf authentication message-digest"
385 else:
386 cmd = "ip ospf authentication"
387
388 if "del_action" in ospf_data:
389 cmd = "no {}".format(cmd)
390 config_data.append(cmd)
391
392 if "message-digest-key" in ospf_data:
393 cmd = "ip ospf message-digest-key {} md5 {}".format(
394 ospf_data["message-digest-key"], ospf_data["authentication-key"]
395 )
396 if "del_action" in ospf_data:
397 cmd = "no {}".format(cmd)
398 config_data.append(cmd)
399
400 if (
401 "authentication-key" in ospf_data
402 and "message-digest-key" not in ospf_data
403 ):
404 cmd = "ip ospf authentication-key {}".format(
405 ospf_data["authentication-key"]
406 )
407 if "del_action" in ospf_data:
408 cmd = "no {}".format(cmd)
409 config_data.append(cmd)
410
411 # interface ospf dr priority
412 if data_ospf_dr_priority in ospf_data:
413 cmd = "ip ospf priority {}".format(ospf_data["priority"])
414 if "del_action" in ospf_data:
415 cmd = "no {}".format(cmd)
416 config_data.append(cmd)
417
418 # interface ospf cost
419 if data_ospf_cost in ospf_data:
420 cmd = "ip ospf cost {}".format(ospf_data["cost"])
421 if "del_action" in ospf_data:
422 cmd = "no {}".format(cmd)
423 config_data.append(cmd)
424
425 if build:
426 return config_data
427 else:
428 result = create_common_configuration(
429 tgen, router, config_data, "interface_config", build=build
430 )
431 logger.debug("Exiting lib API: create_igmp_config()")
432 return result
433
434
435 def clear_ospf(tgen, router):
436 """
437 This API is to clear ospf neighborship by running
438 clear ip ospf interface * command,
439
440 Parameters
441 ----------
442 * `tgen`: topogen object
443 * `router`: device under test
444
445 Usage
446 -----
447 clear_ospf(tgen, "r1")
448 """
449
450 logger.debug("Entering lib API: clear_ospf()")
451 if router not in tgen.routers():
452 return False
453
454 rnode = tgen.routers()[router]
455
456 # Clearing OSPF
457 logger.info("Clearing ospf process for router %s..", router)
458
459 run_frr_cmd(rnode, "clear ip ospf interface ")
460
461 logger.debug("Exiting lib API: clear_ospf()")
462
463
464 ################################
465 # Verification procs
466 ################################
467 @retry(attempts=40, wait=2, return_is_str=True)
468 def verify_ospf_neighbor(tgen, topo, dut=None, input_dict=None, lan=False):
469 """
470 This API is to verify ospf neighborship by running
471 show ip ospf neighbour command,
472
473 Parameters
474 ----------
475 * `tgen` : Topogen object
476 * `topo` : json file data
477 * `dut`: device under test
478 * `input_dict` : Input dict data, required when configuring from testcase
479 * `lan` : verify neighbors in lan topology
480
481 Usage
482 -----
483 1. To check FULL neighbors.
484 verify_ospf_neighbor(tgen, topo, dut=dut)
485
486 2. To check neighbors with their roles.
487 input_dict = {
488 "r0": {
489 "ospf": {
490 "neighbors": {
491 "r1": {
492 "state": "Full",
493 "role": "DR"
494 },
495 "r2": {
496 "state": "Full",
497 "role": "DROther"
498 },
499 "r3": {
500 "state": "Full",
501 "role": "DROther"
502 }
503 }
504 }
505 }
506 }
507 result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
508
509 Returns
510 -------
511 True or False (Error Message)
512 """
513 logger.debug("Entering lib API: verify_ospf_neighbor()")
514 result = False
515 if input_dict:
516 for router, rnode in tgen.routers().items():
517 if "ospf" not in topo["routers"][router]:
518 continue
519
520 if dut is not None and dut != router:
521 continue
522
523 logger.info("Verifying OSPF neighborship on router %s:", router)
524 show_ospf_json = run_frr_cmd(
525 rnode, "show ip ospf neighbor all json", isjson=True
526 )
527
528 # Verifying output dictionary show_ospf_json is empty or not
529 if not bool(show_ospf_json):
530 errormsg = "OSPF is not running"
531 return errormsg
532
533 ospf_data_list = input_dict[router]["ospf"]
534 ospf_nbr_list = ospf_data_list["neighbors"]
535
536 for ospf_nbr, nbr_data in ospf_nbr_list.items():
537 data_ip = topo["routers"][ospf_nbr]["links"]
538 data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
539 if ospf_nbr in data_ip:
540 nbr_details = nbr_data[ospf_nbr]
541 elif lan:
542 for switch in topo["switches"]:
543 if "ospf" in topo["switches"][switch]["links"][router]:
544 neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
545 else:
546 continue
547 else:
548 neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
549
550 nh_state = None
551 neighbor_ip = neighbor_ip.lower()
552 nbr_rid = data_rid
553 try:
554 nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0]
555 intf_state = show_ospf_json[nbr_rid][0]["state"].split("/")[1]
556 except KeyError:
557 errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid)
558 return errormsg
559
560 nbr_state = nbr_data.setdefault("state", None)
561 nbr_role = nbr_data.setdefault("role", None)
562
563 if nbr_state:
564 if nbr_state == nh_state:
565 logger.info(
566 "[DUT: {}] OSPF Nbr is {}:{} State {}".format(
567 router, ospf_nbr, nbr_rid, nh_state
568 )
569 )
570 result = True
571 else:
572 errormsg = (
573 "[DUT: {}] OSPF is not Converged, neighbor"
574 " state is {}".format(router, nh_state)
575 )
576 return errormsg
577 if nbr_role:
578 if nbr_role == intf_state:
579 logger.info(
580 "[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
581 router, ospf_nbr, nbr_rid, nbr_role
582 )
583 )
584 else:
585 errormsg = (
586 "[DUT: {}] OSPF is not Converged with rid"
587 "{}, role is {}".format(router, nbr_rid, intf_state)
588 )
589 return errormsg
590 continue
591 else:
592 for router, rnode in tgen.routers().items():
593 if "ospf" not in topo["routers"][router]:
594 continue
595
596 if dut is not None and dut != router:
597 continue
598
599 logger.info("Verifying OSPF neighborship on router %s:", router)
600 show_ospf_json = run_frr_cmd(
601 rnode, "show ip ospf neighbor all json", isjson=True
602 )
603 # Verifying output dictionary show_ospf_json is empty or not
604 if not bool(show_ospf_json):
605 errormsg = "OSPF is not running"
606 return errormsg
607
608 ospf_data_list = topo["routers"][router]["ospf"]
609 ospf_neighbors = ospf_data_list["neighbors"]
610 total_peer = 0
611 total_peer = len(ospf_neighbors.keys())
612 no_of_ospf_nbr = 0
613 ospf_nbr_list = ospf_data_list["neighbors"]
614 no_of_peer = 0
615 for ospf_nbr, nbr_data in ospf_nbr_list.items():
616 if nbr_data:
617 data_ip = topo["routers"][nbr_data["nbr"]]["links"]
618 data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"]
619 else:
620 data_ip = topo["routers"][ospf_nbr]["links"]
621 data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
622 if ospf_nbr in data_ip:
623 nbr_details = nbr_data[ospf_nbr]
624 elif lan:
625 for switch in topo["switches"]:
626 if "ospf" in topo["switches"][switch]["links"][router]:
627 neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
628 else:
629 continue
630 else:
631 neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
632
633 nh_state = None
634 neighbor_ip = neighbor_ip.lower()
635 nbr_rid = data_rid
636 try:
637 nh_state = show_ospf_json[nbr_rid][0]["state"].split("/")[0]
638 except KeyError:
639 errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
640 router, nbr_rid, ospf_nbr
641 )
642 return errormsg
643
644 if nh_state == "Full":
645 no_of_peer += 1
646
647 if no_of_peer == total_peer:
648 logger.info("[DUT: {}] OSPF is Converged".format(router))
649 result = True
650 else:
651 errormsg = "[DUT: {}] OSPF is not Converged".format(router)
652 return errormsg
653
654 logger.debug("Exiting API: verify_ospf_neighbor()")
655 return result
656
657
658 @retry(attempts=21, wait=2, return_is_str=True)
659 def verify_ospf_rib(
660 tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
661 ):
662 """
663 This API is to verify ospf routes by running
664 show ip ospf route command.
665
666 Parameters
667 ----------
668 * `tgen` : Topogen object
669 * `dut`: device under test
670 * `input_dict` : Input dict data, required when configuring from testcase
671 * `next_hop` : next to be verified
672 * `tag` : tag to be verified
673 * `metric` : metric to be verified
674 * `fib` : True if the route is installed in FIB.
675
676 Usage
677 -----
678 input_dict = {
679 "r1": {
680 "static_routes": [
681 {
682 "network": ip_net,
683 "no_of_ip": 1,
684 "routeType": "N"
685 }
686 ]
687 }
688 }
689
690 result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
691
692 Returns
693 -------
694 True or False (Error Message)
695 """
696
697 logger.info("Entering lib API: verify_ospf_rib()")
698 result = False
699 router_list = tgen.routers()
700 additional_nexthops_in_required_nhs = []
701 found_hops = []
702 for routerInput in input_dict.keys():
703 for router, rnode in router_list.items():
704 if router != dut:
705 continue
706
707 logger.info("Checking router %s RIB:", router)
708
709 # Verifying RIB routes
710 command = "show ip ospf route"
711
712 found_routes = []
713 missing_routes = []
714
715 if (
716 "static_routes" in input_dict[routerInput]
717 or "prefix" in input_dict[routerInput]
718 ):
719 if "prefix" in input_dict[routerInput]:
720 static_routes = input_dict[routerInput]["prefix"]
721 else:
722 static_routes = input_dict[routerInput]["static_routes"]
723
724 for static_route in static_routes:
725 cmd = "{}".format(command)
726
727 cmd = "{} json".format(cmd)
728
729 ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
730
731 # Verifying output dictionary ospf_rib_json is not empty
732 if bool(ospf_rib_json) is False:
733 errormsg = (
734 "[DUT: {}] No routes found in OSPF route "
735 "table".format(router)
736 )
737 return errormsg
738
739 network = static_route["network"]
740 no_of_ip = static_route.setdefault("no_of_ip", 1)
741 _tag = static_route.setdefault("tag", None)
742 _rtype = static_route.setdefault("routeType", None)
743
744 # Generating IPs for verification
745 ip_list = generate_ips(network, no_of_ip)
746 st_found = False
747 nh_found = False
748
749 for st_rt in ip_list:
750 st_rt = str(ipaddr.IPNetwork(frr_unicode(st_rt)))
751
752 _addr_type = validate_ip_address(st_rt)
753 if _addr_type != "ipv4":
754 continue
755
756 if st_rt in ospf_rib_json:
757 st_found = True
758 found_routes.append(st_rt)
759
760 if fib and next_hop:
761 if type(next_hop) is not list:
762 next_hop = [next_hop]
763
764 for mnh in range(0, len(ospf_rib_json[st_rt])):
765 if (
766 "fib"
767 in ospf_rib_json[st_rt][mnh]["nexthops"][0]
768 ):
769 found_hops.append(
770 [
771 rib_r["ip"]
772 for rib_r in ospf_rib_json[st_rt][mnh][
773 "nexthops"
774 ]
775 ]
776 )
777
778 if found_hops[0]:
779 missing_list_of_nexthops = set(
780 found_hops[0]
781 ).difference(next_hop)
782 additional_nexthops_in_required_nhs = set(
783 next_hop
784 ).difference(found_hops[0])
785
786 if additional_nexthops_in_required_nhs:
787 logger.info(
788 "Nexthop "
789 "%s is not active for route %s in "
790 "RIB of router %s\n",
791 additional_nexthops_in_required_nhs,
792 st_rt,
793 dut,
794 )
795 errormsg = (
796 "Nexthop {} is not active"
797 " for route {} in RIB of router"
798 " {}\n".format(
799 additional_nexthops_in_required_nhs,
800 st_rt,
801 dut,
802 )
803 )
804 return errormsg
805 else:
806 nh_found = True
807
808 elif next_hop and fib is None:
809 if type(next_hop) is not list:
810 next_hop = [next_hop]
811 found_hops = [
812 rib_r["ip"]
813 for rib_r in ospf_rib_json[st_rt]["nexthops"]
814 ]
815
816 if found_hops:
817 missing_list_of_nexthops = set(
818 found_hops
819 ).difference(next_hop)
820 additional_nexthops_in_required_nhs = set(
821 next_hop
822 ).difference(found_hops)
823
824 if additional_nexthops_in_required_nhs:
825 logger.info(
826 "Missing nexthop %s for route"
827 " %s in RIB of router %s\n",
828 additional_nexthops_in_required_nhs,
829 st_rt,
830 dut,
831 )
832 errormsg = (
833 "Nexthop {} is Missing for "
834 "route {} in RIB of router {}\n".format(
835 additional_nexthops_in_required_nhs,
836 st_rt,
837 dut,
838 )
839 )
840 return errormsg
841 else:
842 nh_found = True
843 if _rtype:
844 if "routeType" not in ospf_rib_json[st_rt]:
845 errormsg = (
846 "[DUT: {}]: routeType missing"
847 "for route {} in OSPF RIB \n".format(dut, st_rt)
848 )
849 return errormsg
850 elif _rtype != ospf_rib_json[st_rt]["routeType"]:
851 errormsg = (
852 "[DUT: {}]: routeType mismatch"
853 "for route {} in OSPF RIB \n".format(dut, st_rt)
854 )
855 return errormsg
856 else:
857 logger.info(
858 "DUT: {}]: Found routeType {}"
859 "for route {}".format(dut, _rtype, st_rt)
860 )
861 if tag:
862 if "tag" not in ospf_rib_json[st_rt]:
863 errormsg = (
864 "[DUT: {}]: tag is not"
865 " present for"
866 " route {} in RIB \n".format(dut, st_rt)
867 )
868 return errormsg
869
870 if _tag != ospf_rib_json[st_rt]["tag"]:
871 errormsg = (
872 "[DUT: {}]: tag value {}"
873 " is not matched for"
874 " route {} in RIB \n".format(
875 dut,
876 _tag,
877 st_rt,
878 )
879 )
880 return errormsg
881
882 if metric is not None:
883 if "type2cost" not in ospf_rib_json[st_rt]:
884 errormsg = (
885 "[DUT: {}]: metric is"
886 " not present for"
887 " route {} in RIB \n".format(dut, st_rt)
888 )
889 return errormsg
890
891 if metric != ospf_rib_json[st_rt]["type2cost"]:
892 errormsg = (
893 "[DUT: {}]: metric value "
894 "{} is not matched for "
895 "route {} in RIB \n".format(
896 dut,
897 metric,
898 st_rt,
899 )
900 )
901 return errormsg
902
903 else:
904 missing_routes.append(st_rt)
905
906 if nh_found:
907 logger.info(
908 "[DUT: {}]: Found next_hop {} for all OSPF"
909 " routes in RIB".format(router, next_hop)
910 )
911
912 if len(missing_routes) > 0:
913 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
914 dut, missing_routes
915 )
916 return errormsg
917
918 if found_routes:
919 logger.info(
920 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
921 dut,
922 found_routes,
923 )
924 result = True
925
926 logger.info("Exiting lib API: verify_ospf_rib()")
927 return result
928
929
930 @retry(attempts=10, wait=2, return_is_str=True)
931 def verify_ospf_interface(tgen, topo, dut=None, lan=False, input_dict=None):
932 """
933 This API is to verify ospf routes by running
934 show ip ospf interface command.
935
936 Parameters
937 ----------
938 * `tgen` : Topogen object
939 * `topo` : topology descriptions
940 * `dut`: device under test
941 * `lan`: if set to true this interface belongs to LAN.
942 * `input_dict` : Input dict data, required when configuring from testcase
943
944 Usage
945 -----
946 input_dict= {
947 'r0': {
948 'links':{
949 's1': {
950 'ospf':{
951 'priority':98,
952 'timerDeadSecs': 4,
953 'area': '0.0.0.3',
954 'mcastMemberOspfDesignatedRouters': True,
955 'mcastMemberOspfAllRouters': True,
956 'ospfEnabled': True,
957
958 }
959 }
960 }
961 }
962 }
963 result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
964
965 Returns
966 -------
967 True or False (Error Message)
968 """
969
970 logger.debug("Entering lib API: verify_ospf_interface()")
971 result = False
972 for router, rnode in tgen.routers().items():
973 if "ospf" not in topo["routers"][router]:
974 continue
975
976 if dut is not None and dut != router:
977 continue
978
979 logger.info("Verifying OSPF interface on router %s:", router)
980 show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True)
981
982 # Verifying output dictionary show_ospf_json is empty or not
983 if not bool(show_ospf_json):
984 errormsg = "OSPF is not running"
985 return errormsg
986
987 # To find neighbor ip type
988 ospf_intf_data = input_dict[router]["links"]
989 for ospf_intf, intf_data in ospf_intf_data.items():
990 intf = topo["routers"][router]["links"][ospf_intf]["interface"]
991 if intf in show_ospf_json["interfaces"]:
992 for intf_attribute in intf_data["ospf"]:
993 if (
994 intf_data["ospf"][intf_attribute]
995 == show_ospf_json["interfaces"][intf][intf_attribute]
996 ):
997 logger.info(
998 "[DUT: %s] OSPF interface %s: %s is %s",
999 router,
1000 intf,
1001 intf_attribute,
1002 intf_data["ospf"][intf_attribute],
1003 )
1004 else:
1005 errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \
1006 Expected is {}".format(
1007 router,
1008 intf,
1009 intf_attribute,
1010 intf_data["ospf"][intf_attribute],
1011 show_ospf_json["interfaces"][intf][intf_attribute],
1012 )
1013 return errormsg
1014 result = True
1015 logger.debug("Exiting API: verify_ospf_interface()")
1016 return result
1017
1018
1019 @retry(attempts=11, wait=2, return_is_str=True)
1020 def verify_ospf_database(tgen, topo, dut, input_dict):
1021 """
1022 This API is to verify ospf lsa's by running
1023 show ip ospf database command.
1024
1025 Parameters
1026 ----------
1027 * `tgen` : Topogen object
1028 * `dut`: device under test
1029 * `input_dict` : Input dict data, required when configuring from testcase
1030 * `topo` : next to be verified
1031
1032 Usage
1033 -----
1034 input_dict = {
1035 "areas": {
1036 "0.0.0.0": {
1037 "Router Link States": {
1038 "100.1.1.0-100.1.1.0": {
1039 "LSID": "100.1.1.0",
1040 "Advertised router": "100.1.1.0",
1041 "LSA Age": 130,
1042 "Sequence Number": "80000006",
1043 "Checksum": "a703",
1044 "Router links": 3
1045 }
1046 },
1047 "Net Link States": {
1048 "10.0.0.2-100.1.1.1": {
1049 "LSID": "10.0.0.2",
1050 "Advertised router": "100.1.1.1",
1051 "LSA Age": 137,
1052 "Sequence Number": "80000001",
1053 "Checksum": "9583"
1054 }
1055 },
1056 },
1057 }
1058 }
1059 result = verify_ospf_database(tgen, topo, dut, input_dict)
1060
1061 Returns
1062 -------
1063 True or False (Error Message)
1064 """
1065
1066 result = False
1067 router = dut
1068 logger.debug("Entering lib API: verify_ospf_database()")
1069
1070 if "ospf" not in topo["routers"][dut]:
1071 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
1072 return errormsg
1073
1074 rnode = tgen.routers()[dut]
1075
1076 logger.info("Verifying OSPF interface on router %s:", dut)
1077 show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
1078 # Verifying output dictionary show_ospf_json is empty or not
1079 if not bool(show_ospf_json):
1080 errormsg = "OSPF is not running"
1081 return errormsg
1082
1083 # for inter and inter lsa's
1084 ospf_db_data = input_dict.setdefault("areas", None)
1085 ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
1086 if ospf_db_data:
1087 for ospf_area, area_lsa in ospf_db_data.items():
1088 if ospf_area in show_ospf_json["areas"]:
1089 if "Router Link States" in area_lsa:
1090 for lsa in area_lsa["Router Link States"]:
1091 if (
1092 lsa
1093 in show_ospf_json["areas"][ospf_area]["Router Link States"]
1094 ):
1095 logger.info(
1096 "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
1097 router,
1098 ospf_area,
1099 lsa,
1100 )
1101 result = True
1102 else:
1103 errormsg = (
1104 "[DUT: {}] OSPF LSDB area {}: expected"
1105 " Router LSA is {}".format(router, ospf_area, lsa)
1106 )
1107 return errormsg
1108 if "Net Link States" in area_lsa:
1109 for lsa in area_lsa["Net Link States"]:
1110 if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]:
1111 logger.info(
1112 "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
1113 router,
1114 ospf_area,
1115 lsa,
1116 )
1117 result = True
1118 else:
1119 errormsg = (
1120 "[DUT: {}] OSPF LSDB area {}: expected"
1121 " Network LSA is {}".format(router, ospf_area, lsa)
1122 )
1123 return errormsg
1124 if "Summary Link States" in area_lsa:
1125 for lsa in area_lsa["Summary Link States"]:
1126 if (
1127 lsa
1128 in show_ospf_json["areas"][ospf_area]["Summary Link States"]
1129 ):
1130 logger.info(
1131 "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
1132 router,
1133 ospf_area,
1134 lsa,
1135 )
1136 result = True
1137 else:
1138 errormsg = (
1139 "[DUT: {}] OSPF LSDB area {}: expected"
1140 " Summary LSA is {}".format(router, ospf_area, lsa)
1141 )
1142 return errormsg
1143 if "ASBR-Summary Link States" in area_lsa:
1144 for lsa in area_lsa["ASBR-Summary Link States"]:
1145 if (
1146 lsa
1147 in show_ospf_json["areas"][ospf_area][
1148 "ASBR-Summary Link States"
1149 ]
1150 ):
1151 logger.info(
1152 "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
1153 router,
1154 ospf_area,
1155 lsa,
1156 )
1157 result = True
1158 else:
1159 errormsg = (
1160 "[DUT: {}] OSPF LSDB area {}: expected"
1161 " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
1162 )
1163 return errormsg
1164 if ospf_external_lsa:
1165 for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
1166 if ospf_ext_lsa in show_ospf_json["AS External Link States"]:
1167 logger.info(
1168 "[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa
1169 )
1170 result = True
1171 else:
1172 errormsg = (
1173 "[DUT: {}] OSPF LSDB : expected"
1174 " External LSA is {}".format(router, ospf_ext_lsa)
1175 )
1176 return errormsg
1177
1178 logger.debug("Exiting API: verify_ospf_database()")
1179 return result
1180
1181
1182 @retry(attempts=10, wait=2, return_is_str=True)
1183 def verify_ospf_summary(tgen, topo, dut, input_dict):
1184 """
1185 This API is to verify ospf routes by running
1186 show ip ospf interface command.
1187
1188 Parameters
1189 ----------
1190 * `tgen` : Topogen object
1191 * `topo` : topology descriptions
1192 * `dut`: device under test
1193 * `input_dict` : Input dict data, required when configuring from testcase
1194
1195 Usage
1196 -----
1197 input_dict = {
1198 "11.0.0.0/8": {
1199 "Summary address": "11.0.0.0/8",
1200 "Metric-type": "E2",
1201 "Metric": 20,
1202 "Tag": 0,
1203 "External route count": 5
1204 }
1205 }
1206 result = verify_ospf_summary(tgen, topo, dut, input_dict)
1207
1208 Returns
1209 -------
1210 True or False (Error Message)
1211 """
1212
1213 logger.debug("Entering lib API: verify_ospf_summary()")
1214 result = False
1215 router = dut
1216
1217 logger.info("Verifying OSPF summary on router %s:", router)
1218
1219 if "ospf" not in topo["routers"][dut]:
1220 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
1221 return errormsg
1222
1223 rnode = tgen.routers()[dut]
1224 show_ospf_json = run_frr_cmd(rnode, "show ip ospf summary detail json", isjson=True)
1225
1226 # Verifying output dictionary show_ospf_json is empty or not
1227 if not bool(show_ospf_json):
1228 errormsg = "OSPF is not running"
1229 return errormsg
1230
1231 # To find neighbor ip type
1232 ospf_summary_data = input_dict
1233 for ospf_summ, summ_data in ospf_summary_data.items():
1234 if ospf_summ not in show_ospf_json:
1235 continue
1236 summary = ospf_summary_data[ospf_summ]["Summary address"]
1237 if summary in show_ospf_json:
1238 for summ in summ_data:
1239 if summ_data[summ] == show_ospf_json[summary][summ]:
1240 logger.info(
1241 "[DUT: %s] OSPF summary %s:%s is %s",
1242 router,
1243 summary,
1244 summ,
1245 summ_data[summ],
1246 )
1247 result = True
1248 else:
1249 errormsg = (
1250 "[DUT: {}] OSPF summary {}:{} is %s, "
1251 "Expected is {}".format(
1252 router, summary, summ, show_ospf_json[summary][summ]
1253 )
1254 )
1255 return errormsg
1256
1257 logger.debug("Exiting API: verify_ospf_summary()")
1258 return result