]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/ospf.py
pimd: Use macro for pimreg interface
[mirror_frr.git] / tests / topotests / lib / ospf.py
1 # SPDX-License-Identifier: ISC
2 #
3 # Copyright (c) 2020 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
6 #
7
8 import ipaddress
9 import sys
10 from copy import deepcopy
11 from time import sleep
12
13 # Import common_config to use commomnly used APIs
14 from lib.common_config import (
15 create_common_configurations,
16 InvalidCLIError,
17 generate_ips,
18 retry,
19 run_frr_cmd,
20 validate_ip_address,
21 )
22 from lib.topolog import logger
23 from lib.topotest import frr_unicode
24
25 ################################
26 # Configure procs
27 ################################
28
29
30 def create_router_ospf(tgen, topo=None, input_dict=None, build=False, load_config=True):
31 """
32 API to configure ospf on router.
33
34 Parameters
35 ----------
36 * `tgen` : Topogen object
37 * `topo` : json file data
38 * `input_dict` : Input dict data, required when configuring from testcase
39 * `build` : Only for initial setup phase this is set as True.
40 * `load_config` : Loading the config to router this is set as True.
41
42 Usage
43 -----
44 input_dict = {
45 "r1": {
46 "ospf": {
47 "router_id": "22.22.22.22",
48 "area": [{ "id": "0.0.0.0", "type": "nssa"}]
49 }
50 }
51
52 result = create_router_ospf(tgen, topo, input_dict)
53
54 Returns
55 -------
56 True or False
57 """
58 logger.debug("Entering lib API: create_router_ospf()")
59 result = False
60
61 if topo is None:
62 topo = tgen.json_topo
63
64 if not input_dict:
65 input_dict = deepcopy(topo)
66 else:
67 topo = topo["routers"]
68 input_dict = deepcopy(input_dict)
69
70 for ospf in ["ospf", "ospf6"]:
71 config_data_dict = {}
72
73 for router in input_dict.keys():
74 if ospf not in input_dict[router]:
75 logger.debug("Router %s: %s not present in input_dict", router, ospf)
76 continue
77
78 config_data = __create_ospf_global(
79 tgen, input_dict, router, build, load_config, ospf
80 )
81 if config_data:
82 if router not in config_data_dict:
83 config_data_dict[router] = config_data
84 else:
85 config_data_dict[router].extend(config_data)
86 try:
87 result = create_common_configurations(
88 tgen, config_data_dict, ospf, build, load_config
89 )
90 except InvalidCLIError:
91 logger.error("create_router_ospf (ipv4)", exc_info=True)
92 result = False
93
94 logger.debug("Exiting lib API: create_router_ospf()")
95 return result
96
97
98 def __create_ospf_global(tgen, input_dict, router, build, load_config, ospf):
99 """
100 Helper API to create ospf global configuration.
101
102 Parameters
103 ----------
104 * `tgen` : Topogen object
105 * `input_dict` : Input dict data, required when configuring from testcase
106 * `router` : router to be configured.
107 * `build` : Only for initial setup phase this is set as True.
108 * `load_config` : Loading the config to router this is set as True.
109 * `ospf` : either 'ospf' or 'ospf6'
110
111 Usage
112 -----
113 input_dict = {
114 "routers": {
115 "r1": {
116 "links": {
117 "r3": {
118 "ipv6": "2013:13::1/64",
119 "ospf6": {
120 "hello_interval": 1,
121 "dead_interval": 4,
122 "network": "point-to-point"
123 }
124 }
125 },
126 "ospf6": {
127 "router_id": "1.1.1.1",
128 "neighbors": {
129 "r3": {
130 "area": "1.1.1.1"
131 }
132 }
133 }
134 }
135 }
136
137 Returns
138 -------
139 list of configuration commands
140 """
141
142 config_data = []
143
144 if ospf not in input_dict[router]:
145 return config_data
146
147 logger.debug("Entering lib API: __create_ospf_global()")
148
149 ospf_data = input_dict[router][ospf]
150 del_ospf_action = ospf_data.setdefault("delete", False)
151 if del_ospf_action:
152 config_data = ["no router {}".format(ospf)]
153 return config_data
154
155 cmd = "router {}".format(ospf)
156
157 config_data.append(cmd)
158
159 # router id
160 router_id = ospf_data.setdefault("router_id", None)
161 del_router_id = ospf_data.setdefault("del_router_id", False)
162 if del_router_id:
163 config_data.append("no {} router-id".format(ospf))
164 if router_id:
165 config_data.append("{} router-id {}".format(ospf, router_id))
166
167 # log-adjacency-changes
168 log_adj_changes = ospf_data.setdefault("log_adj_changes", None)
169 del_log_adj_changes = ospf_data.setdefault("del_log_adj_changes", False)
170 if del_log_adj_changes:
171 config_data.append("no log-adjacency-changes detail")
172 if log_adj_changes:
173 config_data.append("log-adjacency-changes {}".format(log_adj_changes))
174
175 # aggregation timer
176 aggr_timer = ospf_data.setdefault("aggr_timer", None)
177 del_aggr_timer = ospf_data.setdefault("del_aggr_timer", False)
178 if del_aggr_timer:
179 config_data.append("no aggregation timer")
180 if aggr_timer:
181 config_data.append("aggregation timer {}".format(aggr_timer))
182
183 # maximum path information
184 ecmp_data = ospf_data.setdefault("maximum-paths", {})
185 if ecmp_data:
186 cmd = "maximum-paths {}".format(ecmp_data)
187 del_action = ospf_data.setdefault("del_max_path", False)
188 if del_action:
189 cmd = "no maximum-paths"
190 config_data.append(cmd)
191
192 # Flood reduction.
193 flood_data = ospf_data.setdefault("flood-reduction", {})
194 if flood_data:
195 cmd = "flood-reduction"
196 del_action = ospf_data.setdefault("del_flood_reduction", False)
197 if del_action:
198 cmd = "no flood-reduction"
199 config_data.append(cmd)
200
201 # LSA refresh timer - A hidden command.
202 refresh_data = ospf_data.setdefault("lsa-refresh", {})
203 if refresh_data:
204 cmd = "ospf lsa-refresh {}".format(refresh_data)
205 del_action = ospf_data.setdefault("del_lsa_refresh", False)
206 if del_action:
207 cmd = "no ospf lsa-refresh"
208 config_data.append(cmd)
209
210 # redistribute command
211 redistribute_data = ospf_data.setdefault("redistribute", {})
212 if redistribute_data:
213 for redistribute in redistribute_data:
214 if "redist_type" not in redistribute:
215 logger.debug(
216 "Router %s: 'redist_type' not present in " "input_dict", router
217 )
218 else:
219 cmd = "redistribute {}".format(redistribute["redist_type"])
220 for red_type in redistribute_data:
221 if "route_map" in red_type:
222 cmd = cmd + " route-map {}".format(red_type["route_map"])
223 del_action = redistribute.setdefault("delete", False)
224 if del_action:
225 cmd = "no {}".format(cmd)
226 config_data.append(cmd)
227
228 # area information
229 area_data = ospf_data.setdefault("area", {})
230 if area_data:
231 for area in area_data:
232 if "id" not in area:
233 logger.debug(
234 "Router %s: 'area id' not present in " "input_dict", router
235 )
236 else:
237 cmd = "area {}".format(area["id"])
238
239 if "type" in area:
240 cmd = cmd + " {}".format(area["type"])
241
242 if "flood-reduction" in area:
243 cmd = cmd + " flood-reduction"
244
245 del_action = area.setdefault("delete", False)
246 if del_action:
247 cmd = "no {}".format(cmd)
248 config_data.append(cmd)
249
250 # def route information
251 def_rte_data = ospf_data.setdefault("default-information", {})
252 if def_rte_data:
253 if "originate" not in def_rte_data:
254 logger.debug(
255 "Router %s: 'originate key' not present in " "input_dict", router
256 )
257 else:
258 cmd = "default-information originate"
259
260 if "always" in def_rte_data:
261 cmd = cmd + " always"
262
263 if "metric" in def_rte_data:
264 cmd = cmd + " metric {}".format(def_rte_data["metric"])
265
266 if "metric-type" in def_rte_data:
267 cmd = cmd + " metric-type {}".format(def_rte_data["metric-type"])
268
269 if "route-map" in def_rte_data:
270 cmd = cmd + " route-map {}".format(def_rte_data["route-map"])
271
272 del_action = def_rte_data.setdefault("delete", False)
273 if del_action:
274 cmd = "no {}".format(cmd)
275 config_data.append(cmd)
276
277 # summary information
278 summary_data = ospf_data.setdefault("summary-address", {})
279 if summary_data:
280 for summary in summary_data:
281 if "prefix" not in summary:
282 logger.debug(
283 "Router %s: 'summary-address' not present in " "input_dict",
284 router,
285 )
286 else:
287 cmd = "summary {}/{}".format(summary["prefix"], summary["mask"])
288
289 _tag = summary.setdefault("tag", None)
290 if _tag:
291 cmd = "{} tag {}".format(cmd, _tag)
292
293 _advertise = summary.setdefault("advertise", True)
294 if not _advertise:
295 cmd = "{} no-advertise".format(cmd)
296
297 del_action = summary.setdefault("delete", False)
298 if del_action:
299 cmd = "no {}".format(cmd)
300 config_data.append(cmd)
301
302 # ospf gr information
303 gr_data = ospf_data.setdefault("graceful-restart", {})
304 if gr_data:
305
306 if "opaque" in gr_data and gr_data["opaque"]:
307 cmd = "capability opaque"
308 if gr_data.setdefault("delete", False):
309 cmd = "no {}".format(cmd)
310 config_data.append(cmd)
311
312 if "helper enable" in gr_data and not gr_data["helper enable"]:
313 cmd = "graceful-restart helper enable"
314 if gr_data.setdefault("delete", False):
315 cmd = "no {}".format(cmd)
316 config_data.append(cmd)
317 elif "helper enable" in gr_data and type(gr_data["helper enable"]) is list:
318 for rtrs in gr_data["helper enable"]:
319 cmd = "graceful-restart helper enable {}".format(rtrs)
320 if gr_data.setdefault("delete", False):
321 cmd = "no {}".format(cmd)
322 config_data.append(cmd)
323
324 if "helper" in gr_data:
325 if type(gr_data["helper"]) is not list:
326 gr_data["helper"] = list(gr_data["helper"])
327 for helper_role in gr_data["helper"]:
328 cmd = "graceful-restart helper {}".format(helper_role)
329 if gr_data.setdefault("delete", False):
330 cmd = "no {}".format(cmd)
331 config_data.append(cmd)
332
333 if "supported-grace-time" in gr_data:
334 cmd = "graceful-restart helper supported-grace-time {}".format(
335 gr_data["supported-grace-time"]
336 )
337 if gr_data.setdefault("delete", False):
338 cmd = "no {}".format(cmd)
339 config_data.append(cmd)
340
341 logger.debug("Exiting lib API: create_ospf_global()")
342
343 return config_data
344
345
346 def config_ospf_interface(
347 tgen, topo=None, input_dict=None, build=False, load_config=True
348 ):
349 """
350 API to configure ospf on router.
351
352 Parameters
353 ----------
354 * `tgen` : Topogen object
355 * `topo` : json file data
356 * `input_dict` : Input dict data, required when configuring from testcase
357 * `build` : Only for initial setup phase this is set as True.
358 * `load_config` : Loading the config to router this is set as True.
359
360 Usage
361 -----
362 r1_ospf_auth = {
363 "r1": {
364 "links": {
365 "r2": {
366 "ospf": {
367 "authentication": "message-digest",
368 "authentication-key": "ospf",
369 "message-digest-key": "10"
370 }
371 }
372 }
373 }
374 }
375 result = config_ospf_interface(tgen, topo, r1_ospf_auth)
376
377 Returns
378 -------
379 True or False
380 """
381 logger.debug("Enter lib config_ospf_interface")
382 result = False
383
384 if topo is None:
385 topo = tgen.json_topo
386
387 if not input_dict:
388 input_dict = deepcopy(topo)
389 else:
390 input_dict = deepcopy(input_dict)
391
392 config_data_dict = {}
393
394 for router in input_dict.keys():
395 config_data = []
396 for lnk in input_dict[router]["links"].keys():
397 if "ospf" not in input_dict[router]["links"][lnk]:
398 logger.debug(
399 "Router %s: ospf config is not present in" "input_dict", router
400 )
401 continue
402 ospf_data = input_dict[router]["links"][lnk]["ospf"]
403 data_ospf_area = ospf_data.setdefault("area", None)
404 data_ospf_auth = ospf_data.setdefault("authentication", None)
405 data_ospf_dr_priority = ospf_data.setdefault("priority", None)
406 data_ospf_cost = ospf_data.setdefault("cost", None)
407 data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
408
409 try:
410 intf = topo["routers"][router]["links"][lnk]["interface"]
411 except KeyError:
412 intf = topo["switches"][router]["links"][lnk]["interface"]
413
414 # interface
415 cmd = "interface {}".format(intf)
416
417 config_data.append(cmd)
418 # interface area config
419 if data_ospf_area:
420 cmd = "ip ospf area {}".format(data_ospf_area)
421 config_data.append(cmd)
422
423 # interface ospf auth
424 if data_ospf_auth:
425 if data_ospf_auth == "null":
426 cmd = "ip ospf authentication null"
427 elif data_ospf_auth == "message-digest":
428 cmd = "ip ospf authentication message-digest"
429 else:
430 cmd = "ip ospf authentication"
431
432 if "del_action" in ospf_data:
433 cmd = "no {}".format(cmd)
434 config_data.append(cmd)
435
436 if "message-digest-key" in ospf_data:
437 cmd = "ip ospf message-digest-key {} md5 {}".format(
438 ospf_data["message-digest-key"], ospf_data["authentication-key"]
439 )
440 if "del_action" in ospf_data:
441 cmd = "no {}".format(cmd)
442 config_data.append(cmd)
443
444 if (
445 "authentication-key" in ospf_data
446 and "message-digest-key" not in ospf_data
447 ):
448 cmd = "ip ospf authentication-key {}".format(
449 ospf_data["authentication-key"]
450 )
451 if "del_action" in ospf_data:
452 cmd = "no {}".format(cmd)
453 config_data.append(cmd)
454
455 # interface ospf dr priority
456 if data_ospf_dr_priority:
457 cmd = "ip ospf priority {}".format(ospf_data["priority"])
458 if "del_action" in ospf_data:
459 cmd = "no {}".format(cmd)
460 config_data.append(cmd)
461
462 # interface ospf cost
463 if data_ospf_cost:
464 cmd = "ip ospf cost {}".format(ospf_data["cost"])
465 if "del_action" in ospf_data:
466 cmd = "no {}".format(cmd)
467 config_data.append(cmd)
468
469 # interface ospf mtu
470 if data_ospf_mtu:
471 cmd = "ip ospf mtu-ignore"
472 if "del_action" in ospf_data:
473 cmd = "no {}".format(cmd)
474 config_data.append(cmd)
475
476 if build:
477 return config_data
478
479 if config_data:
480 config_data_dict[router] = config_data
481
482 result = create_common_configurations(
483 tgen, config_data_dict, "interface_config", build=build
484 )
485
486 logger.debug("Exiting lib API: config_ospf_interface()")
487 return result
488
489
490 def clear_ospf(tgen, router, ospf=None):
491 """
492 This API is to clear ospf neighborship by running
493 clear ip ospf interface * command,
494
495 Parameters
496 ----------
497 * `tgen`: topogen object
498 * `router`: device under test
499
500 Usage
501 -----
502 clear_ospf(tgen, "r1")
503 """
504
505 logger.debug("Entering lib API: clear_ospf()")
506 if router not in tgen.routers():
507 return False
508
509 rnode = tgen.routers()[router]
510 # Clearing OSPF
511 if ospf:
512 version = "ipv6"
513 else:
514 version = "ip"
515
516 cmd = "clear {} ospf interface".format(version)
517 logger.info("Clearing ospf process on router %s.. using command '%s'", router, cmd)
518 run_frr_cmd(rnode, cmd)
519
520 logger.debug("Exiting lib API: clear_ospf()")
521
522
523 def redistribute_ospf(tgen, topo, dut, route_type, **kwargs):
524 """
525 Redstribution of routes inside ospf.
526
527 Parameters
528 ----------
529 * `tgen`: Topogen object
530 * `topo` : json file data
531 * `dut`: device under test
532 * `route_type`: "static" or "connected" or ....
533 * `kwargs`: pass extra information (see below)
534
535 Usage
536 -----
537 redistribute_ospf(tgen, topo, "r0", "static", delete=True)
538 redistribute_ospf(tgen, topo, "r0", "static", route_map="rmap_ipv4")
539 """
540
541 ospf_red = {dut: {"ospf": {"redistribute": [{"redist_type": route_type}]}}}
542 for k, v in kwargs.items():
543 ospf_red[dut]["ospf"]["redistribute"][0][k] = v
544
545 result = create_router_ospf(tgen, topo, ospf_red)
546 assert result is True, "Testcase : Failed \n Error: {}".format(result)
547
548
549 ################################
550 # Verification procs
551 ################################
552 @retry(retry_timeout=80)
553 def verify_ospf_neighbor(
554 tgen, topo=None, dut=None, input_dict=None, lan=False, expected=True
555 ):
556 """
557 This API is to verify ospf neighborship by running
558 show ip ospf neighbour command,
559
560 Parameters
561 ----------
562 * `tgen` : Topogen object
563 * `topo` : json file data
564 * `dut`: device under test
565 * `input_dict` : Input dict data, required when configuring from testcase
566 * `lan` : verify neighbors in lan topology
567 * `expected` : expected results from API, by-default True
568
569 Usage
570 -----
571 1. To check FULL neighbors.
572 verify_ospf_neighbor(tgen, topo, dut=dut)
573
574 2. To check neighbors with their roles.
575 input_dict = {
576 "r0": {
577 "ospf": {
578 "neighbors": {
579 "r1": {
580 "nbrState": "Full",
581 "role": "DR"
582 },
583 "r2": {
584 "nbrState": "Full",
585 "role": "DROther"
586 },
587 "r3": {
588 "nbrState": "Full",
589 "role": "DROther"
590 }
591 }
592 }
593 }
594 }
595 result = verify_ospf_neighbor(tgen, topo, dut, input_dict, lan=True)
596
597 Returns
598 -------
599 True or False (Error Message)
600 """
601 logger.debug("Entering lib API: verify_ospf_neighbor()")
602 result = False
603 if topo is None:
604 topo = tgen.json_topo
605
606 if input_dict:
607 for router, rnode in tgen.routers().items():
608 if "ospf" not in topo["routers"][router]:
609 continue
610
611 if dut is not None and dut != router:
612 continue
613
614 logger.info("Verifying OSPF neighborship on router %s:", router)
615 show_ospf_json = run_frr_cmd(
616 rnode, "show ip ospf neighbor all json", isjson=True
617 )
618
619 # Verifying output dictionary show_ospf_json is empty or not
620 if not bool(show_ospf_json):
621 errormsg = "OSPF is not running"
622 return errormsg
623
624 ospf_data_list = input_dict[router]["ospf"]
625 ospf_nbr_list = ospf_data_list["neighbors"]
626
627 for ospf_nbr, nbr_data in ospf_nbr_list.items():
628 data_ip = topo["routers"][ospf_nbr]["links"]
629 data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
630 if ospf_nbr in data_ip:
631 nbr_details = nbr_data[ospf_nbr]
632 elif lan:
633 for switch in topo["switches"]:
634 if "ospf" in topo["switches"][switch]["links"][router]:
635 neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
636 else:
637 continue
638 else:
639 neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
640
641 nh_state = None
642 neighbor_ip = neighbor_ip.lower()
643 nbr_rid = data_rid
644 try:
645 nh_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[0]
646 intf_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[1]
647 except KeyError:
648 errormsg = "[DUT: {}] OSPF peer {} missing".format(router, nbr_rid)
649 return errormsg
650
651 nbr_state = nbr_data.setdefault("nbrState", None)
652 nbr_role = nbr_data.setdefault("role", None)
653
654 if nbr_state:
655 if nbr_state == nh_state:
656 logger.info(
657 "[DUT: {}] OSPF Nbr is {}:{} State {}".format(
658 router, ospf_nbr, nbr_rid, nh_state
659 )
660 )
661 result = True
662 else:
663 errormsg = (
664 "[DUT: {}] OSPF is not Converged, neighbor"
665 " state is {}".format(router, nh_state)
666 )
667 return errormsg
668 if nbr_role:
669 if nbr_role == intf_state:
670 logger.info(
671 "[DUT: {}] OSPF Nbr is {}: {} Role {}".format(
672 router, ospf_nbr, nbr_rid, nbr_role
673 )
674 )
675 else:
676 errormsg = (
677 "[DUT: {}] OSPF is not Converged with rid"
678 "{}, role is {}".format(router, nbr_rid, intf_state)
679 )
680 return errormsg
681 continue
682 else:
683 for router, rnode in tgen.routers().items():
684 if "ospf" not in topo["routers"][router]:
685 continue
686
687 if dut is not None and dut != router:
688 continue
689
690 logger.info("Verifying OSPF neighborship on router %s:", router)
691 show_ospf_json = run_frr_cmd(
692 rnode, "show ip ospf neighbor all json", isjson=True
693 )
694 # Verifying output dictionary show_ospf_json is empty or not
695 if not bool(show_ospf_json):
696 errormsg = "OSPF is not running"
697 return errormsg
698
699 ospf_data_list = topo["routers"][router]["ospf"]
700 ospf_neighbors = ospf_data_list["neighbors"]
701 total_peer = 0
702 total_peer = len(ospf_neighbors.keys())
703 no_of_ospf_nbr = 0
704 ospf_nbr_list = ospf_data_list["neighbors"]
705 no_of_peer = 0
706 for ospf_nbr, nbr_data in ospf_nbr_list.items():
707 if nbr_data:
708 data_ip = topo["routers"][nbr_data["nbr"]]["links"]
709 data_rid = topo["routers"][nbr_data["nbr"]]["ospf"]["router_id"]
710 else:
711 data_ip = topo["routers"][ospf_nbr]["links"]
712 data_rid = topo["routers"][ospf_nbr]["ospf"]["router_id"]
713 if ospf_nbr in data_ip:
714 nbr_details = nbr_data[ospf_nbr]
715 elif lan:
716 for switch in topo["switches"]:
717 if "ospf" in topo["switches"][switch]["links"][router]:
718 neighbor_ip = data_ip[switch]["ipv4"].split("/")[0]
719 else:
720 continue
721 else:
722 neighbor_ip = data_ip[router]["ipv4"].split("/")[0]
723
724 nh_state = None
725 neighbor_ip = neighbor_ip.lower()
726 nbr_rid = data_rid
727
728 try:
729 nh_state = show_ospf_json[nbr_rid][0]["nbrState"].split("/")[0]
730 except KeyError:
731 errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
732 router, nbr_rid, ospf_nbr
733 )
734 return errormsg
735
736 if nh_state == "Full":
737 no_of_peer += 1
738
739 if no_of_peer == total_peer:
740 logger.info("[DUT: {}] OSPF is Converged".format(router))
741 result = True
742 else:
743 errormsg = "[DUT: {}] OSPF is not Converged".format(router)
744 return errormsg
745
746 logger.debug("Exiting API: verify_ospf_neighbor()")
747 return result
748
749
750 @retry(retry_timeout=50)
751 def verify_ospf6_neighbor(tgen, topo=None, dut=None, input_dict=None, lan=False):
752 """
753 This API is to verify ospf neighborship by running
754 show ipv6 ospf neighbour command,
755
756 Parameters
757 ----------
758 * `tgen` : Topogen object
759 * `topo` : json file data
760 * `dut`: device under test
761 * `input_dict` : Input dict data, required when configuring from testcase
762 * `lan` : verify neighbors in lan topology
763
764 Usage
765 -----
766 1. To check FULL neighbors.
767 verify_ospf_neighbor(tgen, topo, dut=dut)
768
769 2. To check neighbors with their roles.
770 input_dict = {
771 "r0": {
772 "ospf6": {
773 "neighbors": {
774 "r1": {
775 "state": "Full",
776 "role": "DR"
777 },
778 "r2": {
779 "state": "Full",
780 "role": "DROther"
781 },
782 "r3": {
783 "state": "Full",
784 "role": "DROther"
785 }
786 }
787 }
788 }
789 }
790 result = verify_ospf6_neighbor(tgen, topo, dut, input_dict, lan=True)
791
792 3. To check there are no neighbors.
793 input_dict = {
794 "r0": {
795 "ospf6": {
796 "neighbors": []
797 }
798 }
799 }
800 result = verify_ospf6_neighbor(tgen, topo, dut, input_dict)
801
802 Returns
803 -------
804 True or False (Error Message)
805 """
806 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
807 result = False
808
809 if topo is None:
810 topo = tgen.json_topo
811
812 if input_dict:
813 for router, rnode in tgen.routers().items():
814 if "ospf6" not in topo["routers"][router]:
815 continue
816
817 if dut is not None and dut != router:
818 continue
819
820 logger.info("Verifying OSPF neighborship on router %s:", router)
821 show_ospf_json = run_frr_cmd(
822 rnode, "show ipv6 ospf neighbor json", isjson=True
823 )
824 # Verifying output dictionary show_ospf_json is empty or not
825 if not bool(show_ospf_json):
826 errormsg = "OSPF6 is not running"
827 return errormsg
828
829 ospf_data_list = input_dict[router]["ospf6"]
830 ospf_nbr_list = ospf_data_list["neighbors"]
831
832 # Check if looking for no neighbors
833 if ospf_nbr_list == []:
834 if show_ospf_json["neighbors"] == []:
835 logger.info("[DUT: {}] OSPF6 no neighbors found".format(router))
836 return True
837 else:
838 errormsg = (
839 "[DUT: {}] OSPF6 active neighbors found, expected None".format(
840 router
841 )
842 )
843 return errormsg
844
845 for ospf_nbr, nbr_data in ospf_nbr_list.items():
846
847 try:
848 data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
849 except KeyError:
850 data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
851 "router_id"
852 ]
853
854 if ospf_nbr in data_ip:
855 nbr_details = nbr_data[ospf_nbr]
856 elif lan:
857 for switch in topo["switches"]:
858 if "ospf6" in topo["switches"][switch]["links"][router]:
859 neighbor_ip = data_ip
860 else:
861 continue
862 else:
863 neighbor_ip = data_ip[router]["ipv6"].split("/")[0]
864
865 nh_state = None
866 neighbor_ip = neighbor_ip.lower()
867 nbr_rid = data_rid
868 get_index_val = dict(
869 (d["neighborId"], dict(d, index=index))
870 for (index, d) in enumerate(show_ospf_json["neighbors"])
871 )
872 try:
873 nh_state = get_index_val.get(neighbor_ip)["state"]
874 intf_state = get_index_val.get(neighbor_ip)["ifState"]
875 except TypeError:
876 errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
877 router, nbr_rid, ospf_nbr
878 )
879 return errormsg
880
881 nbr_state = nbr_data.setdefault("state", None)
882 nbr_role = nbr_data.setdefault("role", None)
883
884 if nbr_state:
885 if nbr_state == nh_state:
886 logger.info(
887 "[DUT: {}] OSPF6 Nbr is {}:{} State {}".format(
888 router, ospf_nbr, nbr_rid, nh_state
889 )
890 )
891 result = True
892 else:
893 errormsg = (
894 "[DUT: {}] OSPF6 is not Converged, neighbor"
895 " state is {} , Expected state is {}".format(
896 router, nh_state, nbr_state
897 )
898 )
899 return errormsg
900 if nbr_role:
901 if nbr_role == intf_state:
902 logger.info(
903 "[DUT: {}] OSPF6 Nbr is {}: {} Role {}".format(
904 router, ospf_nbr, nbr_rid, nbr_role
905 )
906 )
907 else:
908 errormsg = (
909 "[DUT: {}] OSPF6 is not Converged with rid"
910 "{}, role is {}, Expected role is {}".format(
911 router, nbr_rid, intf_state, nbr_role
912 )
913 )
914 return errormsg
915 continue
916 else:
917
918 for router, rnode in tgen.routers().items():
919 if "ospf6" not in topo["routers"][router]:
920 continue
921
922 if dut is not None and dut != router:
923 continue
924
925 logger.info("Verifying OSPF6 neighborship on router %s:", router)
926 show_ospf_json = run_frr_cmd(
927 rnode, "show ipv6 ospf neighbor json", isjson=True
928 )
929 # Verifying output dictionary show_ospf_json is empty or not
930 if not bool(show_ospf_json):
931 errormsg = "OSPF6 is not running"
932 return errormsg
933
934 ospf_data_list = topo["routers"][router]["ospf6"]
935 ospf_neighbors = ospf_data_list["neighbors"]
936 total_peer = 0
937 total_peer = len(ospf_neighbors.keys())
938 no_of_ospf_nbr = 0
939 ospf_nbr_list = ospf_data_list["neighbors"]
940 no_of_peer = 0
941 for ospf_nbr, nbr_data in ospf_nbr_list.items():
942 try:
943 data_ip = data_rid = topo["routers"][ospf_nbr]["ospf6"]["router_id"]
944 except KeyError:
945 data_ip = data_rid = topo["routers"][nbr_data["nbr"]]["ospf6"][
946 "router_id"
947 ]
948
949 if ospf_nbr in data_ip:
950 nbr_details = nbr_data[ospf_nbr]
951 elif lan:
952 for switch in topo["switches"]:
953 if "ospf6" in topo["switches"][switch]["links"][router]:
954 neighbor_ip = data_ip
955 else:
956 continue
957 else:
958 neighbor_ip = data_ip
959
960 nh_state = None
961 neighbor_ip = neighbor_ip.lower()
962 nbr_rid = data_rid
963 get_index_val = dict(
964 (d["neighborId"], dict(d, index=index))
965 for (index, d) in enumerate(show_ospf_json["neighbors"])
966 )
967 try:
968 nh_state = get_index_val.get(neighbor_ip)["state"]
969 intf_state = get_index_val.get(neighbor_ip)["ifState"]
970 except TypeError:
971 errormsg = "[DUT: {}] OSPF peer {} missing,from " "{} ".format(
972 router, nbr_rid, ospf_nbr
973 )
974 return errormsg
975
976 if nh_state == "Full":
977 no_of_peer += 1
978
979 if no_of_peer == total_peer:
980 logger.info("[DUT: {}] OSPF6 is Converged".format(router))
981 result = True
982 else:
983 errormsg = "[DUT: {}] OSPF6 is not Converged".format(router)
984 return errormsg
985
986 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
987 return result
988
989
990 @retry(retry_timeout=40)
991 def verify_ospf_rib(
992 tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None, expected=True
993 ):
994 """
995 This API is to verify ospf routes by running
996 show ip ospf route command.
997
998 Parameters
999 ----------
1000 * `tgen` : Topogen object
1001 * `dut`: device under test
1002 * `input_dict` : Input dict data, required when configuring from testcase
1003 * `next_hop` : next to be verified
1004 * `tag` : tag to be verified
1005 * `metric` : metric to be verified
1006 * `fib` : True if the route is installed in FIB.
1007 * `expected` : expected results from API, by-default True
1008
1009 Usage
1010 -----
1011 input_dict = {
1012 "r1": {
1013 "static_routes": [
1014 {
1015 "network": ip_net,
1016 "no_of_ip": 1,
1017 "routeType": "N"
1018 }
1019 ]
1020 }
1021 }
1022
1023 result = verify_ospf_rib(tgen, dut, input_dict,next_hop=nh)
1024
1025 Returns
1026 -------
1027 True or False (Error Message)
1028 """
1029
1030 logger.info("Entering lib API: verify_ospf_rib()")
1031 result = False
1032 router_list = tgen.routers()
1033 additional_nexthops_in_required_nhs = []
1034 found_hops = []
1035 for routerInput in input_dict.keys():
1036 for router, rnode in router_list.items():
1037 if router != dut:
1038 continue
1039
1040 logger.info("Checking router %s RIB:", router)
1041
1042 # Verifying RIB routes
1043 command = "show ip ospf route"
1044
1045 found_routes = []
1046 missing_routes = []
1047
1048 if (
1049 "static_routes" in input_dict[routerInput]
1050 or "prefix" in input_dict[routerInput]
1051 ):
1052 if "prefix" in input_dict[routerInput]:
1053 static_routes = input_dict[routerInput]["prefix"]
1054 else:
1055 static_routes = input_dict[routerInput]["static_routes"]
1056
1057 for static_route in static_routes:
1058 cmd = "{}".format(command)
1059
1060 cmd = "{} json".format(cmd)
1061
1062 ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
1063
1064 # Verifying output dictionary ospf_rib_json is not empty
1065 if bool(ospf_rib_json) is False:
1066 errormsg = (
1067 "[DUT: {}] No routes found in OSPF route "
1068 "table".format(router)
1069 )
1070 return errormsg
1071
1072 network = static_route["network"]
1073 no_of_ip = static_route.setdefault("no_of_ip", 1)
1074 _tag = static_route.setdefault("tag", None)
1075 _rtype = static_route.setdefault("routeType", None)
1076
1077 # Generating IPs for verification
1078 ip_list = generate_ips(network, no_of_ip)
1079 st_found = False
1080 nh_found = False
1081
1082 for st_rt in ip_list:
1083 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
1084
1085 _addr_type = validate_ip_address(st_rt)
1086 if _addr_type != "ipv4":
1087 continue
1088
1089 if st_rt in ospf_rib_json:
1090 st_found = True
1091 found_routes.append(st_rt)
1092
1093 if fib and next_hop:
1094 if type(next_hop) is not list:
1095 next_hop = [next_hop]
1096
1097 for mnh in range(0, len(ospf_rib_json[st_rt])):
1098 if (
1099 "fib"
1100 in ospf_rib_json[st_rt][mnh]["nexthops"][0]
1101 ):
1102 found_hops.append(
1103 [
1104 rib_r["ip"]
1105 for rib_r in ospf_rib_json[st_rt][mnh][
1106 "nexthops"
1107 ]
1108 ]
1109 )
1110
1111 if found_hops[0]:
1112 missing_list_of_nexthops = set(
1113 found_hops[0]
1114 ).difference(next_hop)
1115 additional_nexthops_in_required_nhs = set(
1116 next_hop
1117 ).difference(found_hops[0])
1118
1119 if additional_nexthops_in_required_nhs:
1120 logger.info(
1121 "Nexthop "
1122 "%s is not active for route %s in "
1123 "RIB of router %s\n",
1124 additional_nexthops_in_required_nhs,
1125 st_rt,
1126 dut,
1127 )
1128 errormsg = (
1129 "Nexthop {} is not active"
1130 " for route {} in RIB of router"
1131 " {}\n".format(
1132 additional_nexthops_in_required_nhs,
1133 st_rt,
1134 dut,
1135 )
1136 )
1137 return errormsg
1138 else:
1139 nh_found = True
1140
1141 elif next_hop and fib is None:
1142 if type(next_hop) is not list:
1143 next_hop = [next_hop]
1144 found_hops = [
1145 rib_r["ip"]
1146 for rib_r in ospf_rib_json[st_rt]["nexthops"]
1147 ]
1148
1149 if found_hops:
1150 missing_list_of_nexthops = set(
1151 found_hops
1152 ).difference(next_hop)
1153 additional_nexthops_in_required_nhs = set(
1154 next_hop
1155 ).difference(found_hops)
1156
1157 if additional_nexthops_in_required_nhs:
1158 logger.info(
1159 "Missing nexthop %s for route"
1160 " %s in RIB of router %s\n",
1161 additional_nexthops_in_required_nhs,
1162 st_rt,
1163 dut,
1164 )
1165 errormsg = (
1166 "Nexthop {} is Missing for "
1167 "route {} in RIB of router {}\n".format(
1168 additional_nexthops_in_required_nhs,
1169 st_rt,
1170 dut,
1171 )
1172 )
1173 return errormsg
1174 else:
1175 nh_found = True
1176 if _rtype:
1177 if "routeType" not in ospf_rib_json[st_rt]:
1178 errormsg = (
1179 "[DUT: {}]: routeType missing"
1180 " for route {} in OSPF RIB \n".format(
1181 dut, st_rt
1182 )
1183 )
1184 return errormsg
1185 elif _rtype != ospf_rib_json[st_rt]["routeType"]:
1186 errormsg = (
1187 "[DUT: {}]: routeType mismatch"
1188 " for route {} in OSPF RIB \n".format(
1189 dut, st_rt
1190 )
1191 )
1192 return errormsg
1193 else:
1194 logger.info(
1195 "[DUT: {}]: Found routeType {}"
1196 " for route {}".format(dut, _rtype, st_rt)
1197 )
1198 if tag:
1199 if "tag" not in ospf_rib_json[st_rt]:
1200 errormsg = (
1201 "[DUT: {}]: tag is not"
1202 " present for"
1203 " route {} in RIB \n".format(dut, st_rt)
1204 )
1205 return errormsg
1206
1207 if _tag != ospf_rib_json[st_rt]["tag"]:
1208 errormsg = (
1209 "[DUT: {}]: tag value {}"
1210 " is not matched for"
1211 " route {} in RIB \n".format(
1212 dut,
1213 _tag,
1214 st_rt,
1215 )
1216 )
1217 return errormsg
1218
1219 if metric is not None:
1220 if "type2cost" not in ospf_rib_json[st_rt]:
1221 errormsg = (
1222 "[DUT: {}]: metric is"
1223 " not present for"
1224 " route {} in RIB \n".format(dut, st_rt)
1225 )
1226 return errormsg
1227
1228 if metric != ospf_rib_json[st_rt]["type2cost"]:
1229 errormsg = (
1230 "[DUT: {}]: metric value "
1231 "{} is not matched for "
1232 "route {} in RIB \n".format(
1233 dut,
1234 metric,
1235 st_rt,
1236 )
1237 )
1238 return errormsg
1239
1240 else:
1241 missing_routes.append(st_rt)
1242
1243 if nh_found:
1244 logger.info(
1245 "[DUT: {}]: Found next_hop {} for all OSPF"
1246 " routes in RIB".format(router, next_hop)
1247 )
1248
1249 if len(missing_routes) > 0:
1250 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
1251 dut, missing_routes
1252 )
1253 return errormsg
1254
1255 if found_routes:
1256 logger.info(
1257 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
1258 dut,
1259 found_routes,
1260 )
1261 result = True
1262
1263 logger.info("Exiting lib API: verify_ospf_rib()")
1264 return result
1265
1266
1267 @retry(retry_timeout=20)
1268 def verify_ospf_interface(
1269 tgen, topo=None, dut=None, lan=False, input_dict=None, expected=True
1270 ):
1271 """
1272 This API is to verify ospf routes by running
1273 show ip ospf interface command.
1274
1275 Parameters
1276 ----------
1277 * `tgen` : Topogen object
1278 * `topo` : topology descriptions
1279 * `dut`: device under test
1280 * `lan`: if set to true this interface belongs to LAN.
1281 * `input_dict` : Input dict data, required when configuring from testcase
1282 * `expected` : expected results from API, by-default True
1283
1284 Usage
1285 -----
1286 input_dict= {
1287 'r0': {
1288 'links':{
1289 's1': {
1290 'ospf':{
1291 'priority':98,
1292 'timerDeadSecs': 4,
1293 'area': '0.0.0.3',
1294 'mcastMemberOspfDesignatedRouters': True,
1295 'mcastMemberOspfAllRouters': True,
1296 'ospfEnabled': True,
1297
1298 }
1299 }
1300 }
1301 }
1302 }
1303 result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
1304
1305 Returns
1306 -------
1307 True or False (Error Message)
1308 """
1309
1310 logger.debug("Entering lib API: verify_ospf_interface()")
1311 result = False
1312 if topo is None:
1313 topo = tgen.json_topo
1314
1315 for router, rnode in tgen.routers().items():
1316 if "ospf" not in topo["routers"][router]:
1317 continue
1318
1319 if dut is not None and dut != router:
1320 continue
1321
1322 logger.info("Verifying OSPF interface on router %s:", router)
1323 show_ospf_json = run_frr_cmd(rnode, "show ip ospf interface json", isjson=True)
1324
1325 # Verifying output dictionary show_ospf_json is empty or not
1326 if not bool(show_ospf_json):
1327 errormsg = "OSPF is not running"
1328 return errormsg
1329
1330 # To find neighbor ip type
1331 ospf_intf_data = input_dict[router]["links"]
1332 for ospf_intf, intf_data in ospf_intf_data.items():
1333 intf = topo["routers"][router]["links"][ospf_intf]["interface"]
1334 if intf in show_ospf_json["interfaces"]:
1335 for intf_attribute in intf_data["ospf"]:
1336 if (
1337 intf_data["ospf"][intf_attribute]
1338 == show_ospf_json["interfaces"][intf][intf_attribute]
1339 ):
1340 logger.info(
1341 "[DUT: %s] OSPF interface %s: %s is %s",
1342 router,
1343 intf,
1344 intf_attribute,
1345 intf_data["ospf"][intf_attribute],
1346 )
1347 else:
1348 errormsg = "[DUT: {}] OSPF interface {}: {} is {}, \
1349 Expected is {}".format(
1350 router,
1351 intf,
1352 intf_attribute,
1353 intf_data["ospf"][intf_attribute],
1354 show_ospf_json["interfaces"][intf][intf_attribute],
1355 )
1356 return errormsg
1357 result = True
1358 logger.debug("Exiting API: verify_ospf_interface()")
1359 return result
1360
1361
1362 @retry(retry_timeout=40)
1363 def verify_ospf_database(
1364 tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None, expected=True
1365 ):
1366 """
1367 This API is to verify ospf lsa's by running
1368 show ip ospf database command.
1369
1370 Parameters
1371 ----------
1372 * `tgen` : Topogen object
1373 * `dut`: device under test
1374 * `input_dict` : Input dict data, required when configuring from testcase
1375 * `topo` : next to be verified
1376 * `expected` : expected results from API, by-default True
1377
1378 Usage
1379 -----
1380 input_dict = {
1381 "areas": {
1382 "0.0.0.0": {
1383 "Router Link States": {
1384 "100.1.1.0-100.1.1.0": {
1385 "LSID": "100.1.1.0",
1386 "Advertised router": "100.1.1.0",
1387 "LSA Age": 130,
1388 "Sequence Number": "80000006",
1389 "Checksum": "a703",
1390 "Router links": 3
1391 }
1392 },
1393 "Net Link States": {
1394 "10.0.0.2-100.1.1.1": {
1395 "LSID": "10.0.0.2",
1396 "Advertised router": "100.1.1.1",
1397 "LSA Age": 137,
1398 "Sequence Number": "80000001",
1399 "Checksum": "9583"
1400 }
1401 },
1402 },
1403 }
1404 }
1405 result = verify_ospf_database(tgen, topo, dut, input_dict)
1406
1407 Returns
1408 -------
1409 True or False (Error Message)
1410 """
1411
1412 result = False
1413 router = dut
1414 logger.debug("Entering lib API: verify_ospf_database()")
1415
1416 if "ospf" not in topo["routers"][dut]:
1417 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
1418 return errormsg
1419
1420 rnode = tgen.routers()[dut]
1421
1422 logger.info("Verifying OSPF interface on router %s:", dut)
1423
1424 if not rid:
1425 rid = "self-originate"
1426 if lsatype:
1427 if vrf is None:
1428 command = "show ip ospf database {} {} json".format(lsatype, rid)
1429 else:
1430 command = "show ip ospf database {} {} vrf {} json".format(
1431 lsatype, rid, vrf
1432 )
1433 else:
1434 if vrf is None:
1435 command = "show ip ospf database json"
1436 else:
1437 command = "show ip ospf database vrf {} json".format(vrf)
1438
1439 show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
1440 # Verifying output dictionary show_ospf_json is empty or not
1441 if not bool(show_ospf_json):
1442 errormsg = "OSPF is not running"
1443 return errormsg
1444
1445 # for inter and inter lsa's
1446 ospf_db_data = input_dict.setdefault("areas", None)
1447 ospf_external_lsa = input_dict.setdefault("AS External Link States", None)
1448 # import pdb; pdb.set_trace()
1449 if ospf_db_data:
1450 for ospf_area, area_lsa in ospf_db_data.items():
1451 if ospf_area in show_ospf_json["routerLinkStates"]["areas"]:
1452 if "routerLinkStates" in area_lsa:
1453 for lsa in area_lsa["routerLinkStates"]:
1454 _advrtr = lsa.setdefault("advertisedRouter", None)
1455 _options = lsa.setdefault("options", None)
1456
1457 if (
1458 _options
1459 and lsa["lsaId"]
1460 == show_ospf_json["routerLinkStates"]["areas"][ospf_area][
1461 0
1462 ]["linkStateId"]
1463 and lsa["options"]
1464 == show_ospf_json["routerLinkStates"]["areas"][ospf_area][
1465 0
1466 ]["options"]
1467 ):
1468 result = True
1469 break
1470 else:
1471 errormsg = '[DUT: {}] OSPF LSA options: expected {}, Received Options are {} lsa["options"] {} OSPF LSAID: expected lsaid {}, Received lsaid {}'.format(
1472 dut,
1473 show_ospf_json["routerLinkStates"]["areas"][ospf_area][
1474 0
1475 ]["options"],
1476 _options,
1477 lsa["options"],
1478 show_ospf_json["routerLinkStates"]["areas"][ospf_area][
1479 0
1480 ]["linkStateId"],
1481 lsa["lsaId"],
1482 )
1483 return errormsg
1484 if "Net Link States" in area_lsa:
1485 for lsa in area_lsa["Net Link States"]:
1486 if lsa in show_ospf_json["areas"][ospf_area]["Net Link States"]:
1487 logger.info(
1488 "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
1489 router,
1490 ospf_area,
1491 lsa,
1492 )
1493 result = True
1494 else:
1495 errormsg = (
1496 "[DUT: {}] OSPF LSDB area {}: expected"
1497 " Network LSA is {}".format(router, ospf_area, lsa)
1498 )
1499 return errormsg
1500 if "Summary Link States" in area_lsa:
1501 for lsa in area_lsa["Summary Link States"]:
1502 if (
1503 lsa
1504 in show_ospf_json["areas"][ospf_area]["Summary Link States"]
1505 ):
1506 logger.info(
1507 "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
1508 router,
1509 ospf_area,
1510 lsa,
1511 )
1512 result = True
1513 else:
1514 errormsg = (
1515 "[DUT: {}] OSPF LSDB area {}: expected"
1516 " Summary LSA is {}".format(router, ospf_area, lsa)
1517 )
1518 return errormsg
1519 if "ASBR-Summary Link States" in area_lsa:
1520 for lsa in area_lsa["ASBR-Summary Link States"]:
1521 if (
1522 lsa
1523 in show_ospf_json["areas"][ospf_area][
1524 "ASBR-Summary Link States"
1525 ]
1526 ):
1527 logger.info(
1528 "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
1529 router,
1530 ospf_area,
1531 lsa,
1532 )
1533 result = True
1534 else:
1535 errormsg = (
1536 "[DUT: {}] OSPF LSDB area {}: expected"
1537 " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
1538 )
1539 return errormsg
1540 if ospf_external_lsa:
1541 for ospf_ext_lsa, ext_lsa_data in ospf_external_lsa.items():
1542 if ospf_ext_lsa in show_ospf_json["AS External Link States"]:
1543 logger.info(
1544 "[DUT: %s] OSPF LSDB:External LSA %s", router, ospf_ext_lsa
1545 )
1546 result = True
1547 else:
1548 errormsg = (
1549 "[DUT: {}] OSPF LSDB : expected"
1550 " External LSA is {}".format(router, ospf_ext_lsa)
1551 )
1552 return errormsg
1553
1554 logger.debug("Exiting API: verify_ospf_database()")
1555 return result
1556
1557
1558 @retry(retry_timeout=20)
1559 def verify_ospf_summary(tgen, topo, dut, input_dict, ospf=None, expected=True):
1560 """
1561 This API is to verify ospf routes by running
1562 show ip ospf interface command.
1563
1564 Parameters
1565 ----------
1566 * `tgen` : Topogen object
1567 * `topo` : topology descriptions
1568 * `dut`: device under test
1569 * `input_dict` : Input dict data, required when configuring from testcase
1570
1571 Usage
1572 -----
1573 input_dict = {
1574 "11.0.0.0/8": {
1575 "summaryAddress": "11.0.0.0/8",
1576 "metricType": "E2",
1577 "metric": 20,
1578 "tag": 0,
1579 "externalRouteCount": 5
1580 }
1581 }
1582 result = verify_ospf_summary(tgen, topo, dut, input_dict)
1583
1584 Returns
1585 -------
1586 True or False (Error Message)
1587 """
1588
1589 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1590 result = False
1591 router = dut
1592
1593 logger.info("Verifying OSPF summary on router %s:", router)
1594
1595 rnode = tgen.routers()[dut]
1596
1597 if ospf:
1598 if "ospf6" not in topo["routers"][dut]:
1599 errormsg = "[DUT: {}] OSPF6 is not configured on the router.".format(router)
1600 return errormsg
1601
1602 show_ospf_json = run_frr_cmd(
1603 rnode, "show ipv6 ospf summary detail json", isjson=True
1604 )
1605 else:
1606 if "ospf" not in topo["routers"][dut]:
1607 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(router)
1608 return errormsg
1609
1610 show_ospf_json = run_frr_cmd(
1611 rnode, "show ip ospf summary detail json", isjson=True
1612 )
1613
1614 # Verifying output dictionary show_ospf_json is empty or not
1615 if not bool(show_ospf_json):
1616 errormsg = "OSPF is not running"
1617 return errormsg
1618
1619 # To find neighbor ip type
1620 ospf_summary_data = input_dict
1621
1622 if ospf:
1623 show_ospf_json = show_ospf_json["default"]
1624
1625 for ospf_summ, summ_data in ospf_summary_data.items():
1626 if ospf_summ not in show_ospf_json:
1627 continue
1628 summary = ospf_summary_data[ospf_summ]["summaryAddress"]
1629
1630 if summary in show_ospf_json:
1631 for summ in summ_data:
1632 if summ_data[summ] == show_ospf_json[summary][summ]:
1633 logger.info(
1634 "[DUT: %s] OSPF summary %s:%s is %s",
1635 router,
1636 summary,
1637 summ,
1638 summ_data[summ],
1639 )
1640 result = True
1641 else:
1642 errormsg = (
1643 "[DUT: {}] OSPF summary {} : {} is {}, "
1644 "Expected is {}".format(
1645 router,
1646 summary,
1647 summ,
1648 show_ospf_json[summary][summ],
1649 summ_data[summ],
1650 )
1651 )
1652 return errormsg
1653
1654 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1655 return result
1656
1657
1658 @retry(retry_timeout=30)
1659 def verify_ospf6_rib(
1660 tgen, dut, input_dict, next_hop=None, tag=None, metric=None, fib=None
1661 ):
1662 """
1663 This API is to verify ospf routes by running
1664 show ip ospf route command.
1665
1666 Parameters
1667 ----------
1668 * `tgen` : Topogen object
1669 * `dut`: device under test
1670 * `input_dict` : Input dict data, required when configuring from testcase
1671 * `next_hop` : next to be verified
1672 * `tag` : tag to be verified
1673 * `metric` : metric to be verified
1674 * `fib` : True if the route is installed in FIB.
1675
1676 Usage
1677 -----
1678 input_dict = {
1679 "r1": {
1680 "static_routes": [
1681 {
1682 "network": ip_net,
1683 "no_of_ip": 1,
1684 "routeType": "N"
1685 }
1686 ]
1687 }
1688 }
1689
1690 result = verify_ospf6_rib(tgen, dut, input_dict,next_hop=nh)
1691
1692 Returns
1693 -------
1694 True or False (Error Message)
1695 """
1696
1697 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1698 result = False
1699 router_list = tgen.routers()
1700 additional_nexthops_in_required_nhs = []
1701 found_hops = []
1702 for routerInput in input_dict.keys():
1703 for router, rnode in router_list.items():
1704 if router != dut:
1705 continue
1706
1707 logger.info("Checking router %s RIB:", router)
1708
1709 # Verifying RIB routes
1710 command = "show ipv6 ospf route detail"
1711
1712 found_routes = []
1713 missing_routes = []
1714
1715 if (
1716 "static_routes" in input_dict[routerInput]
1717 or "prefix" in input_dict[routerInput]
1718 ):
1719 if "prefix" in input_dict[routerInput]:
1720 static_routes = input_dict[routerInput]["prefix"]
1721 else:
1722 static_routes = input_dict[routerInput]["static_routes"]
1723
1724 for static_route in static_routes:
1725 cmd = "{}".format(command)
1726
1727 cmd = "{} json".format(cmd)
1728
1729 ospf_rib_json = run_frr_cmd(rnode, cmd, isjson=True)
1730
1731 # Fix for PR 2644182
1732 try:
1733 ospf_rib_json = ospf_rib_json["routes"]
1734 except KeyError:
1735 pass
1736
1737 # Verifying output dictionary ospf_rib_json is not empty
1738 if bool(ospf_rib_json) is False:
1739 errormsg = (
1740 "[DUT: {}] No routes found in OSPF6 route "
1741 "table".format(router)
1742 )
1743 return errormsg
1744
1745 network = static_route["network"]
1746 no_of_ip = static_route.setdefault("no_of_ip", 1)
1747 _tag = static_route.setdefault("tag", None)
1748 _rtype = static_route.setdefault("routeType", None)
1749
1750 # Generating IPs for verification
1751 ip_list = generate_ips(network, no_of_ip)
1752 if len(ip_list) == 1:
1753 ip_list = [network]
1754 st_found = False
1755 nh_found = False
1756 for st_rt in ip_list:
1757 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
1758
1759 _addr_type = validate_ip_address(st_rt)
1760 if _addr_type != "ipv6":
1761 continue
1762
1763 if st_rt in ospf_rib_json:
1764
1765 st_found = True
1766 found_routes.append(st_rt)
1767
1768 if fib and next_hop:
1769 if type(next_hop) is not list:
1770 next_hop = [next_hop]
1771
1772 for mnh in range(0, len(ospf_rib_json[st_rt])):
1773 if (
1774 "fib"
1775 in ospf_rib_json[st_rt][mnh]["nextHops"][0]
1776 ):
1777 found_hops.append(
1778 [
1779 rib_r["ip"]
1780 for rib_r in ospf_rib_json[st_rt][mnh][
1781 "nextHops"
1782 ]
1783 ]
1784 )
1785
1786 if found_hops[0]:
1787 missing_list_of_nexthops = set(
1788 found_hops[0]
1789 ).difference(next_hop)
1790 additional_nexthops_in_required_nhs = set(
1791 next_hop
1792 ).difference(found_hops[0])
1793
1794 if additional_nexthops_in_required_nhs:
1795 logger.info(
1796 "Nexthop "
1797 "%s is not active for route %s in "
1798 "RIB of router %s\n",
1799 additional_nexthops_in_required_nhs,
1800 st_rt,
1801 dut,
1802 )
1803 errormsg = (
1804 "Nexthop {} is not active"
1805 " for route {} in RIB of router"
1806 " {}\n".format(
1807 additional_nexthops_in_required_nhs,
1808 st_rt,
1809 dut,
1810 )
1811 )
1812 return errormsg
1813 else:
1814 nh_found = True
1815
1816 elif next_hop and fib is None:
1817 if type(next_hop) is not list:
1818 next_hop = [next_hop]
1819 found_hops = [
1820 rib_r["nextHop"]
1821 for rib_r in ospf_rib_json[st_rt]["nextHops"]
1822 ]
1823
1824 if found_hops:
1825 missing_list_of_nexthops = set(
1826 found_hops
1827 ).difference(next_hop)
1828 additional_nexthops_in_required_nhs = set(
1829 next_hop
1830 ).difference(found_hops)
1831 if additional_nexthops_in_required_nhs:
1832 logger.info(
1833 "Missing nexthop %s for route"
1834 " %s in RIB of router %s\n",
1835 additional_nexthops_in_required_nhs,
1836 st_rt,
1837 dut,
1838 )
1839 errormsg = (
1840 "Nexthop {} is Missing for "
1841 "route {} in RIB of router {}\n".format(
1842 additional_nexthops_in_required_nhs,
1843 st_rt,
1844 dut,
1845 )
1846 )
1847 return errormsg
1848 else:
1849 nh_found = True
1850 if _rtype:
1851 if "destinationType" not in ospf_rib_json[st_rt]:
1852 errormsg = (
1853 "[DUT: {}]: destinationType missing"
1854 "for route {} in OSPF RIB \n".format(dut, st_rt)
1855 )
1856 return errormsg
1857 elif _rtype != ospf_rib_json[st_rt]["destinationType"]:
1858 errormsg = (
1859 "[DUT: {}]: destinationType mismatch"
1860 "for route {} in OSPF RIB \n".format(dut, st_rt)
1861 )
1862 return errormsg
1863 else:
1864 logger.info(
1865 "DUT: {}]: Found destinationType {}"
1866 "for route {}".format(dut, _rtype, st_rt)
1867 )
1868 if tag:
1869 if "tag" not in ospf_rib_json[st_rt]:
1870 errormsg = (
1871 "[DUT: {}]: tag is not"
1872 " present for"
1873 " route {} in RIB \n".format(dut, st_rt)
1874 )
1875 return errormsg
1876
1877 if _tag != ospf_rib_json[st_rt]["tag"]:
1878 errormsg = (
1879 "[DUT: {}]: tag value {}"
1880 " is not matched for"
1881 " route {} in RIB \n".format(
1882 dut,
1883 _tag,
1884 st_rt,
1885 )
1886 )
1887 return errormsg
1888
1889 if metric is not None:
1890 if "metricCostE2" not in ospf_rib_json[st_rt]:
1891 errormsg = (
1892 "[DUT: {}]: metric is"
1893 " not present for"
1894 " route {} in RIB \n".format(dut, st_rt)
1895 )
1896 return errormsg
1897
1898 if metric != ospf_rib_json[st_rt]["metricCostE2"]:
1899 errormsg = (
1900 "[DUT: {}]: metric value "
1901 "{} is not matched for "
1902 "route {} in RIB \n".format(
1903 dut,
1904 metric,
1905 st_rt,
1906 )
1907 )
1908 return errormsg
1909
1910 else:
1911 missing_routes.append(st_rt)
1912
1913 if nh_found:
1914 logger.info(
1915 "[DUT: {}]: Found next_hop {} for all OSPF"
1916 " routes in RIB".format(router, next_hop)
1917 )
1918
1919 if len(missing_routes) > 0:
1920 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
1921 dut, missing_routes
1922 )
1923 return errormsg
1924
1925 if found_routes:
1926 logger.info(
1927 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
1928 dut,
1929 found_routes,
1930 )
1931 result = True
1932
1933 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1934 return result
1935
1936
1937 @retry(retry_timeout=6)
1938 def verify_ospf6_interface(tgen, topo=None, dut=None, lan=False, input_dict=None):
1939 """
1940 This API is to verify ospf routes by running
1941 show ip ospf interface command.
1942
1943 Parameters
1944 ----------
1945 * `tgen` : Topogen object
1946 * `topo` : topology descriptions
1947 * `dut`: device under test
1948 * `lan`: if set to true this interface belongs to LAN.
1949 * `input_dict` : Input dict data, required when configuring from testcase
1950
1951 Usage
1952 -----
1953 input_dict= {
1954 'r0': {
1955 'links':{
1956 's1': {
1957 'ospf6':{
1958 'priority':98,
1959 'timerDeadSecs': 4,
1960 'area': '0.0.0.3',
1961 'mcastMemberOspfDesignatedRouters': True,
1962 'mcastMemberOspfAllRouters': True,
1963 'ospfEnabled': True,
1964
1965 }
1966 }
1967 }
1968 }
1969 }
1970 result = verify_ospf_interface(tgen, topo, dut=dut, input_dict=input_dict)
1971
1972 Returns
1973 -------
1974 True or False (Error Message)
1975 """
1976
1977 logger.debug("Entering lib API: verify_ospf6_interface")
1978 result = False
1979
1980 if topo is None:
1981 topo = tgen.json_topo
1982
1983 for router, rnode in tgen.routers().items():
1984 if "ospf6" not in topo["routers"][router]:
1985 continue
1986
1987 if dut is not None and dut != router:
1988 continue
1989
1990 logger.info("Verifying OSPF interface on router %s:", router)
1991 show_ospf_json = run_frr_cmd(
1992 rnode, "show ipv6 ospf interface json", isjson=True
1993 )
1994
1995 # Verifying output dictionary show_ospf_json is empty or not
1996 if not bool(show_ospf_json):
1997 errormsg = "OSPF6 is not running"
1998 return errormsg
1999
2000 # To find neighbor ip type
2001 ospf_intf_data = input_dict[router]["links"]
2002 for ospf_intf, intf_data in ospf_intf_data.items():
2003 intf = topo["routers"][router]["links"][ospf_intf]["interface"]
2004 if intf in show_ospf_json:
2005 for intf_attribute in intf_data["ospf6"]:
2006 if intf_data["ospf6"][intf_attribute] is not list:
2007 if (
2008 intf_data["ospf6"][intf_attribute]
2009 == show_ospf_json[intf][intf_attribute]
2010 ):
2011 logger.info(
2012 "[DUT: %s] OSPF6 interface %s: %s is %s",
2013 router,
2014 intf,
2015 intf_attribute,
2016 intf_data["ospf6"][intf_attribute],
2017 )
2018 elif intf_data["ospf6"][intf_attribute] is list:
2019 for addr_list in len(show_ospf_json[intf][intf_attribute]):
2020 if (
2021 show_ospf_json[intf][intf_attribute][addr_list][
2022 "address"
2023 ].split("/")[0]
2024 == intf_data["ospf6"]["internetAddress"][0]["address"]
2025 ):
2026 break
2027 else:
2028 errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
2029 Expected is {}".format(
2030 router,
2031 intf,
2032 intf_attribute,
2033 intf_data["ospf6"][intf_attribute],
2034 intf_data["ospf6"][intf_attribute],
2035 )
2036 return errormsg
2037 else:
2038 errormsg = "[DUT: {}] OSPF6 interface {}: {} is {}, \
2039 Expected is {}".format(
2040 router,
2041 intf,
2042 intf_attribute,
2043 intf_data["ospf6"][intf_attribute],
2044 intf_data["ospf6"][intf_attribute],
2045 )
2046 return errormsg
2047 result = True
2048 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2049 return result
2050
2051
2052 @retry(retry_timeout=20)
2053 def verify_ospf6_database(tgen, topo, dut, input_dict):
2054 """
2055 This API is to verify ospf lsa's by running
2056 show ip ospf database command.
2057
2058 Parameters
2059 ----------
2060 * `tgen` : Topogen object
2061 * `dut`: device under test
2062 * `input_dict` : Input dict data, required when configuring from testcase
2063 * `topo` : next to be verified
2064
2065 Usage
2066 -----
2067 input_dict = {
2068 "areas": {
2069 "0.0.0.0": {
2070 "routerLinkStates": {
2071 "100.1.1.0-100.1.1.0": {
2072 "LSID": "100.1.1.0",
2073 "Advertised router": "100.1.1.0",
2074 "LSA Age": 130,
2075 "Sequence Number": "80000006",
2076 "Checksum": "a703",
2077 "Router links": 3
2078 }
2079 },
2080 "networkLinkStates": {
2081 "10.0.0.2-100.1.1.1": {
2082 "LSID": "10.0.0.2",
2083 "Advertised router": "100.1.1.1",
2084 "LSA Age": 137,
2085 "Sequence Number": "80000001",
2086 "Checksum": "9583"
2087 }
2088 },
2089 },
2090 }
2091 }
2092 result = verify_ospf_database(tgen, topo, dut, input_dict)
2093
2094 Returns
2095 -------
2096 True or False (Error Message)
2097 """
2098
2099 result = False
2100 router = dut
2101 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2102
2103 if "ospf" not in topo["routers"][dut]:
2104 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
2105 return errormsg
2106
2107 rnode = tgen.routers()[dut]
2108
2109 logger.info("Verifying OSPF interface on router %s:", dut)
2110 show_ospf_json = run_frr_cmd(rnode, "show ip ospf database json", isjson=True)
2111 # Verifying output dictionary show_ospf_json is empty or not
2112 if not bool(show_ospf_json):
2113 errormsg = "OSPF is not running"
2114 return errormsg
2115
2116 # for inter and inter lsa's
2117 ospf_db_data = input_dict.setdefault("areas", None)
2118 ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
2119
2120 if ospf_db_data:
2121 for ospf_area, area_lsa in ospf_db_data.items():
2122 if ospf_area in show_ospf_json["areas"]:
2123 if "routerLinkStates" in area_lsa:
2124 for lsa in area_lsa["routerLinkStates"]:
2125 for rtrlsa in show_ospf_json["areas"][ospf_area][
2126 "routerLinkStates"
2127 ]:
2128 if (
2129 lsa["lsaId"] == rtrlsa["lsaId"]
2130 and lsa["advertisedRouter"]
2131 == rtrlsa["advertisedRouter"]
2132 ):
2133 result = True
2134 break
2135 if result:
2136 logger.info(
2137 "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
2138 router,
2139 ospf_area,
2140 lsa,
2141 )
2142 break
2143 else:
2144 errormsg = (
2145 "[DUT: {}] OSPF LSDB area {}: expected"
2146 " Router LSA is {}".format(router, ospf_area, lsa)
2147 )
2148 return errormsg
2149
2150 if "networkLinkStates" in area_lsa:
2151 for lsa in area_lsa["networkLinkStates"]:
2152 for netlsa in show_ospf_json["areas"][ospf_area][
2153 "networkLinkStates"
2154 ]:
2155 if (
2156 lsa
2157 in show_ospf_json["areas"][ospf_area][
2158 "networkLinkStates"
2159 ]
2160 ):
2161 if (
2162 lsa["lsaId"] == netlsa["lsaId"]
2163 and lsa["advertisedRouter"]
2164 == netlsa["advertisedRouter"]
2165 ):
2166 result = True
2167 break
2168 if result:
2169 logger.info(
2170 "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
2171 router,
2172 ospf_area,
2173 lsa,
2174 )
2175 break
2176 else:
2177 errormsg = (
2178 "[DUT: {}] OSPF LSDB area {}: expected"
2179 " Network LSA is {}".format(router, ospf_area, lsa)
2180 )
2181 return errormsg
2182
2183 if "summaryLinkStates" in area_lsa:
2184 for lsa in area_lsa["summaryLinkStates"]:
2185 for t3lsa in show_ospf_json["areas"][ospf_area][
2186 "summaryLinkStates"
2187 ]:
2188 if (
2189 lsa["lsaId"] == t3lsa["lsaId"]
2190 and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
2191 ):
2192 result = True
2193 break
2194 if result:
2195 logger.info(
2196 "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
2197 router,
2198 ospf_area,
2199 lsa,
2200 )
2201 break
2202 else:
2203 errormsg = (
2204 "[DUT: {}] OSPF LSDB area {}: expected"
2205 " Summary LSA is {}".format(router, ospf_area, lsa)
2206 )
2207 return errormsg
2208
2209 if "nssaExternalLinkStates" in area_lsa:
2210 for lsa in area_lsa["nssaExternalLinkStates"]:
2211 for t7lsa in show_ospf_json["areas"][ospf_area][
2212 "nssaExternalLinkStates"
2213 ]:
2214 if (
2215 lsa["lsaId"] == t7lsa["lsaId"]
2216 and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
2217 ):
2218 result = True
2219 break
2220 if result:
2221 logger.info(
2222 "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
2223 router,
2224 ospf_area,
2225 lsa,
2226 )
2227 break
2228 else:
2229 errormsg = (
2230 "[DUT: {}] OSPF LSDB area {}: expected"
2231 " Type7 LSA is {}".format(router, ospf_area, lsa)
2232 )
2233 return errormsg
2234
2235 if "asbrSummaryLinkStates" in area_lsa:
2236 for lsa in area_lsa["asbrSummaryLinkStates"]:
2237 for t4lsa in show_ospf_json["areas"][ospf_area][
2238 "asbrSummaryLinkStates"
2239 ]:
2240 if (
2241 lsa["lsaId"] == t4lsa["lsaId"]
2242 and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
2243 ):
2244 result = True
2245 break
2246 if result:
2247 logger.info(
2248 "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
2249 router,
2250 ospf_area,
2251 lsa,
2252 )
2253 result = True
2254 else:
2255 errormsg = (
2256 "[DUT: {}] OSPF LSDB area {}: expected"
2257 " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
2258 )
2259 return errormsg
2260
2261 if "linkLocalOpaqueLsa" in area_lsa:
2262 for lsa in area_lsa["linkLocalOpaqueLsa"]:
2263 try:
2264 for lnklsa in show_ospf_json["areas"][ospf_area][
2265 "linkLocalOpaqueLsa"
2266 ]:
2267 if (
2268 lsa["lsaId"] in lnklsa["lsaId"]
2269 and "linkLocalOpaqueLsa"
2270 in show_ospf_json["areas"][ospf_area]
2271 ):
2272 logger.info(
2273 (
2274 "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
2275 "%s",
2276 ospf_area,
2277 lsa,
2278 )
2279 )
2280 result = True
2281 else:
2282 errormsg = (
2283 "[DUT: FRR] OSPF LSDB area: {} "
2284 "expected Opaque-LSA is {}, Found is {}".format(
2285 ospf_area, lsa, show_ospf_json
2286 )
2287 )
2288 raise ValueError(errormsg)
2289 return errormsg
2290 except KeyError:
2291 errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
2292 return errormsg
2293
2294 if ospf_external_lsa:
2295 for lsa in ospf_external_lsa:
2296 try:
2297 for t5lsa in show_ospf_json["asExternalLinkStates"]:
2298 if (
2299 lsa["lsaId"] == t5lsa["lsaId"]
2300 and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
2301 ):
2302 result = True
2303 break
2304 except KeyError:
2305 result = False
2306 if result:
2307 logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
2308 result = True
2309 else:
2310 errormsg = (
2311 "[DUT: {}] OSPF LSDB : expected"
2312 " External LSA is {}".format(router, lsa)
2313 )
2314 return errormsg
2315
2316 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2317 return result
2318
2319
2320 def config_ospf6_interface(
2321 tgen, topo=None, input_dict=None, build=False, load_config=True
2322 ):
2323 """
2324 API to configure ospf on router.
2325
2326 Parameters
2327 ----------
2328 * `tgen` : Topogen object
2329 * `topo` : json file data
2330 * `input_dict` : Input dict data, required when configuring from testcase
2331 * `build` : Only for initial setup phase this is set as True.
2332 * `load_config` : Loading the config to router this is set as True.
2333
2334 Usage
2335 -----
2336 r1_ospf_auth = {
2337 "r1": {
2338 "links": {
2339 "r2": {
2340 "ospf": {
2341 "authentication": 'message-digest',
2342 "authentication-key": "ospf",
2343 "message-digest-key": "10"
2344 }
2345 }
2346 }
2347 }
2348 }
2349 result = config_ospf6_interface(tgen, topo, r1_ospf_auth)
2350
2351 Returns
2352 -------
2353 True or False
2354 """
2355
2356 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2357 result = False
2358 if topo is None:
2359 topo = tgen.json_topo
2360
2361 if not input_dict:
2362 input_dict = deepcopy(topo)
2363 else:
2364 input_dict = deepcopy(input_dict)
2365
2366 config_data_dict = {}
2367
2368 for router in input_dict.keys():
2369 config_data = []
2370 for lnk in input_dict[router]["links"].keys():
2371 if "ospf6" not in input_dict[router]["links"][lnk]:
2372 logger.debug(
2373 "Router %s: ospf6 config is not present in"
2374 "input_dict, passed input_dict %s",
2375 router,
2376 str(input_dict),
2377 )
2378 continue
2379 ospf_data = input_dict[router]["links"][lnk]["ospf6"]
2380 data_ospf_area = ospf_data.setdefault("area", None)
2381 data_ospf_auth = ospf_data.setdefault("hash-algo", None)
2382 data_ospf_keychain = ospf_data.setdefault("keychain", None)
2383 data_ospf_dr_priority = ospf_data.setdefault("priority", None)
2384 data_ospf_cost = ospf_data.setdefault("cost", None)
2385 data_ospf_mtu = ospf_data.setdefault("mtu_ignore", None)
2386
2387 try:
2388 intf = topo["routers"][router]["links"][lnk]["interface"]
2389 except KeyError:
2390 intf = topo["switches"][router]["links"][lnk]["interface"]
2391
2392 # interface
2393 cmd = "interface {}".format(intf)
2394
2395 config_data.append(cmd)
2396 # interface area config
2397 if data_ospf_area:
2398 cmd = "ipv6 ospf area {}".format(data_ospf_area)
2399 config_data.append(cmd)
2400
2401 # interface ospf auth
2402 if data_ospf_auth:
2403 cmd = "ipv6 ospf6 authentication"
2404
2405 if "del_action" in ospf_data:
2406 cmd = "no {}".format(cmd)
2407
2408 if "hash-algo" in ospf_data:
2409 cmd = "{} key-id {} hash-algo {} key {}".format(
2410 cmd,
2411 ospf_data["key-id"],
2412 ospf_data["hash-algo"],
2413 ospf_data["key"],
2414 )
2415 config_data.append(cmd)
2416
2417 # interface ospf auth with keychain
2418 if data_ospf_keychain:
2419 cmd = "ipv6 ospf6 authentication"
2420
2421 if "del_action" in ospf_data:
2422 cmd = "no {}".format(cmd)
2423
2424 if "keychain" in ospf_data:
2425 cmd = "{} keychain {}".format(cmd, ospf_data["keychain"])
2426 config_data.append(cmd)
2427
2428 # interface ospf dr priority
2429 if data_ospf_dr_priority:
2430 cmd = "ipv6 ospf priority {}".format(ospf_data["priority"])
2431 if "del_action" in ospf_data:
2432 cmd = "no {}".format(cmd)
2433 config_data.append(cmd)
2434
2435 # interface ospf cost
2436 if data_ospf_cost:
2437 cmd = "ipv6 ospf cost {}".format(ospf_data["cost"])
2438 if "del_action" in ospf_data:
2439 cmd = "no {}".format(cmd)
2440 config_data.append(cmd)
2441
2442 # interface ospf mtu
2443 if data_ospf_mtu:
2444 cmd = "ipv6 ospf mtu-ignore"
2445 if "del_action" in ospf_data:
2446 cmd = "no {}".format(cmd)
2447 config_data.append(cmd)
2448
2449 if build:
2450 return config_data
2451
2452 if config_data:
2453 config_data_dict[router] = config_data
2454
2455 result = create_common_configurations(
2456 tgen, config_data_dict, "interface_config", build=build
2457 )
2458
2459 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2460 return result
2461
2462
2463 @retry(retry_timeout=20)
2464 def verify_ospf_gr_helper(tgen, topo, dut, input_dict=None):
2465 """
2466 This API is used to vreify gr helper using command
2467 show ip ospf graceful-restart helper
2468
2469 Parameters
2470 ----------
2471 * `tgen` : Topogen object
2472 * `topo` : topology descriptions
2473 * 'dut' : router
2474 * 'input_dict' - values to be verified
2475
2476 Usage:
2477 -------
2478 input_dict = {
2479 "helperSupport":"Disabled",
2480 "strictLsaCheck":"Enabled",
2481 "restartSupport":"Planned and Unplanned Restarts",
2482 "supportedGracePeriod":1800
2483 }
2484 result = verify_ospf_gr_helper(tgen, topo, dut, input_dict)
2485
2486 """
2487 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2488 result = False
2489
2490 if "ospf" not in topo["routers"][dut]:
2491 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
2492 return errormsg
2493
2494 rnode = tgen.routers()[dut]
2495 logger.info("Verifying OSPF GR details on router %s:", dut)
2496 show_ospf_json = run_frr_cmd(
2497 rnode, "show ip ospf graceful-restart helper json", isjson=True
2498 )
2499
2500 # Verifying output dictionary show_ospf_json is empty or not
2501 if not bool(show_ospf_json):
2502 errormsg = "OSPF is not running"
2503 raise ValueError(errormsg)
2504 return errormsg
2505
2506 for ospf_gr, gr_data in input_dict.items():
2507 try:
2508 if input_dict[ospf_gr] == show_ospf_json[ospf_gr]:
2509 logger.info(
2510 "[DUT: FRR] OSPF GR Helper: %s is %s",
2511 ospf_gr,
2512 show_ospf_json[ospf_gr],
2513 )
2514 result = True
2515 else:
2516 errormsg = (
2517 "[DUT: FRR] OSPF GR Helper: {} expected is {}, Found "
2518 "is {}".format(
2519 ospf_gr, input_dict[ospf_gr], show_ospf_json[ospf_gr]
2520 )
2521 )
2522 raise ValueError(errormsg)
2523 return errormsg
2524
2525 except KeyError:
2526 errormsg = "[DUT: FRR] OSPF GR Helper: {}".format(ospf_gr)
2527 return errormsg
2528
2529 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2530 return result
2531
2532
2533 def get_ospf_database(tgen, topo, dut, input_dict, vrf=None, lsatype=None, rid=None):
2534 """
2535 This API is to return ospf lsa's by running
2536 show ip ospf database command.
2537
2538 Parameters
2539 ----------
2540 * `tgen` : Topogen object
2541 * `dut`: device under test
2542 * `input_dict` : Input dict data, required when configuring from testcase
2543 * `topo` : next to be verified
2544 * `vrf` : vrf to be checked
2545 * `lsatype` : type of lsa to be checked
2546 * `rid` : router id for lsa to be checked
2547 Usage
2548 -----
2549 input_dict = {
2550 "areas": {
2551 "0.0.0.0": {
2552 "routerLinkStates": {
2553 "100.1.1.0-100.1.1.0": {
2554 "LSID": "100.1.1.0",
2555 "Advertised router": "100.1.1.0",
2556 "LSA Age": 130,
2557 "Sequence Number": "80000006",
2558 "Checksum": "a703",
2559 "Router links": 3
2560 }
2561 },
2562 "networkLinkStates": {
2563 "10.0.0.2-100.1.1.1": {
2564 "LSID": "10.0.0.2",
2565 "Advertised router": "100.1.1.1",
2566 "LSA Age": 137,
2567 "Sequence Number": "80000001",
2568 "Checksum": "9583"
2569 }
2570 },
2571 },
2572 }
2573 }
2574 result = get_ospf_database(tgen, topo, dut, input_dict)
2575
2576 Returns
2577 -------
2578 True or False (Error Message)
2579 """
2580
2581 result = False
2582 router = dut
2583 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2584 sleep(10)
2585 if "ospf" not in topo["routers"][dut]:
2586 errormsg = "[DUT: {}] OSPF is not configured on the router.".format(dut)
2587 return errormsg
2588
2589 rnode = tgen.routers()[dut]
2590
2591 logger.info("Verifying OSPF interface on router %s:", dut)
2592 if not rid:
2593 rid = "self-originate"
2594 if lsatype:
2595 if vrf is None:
2596 command = "show ip ospf database {} {} json".format(lsatype, rid)
2597 else:
2598 command = "show ip ospf database {} {} vrf {} json".format(
2599 lsatype, rid, vrf
2600 )
2601 else:
2602 if vrf is None:
2603 command = "show ip ospf database json"
2604 else:
2605 command = "show ip ospf database vrf {} json".format(vrf)
2606
2607 show_ospf_json = run_frr_cmd(rnode, command, isjson=True)
2608 # Verifying output dictionary show_ospf_json is empty or not
2609 if not bool(show_ospf_json):
2610 errormsg = "OSPF is not running"
2611 return errormsg
2612
2613 # for inter and inter lsa's
2614 ospf_db_data = input_dict.setdefault("areas", None)
2615 ospf_external_lsa = input_dict.setdefault("asExternalLinkStates", None)
2616
2617 if ospf_db_data:
2618 for ospf_area, area_lsa in ospf_db_data.items():
2619 if "areas" in show_ospf_json and ospf_area in show_ospf_json["areas"]:
2620 if "routerLinkStates" in area_lsa:
2621 for lsa in area_lsa["routerLinkStates"]:
2622 for rtrlsa in show_ospf_json["areas"][ospf_area][
2623 "routerLinkStates"
2624 ]:
2625 _advrtr = lsa.setdefault("advertisedRouter", None)
2626 _options = lsa.setdefault("options", None)
2627 if (
2628 _advrtr
2629 and lsa["lsaId"] == rtrlsa["lsaId"]
2630 and lsa["advertisedRouter"]
2631 == rtrlsa["advertisedRouter"]
2632 ):
2633 result = True
2634 break
2635 if (
2636 _options
2637 and lsa["lsaId"] == rtrlsa["lsaId"]
2638 and lsa["options"] == rtrlsa["options"]
2639 ):
2640 result = True
2641 break
2642
2643 if result:
2644 logger.info(
2645 "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
2646 router,
2647 ospf_area,
2648 lsa,
2649 )
2650 break
2651 else:
2652 errormsg = (
2653 "[DUT: {}] OSPF LSDB area {}: expected"
2654 " Router LSA is {}\n found Router LSA: {}".format(
2655 router, ospf_area, lsa, rtrlsa
2656 )
2657 )
2658 return errormsg
2659
2660 if "networkLinkStates" in area_lsa:
2661 for lsa in area_lsa["networkLinkStates"]:
2662 for netlsa in show_ospf_json["areas"][ospf_area][
2663 "networkLinkStates"
2664 ]:
2665 if (
2666 lsa
2667 in show_ospf_json["areas"][ospf_area][
2668 "networkLinkStates"
2669 ]
2670 ):
2671 if (
2672 lsa["lsaId"] == netlsa["lsaId"]
2673 and lsa["advertisedRouter"]
2674 == netlsa["advertisedRouter"]
2675 ):
2676 result = True
2677 break
2678 if result:
2679 logger.info(
2680 "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
2681 router,
2682 ospf_area,
2683 lsa,
2684 )
2685 break
2686 else:
2687 errormsg = (
2688 "[DUT: {}] OSPF LSDB area {}: expected"
2689 " Network LSA is {}".format(router, ospf_area, lsa)
2690 )
2691 return errormsg
2692
2693 if "summaryLinkStates" in area_lsa:
2694 for lsa in area_lsa["summaryLinkStates"]:
2695 for t3lsa in show_ospf_json["areas"][ospf_area][
2696 "summaryLinkStates"
2697 ]:
2698 if (
2699 lsa["lsaId"] == t3lsa["lsaId"]
2700 and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
2701 ):
2702 result = True
2703 break
2704 if result:
2705 logger.info(
2706 "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
2707 router,
2708 ospf_area,
2709 lsa,
2710 )
2711 break
2712 else:
2713 errormsg = (
2714 "[DUT: {}] OSPF LSDB area {}: expected"
2715 " Summary LSA is {}".format(router, ospf_area, lsa)
2716 )
2717 return errormsg
2718
2719 if "nssaExternalLinkStates" in area_lsa:
2720 for lsa in area_lsa["nssaExternalLinkStates"]:
2721 for t7lsa in show_ospf_json["areas"][ospf_area][
2722 "nssaExternalLinkStates"
2723 ]:
2724 if (
2725 lsa["lsaId"] == t7lsa["lsaId"]
2726 and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
2727 ):
2728 result = True
2729 break
2730 if result:
2731 logger.info(
2732 "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
2733 router,
2734 ospf_area,
2735 lsa,
2736 )
2737 break
2738 else:
2739 errormsg = (
2740 "[DUT: {}] OSPF LSDB area {}: expected"
2741 " Type7 LSA is {}".format(router, ospf_area, lsa)
2742 )
2743 return errormsg
2744
2745 if "asbrSummaryLinkStates" in area_lsa:
2746 for lsa in area_lsa["asbrSummaryLinkStates"]:
2747 for t4lsa in show_ospf_json["areas"][ospf_area][
2748 "asbrSummaryLinkStates"
2749 ]:
2750 if (
2751 lsa["lsaId"] == t4lsa["lsaId"]
2752 and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
2753 ):
2754 result = True
2755 break
2756 if result:
2757 logger.info(
2758 "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
2759 router,
2760 ospf_area,
2761 lsa,
2762 )
2763 result = True
2764 else:
2765 errormsg = (
2766 "[DUT: {}] OSPF LSDB area {}: expected"
2767 " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
2768 )
2769 return errormsg
2770
2771 if "linkLocalOpaqueLsa" in area_lsa:
2772 for lsa in area_lsa["linkLocalOpaqueLsa"]:
2773 try:
2774 for lnklsa in show_ospf_json["areas"][ospf_area][
2775 "linkLocalOpaqueLsa"
2776 ]:
2777 if (
2778 lsa["lsaId"] in lnklsa["lsaId"]
2779 and "linkLocalOpaqueLsa"
2780 in show_ospf_json["areas"][ospf_area]
2781 ):
2782 logger.info(
2783 (
2784 "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
2785 "%s",
2786 ospf_area,
2787 lsa,
2788 )
2789 )
2790 result = True
2791 else:
2792 errormsg = (
2793 "[DUT: FRR] OSPF LSDB area: {} "
2794 "expected Opaque-LSA is {}, Found is {}".format(
2795 ospf_area, lsa, show_ospf_json
2796 )
2797 )
2798 raise ValueError(errormsg)
2799 return errormsg
2800 except KeyError:
2801 errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
2802 return errormsg
2803 else:
2804 if "routerLinkStates" in area_lsa:
2805 for lsa in area_lsa["routerLinkStates"]:
2806 for rtrlsa in show_ospf_json["routerLinkStates"]:
2807 _advrtr = lsa.setdefault("advertisedRouter", None)
2808 _options = lsa.setdefault("options", None)
2809 _age = lsa.setdefault("lsaAge", None)
2810 if (
2811 _options
2812 and lsa["options"]
2813 == show_ospf_json["routerLinkStates"][rtrlsa][
2814 ospf_area
2815 ][0]["options"]
2816 ):
2817 result = True
2818 break
2819 if (
2820 _age != "get"
2821 and lsa["lsaAge"]
2822 == show_ospf_json["routerLinkStates"][rtrlsa][
2823 ospf_area
2824 ][0]["lsaAge"]
2825 ):
2826 result = True
2827 break
2828
2829 if _age == "get":
2830 return "{}".format(
2831 show_ospf_json["routerLinkStates"][rtrlsa][
2832 ospf_area
2833 ][0]["lsaAge"]
2834 )
2835 if result:
2836 logger.info(
2837 "[DUT: %s] OSPF LSDB area %s:Router " "LSA %s",
2838 router,
2839 ospf_area,
2840 lsa,
2841 )
2842 break
2843 else:
2844 errormsg = (
2845 "[DUT: {}] OSPF LSDB area {}: expected"
2846 " Router LSA is {}\n found Router LSA: {}".format(
2847 router,
2848 ospf_area,
2849 lsa,
2850 show_ospf_json["routerLinkStates"],
2851 )
2852 )
2853 return errormsg
2854
2855 if "networkLinkStates" in area_lsa:
2856 for lsa in area_lsa["networkLinkStates"]:
2857 for netlsa in show_ospf_json["areas"][ospf_area][
2858 "networkLinkStates"
2859 ]:
2860 if (
2861 lsa
2862 in show_ospf_json["areas"][ospf_area][
2863 "networkLinkStates"
2864 ]
2865 ):
2866 if (
2867 lsa["lsaId"] == netlsa["lsaId"]
2868 and lsa["advertisedRouter"]
2869 == netlsa["advertisedRouter"]
2870 ):
2871 result = True
2872 break
2873 if result:
2874 logger.info(
2875 "[DUT: %s] OSPF LSDB area %s:Network " "LSA %s",
2876 router,
2877 ospf_area,
2878 lsa,
2879 )
2880 break
2881 else:
2882 errormsg = (
2883 "[DUT: {}] OSPF LSDB area {}: expected"
2884 " Network LSA is {}".format(router, ospf_area, lsa)
2885 )
2886 return errormsg
2887
2888 if "summaryLinkStates" in area_lsa:
2889 for lsa in area_lsa["summaryLinkStates"]:
2890 for t3lsa in show_ospf_json["areas"][ospf_area][
2891 "summaryLinkStates"
2892 ]:
2893 if (
2894 lsa["lsaId"] == t3lsa["lsaId"]
2895 and lsa["advertisedRouter"] == t3lsa["advertisedRouter"]
2896 ):
2897 result = True
2898 break
2899 if result:
2900 logger.info(
2901 "[DUT: %s] OSPF LSDB area %s:Summary " "LSA %s",
2902 router,
2903 ospf_area,
2904 lsa,
2905 )
2906 break
2907 else:
2908 errormsg = (
2909 "[DUT: {}] OSPF LSDB area {}: expected"
2910 " Summary LSA is {}".format(router, ospf_area, lsa)
2911 )
2912 return errormsg
2913
2914 if "nssaExternalLinkStates" in area_lsa:
2915 for lsa in area_lsa["nssaExternalLinkStates"]:
2916 for t7lsa in show_ospf_json["areas"][ospf_area][
2917 "nssaExternalLinkStates"
2918 ]:
2919 if (
2920 lsa["lsaId"] == t7lsa["lsaId"]
2921 and lsa["advertisedRouter"] == t7lsa["advertisedRouter"]
2922 ):
2923 result = True
2924 break
2925 if result:
2926 logger.info(
2927 "[DUT: %s] OSPF LSDB area %s:Type7 " "LSA %s",
2928 router,
2929 ospf_area,
2930 lsa,
2931 )
2932 break
2933 else:
2934 errormsg = (
2935 "[DUT: {}] OSPF LSDB area {}: expected"
2936 " Type7 LSA is {}".format(router, ospf_area, lsa)
2937 )
2938 return errormsg
2939
2940 if "asbrSummaryLinkStates" in area_lsa:
2941 for lsa in area_lsa["asbrSummaryLinkStates"]:
2942 for t4lsa in show_ospf_json["areas"][ospf_area][
2943 "asbrSummaryLinkStates"
2944 ]:
2945 if (
2946 lsa["lsaId"] == t4lsa["lsaId"]
2947 and lsa["advertisedRouter"] == t4lsa["advertisedRouter"]
2948 ):
2949 result = True
2950 break
2951 if result:
2952 logger.info(
2953 "[DUT: %s] OSPF LSDB area %s:ASBR Summary " "LSA %s",
2954 router,
2955 ospf_area,
2956 lsa,
2957 )
2958 result = True
2959 else:
2960 errormsg = (
2961 "[DUT: {}] OSPF LSDB area {}: expected"
2962 " ASBR Summary LSA is {}".format(router, ospf_area, lsa)
2963 )
2964 return errormsg
2965
2966 if "linkLocalOpaqueLsa" in area_lsa:
2967 for lsa in area_lsa["linkLocalOpaqueLsa"]:
2968 try:
2969 for lnklsa in show_ospf_json["areas"][ospf_area][
2970 "linkLocalOpaqueLsa"
2971 ]:
2972 if (
2973 lsa["lsaId"] in lnklsa["lsaId"]
2974 and "linkLocalOpaqueLsa"
2975 in show_ospf_json["areas"][ospf_area]
2976 ):
2977 logger.info(
2978 (
2979 "[DUT: FRR] OSPF LSDB area %s:Opaque-LSA"
2980 "%s",
2981 ospf_area,
2982 lsa,
2983 )
2984 )
2985 result = True
2986 else:
2987 errormsg = (
2988 "[DUT: FRR] OSPF LSDB area: {} "
2989 "expected Opaque-LSA is {}, Found is {}".format(
2990 ospf_area, lsa, show_ospf_json
2991 )
2992 )
2993 raise ValueError(errormsg)
2994 return errormsg
2995 except KeyError:
2996 errormsg = "[DUT: FRR] linkLocalOpaqueLsa Not " "present"
2997 return errormsg
2998
2999 if ospf_external_lsa:
3000 for lsa in ospf_external_lsa:
3001 try:
3002 for t5lsa in show_ospf_json["asExternalLinkStates"]:
3003 if (
3004 lsa["lsaId"] == t5lsa["lsaId"]
3005 and lsa["advertisedRouter"] == t5lsa["advertisedRouter"]
3006 ):
3007 result = True
3008 break
3009 except KeyError:
3010 result = False
3011 if result:
3012 logger.info("[DUT: %s] OSPF LSDB:External LSA %s", router, lsa)
3013 result = True
3014 else:
3015 errormsg = (
3016 "[DUT: {}] OSPF LSDB : expected"
3017 " External LSA is {}".format(router, lsa)
3018 )
3019 return errormsg
3020
3021 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3022 return result