]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/bgp.py
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / tests / topotests / lib / bgp.py
1 # SPDX-License-Identifier: ISC
2 #
3 # Copyright (c) 2019 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
6 #
7
8 import ipaddress
9 import sys
10 import traceback
11 from copy import deepcopy
12 from time import sleep
13
14 # Import common_config to use commomnly used APIs
15 from lib.common_config import (
16 create_common_configurations,
17 FRRCFG_FILE,
18 InvalidCLIError,
19 apply_raw_config,
20 check_address_types,
21 find_interface_with_greater_ip,
22 generate_ips,
23 get_frr_ipv6_linklocal,
24 retry,
25 run_frr_cmd,
26 validate_ip_address,
27 )
28 from lib.topogen import get_topogen
29 from lib.topolog import logger
30 from lib.topotest import frr_unicode
31
32 from lib import topotest
33
34
35 def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config=True):
36 """
37 API to configure bgp on router
38
39 Parameters
40 ----------
41 * `tgen` : Topogen object
42 * `topo` : json file data
43 * `input_dict` : Input dict data, required when configuring from testcase
44 * `build` : Only for initial setup phase this is set as True.
45
46 Usage
47 -----
48 input_dict = {
49 "r1": {
50 "bgp": {
51 "local_as": "200",
52 "router_id": "22.22.22.22",
53 "graceful-restart": {
54 "graceful-restart": True,
55 "preserve-fw-state": True,
56 "timer": {
57 "restart-time": 30,
58 "rib-stale-time": 30,
59 "select-defer-time": 30,
60 }
61 },
62 "address_family": {
63 "ipv4": {
64 "unicast": {
65 "default_originate":{
66 "neighbor":"R2",
67 "add_type":"lo"
68 "route_map":"rm"
69
70 },
71 "redistribute": [{
72 "redist_type": "static",
73 "attribute": {
74 "metric" : 123
75 }
76 },
77 {"redist_type": "connected"}
78 ],
79 "advertise_networks": [
80 {
81 "network": "20.0.0.0/32",
82 "no_of_network": 10
83 },
84 {
85 "network": "30.0.0.0/32",
86 "no_of_network": 10
87 }
88 ],
89 "neighbor": {
90 "r3": {
91 "keepalivetimer": 60,
92 "holddowntimer": 180,
93 "dest_link": {
94 "r4": {
95 "allowas-in": {
96 "number_occurences": 2
97 },
98 "prefix_lists": [
99 {
100 "name": "pf_list_1",
101 "direction": "in"
102 }
103 ],
104 "route_maps": [{
105 "name": "RMAP_MED_R3",
106 "direction": "in"
107 }],
108 "next_hop_self": True
109 },
110 "r1": {"graceful-restart-helper": True}
111 }
112 }
113 }
114 }
115 }
116 }
117 }
118 }
119 }
120
121
122 Returns
123 -------
124 True or False
125 """
126 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
127 result = False
128
129 if topo is None:
130 topo = tgen.json_topo
131
132 # Flag is used when testing ipv6 over ipv4 or vice-versa
133 afi_test = False
134
135 if not input_dict:
136 input_dict = deepcopy(topo)
137 else:
138 topo = topo["routers"]
139 input_dict = deepcopy(input_dict)
140
141 config_data_dict = {}
142
143 for router in input_dict.keys():
144 if "bgp" not in input_dict[router]:
145 logger.debug("Router %s: 'bgp' not present in input_dict", router)
146 continue
147
148 bgp_data_list = input_dict[router]["bgp"]
149
150 if type(bgp_data_list) is not list:
151 bgp_data_list = [bgp_data_list]
152
153 config_data = []
154
155 for bgp_data in bgp_data_list:
156 data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
157 if data_all_bgp:
158 bgp_addr_data = bgp_data.setdefault("address_family", {})
159
160 if not bgp_addr_data:
161 logger.debug(
162 "Router %s: 'address_family' not present in "
163 "input_dict for BGP",
164 router,
165 )
166 else:
167
168 ipv4_data = bgp_addr_data.setdefault("ipv4", {})
169 ipv6_data = bgp_addr_data.setdefault("ipv6", {})
170 l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
171
172 neigh_unicast = (
173 True
174 if ipv4_data.setdefault("unicast", {})
175 or ipv6_data.setdefault("unicast", {})
176 else False
177 )
178
179 l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False
180
181 if neigh_unicast:
182 data_all_bgp = __create_bgp_unicast_neighbor(
183 tgen,
184 topo,
185 bgp_data,
186 router,
187 afi_test,
188 config_data=data_all_bgp,
189 )
190
191 if l2vpn_evpn:
192 data_all_bgp = __create_l2vpn_evpn_address_family(
193 tgen, topo, bgp_data, router, config_data=data_all_bgp
194 )
195 if data_all_bgp:
196 config_data.extend(data_all_bgp)
197
198 if config_data:
199 config_data_dict[router] = config_data
200
201 try:
202 result = create_common_configurations(
203 tgen, config_data_dict, "bgp", build, load_config
204 )
205 except InvalidCLIError:
206 logger.error("create_router_bgp", exc_info=True)
207 result = False
208
209 logger.debug("Exiting lib API: create_router_bgp()")
210 return result
211
212
213 def __create_bgp_global(tgen, input_dict, router, build=False):
214 """
215 Helper API to create bgp global configuration.
216
217 Parameters
218 ----------
219 * `tgen` : Topogen object
220 * `input_dict` : Input dict data, required when configuring from testcase
221 * `router` : router id to be configured.
222 * `build` : Only for initial setup phase this is set as True.
223
224 Returns
225 -------
226 list of config commands
227 """
228
229 result = False
230 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
231
232 bgp_data = input_dict
233 del_bgp_action = bgp_data.setdefault("delete", False)
234
235 config_data = []
236
237 if "local_as" not in bgp_data and build:
238 logger.debug(
239 "Router %s: 'local_as' not present in input_dict" "for BGP", router
240 )
241 return config_data
242
243 local_as = bgp_data.setdefault("local_as", "")
244 cmd = "router bgp {}".format(local_as)
245 vrf_id = bgp_data.setdefault("vrf", None)
246 if vrf_id:
247 cmd = "{} vrf {}".format(cmd, vrf_id)
248
249 if del_bgp_action:
250 cmd = "no {}".format(cmd)
251 config_data.append(cmd)
252
253 return config_data
254
255 config_data.append(cmd)
256 config_data.append("no bgp ebgp-requires-policy")
257
258 router_id = bgp_data.setdefault("router_id", None)
259 del_router_id = bgp_data.setdefault("del_router_id", False)
260 if del_router_id:
261 config_data.append("no bgp router-id")
262 if router_id:
263 config_data.append("bgp router-id {}".format(router_id))
264
265 config_data.append("bgp log-neighbor-changes")
266 config_data.append("no bgp network import-check")
267 bgp_peer_grp_data = bgp_data.setdefault("peer-group", {})
268
269 if "peer-group" in bgp_data and bgp_peer_grp_data:
270 peer_grp_data = __create_bgp_peer_group(tgen, bgp_peer_grp_data, router)
271 config_data.extend(peer_grp_data)
272
273 bst_path = bgp_data.setdefault("bestpath", None)
274 if bst_path:
275 if "aspath" in bst_path:
276 if "delete" in bst_path:
277 config_data.append(
278 "no bgp bestpath as-path {}".format(bst_path["aspath"])
279 )
280 else:
281 config_data.append("bgp bestpath as-path {}".format(bst_path["aspath"]))
282
283 if "graceful-restart" in bgp_data:
284 graceful_config = bgp_data["graceful-restart"]
285
286 graceful_restart = graceful_config.setdefault("graceful-restart", None)
287
288 graceful_restart_disable = graceful_config.setdefault(
289 "graceful-restart-disable", None
290 )
291
292 preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None)
293
294 disable_eor = graceful_config.setdefault("disable-eor", None)
295
296 if graceful_restart == False:
297 cmd = "no bgp graceful-restart"
298 if graceful_restart:
299 cmd = "bgp graceful-restart"
300
301 if graceful_restart is not None:
302 config_data.append(cmd)
303
304 if graceful_restart_disable == False:
305 cmd = "no bgp graceful-restart-disable"
306 if graceful_restart_disable:
307 cmd = "bgp graceful-restart-disable"
308
309 if graceful_restart_disable is not None:
310 config_data.append(cmd)
311
312 if preserve_fw_state == False:
313 cmd = "no bgp graceful-restart preserve-fw-state"
314 if preserve_fw_state:
315 cmd = "bgp graceful-restart preserve-fw-state"
316
317 if preserve_fw_state is not None:
318 config_data.append(cmd)
319
320 if disable_eor == False:
321 cmd = "no bgp graceful-restart disable-eor"
322 if disable_eor:
323 cmd = "bgp graceful-restart disable-eor"
324
325 if disable_eor is not None:
326 config_data.append(cmd)
327
328 if "timer" in bgp_data["graceful-restart"]:
329 timer = bgp_data["graceful-restart"]["timer"]
330
331 if "delete" in timer:
332 del_action = timer["delete"]
333 else:
334 del_action = False
335
336 for rs_timer, value in timer.items():
337 rs_timer_value = timer.setdefault(rs_timer, None)
338
339 if rs_timer_value and rs_timer != "delete":
340 cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value)
341
342 if del_action:
343 cmd = "no {}".format(cmd)
344
345 config_data.append(cmd)
346
347 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
348 return config_data
349
350
351 def __create_bgp_unicast_neighbor(
352 tgen, topo, input_dict, router, afi_test, config_data=None
353 ):
354 """
355 Helper API to create configuration for address-family unicast
356
357 Parameters
358 ----------
359 * `tgen` : Topogen object
360 * `topo` : json file data
361 * `input_dict` : Input dict data, required when configuring from testcase
362 * `router` : router id to be configured.
363 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
364 * `build` : Only for initial setup phase this is set as True.
365 """
366
367 result = False
368 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
369
370 add_neigh = True
371 bgp_data = input_dict
372 if "router bgp" in config_data:
373 add_neigh = False
374
375 bgp_data = input_dict["address_family"]
376
377 for addr_type, addr_dict in bgp_data.items():
378 if not addr_dict:
379 continue
380
381 if not check_address_types(addr_type) and not afi_test:
382 continue
383
384 addr_data = addr_dict["unicast"]
385 if addr_data:
386 config_data.append("address-family {} unicast".format(addr_type))
387
388 advertise_network = addr_data.setdefault("advertise_networks", [])
389 for advertise_network_dict in advertise_network:
390 network = advertise_network_dict["network"]
391 if type(network) is not list:
392 network = [network]
393
394 if "no_of_network" in advertise_network_dict:
395 no_of_network = advertise_network_dict["no_of_network"]
396 else:
397 no_of_network = 1
398
399 del_action = advertise_network_dict.setdefault("delete", False)
400
401 # Generating IPs for verification
402 network_list = generate_ips(network, no_of_network)
403 for ip in network_list:
404 ip = str(ipaddress.ip_network(frr_unicode(ip)))
405
406 cmd = "network {}".format(ip)
407 if del_action:
408 cmd = "no {}".format(cmd)
409
410 config_data.append(cmd)
411
412 import_cmd = addr_data.setdefault("import", {})
413 if import_cmd:
414 try:
415 if import_cmd["delete"]:
416 config_data.append("no import vrf {}".format(import_cmd["vrf"]))
417 except KeyError:
418 config_data.append("import vrf {}".format(import_cmd["vrf"]))
419
420 max_paths = addr_data.setdefault("maximum_paths", {})
421 if max_paths:
422 ibgp = max_paths.setdefault("ibgp", None)
423 ebgp = max_paths.setdefault("ebgp", None)
424 del_cmd = max_paths.setdefault("delete", False)
425 if ibgp:
426 if del_cmd:
427 config_data.append("no maximum-paths ibgp {}".format(ibgp))
428 else:
429 config_data.append("maximum-paths ibgp {}".format(ibgp))
430 if ebgp:
431 if del_cmd:
432 config_data.append("no maximum-paths {}".format(ebgp))
433 else:
434 config_data.append("maximum-paths {}".format(ebgp))
435
436 aggregate_addresses = addr_data.setdefault("aggregate_address", [])
437 for aggregate_address in aggregate_addresses:
438 network = aggregate_address.setdefault("network", None)
439 if not network:
440 logger.debug(
441 "Router %s: 'network' not present in " "input_dict for BGP", router
442 )
443 else:
444 cmd = "aggregate-address {}".format(network)
445
446 as_set = aggregate_address.setdefault("as_set", False)
447 summary = aggregate_address.setdefault("summary", False)
448 del_action = aggregate_address.setdefault("delete", False)
449 if as_set:
450 cmd = "{} as-set".format(cmd)
451 if summary:
452 cmd = "{} summary".format(cmd)
453
454 if del_action:
455 cmd = "no {}".format(cmd)
456
457 config_data.append(cmd)
458
459 redistribute_data = addr_data.setdefault("redistribute", {})
460 if redistribute_data:
461 for redistribute in redistribute_data:
462 if "redist_type" not in redistribute:
463 logger.debug(
464 "Router %s: 'redist_type' not present in " "input_dict", router
465 )
466 else:
467 cmd = "redistribute {}".format(redistribute["redist_type"])
468 redist_attr = redistribute.setdefault("attribute", None)
469 if redist_attr:
470 if type(redist_attr) is dict:
471 for key, value in redist_attr.items():
472 cmd = "{} {} {}".format(cmd, key, value)
473 else:
474 cmd = "{} {}".format(cmd, redist_attr)
475
476 del_action = redistribute.setdefault("delete", False)
477 if del_action:
478 cmd = "no {}".format(cmd)
479 config_data.append(cmd)
480
481 admin_dist_data = addr_data.setdefault("distance", {})
482 if admin_dist_data:
483 if len(admin_dist_data) < 2:
484 logger.debug(
485 "Router %s: pass the admin distance values for "
486 "ebgp, ibgp and local routes",
487 router,
488 )
489 cmd = "distance bgp {} {} {}".format(
490 admin_dist_data["ebgp"],
491 admin_dist_data["ibgp"],
492 admin_dist_data["local"],
493 )
494
495 del_action = admin_dist_data.setdefault("delete", False)
496 if del_action:
497 cmd = "no distance bgp"
498 config_data.append(cmd)
499
500 import_vrf_data = addr_data.setdefault("import", {})
501 if import_vrf_data:
502 cmd = "import vrf {}".format(import_vrf_data["vrf"])
503
504 del_action = import_vrf_data.setdefault("delete", False)
505 if del_action:
506 cmd = "no {}".format(cmd)
507 config_data.append(cmd)
508
509 if "neighbor" in addr_data:
510 neigh_data = __create_bgp_neighbor(
511 topo, input_dict, router, addr_type, add_neigh
512 )
513 config_data.extend(neigh_data)
514 # configure default originate
515 if "default_originate" in addr_data:
516 default_originate_config = __create_bgp_default_originate_neighbor(
517 topo, input_dict, router, addr_type, add_neigh
518 )
519 config_data.extend(default_originate_config)
520
521 for addr_type, addr_dict in bgp_data.items():
522 if not addr_dict or not check_address_types(addr_type):
523 continue
524
525 addr_data = addr_dict["unicast"]
526 if "neighbor" in addr_data:
527 neigh_addr_data = __create_bgp_unicast_address_family(
528 topo, input_dict, router, addr_type, add_neigh
529 )
530
531 config_data.extend(neigh_addr_data)
532
533 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
534 return config_data
535
536
537 def __create_bgp_default_originate_neighbor(
538 topo, input_dict, router, addr_type, add_neigh=True
539 ):
540 """
541 Helper API to create neighbor default - originate configuration
542
543 Parameters
544 ----------
545 * `tgen` : Topogen object
546 * `topo` : json file data
547 * `input_dict` : Input dict data, required when configuring from testcase
548 * `router` : router id to be configured
549 """
550 tgen = get_topogen()
551 config_data = []
552 logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
553
554 bgp_data = input_dict["address_family"]
555 neigh_data = bgp_data[addr_type]["unicast"]["default_originate"]
556 for name, peer_dict in neigh_data.items():
557 nh_details = topo[name]
558
559 neighbor_ip = None
560 if "dest-link" in neigh_data[name]:
561 dest_link = neigh_data[name]["dest-link"]
562 neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0]
563 elif "add_type" in neigh_data[name]:
564 add_type = neigh_data[name]["add_type"]
565 neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0]
566 else:
567 neighbor_ip = nh_details["links"][router][addr_type].split("/")[0]
568
569 config_data.append("address-family {} unicast".format(addr_type))
570 if "route_map" in peer_dict:
571 route_map = peer_dict["route_map"]
572 if "delete" in peer_dict:
573 if peer_dict["delete"]:
574 config_data.append(
575 "no neighbor {} default-originate route-map {}".format(
576 neighbor_ip, route_map
577 )
578 )
579 else:
580 config_data.append(
581 " neighbor {} default-originate route-map {}".format(
582 neighbor_ip, route_map
583 )
584 )
585 else:
586 config_data.append(
587 " neighbor {} default-originate route-map {}".format(
588 neighbor_ip, route_map
589 )
590 )
591
592 else:
593 if "delete" in peer_dict:
594 if peer_dict["delete"]:
595 config_data.append(
596 "no neighbor {} default-originate".format(neighbor_ip)
597 )
598 else:
599 config_data.append(
600 "neighbor {} default-originate".format(neighbor_ip)
601 )
602 else:
603 config_data.append("neighbor {} default-originate".format(neighbor_ip))
604
605 logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
606 return config_data
607
608
609 def __create_l2vpn_evpn_address_family(
610 tgen, topo, input_dict, router, config_data=None
611 ):
612 """
613 Helper API to create configuration for l2vpn evpn address-family
614
615 Parameters
616 ----------
617 * `tgen` : Topogen object
618 * `topo` : json file data
619 * `input_dict` : Input dict data, required when configuring
620 from testcase
621 * `router` : router id to be configured.
622 * `build` : Only for initial setup phase this is set as True.
623 """
624
625 result = False
626
627 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
628
629 bgp_data = input_dict["address_family"]
630
631 for family_type, family_dict in bgp_data.items():
632 if family_type != "l2vpn":
633 continue
634
635 family_data = family_dict["evpn"]
636 if family_data:
637 config_data.append("address-family l2vpn evpn")
638
639 advertise_data = family_data.setdefault("advertise", {})
640 neighbor_data = family_data.setdefault("neighbor", {})
641 advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None)
642 rd_data = family_data.setdefault("rd", None)
643 no_rd_data = family_data.setdefault("no rd", False)
644 route_target_data = family_data.setdefault("route-target", {})
645
646 if advertise_data:
647 for address_type, unicast_type in advertise_data.items():
648
649 if type(unicast_type) is dict:
650 for key, value in unicast_type.items():
651 cmd = "advertise {} {}".format(address_type, key)
652
653 if value:
654 route_map = value.setdefault("route-map", {})
655 advertise_del_action = value.setdefault("delete", None)
656
657 if route_map:
658 cmd = "{} route-map {}".format(cmd, route_map)
659
660 if advertise_del_action:
661 cmd = "no {}".format(cmd)
662
663 config_data.append(cmd)
664
665 if neighbor_data:
666 for neighbor, neighbor_data in neighbor_data.items():
667 ipv4_neighbor = neighbor_data.setdefault("ipv4", {})
668 ipv6_neighbor = neighbor_data.setdefault("ipv6", {})
669
670 if ipv4_neighbor:
671 for neighbor_name, action in ipv4_neighbor.items():
672 neighbor_ip = topo[neighbor]["links"][neighbor_name][
673 "ipv4"
674 ].split("/")[0]
675
676 if type(action) is dict:
677 next_hop_self = action.setdefault("next_hop_self", None)
678 route_maps = action.setdefault("route_maps", {})
679
680 if next_hop_self is not None:
681 if next_hop_self is True:
682 config_data.append(
683 "neighbor {} "
684 "next-hop-self".format(neighbor_ip)
685 )
686 elif next_hop_self is False:
687 config_data.append(
688 "no neighbor {} "
689 "next-hop-self".format(neighbor_ip)
690 )
691
692 if route_maps:
693 for route_map in route_maps:
694 name = route_map.setdefault("name", {})
695 direction = route_map.setdefault("direction", "in")
696 del_action = route_map.setdefault("delete", False)
697
698 if not name:
699 logger.info(
700 "Router %s: 'name' "
701 "not present in "
702 "input_dict for BGP "
703 "neighbor route name",
704 router,
705 )
706 else:
707 cmd = "neighbor {} route-map {} " "{}".format(
708 neighbor_ip, name, direction
709 )
710
711 if del_action:
712 cmd = "no {}".format(cmd)
713
714 config_data.append(cmd)
715
716 else:
717 if action == "activate":
718 cmd = "neighbor {} activate".format(neighbor_ip)
719 elif action == "deactivate":
720 cmd = "no neighbor {} activate".format(neighbor_ip)
721
722 config_data.append(cmd)
723
724 if ipv6_neighbor:
725 for neighbor_name, action in ipv4_neighbor.items():
726 neighbor_ip = topo[neighbor]["links"][neighbor_name][
727 "ipv6"
728 ].split("/")[0]
729 if action == "activate":
730 cmd = "neighbor {} activate".format(neighbor_ip)
731 elif action == "deactivate":
732 cmd = "no neighbor {} activate".format(neighbor_ip)
733
734 config_data.append(cmd)
735
736 if advertise_all_vni_data == True:
737 cmd = "advertise-all-vni"
738 config_data.append(cmd)
739 elif advertise_all_vni_data == False:
740 cmd = "no advertise-all-vni"
741 config_data.append(cmd)
742
743 if rd_data:
744 cmd = "rd {}".format(rd_data)
745 config_data.append(cmd)
746
747 if no_rd_data:
748 cmd = "no rd {}".format(no_rd_data)
749 config_data.append(cmd)
750
751 if route_target_data:
752 for rt_type, rt_dict in route_target_data.items():
753 for _rt_dict in rt_dict:
754 rt_value = _rt_dict.setdefault("value", None)
755 del_rt = _rt_dict.setdefault("delete", None)
756
757 if rt_value:
758 cmd = "route-target {} {}".format(rt_type, rt_value)
759 if del_rt:
760 cmd = "no {}".format(cmd)
761
762 config_data.append(cmd)
763
764 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
765
766 return config_data
767
768
769 def __create_bgp_peer_group(topo, input_dict, router):
770 """
771 Helper API to create neighbor specific configuration
772
773 Parameters
774 ----------
775 * `topo` : json file data
776 * `input_dict` : Input dict data, required when configuring from testcase
777 * `router` : router id to be configured
778 """
779 config_data = []
780 logger.debug("Entering lib API: __create_bgp_peer_group()")
781
782 for grp, grp_dict in input_dict.items():
783 config_data.append("neighbor {} peer-group".format(grp))
784 neigh_cxt = "neighbor {} ".format(grp)
785 update_source = grp_dict.setdefault("update-source", None)
786 remote_as = grp_dict.setdefault("remote-as", None)
787 capability = grp_dict.setdefault("capability", None)
788 if update_source:
789 config_data.append("{} update-source {}".format(neigh_cxt, update_source))
790
791 if remote_as:
792 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
793
794 if capability:
795 config_data.append("{} capability {}".format(neigh_cxt, capability))
796
797 logger.debug("Exiting lib API: __create_bgp_peer_group()")
798 return config_data
799
800
801 def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
802 """
803 Helper API to create neighbor specific configuration
804
805 Parameters
806 ----------
807 * `tgen` : Topogen object
808 * `topo` : json file data
809 * `input_dict` : Input dict data, required when configuring from testcase
810 * `router` : router id to be configured
811 """
812 config_data = []
813 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
814 tgen = get_topogen()
815 bgp_data = input_dict["address_family"]
816 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
817 global_connect = input_dict.get("connecttimer", 5)
818
819 for name, peer_dict in neigh_data.items():
820 remote_as = 0
821 for dest_link, peer in peer_dict["dest_link"].items():
822 local_asn = peer.setdefault("local_asn", {})
823 if local_asn:
824 local_as = local_asn.setdefault("local_as", 0)
825 remote_as = local_asn.setdefault("remote_as", 0)
826 no_prepend = local_asn.setdefault("no_prepend", False)
827 replace_as = local_asn.setdefault("replace_as", False)
828 if local_as == remote_as:
829 assert False is True, (
830 " Configuration Error : Router must not have "
831 "same AS-NUMBER as Local AS NUMBER"
832 )
833 nh_details = topo[name]
834
835 if "vrfs" in topo[router] or type(nh_details["bgp"]) is list:
836 for vrf_data in nh_details["bgp"]:
837 if "vrf" in nh_details["links"][dest_link] and "vrf" in vrf_data:
838 if nh_details["links"][dest_link]["vrf"] == vrf_data["vrf"]:
839 if not remote_as:
840 remote_as = vrf_data["local_as"]
841 break
842 else:
843 if "vrf" not in vrf_data:
844 if not remote_as:
845 remote_as = vrf_data["local_as"]
846 break
847 else:
848 if not remote_as:
849 remote_as = nh_details["bgp"]["local_as"]
850
851 update_source = None
852
853 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
854 ip_addr = nh_details["links"][dest_link]["peer-interface"]
855 elif "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
856 intf = topo[name]["links"][dest_link]["interface"]
857 ip_addr = get_frr_ipv6_linklocal(tgen, name, intf)
858 elif dest_link in nh_details["links"].keys():
859 try:
860 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
861 except KeyError:
862 intf = topo[name]["links"][dest_link]["interface"]
863 ip_addr = get_frr_ipv6_linklocal(tgen, name, intf)
864 if "delete" in peer and peer["delete"]:
865 neigh_cxt = "no neighbor {}".format(ip_addr)
866 config_data.append("{}".format(neigh_cxt))
867 return config_data
868 else:
869 neigh_cxt = "neighbor {}".format(ip_addr)
870
871 if "peer-group" in peer:
872 config_data.append(
873 "neighbor {} interface peer-group {}".format(
874 ip_addr, peer["peer-group"]
875 )
876 )
877
878 # Loopback interface
879 if "source_link" in peer:
880 if peer["source_link"] == "lo":
881 update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
882 else:
883 update_source = topo[router]["links"][peer["source_link"]][
884 "interface"
885 ]
886 if "peer-group" not in peer:
887 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
888 config_data.append(
889 "{} interface remote-as {}".format(neigh_cxt, remote_as)
890 )
891 elif add_neigh:
892 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
893
894 if local_asn and local_as:
895 cmd = "{} local-as {}".format(neigh_cxt, local_as)
896 if no_prepend:
897 cmd = "{} no-prepend".format(cmd)
898 if replace_as:
899 cmd = "{} replace-as".format(cmd)
900 config_data.append("{}".format(cmd))
901
902 if addr_type == "ipv6":
903 config_data.append("address-family ipv6 unicast")
904 config_data.append("{} activate".format(neigh_cxt))
905
906 if "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
907 config_data.append(
908 "{} update-source {}".format(
909 neigh_cxt, nh_details["links"][dest_link]["peer-interface"]
910 )
911 )
912 config_data.append(
913 "{} interface {}".format(
914 neigh_cxt, nh_details["links"][dest_link]["peer-interface"]
915 )
916 )
917
918 disable_connected = peer.setdefault("disable_connected_check", False)
919 connect = peer.get("connecttimer", global_connect)
920 keep_alive = peer.setdefault("keepalivetimer", 3)
921 hold_down = peer.setdefault("holddowntimer", 10)
922 password = peer.setdefault("password", None)
923 no_password = peer.setdefault("no_password", None)
924 capability = peer.setdefault("capability", None)
925 max_hop_limit = peer.setdefault("ebgp_multihop", 1)
926
927 graceful_restart = peer.setdefault("graceful-restart", None)
928 graceful_restart_helper = peer.setdefault("graceful-restart-helper", None)
929 graceful_restart_disable = peer.setdefault("graceful-restart-disable", None)
930 if capability:
931 config_data.append("{} capability {}".format(neigh_cxt, capability))
932
933 if update_source:
934 config_data.append(
935 "{} update-source {}".format(neigh_cxt, update_source)
936 )
937 if disable_connected:
938 config_data.append(
939 "{} disable-connected-check".format(disable_connected)
940 )
941 if update_source:
942 config_data.append(
943 "{} update-source {}".format(neigh_cxt, update_source)
944 )
945 if int(keep_alive) != 60 and int(hold_down) != 180:
946 config_data.append(
947 "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
948 )
949 if int(connect) != 120:
950 config_data.append("{} timers connect {}".format(neigh_cxt, connect))
951
952 if graceful_restart:
953 config_data.append("{} graceful-restart".format(neigh_cxt))
954 elif graceful_restart == False:
955 config_data.append("no {} graceful-restart".format(neigh_cxt))
956
957 if graceful_restart_helper:
958 config_data.append("{} graceful-restart-helper".format(neigh_cxt))
959 elif graceful_restart_helper == False:
960 config_data.append("no {} graceful-restart-helper".format(neigh_cxt))
961
962 if graceful_restart_disable:
963 config_data.append("{} graceful-restart-disable".format(neigh_cxt))
964 elif graceful_restart_disable == False:
965 config_data.append("no {} graceful-restart-disable".format(neigh_cxt))
966
967 if password:
968 config_data.append("{} password {}".format(neigh_cxt, password))
969
970 if no_password:
971 config_data.append("no {} password {}".format(neigh_cxt, no_password))
972
973 if max_hop_limit > 1:
974 config_data.append(
975 "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
976 )
977 config_data.append("{} enforce-multihop".format(neigh_cxt))
978
979 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
980 return config_data
981
982
983 def __create_bgp_unicast_address_family(
984 topo, input_dict, router, addr_type, add_neigh=True
985 ):
986 """
987 API prints bgp global config to bgp_json file.
988
989 Parameters
990 ----------
991 * `bgp_cfg` : BGP class variables have BGP config saved in it for
992 particular router,
993 * `local_as_no` : Local as number
994 * `router_id` : Router-id
995 * `ecmp_path` : ECMP max path
996 * `gr_enable` : BGP global gracefull restart config
997 """
998
999 config_data = []
1000 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1001 tgen = get_topogen()
1002 bgp_data = input_dict["address_family"]
1003 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
1004
1005 for peer_name, peer_dict in deepcopy(neigh_data).items():
1006 for dest_link, peer in peer_dict["dest_link"].items():
1007 deactivate = None
1008 activate = None
1009 nh_details = topo[peer_name]
1010 activate_addr_family = peer.setdefault("activate", None)
1011 deactivate_addr_family = peer.setdefault("deactivate", None)
1012 # Loopback interface
1013 if "source_link" in peer and peer["source_link"] == "lo":
1014 for destRouterLink, data in sorted(nh_details["links"].items()):
1015 if "type" in data and data["type"] == "loopback":
1016 if dest_link == destRouterLink:
1017 ip_addr = (
1018 nh_details["links"][destRouterLink][addr_type]
1019 .split("/")[0]
1020 .lower()
1021 )
1022
1023 # Physical interface
1024 else:
1025 # check the neighbor type if un numbered nbr, use interface.
1026 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
1027 ip_addr = nh_details["links"][dest_link]["peer-interface"]
1028 elif "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
1029 intf = topo[peer_name]["links"][dest_link]["interface"]
1030 ip_addr = get_frr_ipv6_linklocal(tgen, peer_name, intf)
1031 elif dest_link in nh_details["links"].keys():
1032 try:
1033 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[
1034 0
1035 ]
1036 except KeyError:
1037 intf = topo[peer_name]["links"][dest_link]["interface"]
1038 ip_addr = get_frr_ipv6_linklocal(tgen, peer_name, intf)
1039 if (
1040 addr_type == "ipv4"
1041 and bgp_data["ipv6"]
1042 and check_address_types("ipv6")
1043 and "ipv6" in nh_details["links"][dest_link]
1044 ):
1045 deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
1046 0
1047 ]
1048
1049 neigh_cxt = "neighbor {}".format(ip_addr)
1050 config_data.append("address-family {} unicast".format(addr_type))
1051
1052 if activate_addr_family is not None:
1053 config_data.append(
1054 "address-family {} unicast".format(activate_addr_family)
1055 )
1056
1057 config_data.append("{} activate".format(neigh_cxt))
1058
1059 if deactivate and activate_addr_family is None:
1060 config_data.append("no neighbor {} activate".format(deactivate))
1061
1062 if deactivate_addr_family is not None:
1063 config_data.append(
1064 "address-family {} unicast".format(deactivate_addr_family)
1065 )
1066 config_data.append("no {} activate".format(neigh_cxt))
1067
1068 next_hop_self = peer.setdefault("next_hop_self", None)
1069 send_community = peer.setdefault("send_community", None)
1070 prefix_lists = peer.setdefault("prefix_lists", {})
1071 route_maps = peer.setdefault("route_maps", {})
1072 no_send_community = peer.setdefault("no_send_community", None)
1073 capability = peer.setdefault("capability", None)
1074 allowas_in = peer.setdefault("allowas-in", None)
1075
1076 # next-hop-self
1077 if next_hop_self is not None:
1078 if next_hop_self is True:
1079 config_data.append("{} next-hop-self".format(neigh_cxt))
1080 else:
1081 config_data.append("no {} next-hop-self".format(neigh_cxt))
1082
1083 # send_community
1084 if send_community:
1085 config_data.append("{} send-community".format(neigh_cxt))
1086
1087 # no_send_community
1088 if no_send_community:
1089 config_data.append(
1090 "no {} send-community {}".format(neigh_cxt, no_send_community)
1091 )
1092
1093 # capability_ext_nh
1094 if capability and addr_type == "ipv6":
1095 config_data.append("address-family ipv4 unicast")
1096 config_data.append("{} activate".format(neigh_cxt))
1097
1098 if "allowas_in" in peer:
1099 allow_as_in = peer["allowas_in"]
1100 config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
1101
1102 if "no_allowas_in" in peer:
1103 allow_as_in = peer["no_allowas_in"]
1104 config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
1105
1106 if "shutdown" in peer:
1107 config_data.append(
1108 "{} {} shutdown".format(
1109 "no" if not peer["shutdown"] else "", neigh_cxt
1110 )
1111 )
1112
1113 if prefix_lists:
1114 for prefix_list in prefix_lists:
1115 name = prefix_list.setdefault("name", {})
1116 direction = prefix_list.setdefault("direction", "in")
1117 del_action = prefix_list.setdefault("delete", False)
1118 if not name:
1119 logger.info(
1120 "Router %s: 'name' not present in "
1121 "input_dict for BGP neighbor prefix lists",
1122 router,
1123 )
1124 else:
1125 cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
1126 if del_action:
1127 cmd = "no {}".format(cmd)
1128 config_data.append(cmd)
1129
1130 if route_maps:
1131 for route_map in route_maps:
1132 name = route_map.setdefault("name", {})
1133 direction = route_map.setdefault("direction", "in")
1134 del_action = route_map.setdefault("delete", False)
1135 if not name:
1136 logger.info(
1137 "Router %s: 'name' not present in "
1138 "input_dict for BGP neighbor route name",
1139 router,
1140 )
1141 else:
1142 cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
1143 if del_action:
1144 cmd = "no {}".format(cmd)
1145 config_data.append(cmd)
1146
1147 if allowas_in:
1148 number_occurences = allowas_in.setdefault("number_occurences", {})
1149 del_action = allowas_in.setdefault("delete", False)
1150
1151 cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
1152
1153 if del_action:
1154 cmd = "no {}".format(cmd)
1155
1156 config_data.append(cmd)
1157
1158 return config_data
1159
1160
1161 def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
1162 """
1163 API will save the current config to router's /etc/frr/ for BGPd
1164 daemon(bgpd.conf file)
1165
1166 Paramters
1167 ---------
1168 * `tgen` : Topogen object
1169 * `topo` : json file data
1170 * `input_dict` : defines for which router, and which config
1171 needs to be modified
1172
1173 Usage:
1174 ------
1175 # Modify graceful-restart config not to set f-bit
1176 # and write to /etc/frr
1177
1178 # Api call to delete advertised networks
1179 input_dict_2 = {
1180 "r5": {
1181 "bgp": {
1182 "address_family": {
1183 "ipv4": {
1184 "unicast": {
1185 "advertise_networks": [
1186 {
1187 "network": "101.0.20.1/32",
1188 "no_of_network": 5,
1189 "delete": True
1190 }
1191 ],
1192 }
1193 },
1194 "ipv6": {
1195 "unicast": {
1196 "advertise_networks": [
1197 {
1198 "network": "5::1/128",
1199 "no_of_network": 5,
1200 "delete": True
1201 }
1202 ],
1203 }
1204 }
1205 }
1206 }
1207 }
1208 }
1209
1210 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
1211
1212 """
1213
1214 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1215 try:
1216 result = create_router_bgp(
1217 tgen, topo, input_dict, build=False, load_config=False
1218 )
1219 if result is not True:
1220 return result
1221
1222 # Copy bgp config file to /etc/frr
1223 for dut in input_dict.keys():
1224 router_list = tgen.routers()
1225 for router, rnode in router_list.items():
1226 if router != dut:
1227 continue
1228
1229 logger.info("Delete BGP config when BGPd is down in {}".format(router))
1230 # Reading the config from "rundir" and copy to /etc/frr/bgpd.conf
1231 cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
1232 tgen.logdir, router, FRRCFG_FILE
1233 )
1234 router_list[router].run(cmd)
1235
1236 except Exception as e:
1237 errormsg = traceback.format_exc()
1238 logger.error(errormsg)
1239 return errormsg
1240
1241 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1242 return True
1243
1244
1245 #############################################
1246 # Verification APIs
1247 #############################################
1248 @retry(retry_timeout=8)
1249 def verify_router_id(tgen, topo, input_dict, expected=True):
1250 """
1251 Running command "show ip bgp json" for DUT and reading router-id
1252 from input_dict and verifying with command output.
1253 1. Statically modfified router-id should take place
1254 2. When static router-id is deleted highest loopback should
1255 become router-id
1256 3. When loopback intf is down then highest physcial intf
1257 should become router-id
1258
1259 Parameters
1260 ----------
1261 * `tgen`: topogen object
1262 * `topo`: input json file data
1263 * `input_dict`: input dictionary, have details of Device Under Test, for
1264 which user wants to test the data
1265 * `expected` : expected results from API, by-default True
1266
1267 Usage
1268 -----
1269 # Verify if router-id for r1 is 12.12.12.12
1270 input_dict = {
1271 "r1":{
1272 "router_id": "12.12.12.12"
1273 }
1274 # Verify that router-id for r1 is highest interface ip
1275 input_dict = {
1276 "routers": ["r1"]
1277 }
1278 result = verify_router_id(tgen, topo, input_dict)
1279
1280 Returns
1281 -------
1282 errormsg(str) or True
1283 """
1284
1285 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1286 for router in input_dict.keys():
1287 if router not in tgen.routers():
1288 continue
1289
1290 rnode = tgen.routers()[router]
1291
1292 del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
1293
1294 logger.info("Checking router %s router-id", router)
1295 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1296 router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
1297 router_id_out = ipaddress.IPv4Address(frr_unicode(router_id_out))
1298
1299 # Once router-id is deleted, highest interface ip should become
1300 # router-id
1301 if del_router_id:
1302 router_id = find_interface_with_greater_ip(topo, router)
1303 else:
1304 router_id = input_dict[router]["bgp"]["router_id"]
1305 router_id = ipaddress.IPv4Address(frr_unicode(router_id))
1306
1307 if router_id == router_id_out:
1308 logger.info("Found expected router-id %s for router %s", router_id, router)
1309 else:
1310 errormsg = (
1311 "Router-id for router:{} mismatch, expected:"
1312 " {} but found:{}".format(router, router_id, router_id_out)
1313 )
1314 return errormsg
1315
1316 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1317 return True
1318
1319
1320 @retry(retry_timeout=150)
1321 def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True):
1322 """
1323 API will verify if BGP is converged with in the given time frame.
1324 Running "show bgp summary json" command and verify bgp neighbor
1325 state is established,
1326
1327 Parameters
1328 ----------
1329 * `tgen`: topogen object
1330 * `topo`: input json file data
1331 * `dut`: device under test
1332
1333 Usage
1334 -----
1335 # To veriry is BGP is converged for all the routers used in
1336 topology
1337 results = verify_bgp_convergence(tgen, topo, dut="r1")
1338
1339 Returns
1340 -------
1341 errormsg(str) or True
1342 """
1343
1344 if topo is None:
1345 topo = tgen.json_topo
1346
1347 result = False
1348 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1349 tgen = get_topogen()
1350 for router, rnode in tgen.routers().items():
1351 if "bgp" not in topo["routers"][router]:
1352 continue
1353
1354 if dut is not None and dut != router:
1355 continue
1356
1357 logger.info("Verifying BGP Convergence on router %s:", router)
1358 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1359 # Verifying output dictionary show_bgp_json is empty or not
1360 if not bool(show_bgp_json):
1361 errormsg = "BGP is not running"
1362 return errormsg
1363
1364 # To find neighbor ip type
1365 bgp_data_list = topo["routers"][router]["bgp"]
1366
1367 if type(bgp_data_list) is not list:
1368 bgp_data_list = [bgp_data_list]
1369
1370 for bgp_data in bgp_data_list:
1371 if "vrf" in bgp_data:
1372 vrf = bgp_data["vrf"]
1373 if vrf is None:
1374 vrf = "default"
1375 else:
1376 vrf = "default"
1377
1378 # To find neighbor ip type
1379 bgp_addr_type = bgp_data["address_family"]
1380 if "l2vpn" in bgp_addr_type:
1381 total_evpn_peer = 0
1382
1383 if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
1384 continue
1385
1386 bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
1387 total_evpn_peer += len(bgp_neighbors)
1388
1389 no_of_evpn_peer = 0
1390 for bgp_neighbor, peer_data in bgp_neighbors.items():
1391 for _addr_type, dest_link_dict in peer_data.items():
1392 data = topo["routers"][bgp_neighbor]["links"]
1393 for dest_link in dest_link_dict.keys():
1394 if dest_link in data:
1395 peer_details = peer_data[_addr_type][dest_link]
1396
1397 neighbor_ip = data[dest_link][_addr_type].split("/")[0]
1398 nh_state = None
1399
1400 if (
1401 "ipv4Unicast" in show_bgp_json[vrf]
1402 or "ipv6Unicast" in show_bgp_json[vrf]
1403 ):
1404 errormsg = (
1405 "[DUT: %s] VRF: %s, "
1406 "ipv4Unicast/ipv6Unicast"
1407 " address-family present"
1408 " under l2vpn" % (router, vrf)
1409 )
1410 return errormsg
1411
1412 l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
1413 "peers"
1414 ]
1415 nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
1416
1417 if nh_state == "Established":
1418 no_of_evpn_peer += 1
1419
1420 if no_of_evpn_peer == total_evpn_peer:
1421 logger.info(
1422 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1423 router,
1424 vrf,
1425 )
1426 result = True
1427 else:
1428 errormsg = (
1429 "[DUT: %s] VRF: %s, BGP is not converged "
1430 "for evpn peers" % (router, vrf)
1431 )
1432 return errormsg
1433 else:
1434 total_peer = 0
1435 for addr_type in bgp_addr_type.keys():
1436 if not check_address_types(addr_type):
1437 continue
1438
1439 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1440
1441 for bgp_neighbor in bgp_neighbors:
1442 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1443
1444 no_of_peer = 0
1445 for addr_type in bgp_addr_type.keys():
1446 if not check_address_types(addr_type):
1447 continue
1448 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1449
1450 for bgp_neighbor, peer_data in bgp_neighbors.items():
1451 for dest_link in peer_data["dest_link"].keys():
1452 data = topo["routers"][bgp_neighbor]["links"]
1453 if dest_link in data:
1454 peer_details = peer_data["dest_link"][dest_link]
1455 # for link local neighbors
1456 if (
1457 "neighbor_type" in peer_details
1458 and peer_details["neighbor_type"] == "link-local"
1459 ):
1460 intf = topo["routers"][bgp_neighbor]["links"][
1461 dest_link
1462 ]["interface"]
1463 neighbor_ip = get_frr_ipv6_linklocal(
1464 tgen, bgp_neighbor, intf
1465 )
1466 elif "source_link" in peer_details:
1467 neighbor_ip = topo["routers"][bgp_neighbor][
1468 "links"
1469 ][peer_details["source_link"]][addr_type].split(
1470 "/"
1471 )[
1472 0
1473 ]
1474 elif (
1475 "neighbor_type" in peer_details
1476 and peer_details["neighbor_type"] == "unnumbered"
1477 ):
1478 neighbor_ip = data[dest_link]["peer-interface"]
1479 else:
1480 neighbor_ip = data[dest_link][addr_type].split("/")[
1481 0
1482 ]
1483 nh_state = None
1484 neighbor_ip = neighbor_ip.lower()
1485 if addr_type == "ipv4":
1486 ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
1487 "peers"
1488 ]
1489 nh_state = ipv4_data[neighbor_ip]["state"]
1490 else:
1491 ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
1492 "peers"
1493 ]
1494 if neighbor_ip in ipv6_data:
1495 nh_state = ipv6_data[neighbor_ip]["state"]
1496
1497 if nh_state == "Established":
1498 no_of_peer += 1
1499
1500 if no_of_peer == total_peer and no_of_peer > 0:
1501 logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
1502 result = True
1503 else:
1504 errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
1505 return errormsg
1506
1507 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1508 return result
1509
1510
1511 @retry(retry_timeout=16)
1512 def verify_bgp_community(
1513 tgen,
1514 addr_type,
1515 router,
1516 network,
1517 input_dict=None,
1518 vrf=None,
1519 bestpath=False,
1520 expected=True,
1521 ):
1522 """
1523 API to veiryf BGP large community is attached in route for any given
1524 DUT by running "show bgp ipv4/6 {route address} json" command.
1525
1526 Parameters
1527 ----------
1528 * `tgen`: topogen object
1529 * `addr_type` : ip type, ipv4/ipv6
1530 * `dut`: Device Under Test
1531 * `network`: network for which set criteria needs to be verified
1532 * `input_dict`: having details like - for which router, community and
1533 values needs to be verified
1534 * `vrf`: VRF name
1535 * `bestpath`: To check best path cli
1536 * `expected` : expected results from API, by-default True
1537
1538 Usage
1539 -----
1540 networks = ["200.50.2.0/32"]
1541 input_dict = {
1542 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1543 }
1544 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1545
1546 Returns
1547 -------
1548 errormsg(str) or True
1549 """
1550
1551 logger.debug("Entering lib API: verify_bgp_community()")
1552 if router not in tgen.routers():
1553 return False
1554
1555 rnode = tgen.routers()[router]
1556
1557 logger.info(
1558 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1559 router,
1560 addr_type,
1561 network,
1562 )
1563
1564 command = "show bgp"
1565
1566 for net in network:
1567 if vrf:
1568 cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
1569 elif bestpath:
1570 cmd = "{} {} {} bestpath json".format(command, addr_type, net)
1571 else:
1572 cmd = "{} {} {} json".format(command, addr_type, net)
1573
1574 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
1575 if "paths" not in show_bgp_json:
1576 return "Prefix {} not found in BGP table of router: {}".format(net, router)
1577
1578 as_paths = show_bgp_json["paths"]
1579 found = False
1580 for i in range(len(as_paths)):
1581 if (
1582 "largeCommunity" in show_bgp_json["paths"][i]
1583 or "community" in show_bgp_json["paths"][i]
1584 ):
1585 found = True
1586 logger.info(
1587 "Large Community attribute is found for route:" " %s in router: %s",
1588 net,
1589 router,
1590 )
1591 if input_dict is not None:
1592 for criteria, comm_val in input_dict.items():
1593 show_val = show_bgp_json["paths"][i][criteria]["string"]
1594 if comm_val == show_val:
1595 logger.info(
1596 "Verifying BGP %s for prefix: %s"
1597 " in router: %s, found expected"
1598 " value: %s",
1599 criteria,
1600 net,
1601 router,
1602 comm_val,
1603 )
1604 else:
1605 errormsg = (
1606 "Failed: Verifying BGP attribute"
1607 " {} for route: {} in router: {}"
1608 ", expected value: {} but found"
1609 ": {}".format(criteria, net, router, comm_val, show_val)
1610 )
1611 return errormsg
1612
1613 if not found:
1614 errormsg = (
1615 "Large Community attribute is not found for route: "
1616 "{} in router: {} ".format(net, router)
1617 )
1618 return errormsg
1619
1620 logger.debug("Exiting lib API: verify_bgp_community()")
1621 return True
1622
1623
1624 def modify_as_number(tgen, topo, input_dict):
1625 """
1626 API reads local_as and remote_as from user defined input_dict and
1627 modify router"s ASNs accordingly. Router"s config is modified and
1628 recent/changed config is loadeded to router.
1629
1630 Parameters
1631 ----------
1632 * `tgen` : Topogen object
1633 * `topo` : json file data
1634 * `input_dict` : defines for which router ASNs needs to be modified
1635
1636 Usage
1637 -----
1638 To modify ASNs for router r1
1639 input_dict = {
1640 "r1": {
1641 "bgp": {
1642 "local_as": 131079
1643 }
1644 }
1645 result = modify_as_number(tgen, topo, input_dict)
1646
1647 Returns
1648 -------
1649 errormsg(str) or True
1650 """
1651
1652 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1653 try:
1654
1655 new_topo = deepcopy(topo["routers"])
1656 router_dict = {}
1657 for router in input_dict.keys():
1658 # Remove bgp configuration
1659
1660 router_dict.update({router: {"bgp": {"delete": True}}})
1661 try:
1662 new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"][
1663 "local_as"
1664 ]
1665 except TypeError:
1666 new_topo[router]["bgp"][0]["local_as"] = input_dict[router]["bgp"][
1667 "local_as"
1668 ]
1669 logger.info("Removing bgp configuration")
1670 create_router_bgp(tgen, topo, router_dict)
1671
1672 logger.info("Applying modified bgp configuration")
1673 result = create_router_bgp(tgen, new_topo)
1674 if result is not True:
1675 result = "Error applying new AS number config"
1676 except Exception as e:
1677 errormsg = traceback.format_exc()
1678 logger.error(errormsg)
1679 return errormsg
1680
1681 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1682 return result
1683
1684
1685 @retry(retry_timeout=8)
1686 def verify_as_numbers(tgen, topo, input_dict, expected=True):
1687 """
1688 This API is to verify AS numbers for given DUT by running
1689 "show ip bgp neighbor json" command. Local AS and Remote AS
1690 will ve verified with input_dict data and command output.
1691
1692 Parameters
1693 ----------
1694 * `tgen`: topogen object
1695 * `topo`: input json file data
1696 * `addr_type` : ip type, ipv4/ipv6
1697 * `input_dict`: defines - for which router, AS numbers needs to be verified
1698 * `expected` : expected results from API, by-default True
1699
1700 Usage
1701 -----
1702 input_dict = {
1703 "r1": {
1704 "bgp": {
1705 "local_as": 131079
1706 }
1707 }
1708 }
1709 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1710
1711 Returns
1712 -------
1713 errormsg(str) or True
1714 """
1715
1716 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1717 for router in input_dict.keys():
1718 if router not in tgen.routers():
1719 continue
1720
1721 rnode = tgen.routers()[router]
1722
1723 logger.info("Verifying AS numbers for dut %s:", router)
1724
1725 show_ip_bgp_neighbor_json = run_frr_cmd(
1726 rnode, "show ip bgp neighbor json", isjson=True
1727 )
1728 local_as = input_dict[router]["bgp"]["local_as"]
1729 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1730
1731 for addr_type in bgp_addr_type:
1732 if not check_address_types(addr_type):
1733 continue
1734
1735 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1736
1737 for bgp_neighbor, peer_data in bgp_neighbors.items():
1738 remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
1739 for dest_link, peer_dict in peer_data["dest_link"].items():
1740 neighbor_ip = None
1741 data = topo["routers"][bgp_neighbor]["links"]
1742
1743 if dest_link in data:
1744 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1745 neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
1746 # Verify Local AS for router
1747 if neigh_data["localAs"] != local_as:
1748 errormsg = (
1749 "Failed: Verify local_as for dut {},"
1750 " found: {} but expected: {}".format(
1751 router, neigh_data["localAs"], local_as
1752 )
1753 )
1754 return errormsg
1755 else:
1756 logger.info(
1757 "Verified local_as for dut %s, found" " expected: %s",
1758 router,
1759 local_as,
1760 )
1761
1762 # Verify Remote AS for neighbor
1763 if neigh_data["remoteAs"] != remote_as:
1764 errormsg = (
1765 "Failed: Verify remote_as for dut "
1766 "{}'s neighbor {}, found: {} but "
1767 "expected: {}".format(
1768 router, bgp_neighbor, neigh_data["remoteAs"], remote_as
1769 )
1770 )
1771 return errormsg
1772 else:
1773 logger.info(
1774 "Verified remote_as for dut %s's "
1775 "neighbor %s, found expected: %s",
1776 router,
1777 bgp_neighbor,
1778 remote_as,
1779 )
1780
1781 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1782 return True
1783
1784
1785 @retry(retry_timeout=150)
1786 def verify_bgp_convergence_from_running_config(tgen, dut=None, expected=True):
1787 """
1788 API to verify BGP convergence b/w loopback and physical interface.
1789 This API would be used when routers have BGP neighborship is loopback
1790 to physical or vice-versa
1791
1792 Parameters
1793 ----------
1794 * `tgen`: topogen object
1795 * `dut`: device under test
1796 * `expected` : expected results from API, by-default True
1797
1798 Usage
1799 -----
1800 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1801 dut="r1")
1802
1803 Returns
1804 -------
1805 errormsg(str) or True
1806 """
1807
1808 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1809
1810 for router, rnode in tgen.routers().items():
1811 if dut is not None and dut != router:
1812 continue
1813
1814 logger.info("Verifying BGP Convergence on router %s:", router)
1815 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1816 # Verifying output dictionary show_bgp_json is empty or not
1817 if not bool(show_bgp_json):
1818 errormsg = "BGP is not running"
1819 return errormsg
1820
1821 for vrf, addr_family_data in show_bgp_json.items():
1822 for address_family, neighborship_data in addr_family_data.items():
1823 total_peer = 0
1824 no_of_peer = 0
1825
1826 total_peer = len(neighborship_data["peers"].keys())
1827
1828 for peer, peer_data in neighborship_data["peers"].items():
1829 if peer_data["state"] == "Established":
1830 no_of_peer += 1
1831
1832 if total_peer != no_of_peer:
1833 errormsg = (
1834 "[DUT: %s] VRF: %s, BGP is not converged"
1835 " for peer: %s" % (router, vrf, peer)
1836 )
1837 return errormsg
1838
1839 logger.info("[DUT: %s]: vrf: %s, BGP is Converged", router, vrf)
1840
1841 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1842
1843 return True
1844
1845
1846 def clear_bgp(tgen, addr_type, router, vrf=None, neighbor=None):
1847 """
1848 This API is to clear bgp neighborship by running
1849 clear ip bgp */clear bgp ipv6 * command,
1850
1851 Parameters
1852 ----------
1853 * `tgen`: topogen object
1854 * `addr_type`: ip type ipv4/ipv6
1855 * `router`: device under test
1856 * `vrf`: vrf name
1857 * `neighbor`: Neighbor for which bgp needs to be cleared
1858
1859 Usage
1860 -----
1861 clear_bgp(tgen, addr_type, "r1")
1862 """
1863
1864 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1865
1866 if router not in tgen.routers():
1867 return False
1868
1869 rnode = tgen.routers()[router]
1870
1871 if vrf:
1872 if type(vrf) is not list:
1873 vrf = [vrf]
1874
1875 # Clearing BGP
1876 logger.info("Clearing BGP neighborship for router %s..", router)
1877 if addr_type == "ipv4":
1878 if vrf:
1879 for _vrf in vrf:
1880 run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
1881 elif neighbor:
1882 run_frr_cmd(rnode, "clear bgp ipv4 {}".format(neighbor))
1883 else:
1884 run_frr_cmd(rnode, "clear ip bgp *")
1885 elif addr_type == "ipv6":
1886 if vrf:
1887 for _vrf in vrf:
1888 run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
1889 elif neighbor:
1890 run_frr_cmd(rnode, "clear bgp ipv6 {}".format(neighbor))
1891 else:
1892 run_frr_cmd(rnode, "clear bgp ipv6 *")
1893 else:
1894 run_frr_cmd(rnode, "clear bgp *")
1895
1896 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1897
1898
1899 def clear_bgp_and_verify(tgen, topo, router, rid=None):
1900 """
1901 This API is to clear bgp neighborship and verify bgp neighborship
1902 is coming up(BGP is converged) usinf "show bgp summary json" command
1903 and also verifying for all bgp neighbors uptime before and after
1904 clear bgp sessions is different as the uptime must be changed once
1905 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1906
1907 Parameters
1908 ----------
1909 * `tgen`: topogen object
1910 * `topo`: input json file data
1911 * `router`: device under test
1912
1913 Usage
1914 -----
1915 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1916
1917 Returns
1918 -------
1919 errormsg(str) or True
1920 """
1921
1922 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1923
1924 if router not in tgen.routers():
1925 return False
1926
1927 rnode = tgen.routers()[router]
1928
1929 peer_uptime_before_clear_bgp = {}
1930 sleeptime = 3
1931
1932 # Verifying BGP convergence before bgp clear command
1933 for retry in range(50):
1934 # Waiting for BGP to converge
1935 logger.info(
1936 "Waiting for %s sec for BGP to converge on router" " %s...",
1937 sleeptime,
1938 router,
1939 )
1940 sleep(sleeptime)
1941
1942 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1943 # Verifying output dictionary show_bgp_json is empty or not
1944 if not bool(show_bgp_json):
1945 errormsg = "BGP is not running"
1946 return errormsg
1947
1948 # To find neighbor ip type
1949 try:
1950 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1951 except TypeError:
1952 bgp_addr_type = topo["routers"][router]["bgp"][0]["address_family"]
1953
1954 total_peer = 0
1955 for addr_type in bgp_addr_type.keys():
1956
1957 if not check_address_types(addr_type):
1958 continue
1959
1960 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1961
1962 for bgp_neighbor in bgp_neighbors:
1963 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1964
1965 no_of_peer = 0
1966 for addr_type in bgp_addr_type:
1967 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1968
1969 for bgp_neighbor, peer_data in bgp_neighbors.items():
1970 for dest_link, peer_dict in peer_data["dest_link"].items():
1971 data = topo["routers"][bgp_neighbor]["links"]
1972
1973 if dest_link in data:
1974 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1975 if addr_type == "ipv4":
1976 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1977 nh_state = ipv4_data[neighbor_ip]["state"]
1978
1979 # Peer up time dictionary
1980 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
1981 neighbor_ip
1982 ]["peerUptimeEstablishedEpoch"]
1983 else:
1984 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1985 nh_state = ipv6_data[neighbor_ip]["state"]
1986
1987 # Peer up time dictionary
1988 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
1989 neighbor_ip
1990 ]["peerUptimeEstablishedEpoch"]
1991
1992 if nh_state == "Established":
1993 no_of_peer += 1
1994
1995 if no_of_peer == total_peer:
1996 logger.info("BGP is Converged for router %s before bgp" " clear", router)
1997 break
1998 else:
1999 logger.info(
2000 "BGP is not yet Converged for router %s " "before bgp clear", router
2001 )
2002 else:
2003 errormsg = (
2004 "TIMEOUT!! BGP is not converged in {} seconds for"
2005 " router {}".format(retry * sleeptime, router)
2006 )
2007 return errormsg
2008
2009 # Clearing BGP
2010 logger.info("Clearing BGP neighborship for router %s..", router)
2011 for addr_type in bgp_addr_type.keys():
2012 if addr_type == "ipv4":
2013 if rid:
2014 run_frr_cmd(rnode, "clear bgp ipv4 {}".format(rid))
2015 else:
2016 run_frr_cmd(rnode, "clear bgp ipv4 *")
2017 elif addr_type == "ipv6":
2018 if rid:
2019 run_frr_cmd(rnode, "clear bgp ipv6 {}".format(rid))
2020 else:
2021 run_frr_cmd(rnode, "clear bgp ipv6 *")
2022 peer_uptime_after_clear_bgp = {}
2023 # Verifying BGP convergence after bgp clear command
2024 for retry in range(50):
2025
2026 # Waiting for BGP to converge
2027 logger.info(
2028 "Waiting for %s sec for BGP to converge on router" " %s...",
2029 sleeptime,
2030 router,
2031 )
2032 sleep(sleeptime)
2033
2034 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
2035 # Verifying output dictionary show_bgp_json is empty or not
2036 if not bool(show_bgp_json):
2037 errormsg = "BGP is not running"
2038 return errormsg
2039
2040 # To find neighbor ip type
2041 try:
2042 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
2043 except TypeError:
2044 bgp_addr_type = topo["routers"][router]["bgp"][0]["address_family"]
2045
2046 total_peer = 0
2047 for addr_type in bgp_addr_type.keys():
2048 if not check_address_types(addr_type):
2049 continue
2050
2051 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2052
2053 for bgp_neighbor in bgp_neighbors:
2054 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
2055
2056 no_of_peer = 0
2057 for addr_type in bgp_addr_type:
2058 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2059
2060 for bgp_neighbor, peer_data in bgp_neighbors.items():
2061 for dest_link, peer_dict in peer_data["dest_link"].items():
2062 data = topo["routers"][bgp_neighbor]["links"]
2063
2064 if dest_link in data:
2065 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2066 if addr_type == "ipv4":
2067 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2068 nh_state = ipv4_data[neighbor_ip]["state"]
2069 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
2070 neighbor_ip
2071 ]["peerUptimeEstablishedEpoch"]
2072 else:
2073 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2074 nh_state = ipv6_data[neighbor_ip]["state"]
2075 # Peer up time dictionary
2076 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
2077 neighbor_ip
2078 ]["peerUptimeEstablishedEpoch"]
2079
2080 if nh_state == "Established":
2081 no_of_peer += 1
2082
2083 if no_of_peer == total_peer:
2084 logger.info("BGP is Converged for router %s after bgp clear", router)
2085 break
2086 else:
2087 logger.info(
2088 "BGP is not yet Converged for router %s after" " bgp clear", router
2089 )
2090 else:
2091 errormsg = (
2092 "TIMEOUT!! BGP is not converged in {} seconds for"
2093 " router {}".format(retry * sleeptime, router)
2094 )
2095 return errormsg
2096
2097 # Comparing peerUptimeEstablishedEpoch dictionaries
2098 if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
2099 logger.info("BGP neighborship is reset after clear BGP on router %s", router)
2100 else:
2101 errormsg = (
2102 "BGP neighborship is not reset after clear bgp on router"
2103 " {}".format(router)
2104 )
2105 return errormsg
2106
2107 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2108 return True
2109
2110
2111 def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
2112 """
2113 To verify BGP timer config, execute "show ip bgp neighbor json" command
2114 and verify bgp timers with input_dict data.
2115 To veirfy bgp timers functonality, shutting down peer interface
2116 and verify BGP neighborship status.
2117
2118 Parameters
2119 ----------
2120 * `tgen`: topogen object
2121 * `topo`: input json file data
2122 * `addr_type`: ip type, ipv4/ipv6
2123 * `input_dict`: defines for which router, bgp timers needs to be verified
2124
2125 Usage:
2126 # To verify BGP timers for neighbor r2 of router r1
2127 input_dict = {
2128 "r1": {
2129 "bgp": {
2130 "bgp_neighbors":{
2131 "r2":{
2132 "keepalivetimer": 5,
2133 "holddowntimer": 15,
2134 }}}}}
2135 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
2136 input_dict)
2137
2138 Returns
2139 -------
2140 errormsg(str) or True
2141 """
2142
2143 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2144 sleep(5)
2145 router_list = tgen.routers()
2146 for router in input_dict.keys():
2147 if router not in router_list:
2148 continue
2149
2150 rnode = router_list[router]
2151
2152 logger.info("Verifying bgp timers functionality, DUT is %s:", router)
2153
2154 show_ip_bgp_neighbor_json = run_frr_cmd(
2155 rnode, "show ip bgp neighbor json", isjson=True
2156 )
2157
2158 bgp_addr_type = input_dict[router]["bgp"]["address_family"]
2159
2160 for addr_type in bgp_addr_type:
2161 if not check_address_types(addr_type):
2162 continue
2163
2164 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2165 for bgp_neighbor, peer_data in bgp_neighbors.items():
2166 for dest_link, peer_dict in peer_data["dest_link"].items():
2167 data = topo["routers"][bgp_neighbor]["links"]
2168
2169 keepalivetimer = peer_dict["keepalivetimer"]
2170 holddowntimer = peer_dict["holddowntimer"]
2171
2172 if dest_link in data:
2173 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2174 neighbor_intf = data[dest_link]["interface"]
2175
2176 # Verify HoldDownTimer for neighbor
2177 bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
2178 "bgpTimerHoldTimeMsecs"
2179 ]
2180 if bgpHoldTimeMsecs != holddowntimer * 1000:
2181 errormsg = (
2182 "Verifying holddowntimer for bgp "
2183 "neighbor {} under dut {}, found: {} "
2184 "but expected: {}".format(
2185 neighbor_ip,
2186 router,
2187 bgpHoldTimeMsecs,
2188 holddowntimer * 1000,
2189 )
2190 )
2191 return errormsg
2192
2193 # Verify KeepAliveTimer for neighbor
2194 bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
2195 "bgpTimerKeepAliveIntervalMsecs"
2196 ]
2197 if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
2198 errormsg = (
2199 "Verifying keepalivetimer for bgp "
2200 "neighbor {} under dut {}, found: {} "
2201 "but expected: {}".format(
2202 neighbor_ip,
2203 router,
2204 bgpKeepAliveTimeMsecs,
2205 keepalivetimer * 1000,
2206 )
2207 )
2208 return errormsg
2209
2210 ####################
2211 # Shutting down peer interface after keepalive time and
2212 # after some time bringing up peer interface.
2213 # verifying BGP neighborship in (hold down-keep alive)
2214 # time, it should not go down
2215 ####################
2216
2217 # Wait till keep alive time
2218 logger.info("=" * 20)
2219 logger.info("Scenario 1:")
2220 logger.info(
2221 "Shutdown and bring up peer interface: %s "
2222 "in keep alive time : %s sec and verify "
2223 " BGP neighborship is intact in %s sec ",
2224 neighbor_intf,
2225 keepalivetimer,
2226 (holddowntimer - keepalivetimer),
2227 )
2228 logger.info("=" * 20)
2229 logger.info("Waiting for %s sec..", keepalivetimer)
2230 sleep(keepalivetimer)
2231
2232 # Shutting down peer ineterface
2233 logger.info(
2234 "Shutting down interface %s on router %s",
2235 neighbor_intf,
2236 bgp_neighbor,
2237 )
2238 topotest.interface_set_status(
2239 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2240 )
2241
2242 # Bringing up peer interface
2243 sleep(5)
2244 logger.info(
2245 "Bringing up interface %s on router %s..",
2246 neighbor_intf,
2247 bgp_neighbor,
2248 )
2249 topotest.interface_set_status(
2250 router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
2251 )
2252
2253 # Verifying BGP neighborship is intact in
2254 # (holddown - keepalive) time
2255 for timer in range(
2256 keepalivetimer, holddowntimer, int(holddowntimer / 3)
2257 ):
2258 logger.info("Waiting for %s sec..", keepalivetimer)
2259 sleep(keepalivetimer)
2260 sleep(2)
2261 show_bgp_json = run_frr_cmd(
2262 rnode, "show bgp summary json", isjson=True
2263 )
2264
2265 if addr_type == "ipv4":
2266 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2267 nh_state = ipv4_data[neighbor_ip]["state"]
2268 else:
2269 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2270 nh_state = ipv6_data[neighbor_ip]["state"]
2271
2272 if timer == (holddowntimer - keepalivetimer):
2273 if nh_state != "Established":
2274 errormsg = (
2275 "BGP neighborship has not gone "
2276 "down in {} sec for neighbor {}".format(
2277 timer, bgp_neighbor
2278 )
2279 )
2280 return errormsg
2281 else:
2282 logger.info(
2283 "BGP neighborship is intact in %s"
2284 " sec for neighbor %s",
2285 timer,
2286 bgp_neighbor,
2287 )
2288
2289 ####################
2290 # Shutting down peer interface and verifying that BGP
2291 # neighborship is going down in holddown time
2292 ####################
2293 logger.info("=" * 20)
2294 logger.info("Scenario 2:")
2295 logger.info(
2296 "Shutdown peer interface: %s and verify BGP"
2297 " neighborship has gone down in hold down "
2298 "time %s sec",
2299 neighbor_intf,
2300 holddowntimer,
2301 )
2302 logger.info("=" * 20)
2303
2304 logger.info(
2305 "Shutting down interface %s on router %s..",
2306 neighbor_intf,
2307 bgp_neighbor,
2308 )
2309 topotest.interface_set_status(
2310 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2311 )
2312
2313 # Verifying BGP neighborship is going down in holddown time
2314 for timer in range(
2315 keepalivetimer,
2316 (holddowntimer + keepalivetimer),
2317 int(holddowntimer / 3),
2318 ):
2319 logger.info("Waiting for %s sec..", keepalivetimer)
2320 sleep(keepalivetimer)
2321 sleep(2)
2322 show_bgp_json = run_frr_cmd(
2323 rnode, "show bgp summary json", isjson=True
2324 )
2325
2326 if addr_type == "ipv4":
2327 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2328 nh_state = ipv4_data[neighbor_ip]["state"]
2329 else:
2330 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2331 nh_state = ipv6_data[neighbor_ip]["state"]
2332
2333 if timer == holddowntimer:
2334 if nh_state == "Established":
2335 errormsg = (
2336 "BGP neighborship has not gone "
2337 "down in {} sec for neighbor {}".format(
2338 timer, bgp_neighbor
2339 )
2340 )
2341 return errormsg
2342 else:
2343 logger.info(
2344 "BGP neighborship has gone down in"
2345 " %s sec for neighbor %s",
2346 timer,
2347 bgp_neighbor,
2348 )
2349
2350 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2351 return True
2352
2353
2354 @retry(retry_timeout=16)
2355 def verify_bgp_attributes(
2356 tgen,
2357 addr_type,
2358 dut,
2359 static_routes,
2360 rmap_name=None,
2361 input_dict=None,
2362 seq_id=None,
2363 vrf=None,
2364 nexthop=None,
2365 expected=True,
2366 ):
2367 """
2368 API will verify BGP attributes set by Route-map for given prefix and
2369 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2370 in DUT to verify BGP attributes set by route-map, Set attributes
2371 values will be read from input_dict and verified with command output.
2372
2373 * `tgen`: topogen object
2374 * `addr_type` : ip type, ipv4/ipv6
2375 * `dut`: Device Under Test
2376 * `static_routes`: Static Routes for which BGP set attributes needs to be
2377 verified
2378 * `rmap_name`: route map name for which set criteria needs to be verified
2379 * `input_dict`: defines for which router, AS numbers needs
2380 * `seq_id`: sequence number of rmap, default is None
2381 * `expected` : expected results from API, by-default True
2382
2383 Usage
2384 -----
2385 # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2386 for prefix 10.0.20.1/32 in router r3.
2387 input_dict = {
2388 "r3": {
2389 "route_maps": {
2390 "rmap_match_pf_list1": [
2391 {
2392 "action": "PERMIT",
2393 "match": {"prefix_list": "pf_list_1"},
2394 "set": {"localpref": 150, "med": 30}
2395 }
2396 ],
2397 },
2398 "as_path": "500 400"
2399 }
2400 }
2401 static_routes (list) = ["10.0.20.1/32"]
2402
2403
2404
2405 Returns
2406 -------
2407 errormsg(str) or True
2408 """
2409
2410 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2411 for router, rnode in tgen.routers().items():
2412 if router != dut:
2413 continue
2414
2415 logger.info("Verifying BGP set attributes for dut {}:".format(router))
2416
2417 for static_route in static_routes:
2418 if vrf:
2419 cmd = "show bgp vrf {} {} {} json".format(vrf, addr_type, static_route)
2420 else:
2421 cmd = "show bgp {} {} json".format(addr_type, static_route)
2422 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2423
2424 dict_to_test = []
2425 tmp_list = []
2426
2427 dict_list = list(input_dict.values())[0]
2428
2429 if "route_maps" in dict_list:
2430 for rmap_router in input_dict.keys():
2431 for rmap, values in input_dict[rmap_router]["route_maps"].items():
2432 if rmap == rmap_name:
2433 dict_to_test = values
2434 for rmap_dict in values:
2435 if seq_id is not None:
2436 if type(seq_id) is not list:
2437 seq_id = [seq_id]
2438
2439 if "seq_id" in rmap_dict:
2440 rmap_seq_id = rmap_dict["seq_id"]
2441 for _seq_id in seq_id:
2442 if _seq_id == rmap_seq_id:
2443 tmp_list.append(rmap_dict)
2444 if tmp_list:
2445 dict_to_test = tmp_list
2446
2447 value = None
2448 for rmap_dict in dict_to_test:
2449 if "set" in rmap_dict:
2450 for criteria in rmap_dict["set"].keys():
2451 found = False
2452 for path in show_bgp_json["paths"]:
2453 if criteria not in path:
2454 continue
2455
2456 if criteria == "aspath":
2457 value = path[criteria]["string"]
2458 else:
2459 value = path[criteria]
2460
2461 if rmap_dict["set"][criteria] == value:
2462 found = True
2463 logger.info(
2464 "Verifying BGP "
2465 "attribute {} for"
2466 " route: {} in "
2467 "router: {}, found"
2468 " expected value:"
2469 " {}".format(
2470 criteria,
2471 static_route,
2472 dut,
2473 value,
2474 )
2475 )
2476 break
2477
2478 if not found:
2479 errormsg = (
2480 "Failed: Verifying BGP "
2481 "attribute {} for route:"
2482 " {} in router: {}, "
2483 " expected value: {} but"
2484 " found: {}".format(
2485 criteria,
2486 static_route,
2487 dut,
2488 rmap_dict["set"][criteria],
2489 value,
2490 )
2491 )
2492 return errormsg
2493
2494 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2495 return True
2496
2497
2498 @retry(retry_timeout=8)
2499 def verify_best_path_as_per_bgp_attribute(
2500 tgen, addr_type, router, input_dict, attribute, expected=True
2501 ):
2502 """
2503 API is to verify best path according to BGP attributes for given routes.
2504 "show bgp ipv4/6 json" command will be run and verify best path according
2505 to shortest as-path, highest local-preference and med, lowest weight and
2506 route origin IGP>EGP>INCOMPLETE.
2507 Parameters
2508 ----------
2509 * `tgen` : topogen object
2510 * `addr_type` : ip type, ipv4/ipv6
2511 * `tgen` : topogen object
2512 * `attribute` : calculate best path using this attribute
2513 * `input_dict`: defines different routes to calculate for which route
2514 best path is selected
2515 * `expected` : expected results from API, by-default True
2516
2517 Usage
2518 -----
2519 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2520 router r7 to router r1(DUT) as per shortest as-path attribute
2521 input_dict = {
2522 "r7": {
2523 "bgp": {
2524 "address_family": {
2525 "ipv4": {
2526 "unicast": {
2527 "advertise_networks": [
2528 {
2529 "network": "200.50.2.0/32"
2530 },
2531 {
2532 "network": "200.60.2.0/32"
2533 }
2534 ]
2535 }
2536 }
2537 }
2538 }
2539 }
2540 }
2541 attribute = "locPrf"
2542 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2543 input_dict, attribute)
2544 Returns
2545 -------
2546 errormsg(str) or True
2547 """
2548
2549 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2550
2551 if router not in tgen.routers():
2552 return False
2553
2554 rnode = tgen.routers()[router]
2555
2556 # Verifying show bgp json
2557 command = "show bgp"
2558
2559 sleep(2)
2560 logger.info("Verifying router %s RIB for best path:", router)
2561
2562 static_route = False
2563 advertise_network = False
2564 for route_val in input_dict.values():
2565 if "static_routes" in route_val:
2566 static_route = True
2567 networks = route_val["static_routes"]
2568 else:
2569 advertise_network = True
2570 net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
2571 networks = net_data["advertise_networks"]
2572
2573 for network in networks:
2574 _network = network["network"]
2575 no_of_ip = network.setdefault("no_of_ip", 1)
2576 vrf = network.setdefault("vrf", None)
2577
2578 if vrf:
2579 cmd = "{} vrf {}".format(command, vrf)
2580 else:
2581 cmd = command
2582
2583 cmd = "{} {}".format(cmd, addr_type)
2584 cmd = "{} json".format(cmd)
2585 sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2586
2587 routes = generate_ips(_network, no_of_ip)
2588 for route in routes:
2589 route = str(ipaddress.ip_network(frr_unicode(route)))
2590
2591 if route in sh_ip_bgp_json["routes"]:
2592 route_attributes = sh_ip_bgp_json["routes"][route]
2593 _next_hop = None
2594 compare = None
2595 attribute_dict = {}
2596 for route_attribute in route_attributes:
2597 next_hops = route_attribute["nexthops"]
2598 for next_hop in next_hops:
2599 next_hop_ip = next_hop["ip"]
2600 attribute_dict[next_hop_ip] = route_attribute[attribute]
2601
2602 # AS_PATH attribute
2603 if attribute == "path":
2604 # Find next_hop for the route have minimum as_path
2605 _next_hop = min(
2606 attribute_dict, key=lambda x: len(set(attribute_dict[x]))
2607 )
2608 compare = "SHORTEST"
2609
2610 # LOCAL_PREF attribute
2611 elif attribute == "locPrf":
2612 # Find next_hop for the route have highest local preference
2613 _next_hop = max(
2614 attribute_dict, key=(lambda k: attribute_dict[k])
2615 )
2616 compare = "HIGHEST"
2617
2618 # WEIGHT attribute
2619 elif attribute == "weight":
2620 # Find next_hop for the route have highest weight
2621 _next_hop = max(
2622 attribute_dict, key=(lambda k: attribute_dict[k])
2623 )
2624 compare = "HIGHEST"
2625
2626 # ORIGIN attribute
2627 elif attribute == "origin":
2628 # Find next_hop for the route have IGP as origin, -
2629 # - rule is IGP>EGP>INCOMPLETE
2630 _next_hop = [
2631 key
2632 for (key, value) in attribute_dict.items()
2633 if value == "IGP"
2634 ][0]
2635 compare = ""
2636
2637 # MED attribute
2638 elif attribute == "metric":
2639 # Find next_hop for the route have LOWEST MED
2640 _next_hop = min(
2641 attribute_dict, key=(lambda k: attribute_dict[k])
2642 )
2643 compare = "LOWEST"
2644
2645 # Show ip route
2646 if addr_type == "ipv4":
2647 command_1 = "show ip route"
2648 else:
2649 command_1 = "show ipv6 route"
2650
2651 if vrf:
2652 cmd = "{} vrf {} json".format(command_1, vrf)
2653 else:
2654 cmd = "{} json".format(command_1)
2655
2656 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2657
2658 # Verifying output dictionary rib_routes_json is not empty
2659 if not bool(rib_routes_json):
2660 errormsg = "No route found in RIB of router {}..".format(router)
2661 return errormsg
2662
2663 st_found = False
2664 nh_found = False
2665 # Find best is installed in RIB
2666 if route in rib_routes_json:
2667 st_found = True
2668 # Verify next_hop in rib_routes_json
2669 if (
2670 rib_routes_json[route][0]["nexthops"][0]["ip"]
2671 in attribute_dict
2672 ):
2673 nh_found = True
2674 else:
2675 errormsg = (
2676 "Incorrect Nexthop for BGP route {} in "
2677 "RIB of router {}, Expected: {}, Found:"
2678 " {}\n".format(
2679 route,
2680 router,
2681 rib_routes_json[route][0]["nexthops"][0]["ip"],
2682 _next_hop,
2683 )
2684 )
2685 return errormsg
2686
2687 if st_found and nh_found:
2688 logger.info(
2689 "Best path for prefix: %s with next_hop: %s is "
2690 "installed according to %s %s: (%s) in RIB of "
2691 "router %s",
2692 route,
2693 _next_hop,
2694 compare,
2695 attribute,
2696 attribute_dict[_next_hop],
2697 router,
2698 )
2699
2700 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2701 return True
2702
2703
2704 @retry(retry_timeout=10)
2705 def verify_best_path_as_per_admin_distance(
2706 tgen, addr_type, router, input_dict, attribute, expected=True, vrf=None
2707 ):
2708 """
2709 API is to verify best path according to admin distance for given
2710 route. "show ip/ipv6 route json" command will be run and verify
2711 best path accoring to shortest admin distanc.
2712
2713 Parameters
2714 ----------
2715 * `addr_type` : ip type, ipv4/ipv6
2716 * `dut`: Device Under Test
2717 * `tgen` : topogen object
2718 * `attribute` : calculate best path using admin distance
2719 * `input_dict`: defines different routes with different admin distance
2720 to calculate for which route best path is selected
2721 * `expected` : expected results from API, by-default True
2722 * `vrf`: Pass vrf name check for perticular vrf.
2723
2724 Usage
2725 -----
2726 # To verify best path for route 200.50.2.0/32 from router r2 to
2727 router r1(DUT) as per shortest admin distance which is 60.
2728 input_dict = {
2729 "r2": {
2730 "static_routes": [{"network": "200.50.2.0/32", \
2731 "admin_distance": 80, "next_hop": "10.0.0.14"},
2732 {"network": "200.50.2.0/32", \
2733 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2734 }}
2735 attribute = "locPrf"
2736 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2737 input_dict, attribute):
2738 Returns
2739 -------
2740 errormsg(str) or True
2741 """
2742
2743 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2744 router_list = tgen.routers()
2745 if router not in router_list:
2746 return False
2747
2748 rnode = tgen.routers()[router]
2749
2750 sleep(5)
2751 logger.info("Verifying router %s RIB for best path:", router)
2752
2753 # Show ip route cmd
2754 if addr_type == "ipv4":
2755 command = "show ip route"
2756 else:
2757 command = "show ipv6 route"
2758
2759 if vrf:
2760 command = "{} vrf {} json".format(command, vrf)
2761 else:
2762 command = "{} json".format(command)
2763
2764 for routes_from_router in input_dict.keys():
2765 sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
2766 command, isjson=True
2767 )
2768 networks = input_dict[routes_from_router]["static_routes"]
2769 for network in networks:
2770 route = network["network"]
2771
2772 route_attributes = sh_ip_route_json[route]
2773 _next_hop = None
2774 compare = None
2775 attribute_dict = {}
2776 for route_attribute in route_attributes:
2777 next_hops = route_attribute["nexthops"]
2778 for next_hop in next_hops:
2779 next_hop_ip = next_hop["ip"]
2780 attribute_dict[next_hop_ip] = route_attribute["distance"]
2781
2782 # Find next_hop for the route have LOWEST Admin Distance
2783 _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
2784 compare = "LOWEST"
2785
2786 # Show ip route
2787 rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
2788
2789 # Verifying output dictionary rib_routes_json is not empty
2790 if not bool(rib_routes_json):
2791 errormsg = "No route found in RIB of router {}..".format(router)
2792 return errormsg
2793
2794 st_found = False
2795 nh_found = False
2796 # Find best is installed in RIB
2797 if route in rib_routes_json:
2798 st_found = True
2799 # Verify next_hop in rib_routes_json
2800 if [
2801 nh
2802 for nh in rib_routes_json[route][0]["nexthops"]
2803 if nh["ip"] == _next_hop
2804 ]:
2805 nh_found = True
2806 else:
2807 errormsg = (
2808 "Nexthop {} is Missing for BGP route {}"
2809 " in RIB of router {}\n".format(_next_hop, route, router)
2810 )
2811 return errormsg
2812
2813 if st_found and nh_found:
2814 logger.info(
2815 "Best path for prefix: %s is installed according"
2816 " to %s %s: (%s) in RIB of router %s",
2817 route,
2818 compare,
2819 attribute,
2820 attribute_dict[_next_hop],
2821 router,
2822 )
2823
2824 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2825 return True
2826
2827
2828 @retry(retry_timeout=30)
2829 def verify_bgp_rib(
2830 tgen,
2831 addr_type,
2832 dut,
2833 input_dict,
2834 next_hop=None,
2835 aspath=None,
2836 multi_nh=None,
2837 expected=True,
2838 ):
2839 """
2840 This API is to verify whether bgp rib has any
2841 matching route for a nexthop.
2842
2843 Parameters
2844 ----------
2845 * `tgen`: topogen object
2846 * `dut`: input dut router name
2847 * `addr_type` : ip type ipv4/ipv6
2848 * `input_dict` : input dict, has details of static routes
2849 * `next_hop`[optional]: next_hop which needs to be verified,
2850 default = static
2851 * 'aspath'[optional]: aspath which needs to be verified
2852 * `expected` : expected results from API, by-default True
2853
2854 Usage
2855 -----
2856 dut = 'r1'
2857 next_hop = "192.168.1.10"
2858 input_dict = topo['routers']
2859 aspath = "100 200 300"
2860 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2861 next_hop, aspath)
2862
2863 Returns
2864 -------
2865 errormsg(str) or True
2866 """
2867
2868 logger.debug("Entering lib API: verify_bgp_rib()")
2869
2870 router_list = tgen.routers()
2871 additional_nexthops_in_required_nhs = []
2872 list1 = []
2873 list2 = []
2874 found_hops = []
2875 for routerInput in input_dict.keys():
2876 for router, rnode in router_list.items():
2877 if router != dut:
2878 continue
2879
2880 # Verifying RIB routes
2881 command = "show bgp"
2882
2883 # Static routes
2884 sleep(2)
2885 logger.info("Checking router {} BGP RIB:".format(dut))
2886
2887 if "static_routes" in input_dict[routerInput]:
2888 static_routes = input_dict[routerInput]["static_routes"]
2889
2890 for static_route in static_routes:
2891 found_routes = []
2892 missing_routes = []
2893 st_found = False
2894 nh_found = False
2895
2896 vrf = static_route.setdefault("vrf", None)
2897 community = static_route.setdefault("community", None)
2898 largeCommunity = static_route.setdefault("largeCommunity", None)
2899
2900 if vrf:
2901 cmd = "{} vrf {} {}".format(command, vrf, addr_type)
2902
2903 if community:
2904 cmd = "{} community {}".format(cmd, community)
2905
2906 if largeCommunity:
2907 cmd = "{} large-community {}".format(cmd, largeCommunity)
2908 else:
2909 cmd = "{} {}".format(command, addr_type)
2910
2911 cmd = "{} json".format(cmd)
2912
2913 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2914
2915 # Verifying output dictionary rib_routes_json is not empty
2916 if bool(rib_routes_json) == False:
2917 errormsg = "No route found in rib of router {}..".format(router)
2918 return errormsg
2919
2920 network = static_route["network"]
2921
2922 if "no_of_ip" in static_route:
2923 no_of_ip = static_route["no_of_ip"]
2924 else:
2925 no_of_ip = 1
2926
2927 # Generating IPs for verification
2928 ip_list = generate_ips(network, no_of_ip)
2929
2930 for st_rt in ip_list:
2931 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
2932
2933 _addr_type = validate_ip_address(st_rt)
2934 if _addr_type != addr_type:
2935 continue
2936
2937 if st_rt in rib_routes_json["routes"]:
2938 st_found = True
2939 found_routes.append(st_rt)
2940
2941 if next_hop and multi_nh and st_found:
2942 if type(next_hop) is not list:
2943 next_hop = [next_hop]
2944 list1 = next_hop
2945
2946 for mnh in range(
2947 0, len(rib_routes_json["routes"][st_rt])
2948 ):
2949 found_hops.append(
2950 [
2951 rib_r["ip"]
2952 for rib_r in rib_routes_json["routes"][
2953 st_rt
2954 ][mnh]["nexthops"]
2955 ]
2956 )
2957 for mnh in found_hops:
2958 for each_nh_in_multipath in mnh:
2959 list2.append(each_nh_in_multipath)
2960 if found_hops[0]:
2961 missing_list_of_nexthops = set(list2).difference(
2962 list1
2963 )
2964 additional_nexthops_in_required_nhs = set(
2965 list1
2966 ).difference(list2)
2967
2968 if list2:
2969 if additional_nexthops_in_required_nhs:
2970 logger.info(
2971 "Missing nexthop %s for route"
2972 " %s in RIB of router %s\n",
2973 additional_nexthops_in_required_nhs,
2974 st_rt,
2975 dut,
2976 )
2977 else:
2978 nh_found = True
2979
2980 elif next_hop and multi_nh is None:
2981 if type(next_hop) is not list:
2982 next_hop = [next_hop]
2983 list1 = next_hop
2984 found_hops = [
2985 rib_r["ip"]
2986 for rib_r in rib_routes_json["routes"][st_rt][0][
2987 "nexthops"
2988 ]
2989 ]
2990 list2 = found_hops
2991 missing_list_of_nexthops = set(list2).difference(list1)
2992 additional_nexthops_in_required_nhs = set(
2993 list1
2994 ).difference(list2)
2995
2996 if list2:
2997 if additional_nexthops_in_required_nhs:
2998 logger.info(
2999 "Missing nexthop %s for route"
3000 " %s in RIB of router %s\n",
3001 additional_nexthops_in_required_nhs,
3002 st_rt,
3003 dut,
3004 )
3005 errormsg = (
3006 "Nexthop {} is Missing for "
3007 "route {} in RIB of router {}\n".format(
3008 additional_nexthops_in_required_nhs,
3009 st_rt,
3010 dut,
3011 )
3012 )
3013 return errormsg
3014 else:
3015 nh_found = True
3016
3017 if aspath:
3018 found_paths = rib_routes_json["routes"][st_rt][0][
3019 "path"
3020 ]
3021 if aspath == found_paths:
3022 aspath_found = True
3023 logger.info(
3024 "Found AS path {} for route"
3025 " {} in RIB of router "
3026 "{}\n".format(aspath, st_rt, dut)
3027 )
3028 else:
3029 errormsg = (
3030 "AS Path {} is missing for route"
3031 "for route {} in RIB of router {}\n".format(
3032 aspath, st_rt, dut
3033 )
3034 )
3035 return errormsg
3036
3037 else:
3038 missing_routes.append(st_rt)
3039
3040 if nh_found:
3041 logger.info(
3042 "Found next_hop {} for all bgp"
3043 " routes in RIB of"
3044 " router {}\n".format(next_hop, router)
3045 )
3046
3047 if len(missing_routes) > 0:
3048 errormsg = (
3049 "Missing route in RIB of router {}, "
3050 "routes: {}\n".format(dut, missing_routes)
3051 )
3052 return errormsg
3053
3054 if found_routes:
3055 logger.info(
3056 "Verified routes in router {} BGP RIB, "
3057 "found routes are: {} \n".format(dut, found_routes)
3058 )
3059 continue
3060
3061 if "bgp" not in input_dict[routerInput]:
3062 continue
3063
3064 # Advertise networks
3065 bgp_data_list = input_dict[routerInput]["bgp"]
3066
3067 if type(bgp_data_list) is not list:
3068 bgp_data_list = [bgp_data_list]
3069
3070 for bgp_data in bgp_data_list:
3071 vrf_id = bgp_data.setdefault("vrf", None)
3072 if vrf_id:
3073 cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
3074 else:
3075 cmd = "{} {}".format(command, addr_type)
3076
3077 cmd = "{} json".format(cmd)
3078
3079 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3080
3081 # Verifying output dictionary rib_routes_json is not empty
3082 if bool(rib_routes_json) == False:
3083 errormsg = "No route found in rib of router {}..".format(router)
3084 return errormsg
3085
3086 bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
3087 advertise_network = bgp_net_advertise.setdefault(
3088 "advertise_networks", []
3089 )
3090
3091 for advertise_network_dict in advertise_network:
3092 found_routes = []
3093 missing_routes = []
3094 found = False
3095
3096 network = advertise_network_dict["network"]
3097
3098 if "no_of_network" in advertise_network_dict:
3099 no_of_network = advertise_network_dict["no_of_network"]
3100 else:
3101 no_of_network = 1
3102
3103 # Generating IPs for verification
3104 ip_list = generate_ips(network, no_of_network)
3105
3106 for st_rt in ip_list:
3107 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
3108
3109 _addr_type = validate_ip_address(st_rt)
3110 if _addr_type != addr_type:
3111 continue
3112
3113 if st_rt in rib_routes_json["routes"]:
3114 found = True
3115 found_routes.append(st_rt)
3116 else:
3117 found = False
3118 missing_routes.append(st_rt)
3119
3120 if len(missing_routes) > 0:
3121 errormsg = (
3122 "Missing route in BGP RIB of router {},"
3123 " are: {}\n".format(dut, missing_routes)
3124 )
3125 return errormsg
3126
3127 if found_routes:
3128 logger.info(
3129 "Verified routes in router {} BGP RIB, found "
3130 "routes are: {}\n".format(dut, found_routes)
3131 )
3132
3133 logger.debug("Exiting lib API: verify_bgp_rib()")
3134 return True
3135
3136
3137 @retry(retry_timeout=10)
3138 def verify_graceful_restart(
3139 tgen, topo, addr_type, input_dict, dut, peer, expected=True
3140 ):
3141 """
3142 This API is to verify verify_graceful_restart configuration of DUT and
3143 cross verify the same from the peer bgp routerrouter.
3144
3145 Parameters
3146 ----------
3147 * `tgen`: topogen object
3148 * `topo`: input json file data
3149 * `addr_type` : ip type ipv4/ipv6
3150 * `input_dict`: input dictionary, have details of Device Under Test, for
3151 which user wants to test the data
3152 * `dut`: input dut router name
3153 * `peer`: input peer router name
3154 * `expected` : expected results from API, by-default True
3155
3156 Usage
3157 -----
3158 "r1": {
3159 "bgp": {
3160 "address_family": {
3161 "ipv4": {
3162 "unicast": {
3163 "neighbor": {
3164 "r3": {
3165 "dest_link":{
3166 "r1": {
3167 "graceful-restart": True
3168 }
3169 }
3170 }
3171 }
3172 }
3173 },
3174 "ipv6": {
3175 "unicast": {
3176 "neighbor": {
3177 "r3": {
3178 "dest_link":{
3179 "r1": {
3180 "graceful-restart": True
3181 }
3182 }
3183 }
3184 }
3185 }
3186 }
3187 }
3188 }
3189 }
3190 }
3191
3192 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
3193 dut = "r1", peer = 'r2')
3194 Returns
3195 -------
3196 errormsg(str) or True
3197 """
3198
3199 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3200
3201 for router, rnode in tgen.routers().items():
3202 if router != dut:
3203 continue
3204
3205 try:
3206 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3207 except TypeError:
3208 bgp_addr_type = topo["routers"][dut]["bgp"][0]["address_family"]
3209
3210 # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3211
3212 if addr_type in bgp_addr_type:
3213 if not check_address_types(addr_type):
3214 continue
3215
3216 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3217
3218 for bgp_neighbor, peer_data in bgp_neighbors.items():
3219 if bgp_neighbor != peer:
3220 continue
3221
3222 for dest_link, peer_dict in peer_data["dest_link"].items():
3223 data = topo["routers"][bgp_neighbor]["links"]
3224
3225 if dest_link in data:
3226 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3227
3228 logger.info(
3229 "[DUT: {}]: Checking bgp graceful-restart show"
3230 " o/p {}".format(dut, neighbor_ip)
3231 )
3232
3233 show_bgp_graceful_json = None
3234
3235 show_bgp_graceful_json = run_frr_cmd(
3236 rnode,
3237 "show bgp {} neighbor {} graceful-restart json".format(
3238 addr_type, neighbor_ip
3239 ),
3240 isjson=True,
3241 )
3242
3243 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3244
3245 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3246 logger.info(
3247 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3248 )
3249 else:
3250 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3251 dut, neighbor_ip
3252 )
3253 return errormsg
3254
3255 lmode = None
3256 rmode = None
3257 # Local GR mode
3258 if "address_family" in input_dict[dut]["bgp"]:
3259 bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][
3260 "unicast"
3261 ]["neighbor"][peer]["dest_link"]
3262
3263 for dest_link, data in bgp_neighbors.items():
3264 if (
3265 "graceful-restart-helper" in data
3266 and data["graceful-restart-helper"]
3267 ):
3268 lmode = "Helper"
3269 elif "graceful-restart" in data and data["graceful-restart"]:
3270 lmode = "Restart"
3271 elif (
3272 "graceful-restart-disable" in data
3273 and data["graceful-restart-disable"]
3274 ):
3275 lmode = "Disable"
3276 else:
3277 lmode = None
3278
3279 if lmode is None:
3280 if "graceful-restart" in input_dict[dut]["bgp"]:
3281
3282 if (
3283 "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
3284 and input_dict[dut]["bgp"]["graceful-restart"][
3285 "graceful-restart"
3286 ]
3287 ):
3288 lmode = "Restart*"
3289 elif (
3290 "graceful-restart-disable"
3291 in input_dict[dut]["bgp"]["graceful-restart"]
3292 and input_dict[dut]["bgp"]["graceful-restart"][
3293 "graceful-restart-disable"
3294 ]
3295 ):
3296 lmode = "Disable*"
3297 else:
3298 lmode = "Helper*"
3299 else:
3300 lmode = "Helper*"
3301
3302 if lmode == "Disable" or lmode == "Disable*":
3303 return True
3304
3305 # Remote GR mode
3306 if "address_family" in input_dict[peer]["bgp"]:
3307 bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][
3308 "unicast"
3309 ]["neighbor"][dut]["dest_link"]
3310
3311 for dest_link, data in bgp_neighbors.items():
3312 if (
3313 "graceful-restart-helper" in data
3314 and data["graceful-restart-helper"]
3315 ):
3316 rmode = "Helper"
3317 elif "graceful-restart" in data and data["graceful-restart"]:
3318 rmode = "Restart"
3319 elif (
3320 "graceful-restart-disable" in data
3321 and data["graceful-restart-disable"]
3322 ):
3323 rmode = "Disable"
3324 else:
3325 rmode = None
3326
3327 if rmode is None:
3328 if "graceful-restart" in input_dict[peer]["bgp"]:
3329
3330 if (
3331 "graceful-restart"
3332 in input_dict[peer]["bgp"]["graceful-restart"]
3333 and input_dict[peer]["bgp"]["graceful-restart"][
3334 "graceful-restart"
3335 ]
3336 ):
3337 rmode = "Restart"
3338 elif (
3339 "graceful-restart-disable"
3340 in input_dict[peer]["bgp"]["graceful-restart"]
3341 and input_dict[peer]["bgp"]["graceful-restart"][
3342 "graceful-restart-disable"
3343 ]
3344 ):
3345 rmode = "Disable"
3346 else:
3347 rmode = "Helper"
3348 else:
3349 rmode = "Helper"
3350
3351 if show_bgp_graceful_json_out["localGrMode"] == lmode:
3352 logger.info(
3353 "[DUT: {}]: localGrMode : {} ".format(
3354 dut, show_bgp_graceful_json_out["localGrMode"]
3355 )
3356 )
3357 else:
3358 errormsg = (
3359 "[DUT: {}]: localGrMode is not correct"
3360 " Expected: {}, Found: {}".format(
3361 dut, lmode, show_bgp_graceful_json_out["localGrMode"]
3362 )
3363 )
3364 return errormsg
3365
3366 if show_bgp_graceful_json_out["remoteGrMode"] == rmode:
3367 logger.info(
3368 "[DUT: {}]: remoteGrMode : {} ".format(
3369 dut, show_bgp_graceful_json_out["remoteGrMode"]
3370 )
3371 )
3372 elif (
3373 show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable"
3374 and rmode == "Disable"
3375 ):
3376 logger.info(
3377 "[DUT: {}]: remoteGrMode : {} ".format(
3378 dut, show_bgp_graceful_json_out["remoteGrMode"]
3379 )
3380 )
3381 else:
3382 errormsg = (
3383 "[DUT: {}]: remoteGrMode is not correct"
3384 " Expected: {}, Found: {}".format(
3385 dut, rmode, show_bgp_graceful_json_out["remoteGrMode"]
3386 )
3387 )
3388 return errormsg
3389
3390 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3391 return True
3392
3393
3394 @retry(retry_timeout=10)
3395 def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3396 """
3397 This API is to verify r_bit in the BGP gr capability advertised
3398 by the neighbor router
3399
3400 Parameters
3401 ----------
3402 * `tgen`: topogen object
3403 * `topo`: input json file data
3404 * `addr_type` : ip type ipv4/ipv6
3405 * `input_dict`: input dictionary, have details of Device Under Test, for
3406 which user wants to test the data
3407 * `dut`: input dut router name
3408 * `peer`: peer name
3409 * `expected` : expected results from API, by-default True
3410
3411 Usage
3412 -----
3413 input_dict = {
3414 "r1": {
3415 "bgp": {
3416 "address_family": {
3417 "ipv4": {
3418 "unicast": {
3419 "neighbor": {
3420 "r3": {
3421 "dest_link":{
3422 "r1": {
3423 "graceful-restart": True
3424 }
3425 }
3426 }
3427 }
3428 }
3429 },
3430 "ipv6": {
3431 "unicast": {
3432 "neighbor": {
3433 "r3": {
3434 "dest_link":{
3435 "r1": {
3436 "graceful-restart": True
3437 }
3438 }
3439 }
3440 }
3441 }
3442 }
3443 }
3444 }
3445 }
3446 }
3447 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3448
3449 Returns
3450 -------
3451 errormsg(str) or True
3452 """
3453
3454 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3455
3456 for router, rnode in tgen.routers().items():
3457 if router != dut:
3458 continue
3459
3460 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3461
3462 if addr_type in bgp_addr_type:
3463 if not check_address_types(addr_type):
3464 continue
3465
3466 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3467
3468 for bgp_neighbor, peer_data in bgp_neighbors.items():
3469 if bgp_neighbor != peer:
3470 continue
3471
3472 for dest_link, peer_dict in peer_data["dest_link"].items():
3473 data = topo["routers"][bgp_neighbor]["links"]
3474
3475 if dest_link in data:
3476 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3477
3478 logger.info(
3479 "[DUT: {}]: Checking bgp graceful-restart show"
3480 " o/p {}".format(dut, neighbor_ip)
3481 )
3482
3483 show_bgp_graceful_json = run_frr_cmd(
3484 rnode,
3485 "show bgp {} neighbor {} graceful-restart json".format(
3486 addr_type, neighbor_ip
3487 ),
3488 isjson=True,
3489 )
3490
3491 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3492
3493 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3494 logger.info(
3495 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3496 )
3497 else:
3498 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3499 dut, neighbor_ip
3500 )
3501 return errormsg
3502
3503 if "rBit" in show_bgp_graceful_json_out:
3504 if show_bgp_graceful_json_out["rBit"]:
3505 logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip))
3506 else:
3507 errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
3508 return errormsg
3509
3510 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3511 return True
3512
3513
3514 @retry(retry_timeout=10)
3515 def verify_eor(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3516 """
3517 This API is to verify EOR
3518
3519 Parameters
3520 ----------
3521 * `tgen`: topogen object
3522 * `topo`: input json file data
3523 * `addr_type` : ip type ipv4/ipv6
3524 * `input_dict`: input dictionary, have details of DUT, for
3525 which user wants to test the data
3526 * `dut`: input dut router name
3527 * `peer`: peer name
3528 Usage
3529 -----
3530 input_dict = {
3531 input_dict = {
3532 "r1": {
3533 "bgp": {
3534 "address_family": {
3535 "ipv4": {
3536 "unicast": {
3537 "neighbor": {
3538 "r3": {
3539 "dest_link":{
3540 "r1": {
3541 "graceful-restart": True
3542 }
3543 }
3544 }
3545 }
3546 }
3547 },
3548 "ipv6": {
3549 "unicast": {
3550 "neighbor": {
3551 "r3": {
3552 "dest_link":{
3553 "r1": {
3554 "graceful-restart": True
3555 }
3556 }
3557 }
3558 }
3559 }
3560 }
3561 }
3562 }
3563 }
3564 }
3565
3566 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3567
3568 Returns
3569 -------
3570 errormsg(str) or True
3571 """
3572 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3573
3574 for router, rnode in tgen.routers().items():
3575 if router != dut:
3576 continue
3577
3578 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3579
3580 if addr_type in bgp_addr_type:
3581 if not check_address_types(addr_type):
3582 continue
3583
3584 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3585
3586 for bgp_neighbor, peer_data in bgp_neighbors.items():
3587 if bgp_neighbor != peer:
3588 continue
3589
3590 for dest_link, peer_dict in peer_data["dest_link"].items():
3591 data = topo["routers"][bgp_neighbor]["links"]
3592
3593 if dest_link in data:
3594 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3595
3596 logger.info(
3597 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3598 dut,
3599 neighbor_ip,
3600 )
3601
3602 show_bgp_graceful_json = run_frr_cmd(
3603 rnode,
3604 "show bgp {} neighbor {} graceful-restart json".format(
3605 addr_type, neighbor_ip
3606 ),
3607 isjson=True,
3608 )
3609
3610 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3611
3612 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3613 logger.info("[DUT: %s]: Neighbor ip matched %s", dut, neighbor_ip)
3614 else:
3615 errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3616 dut,
3617 neighbor_ip,
3618 )
3619 return errormsg
3620
3621 if addr_type == "ipv4":
3622 afi = "ipv4Unicast"
3623 elif addr_type == "ipv6":
3624 afi = "ipv6Unicast"
3625 else:
3626 errormsg = "Address type %s is not supported" % (addr_type)
3627 return errormsg
3628
3629 eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
3630 if "endOfRibSend" in eor_json:
3631
3632 if eor_json["endOfRibSend"]:
3633 logger.info(
3634 "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
3635 )
3636 else:
3637 errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
3638 dut,
3639 neighbor_ip,
3640 afi,
3641 )
3642 return errormsg
3643
3644 if "endOfRibRecv" in eor_json:
3645 if eor_json["endOfRibRecv"]:
3646 logger.info(
3647 "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
3648 )
3649 else:
3650 errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
3651 dut,
3652 neighbor_ip,
3653 afi,
3654 )
3655 return errormsg
3656
3657 if "endOfRibSentAfterUpdate" in eor_json:
3658 if eor_json["endOfRibSentAfterUpdate"]:
3659 logger.info(
3660 "[DUT: %s]: EOR SendTime true for %s" " %s",
3661 dut,
3662 neighbor_ip,
3663 afi,
3664 )
3665 else:
3666 errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3667 dut,
3668 neighbor_ip,
3669 afi,
3670 )
3671 return errormsg
3672
3673 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3674 return True
3675
3676
3677 @retry(retry_timeout=8)
3678 def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3679 """
3680 This API is to verify f_bit in the BGP gr capability advertised
3681 by the neighbor router
3682
3683 Parameters
3684 ----------
3685 * `tgen`: topogen object
3686 * `topo`: input json file data
3687 * `addr_type` : ip type ipv4/ipv6
3688 * `input_dict`: input dictionary, have details of Device Under Test, for
3689 which user wants to test the data
3690 * `dut`: input dut router name
3691 * `peer`: peer name
3692 * `expected` : expected results from API, by-default True
3693
3694 Usage
3695 -----
3696 input_dict = {
3697 "r1": {
3698 "bgp": {
3699 "address_family": {
3700 "ipv4": {
3701 "unicast": {
3702 "neighbor": {
3703 "r3": {
3704 "dest_link":{
3705 "r1": {
3706 "graceful-restart": True
3707 }
3708 }
3709 }
3710 }
3711 }
3712 },
3713 "ipv6": {
3714 "unicast": {
3715 "neighbor": {
3716 "r3": {
3717 "dest_link":{
3718 "r1": {
3719 "graceful-restart": True
3720 }
3721 }
3722 }
3723 }
3724 }
3725 }
3726 }
3727 }
3728 }
3729 }
3730
3731 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3732
3733 Returns
3734 -------
3735 errormsg(str) or True
3736 """
3737
3738 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3739
3740 for router, rnode in tgen.routers().items():
3741 if router != dut:
3742 continue
3743
3744 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3745
3746 if addr_type in bgp_addr_type:
3747 if not check_address_types(addr_type):
3748 continue
3749
3750 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3751
3752 for bgp_neighbor, peer_data in bgp_neighbors.items():
3753 if bgp_neighbor != peer:
3754 continue
3755
3756 for dest_link, peer_dict in peer_data["dest_link"].items():
3757 data = topo["routers"][bgp_neighbor]["links"]
3758
3759 if dest_link in data:
3760 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3761
3762 logger.info(
3763 "[DUT: {}]: Checking bgp graceful-restart show"
3764 " o/p {}".format(dut, neighbor_ip)
3765 )
3766
3767 show_bgp_graceful_json = run_frr_cmd(
3768 rnode,
3769 "show bgp {} neighbor {} graceful-restart json".format(
3770 addr_type, neighbor_ip
3771 ),
3772 isjson=True,
3773 )
3774
3775 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3776
3777 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3778 logger.info(
3779 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3780 )
3781 else:
3782 errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format(
3783 dut, neighbor_ip
3784 )
3785 return errormsg
3786
3787 if "ipv4Unicast" in show_bgp_graceful_json_out:
3788 if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]:
3789 logger.info(
3790 "[DUT: {}]: Fbit True for {} IPv4"
3791 " Unicast".format(dut, neighbor_ip)
3792 )
3793 else:
3794 errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3795 dut, neighbor_ip
3796 )
3797 return errormsg
3798
3799 elif "ipv6Unicast" in show_bgp_graceful_json_out:
3800 if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]:
3801 logger.info(
3802 "[DUT: {}]: Fbit True for {} IPv6"
3803 " Unicast".format(dut, neighbor_ip)
3804 )
3805 else:
3806 errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3807 dut, neighbor_ip
3808 )
3809 return errormsg
3810 else:
3811 show_bgp_graceful_json_out["ipv4Unicast"]
3812 show_bgp_graceful_json_out["ipv6Unicast"]
3813
3814 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3815 return True
3816
3817
3818 @retry(retry_timeout=10)
3819 def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer):
3820 """
3821 This API is to verify graceful restart timers, configured and received
3822
3823 Parameters
3824 ----------
3825 * `tgen`: topogen object
3826 * `topo`: input json file data
3827 * `addr_type` : ip type ipv4/ipv6
3828 * `input_dict`: input dictionary, have details of Device Under Test,
3829 for which user wants to test the data
3830 * `dut`: input dut router name
3831 * `peer`: peer name
3832 * `expected` : expected results from API, by-default True
3833
3834 Usage
3835 -----
3836 # Configure graceful-restart
3837 input_dict_1 = {
3838 "r1": {
3839 "bgp": {
3840 "bgp_neighbors": {
3841 "r3": {
3842 "graceful-restart": "graceful-restart-helper"
3843 }
3844 },
3845 "gracefulrestart": ["restart-time 150"]
3846 }
3847 },
3848 "r3": {
3849 "bgp": {
3850 "bgp_neighbors": {
3851 "r1": {
3852 "graceful-restart": "graceful-restart"
3853 }
3854 }
3855 }
3856 }
3857 }
3858
3859 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3860
3861 Returns
3862 -------
3863 errormsg(str) or True
3864 """
3865
3866 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3867
3868 for router, rnode in tgen.routers().items():
3869 if router != dut:
3870 continue
3871
3872 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3873
3874 if addr_type in bgp_addr_type:
3875 if not check_address_types(addr_type):
3876 continue
3877
3878 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3879
3880 for bgp_neighbor, peer_data in bgp_neighbors.items():
3881 if bgp_neighbor != peer:
3882 continue
3883
3884 for dest_link, peer_dict in peer_data["dest_link"].items():
3885 data = topo["routers"][bgp_neighbor]["links"]
3886
3887 if dest_link in data:
3888 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3889
3890 logger.info(
3891 "[DUT: {}]: Checking bgp graceful-restart show"
3892 " o/p {}".format(dut, neighbor_ip)
3893 )
3894
3895 show_bgp_graceful_json = run_frr_cmd(
3896 rnode,
3897 "show bgp {} neighbor {} graceful-restart json".format(
3898 addr_type, neighbor_ip
3899 ),
3900 isjson=True,
3901 )
3902
3903 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3904 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3905 logger.info(
3906 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3907 )
3908 else:
3909 errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3910 dut, neighbor_ip
3911 )
3912 return errormsg
3913
3914 # Graceful-restart timer
3915 if "graceful-restart" in input_dict[peer]["bgp"]:
3916 if "timer" in input_dict[peer]["bgp"]["graceful-restart"]:
3917 for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][
3918 "timer"
3919 ].items():
3920 if rs_timer == "restart-time":
3921
3922 receivedTimer = value
3923 if (
3924 show_bgp_graceful_json_out["timers"][
3925 "receivedRestartTimer"
3926 ]
3927 == receivedTimer
3928 ):
3929 logger.info(
3930 "receivedRestartTimer is {}"
3931 " on {} from peer {}".format(
3932 receivedTimer, router, peer
3933 )
3934 )
3935 else:
3936 errormsg = (
3937 "receivedRestartTimer is not"
3938 " as expected {}".format(receivedTimer)
3939 )
3940 return errormsg
3941
3942 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3943 return True
3944
3945
3946 @retry(retry_timeout=8)
3947 def verify_gr_address_family(
3948 tgen, topo, addr_type, addr_family, dut, peer, expected=True
3949 ):
3950 """
3951 This API is to verify gr_address_family in the BGP gr capability advertised
3952 by the neighbor router
3953
3954 Parameters
3955 ----------
3956 * `tgen`: topogen object
3957 * `topo`: input json file data
3958 * `addr_type` : ip type ipv4/ipv6
3959 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3960 * `dut`: input dut router name
3961 * `peer`: input peer router to check
3962 * `expected` : expected results from API, by-default True
3963
3964 Usage
3965 -----
3966
3967 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
3968
3969 Returns
3970 -------
3971 errormsg(str) or None
3972 """
3973
3974 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3975
3976 if not check_address_types(addr_type):
3977 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3978 return
3979
3980 routers = tgen.routers()
3981 if dut not in routers:
3982 return "{} not in routers".format(dut)
3983
3984 rnode = routers[dut]
3985 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3986
3987 if addr_type not in bgp_addr_type:
3988 return "{} not in bgp_addr_types".format(addr_type)
3989
3990 if peer not in bgp_addr_type[addr_type]["unicast"]["neighbor"]:
3991 return "{} not a peer of {} over {}".format(peer, dut, addr_type)
3992
3993 nbr_links = topo["routers"][peer]["links"]
3994 if dut not in nbr_links or addr_type not in nbr_links[dut]:
3995 return "peer {} missing back link to {} over {}".format(peer, dut, addr_type)
3996
3997 neighbor_ip = nbr_links[dut][addr_type].split("/")[0]
3998
3999 logger.info(
4000 "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
4001 dut, neighbor_ip, addr_family
4002 )
4003 )
4004
4005 show_bgp_graceful_json = run_frr_cmd(
4006 rnode,
4007 "show bgp {} neighbor {} graceful-restart json".format(addr_type, neighbor_ip),
4008 isjson=True,
4009 )
4010
4011 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
4012
4013 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
4014 logger.info("Neighbor ip matched {}".format(neighbor_ip))
4015 else:
4016 errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
4017 return errormsg
4018
4019 if addr_family == "ipv4Unicast":
4020 if "ipv4Unicast" in show_bgp_graceful_json_out:
4021 logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
4022 return True
4023 else:
4024 errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
4025 return errormsg
4026
4027 elif addr_family == "ipv6Unicast":
4028 if "ipv6Unicast" in show_bgp_graceful_json_out:
4029 logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
4030 return True
4031 else:
4032 errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
4033 return errormsg
4034 else:
4035 errormsg = "Aaddress family: {} present for {} ".format(
4036 addr_family, neighbor_ip
4037 )
4038 return errormsg
4039
4040 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4041
4042
4043 @retry(retry_timeout=12)
4044 def verify_attributes_for_evpn_routes(
4045 tgen,
4046 topo,
4047 dut,
4048 input_dict,
4049 rd=None,
4050 rt=None,
4051 ethTag=None,
4052 ipLen=None,
4053 rd_peer=None,
4054 rt_peer=None,
4055 expected=True,
4056 ):
4057 """
4058 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
4059 command.
4060 Parameters
4061 ----------
4062 * `tgen`: topogen object
4063 * `topo` : json file data
4064 * `dut` : device under test
4065 * `input_dict`: having details like - for which route, rd value
4066 needs to be verified
4067 * `rd` : route distinguisher
4068 * `rt` : route target
4069 * `ethTag` : Ethernet Tag
4070 * `ipLen` : IP prefix length
4071 * `rd_peer` : Peer name from which RD will be auto-generated
4072 * `rt_peer` : Peer name from which RT will be auto-generated
4073 * `expected` : expected results from API, by-default True
4074
4075 Usage
4076 -----
4077 input_dict_1 = {
4078 "r1": {
4079 "static_routes": [{
4080 "network": [NETWORK1_1[addr_type]],
4081 "next_hop": NEXT_HOP_IP[addr_type],
4082 "vrf": "RED"
4083 }]
4084 }
4085 }
4086
4087 result = verify_attributes_for_evpn_routes(tgen, topo,
4088 input_dict, rd = "10.0.0.33:1")
4089
4090 Returns
4091 -------
4092 errormsg(str) or True
4093 """
4094
4095 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4096 for router in input_dict.keys():
4097 rnode = tgen.routers()[dut]
4098
4099 if "static_routes" in input_dict[router]:
4100 for static_route in input_dict[router]["static_routes"]:
4101 network = static_route["network"]
4102
4103 if "vrf" in static_route:
4104 vrf = static_route["vrf"]
4105
4106 if type(network) is not list:
4107 network = [network]
4108
4109 for route in network:
4110 route = route.split("/")[0]
4111 _addr_type = validate_ip_address(route)
4112 if "v4" in _addr_type:
4113 input_afi = "v4"
4114 elif "v6" in _addr_type:
4115 input_afi = "v6"
4116
4117 cmd = "show bgp l2vpn evpn {} json".format(route)
4118 evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4119 if not bool(evpn_rd_value_json):
4120 errormsg = "No output for '{}' cli".format(cmd)
4121 return errormsg
4122
4123 if rd is not None and rd != "auto":
4124 logger.info(
4125 "[DUT: %s]: Verifying rd value for " "evpn route %s:",
4126 dut,
4127 route,
4128 )
4129
4130 if rd in evpn_rd_value_json:
4131 rd_value_json = evpn_rd_value_json[rd]
4132 if rd_value_json["rd"] != rd:
4133 errormsg = (
4134 "[DUT: %s] Failed: Verifying"
4135 " RD value for EVPN route: %s"
4136 "[FAILED]!!, EXPECTED : %s "
4137 " FOUND : %s"
4138 % (dut, route, rd, rd_value_json["rd"])
4139 )
4140 return errormsg
4141
4142 else:
4143 logger.info(
4144 "[DUT %s]: Verifying RD value for"
4145 " EVPN route: %s [PASSED]|| "
4146 "Found Expected: %s",
4147 dut,
4148 route,
4149 rd,
4150 )
4151 return True
4152
4153 else:
4154 errormsg = (
4155 "[DUT: %s] RD : %s is not present"
4156 " in cli json output" % (dut, rd)
4157 )
4158 return errormsg
4159
4160 if rd == "auto":
4161 logger.info(
4162 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
4163 dut,
4164 route,
4165 )
4166
4167 if rd_peer:
4168 index = 1
4169 vni_dict = {}
4170
4171 rnode = tgen.routers()[rd_peer]
4172 vrfs = topo["routers"][rd_peer]["vrfs"]
4173 for vrf_dict in vrfs:
4174 vni_dict[vrf_dict["name"]] = index
4175 index += 1
4176
4177 show_bgp_json = run_frr_cmd(
4178 rnode, "show bgp vrf all summary json", isjson=True
4179 )
4180
4181 # Verifying output dictionary show_bgp_json is empty
4182 if not bool(show_bgp_json):
4183 errormsg = "BGP is not running"
4184 return errormsg
4185
4186 show_bgp_json_vrf = show_bgp_json[vrf]
4187 for afi, afi_data in show_bgp_json_vrf.items():
4188 if input_afi not in afi:
4189 continue
4190 router_id = afi_data["routerId"]
4191
4192 found = False
4193 rd = "{}:{}".format(router_id, vni_dict[vrf])
4194 for _rd, rd_value_json in evpn_rd_value_json.items():
4195 if (
4196 str(rd_value_json["rd"].split(":")[0])
4197 != rd.split(":")[0]
4198 ):
4199 continue
4200
4201 if int(rd_value_json["rd"].split(":")[1]) > 0:
4202 found = True
4203
4204 if found:
4205 logger.info(
4206 "[DUT %s]: Verifying RD value for"
4207 " EVPN route: %s "
4208 "Found Expected: %s",
4209 dut,
4210 route,
4211 rd_value_json["rd"],
4212 )
4213 return True
4214 else:
4215 errormsg = (
4216 "[DUT: %s] Failed: Verifying"
4217 " RD value for EVPN route: %s"
4218 " FOUND : %s" % (dut, route, rd_value_json["rd"])
4219 )
4220 return errormsg
4221
4222 if rt == "auto":
4223 vni_dict = {}
4224 logger.info(
4225 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
4226 dut,
4227 route,
4228 )
4229
4230 if rt_peer:
4231 rnode = tgen.routers()[rt_peer]
4232 show_bgp_json = run_frr_cmd(
4233 rnode, "show bgp vrf all summary json", isjson=True
4234 )
4235
4236 # Verifying output dictionary show_bgp_json is empty
4237 if not bool(show_bgp_json):
4238 errormsg = "BGP is not running"
4239 return errormsg
4240
4241 show_bgp_json_vrf = show_bgp_json[vrf]
4242 for afi, afi_data in show_bgp_json_vrf.items():
4243 if input_afi not in afi:
4244 continue
4245 as_num = afi_data["as"]
4246
4247 show_vrf_vni_json = run_frr_cmd(
4248 rnode, "show vrf vni json", isjson=True
4249 )
4250
4251 vrfs = show_vrf_vni_json["vrfs"]
4252 for vrf_dict in vrfs:
4253 if vrf_dict["vrf"] == vrf:
4254 vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"])
4255
4256 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
4257 # for auto derived RT value.
4258 if as_num > 65535:
4259 as_bin = bin(as_num)
4260 as_bin = as_bin[-16:]
4261 as_num = int(as_bin, 2)
4262
4263 rt = "{}:{}".format(str(as_num), vni_dict[vrf])
4264 for _rd, route_data in evpn_rd_value_json.items():
4265 if route_data["ip"] == route:
4266 for rt_data in route_data["paths"]:
4267 if vni_dict[vrf] == rt_data["vni"]:
4268 rt_string = rt_data["extendedCommunity"][
4269 "string"
4270 ]
4271 rt_input = "RT:{}".format(rt)
4272 if rt_input not in rt_string:
4273 errormsg = (
4274 "[DUT: %s] Failed:"
4275 " Verifying RT "
4276 "value for EVPN "
4277 " route: %s"
4278 "[FAILED]!!,"
4279 " EXPECTED : %s "
4280 " FOUND : %s"
4281 % (dut, route, rt_input, rt_string)
4282 )
4283 return errormsg
4284
4285 else:
4286 logger.info(
4287 "[DUT %s]: Verifying "
4288 "RT value for EVPN "
4289 "route: %s [PASSED]||"
4290 "Found Expected: %s",
4291 dut,
4292 route,
4293 rt_input,
4294 )
4295 return True
4296
4297 else:
4298 errormsg = (
4299 "[DUT: %s] Route : %s is not"
4300 " present in cli json output" % (dut, route)
4301 )
4302 return errormsg
4303
4304 if rt is not None and rt != "auto":
4305 logger.info(
4306 "[DUT: %s]: Verifying rt value for " "evpn route %s:",
4307 dut,
4308 route,
4309 )
4310
4311 if type(rt) is not list:
4312 rt = [rt]
4313
4314 for _rt in rt:
4315 for _rd, route_data in evpn_rd_value_json.items():
4316 if route_data["ip"] == route:
4317 for rt_data in route_data["paths"]:
4318 rt_string = rt_data["extendedCommunity"][
4319 "string"
4320 ]
4321 rt_input = "RT:{}".format(_rt)
4322 if rt_input not in rt_string:
4323 errormsg = (
4324 "[DUT: %s] Failed: "
4325 "Verifying RT value "
4326 "for EVPN route: %s"
4327 "[FAILED]!!,"
4328 " EXPECTED : %s "
4329 " FOUND : %s"
4330 % (dut, route, rt_input, rt_string)
4331 )
4332 return errormsg
4333
4334 else:
4335 logger.info(
4336 "[DUT %s]: Verifying RT"
4337 " value for EVPN route:"
4338 " %s [PASSED]|| "
4339 "Found Expected: %s",
4340 dut,
4341 route,
4342 rt_input,
4343 )
4344 return True
4345
4346 else:
4347 errormsg = (
4348 "[DUT: %s] Route : %s is not"
4349 " present in cli json output" % (dut, route)
4350 )
4351 return errormsg
4352
4353 if ethTag is not None:
4354 logger.info(
4355 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
4356 )
4357
4358 for _rd, route_data in evpn_rd_value_json.items():
4359 if route_data["ip"] == route:
4360 if route_data["ethTag"] != ethTag:
4361 errormsg = (
4362 "[DUT: %s] RD: %s, Failed: "
4363 "Verifying ethTag value "
4364 "for EVPN route: %s"
4365 "[FAILED]!!,"
4366 " EXPECTED : %s "
4367 " FOUND : %s"
4368 % (
4369 dut,
4370 _rd,
4371 route,
4372 ethTag,
4373 route_data["ethTag"],
4374 )
4375 )
4376 return errormsg
4377
4378 else:
4379 logger.info(
4380 "[DUT %s]: RD: %s, Verifying "
4381 "ethTag value for EVPN route:"
4382 " %s [PASSED]|| "
4383 "Found Expected: %s",
4384 dut,
4385 _rd,
4386 route,
4387 ethTag,
4388 )
4389 return True
4390
4391 else:
4392 errormsg = (
4393 "[DUT: %s] RD: %s, Route : %s "
4394 "is not present in cli json "
4395 "output" % (dut, _rd, route)
4396 )
4397 return errormsg
4398
4399 if ipLen is not None:
4400 logger.info(
4401 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4402 )
4403
4404 for _rd, route_data in evpn_rd_value_json.items():
4405 if route_data["ip"] == route:
4406 if route_data["ipLen"] != int(ipLen):
4407 errormsg = (
4408 "[DUT: %s] RD: %s, Failed: "
4409 "Verifying ipLen value "
4410 "for EVPN route: %s"
4411 "[FAILED]!!,"
4412 " EXPECTED : %s "
4413 " FOUND : %s"
4414 % (dut, _rd, route, ipLen, route_data["ipLen"])
4415 )
4416 return errormsg
4417
4418 else:
4419 logger.info(
4420 "[DUT %s]: RD: %s, Verifying "
4421 "ipLen value for EVPN route:"
4422 " %s [PASSED]|| "
4423 "Found Expected: %s",
4424 dut,
4425 _rd,
4426 route,
4427 ipLen,
4428 )
4429 return True
4430
4431 else:
4432 errormsg = (
4433 "[DUT: %s] RD: %s, Route : %s "
4434 "is not present in cli json "
4435 "output " % (dut, _rd, route)
4436 )
4437 return errormsg
4438
4439 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4440 return False
4441
4442
4443 @retry(retry_timeout=10)
4444 def verify_evpn_routes(
4445 tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None, expected=True
4446 ):
4447 """
4448 API to verify evpn routes using "sh bgp l2vpn evpn"
4449 command.
4450 Parameters
4451 ----------
4452 * `tgen`: topogen object
4453 * `topo` : json file data
4454 * `dut` : device under test
4455 * `input_dict`: having details like - for which route, rd value
4456 needs to be verified
4457 * `route_type` : Route type 5 is supported as of now
4458 * `EthTag` : Ethernet tag, by-default is 0
4459 * `next_hop` : Prefered nexthop for the evpn routes
4460 * `expected` : expected results from API, by-default True
4461
4462 Usage
4463 -----
4464 input_dict_1 = {
4465 "r1": {
4466 "static_routes": [{
4467 "network": [NETWORK1_1[addr_type]],
4468 "next_hop": NEXT_HOP_IP[addr_type],
4469 "vrf": "RED"
4470 }]
4471 }
4472 }
4473 result = verify_evpn_routes(tgen, topo, input_dict)
4474 Returns
4475 -------
4476 errormsg(str) or True
4477 """
4478
4479 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4480
4481 for router in input_dict.keys():
4482 rnode = tgen.routers()[dut]
4483
4484 logger.info("[DUT: %s]: Verifying evpn routes: ", dut)
4485
4486 if "static_routes" in input_dict[router]:
4487 for static_route in input_dict[router]["static_routes"]:
4488 network = static_route["network"]
4489
4490 if type(network) is not list:
4491 network = [network]
4492
4493 missing_routes = {}
4494 for route in network:
4495 rd_keys = 0
4496 ip_len = route.split("/")[1]
4497 route = route.split("/")[0]
4498
4499 prefix = "[{}]:[{}]:[{}]:[{}]".format(
4500 routeType, EthTag, ip_len, route
4501 )
4502
4503 cmd = "show bgp l2vpn evpn route json"
4504 evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4505
4506 if not bool(evpn_value_json):
4507 errormsg = "No output for '{}' cli".format(cmd)
4508 return errormsg
4509
4510 if evpn_value_json["numPrefix"] == 0:
4511 errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut)
4512 return errormsg
4513
4514 for key, route_data_json in evpn_value_json.items():
4515 if type(route_data_json) is dict:
4516 rd_keys += 1
4517 if prefix not in route_data_json:
4518 missing_routes[key] = prefix
4519
4520 if rd_keys == len(missing_routes.keys()):
4521 errormsg = (
4522 "[DUT: %s]: "
4523 "Missing EVPN routes: "
4524 "%s [FAILED]!!" % (dut, list(set(missing_routes.values())))
4525 )
4526 return errormsg
4527
4528 for key, route_data_json in evpn_value_json.items():
4529 if type(route_data_json) is dict:
4530 if prefix not in route_data_json:
4531 continue
4532
4533 for paths in route_data_json[prefix]["paths"]:
4534 for path in paths:
4535 if path["routeType"] != routeType:
4536 errormsg = (
4537 "[DUT: %s]: "
4538 "Verifying routeType "
4539 "for EVPN route: %s "
4540 "[FAILED]!! "
4541 "Expected: %s, "
4542 "Found: %s"
4543 % (
4544 dut,
4545 prefix,
4546 routeType,
4547 path["routeType"],
4548 )
4549 )
4550 return errormsg
4551
4552 elif next_hop:
4553 for nh_dict in path["nexthops"]:
4554 if nh_dict["ip"] != next_hop:
4555 errormsg = (
4556 "[DUT: %s]: "
4557 "Verifying "
4558 "nexthop for "
4559 "EVPN route: %s"
4560 "[FAILED]!! "
4561 "Expected: %s,"
4562 " Found: %s"
4563 % (
4564 dut,
4565 prefix,
4566 next_hop,
4567 nh_dict["ip"],
4568 )
4569 )
4570 return errormsg
4571
4572 else:
4573 logger.info(
4574 "[DUT %s]: Verifying "
4575 "EVPN route : %s, "
4576 "routeType: %s is "
4577 "installed "
4578 "[PASSED]|| ",
4579 dut,
4580 prefix,
4581 routeType,
4582 )
4583 return True
4584
4585 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4586
4587 return False
4588
4589
4590 @retry(retry_timeout=10)
4591 def verify_bgp_bestpath(tgen, addr_type, input_dict):
4592 """
4593 Verifies bgp next hop values in best-path output
4594
4595 * `dut` : device under test
4596 * `addr_type` : Address type ipv4/ipv6
4597 * `input_dict`: having details like multipath and bestpath
4598
4599 Usage
4600 -----
4601 input_dict_1 = {
4602 "r1": {
4603 "ipv4" : {
4604 "bestpath": "50.0.0.1",
4605 "multipath": ["50.0.0.1", "50.0.0.2"],
4606 "network": "100.0.0.0/24"
4607 }
4608 "ipv6" : {
4609 "bestpath": "1000::1",
4610 "multipath": ["1000::1", "1000::2"]
4611 "network": "2000::1/128"
4612 }
4613 }
4614 }
4615
4616 result = verify_bgp_bestpath(tgen, input_dict)
4617
4618 """
4619
4620 result = False
4621 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4622 for dut in input_dict.keys():
4623 rnode = tgen.routers()[dut]
4624
4625 logger.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut)
4626 result = False
4627 for network_dict in input_dict[dut][addr_type]:
4628 nw_addr = network_dict.setdefault("network", None)
4629 vrf = network_dict.setdefault("vrf", None)
4630 bestpath = network_dict.setdefault("bestpath", None)
4631
4632 if vrf:
4633 cmd = "show bgp vrf {} {} {} bestpath json".format(
4634 vrf, addr_type, nw_addr
4635 )
4636 else:
4637 cmd = "show bgp {} {} bestpath json".format(addr_type, nw_addr)
4638
4639 data = run_frr_cmd(rnode, cmd, isjson=True)
4640 route = data["paths"][0]
4641
4642 if "bestpath" in route:
4643 if route["bestpath"]["overall"] is True:
4644 _bestpath = route["nexthops"][0]["ip"]
4645
4646 if _bestpath != bestpath:
4647 return (
4648 "DUT:[{}] Bestpath do not match for"
4649 " network: {}, Expected "
4650 " {} as bgp bestpath found {}".format(
4651 dut, nw_addr, bestpath, _bestpath
4652 )
4653 )
4654
4655 logger.info(
4656 "DUT:[{}] Found expected bestpath: "
4657 " {} for network: {}".format(dut, _bestpath, nw_addr)
4658 )
4659 result = True
4660
4661 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4662 return result
4663
4664
4665 def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
4666 """
4667 This api is used to verify the tcp-mss value assigned to a neigbour of DUT
4668
4669 Parameters
4670 ----------
4671 * `tgen` : topogen object
4672 * `dut`: device under test
4673 * `neighbour`:neigbout IP address
4674 * `configured_tcp_mss`:The TCP-MSS value to be verified
4675 * `vrf`:vrf
4676
4677 Usage
4678 -----
4679 result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss)
4680 Returns
4681 -------
4682 errormsg(str) or True
4683 """
4684
4685 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4686 rnode = tgen.routers()[dut]
4687 if vrf:
4688 cmd = "show bgp vrf {} neighbors {} json".format(vrf, neighbour)
4689 else:
4690 cmd = "show bgp neighbors {} json".format(neighbour)
4691
4692 # Execute the command
4693 show_vrf_stats = run_frr_cmd(rnode, cmd, isjson=True)
4694
4695 # Verify TCP-MSS on router
4696 logger.info("Verify that no core is observed")
4697 if tgen.routers_have_failure():
4698 errormsg = "Core observed while running CLI: %s" % (cmd)
4699 return errormsg
4700 else:
4701 if configured_tcp_mss == show_vrf_stats.get(neighbour).get(
4702 "bgpTcpMssConfigured"
4703 ):
4704 logger.debug(
4705 "Configured TCP - MSS Found: {}".format(sys._getframe().f_code.co_name)
4706 )
4707 return True
4708 else:
4709 logger.debug(
4710 "TCP-MSS Mismatch ,configured {} expecting {}".format(
4711 show_vrf_stats.get(neighbour).get("bgpTcpMssConfigured"),
4712 configured_tcp_mss,
4713 )
4714 )
4715 return "TCP-MSS Mismatch"
4716 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4717 return False
4718
4719
4720 def get_dut_as_number(tgen, dut):
4721 """
4722 API to get the Autonomous Number of the given DUT
4723
4724 params:
4725 =======
4726 dut : Device Under test
4727
4728 returns :
4729 =======
4730 Success : DUT Autonomous number
4731 Fail : Error message with Boolean False
4732 """
4733 tgen = get_topogen()
4734 for router, rnode in tgen.routers().items():
4735 if router == dut:
4736 show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True)
4737 as_number = show_bgp_json["ipv4Unicast"]["as"]
4738 if as_number:
4739 logger.info(
4740 "[dut {}] DUT contains Automnomous number :: {} ".format(
4741 dut, as_number
4742 )
4743 )
4744 return as_number
4745 else:
4746 logger.error(
4747 "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
4748 dut
4749 )
4750 )
4751 return False
4752
4753
4754 def get_prefix_count_route(
4755 tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None
4756 ):
4757 """
4758 API to return the prefix count of default originate the given DUT
4759 dut : Device under test
4760 peer : neigbor on which you are expecting the route to be received
4761
4762 returns :
4763 prefix_count as dict with ipv4 and ipv6 value
4764 """
4765 # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
4766
4767 if link:
4768 neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"]
4769 neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"]
4770 else:
4771 neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"]
4772 neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"]
4773
4774 neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0]
4775 neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0]
4776 prefix_count = {}
4777 tgen = get_topogen()
4778 for router, rnode in tgen.routers().items():
4779 if router == dut:
4780
4781 if vrf:
4782 ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
4783 show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
4784 ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf)
4785 show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True)
4786
4787 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][
4788 neighbor_ipv4_address
4789 ]["pfxRcd"]
4790 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4791 neighbor_ipv6_address
4792 ]["pfxRcd"]
4793
4794 logger.info(
4795 "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4796 dut,
4797 vrf,
4798 neighbor_ipv4_address,
4799 neighbor_ipv6_address,
4800 prefix_count,
4801 )
4802 )
4803 return prefix_count
4804
4805 else:
4806 show_bgp_json_ipv4 = run_frr_cmd(
4807 rnode, "sh ip bgp summary json ", isjson=True
4808 )
4809 show_bgp_json_ipv6 = run_frr_cmd(
4810 rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True
4811 )
4812 if received:
4813 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4814 "peers"
4815 ][neighbor_ipv4_address]["pfxRcd"]
4816 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4817 neighbor_ipv6_address
4818 ]["pfxRcd"]
4819
4820 elif sent:
4821 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4822 "peers"
4823 ][neighbor_ipv4_address]["pfxSnt"]
4824 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4825 neighbor_ipv6_address
4826 ]["pfxSnt"]
4827
4828 else:
4829 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4830 "peers"
4831 ][neighbor_ipv4_address]["pfxRcd"]
4832 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4833 neighbor_ipv6_address
4834 ]["pfxRcd"]
4835
4836 logger.info(
4837 "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4838 dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count
4839 )
4840 )
4841 return prefix_count
4842 else:
4843 logger.error("ERROR...! Unknown dut {} in topolgy".format(dut))
4844
4845
4846 @retry(retry_timeout=5)
4847 def verify_rib_default_route(
4848 tgen,
4849 topo,
4850 dut,
4851 routes,
4852 expected_nexthop,
4853 metric=None,
4854 origin=None,
4855 locPrf=None,
4856 expected_aspath=None,
4857 ):
4858 """
4859 API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
4860
4861 param
4862 =====
4863 dut : device under test
4864 routes : default route with expected nexthop
4865 expected_nexthop : the nexthop that is expected the deafult route
4866
4867 """
4868 result = False
4869 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4870 tgen = get_topogen()
4871 connected_routes = {}
4872 for router, rnode in tgen.routers().items():
4873 if router == dut:
4874
4875 ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
4876 ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
4877 is_ipv4_default_attrib_found = False
4878 is_ipv6_default_attrib_found = False
4879
4880 default_ipv4_route = routes["ipv4"]
4881 default_ipv6_route = "::/0"
4882 ipv4_route_Origin = False
4883 ipv4_route_local_pref = False
4884 ipv4_route_metric = False
4885
4886 if default_ipv4_route in ipv4_routes["routes"].keys():
4887 nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route])
4888 rib_next_hops = []
4889 for index in range(nxt_hop_count):
4890 rib_next_hops.append(
4891 ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"]
4892 )
4893
4894 for nxt_hop in expected_nexthop.items():
4895 if nxt_hop[0] == "ipv4":
4896 if nxt_hop[1] in rib_next_hops:
4897 logger.info(
4898 "Default routes [{}] obtained from {} .....PASSED".format(
4899 default_ipv4_route, nxt_hop[1]
4900 )
4901 )
4902 else:
4903 logger.error(
4904 "ERROR ...! Default routes [{}] expected is missing {}".format(
4905 default_ipv4_route, nxt_hop[1]
4906 )
4907 )
4908 return False
4909
4910 else:
4911 pass
4912
4913 if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4914 ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"]
4915 if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4916 ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][
4917 "locPrf"
4918 ]
4919 if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4920 ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"]
4921 else:
4922 logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut))
4923 return False
4924
4925 origin_found = False
4926 locPrf_found = False
4927 metric_found = False
4928 as_path_found = False
4929
4930 if origin:
4931 if origin == ipv4_route_Origin:
4932 logger.info(
4933 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
4934 default_ipv4_route, origin
4935 )
4936 )
4937 origin_found = True
4938 else:
4939 logger.error(
4940 "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
4941 origin, ipv4_route_Origin
4942 )
4943 )
4944 return False
4945 else:
4946 origin_found = True
4947
4948 if locPrf:
4949 if locPrf == ipv4_route_local_pref:
4950 logger.info(
4951 "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
4952 default_ipv4_route, locPrf
4953 )
4954 )
4955 locPrf_found = True
4956 else:
4957 logger.error(
4958 "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
4959 locPrf, ipv4_route_local_pref
4960 )
4961 )
4962 return False
4963 else:
4964 locPrf_found = True
4965
4966 if metric:
4967 if metric == ipv4_route_metric:
4968 logger.info(
4969 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
4970 default_ipv4_route, metric
4971 )
4972 )
4973
4974 metric_found = True
4975 else:
4976 logger.error(
4977 "ERROR... IPV4::! Expected metric is {} obtained {}".format(
4978 metric, ipv4_route_metric
4979 )
4980 )
4981 return False
4982 else:
4983 metric_found = True
4984
4985 if expected_aspath:
4986 obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"]
4987 if expected_aspath in obtained_aspath:
4988 as_path_found = True
4989 logger.info(
4990 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
4991 default_ipv4_route, expected_aspath
4992 )
4993 )
4994 else:
4995 logger.error(
4996 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
4997 expected_aspath, obtained_aspath
4998 )
4999 )
5000 return False
5001 else:
5002 as_path_found = True
5003
5004 if origin_found and locPrf_found and metric_found and as_path_found:
5005 is_ipv4_default_attrib_found = True
5006 logger.info(
5007 "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5008 origin, locPrf, metric, expected_aspath
5009 )
5010 )
5011 else:
5012 is_ipv4_default_attrib_found = False
5013 logger.error(
5014 "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
5015 origin, ipv4_route_Origin
5016 )
5017 )
5018 logger.error(
5019 "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
5020 locPrf, ipv4_route_local_pref
5021 )
5022 )
5023 logger.error(
5024 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5025 metric, ipv4_route_metric
5026 )
5027 )
5028 logger.error(
5029 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5030 expected_aspath, obtained_aspath
5031 )
5032 )
5033
5034 route_Origin = False
5035 route_local_pref = False
5036 route_local_metric = False
5037 default_ipv6_route = ""
5038 try:
5039 ipv6_routes["routes"]["0::0/0"]
5040 default_ipv6_route = "0::0/0"
5041 except:
5042 ipv6_routes["routes"]["::/0"]
5043 default_ipv6_route = "::/0"
5044 if default_ipv6_route in ipv6_routes["routes"].keys():
5045 nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route])
5046 rib_next_hops = []
5047 for index in range(nxt_hop_count):
5048 rib_next_hops.append(
5049 ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"]
5050 )
5051 try:
5052 rib_next_hops.append(
5053 ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][
5054 "ip"
5055 ]
5056 )
5057 except (KeyError, IndexError) as e:
5058 logger.error("NO impact ..! Global IPV6 Address not found ")
5059
5060 for nxt_hop in expected_nexthop.items():
5061 if nxt_hop[0] == "ipv6":
5062 if nxt_hop[1] in rib_next_hops:
5063 logger.info(
5064 "Default routes [{}] obtained from {} .....PASSED".format(
5065 default_ipv6_route, nxt_hop[1]
5066 )
5067 )
5068 else:
5069 logger.error(
5070 "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
5071 default_ipv6_route, nxt_hop[1], rib_next_hops
5072 )
5073 )
5074 return False
5075
5076 else:
5077 pass
5078 if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5079 route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"]
5080 if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5081 route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"]
5082 if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5083 route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"]
5084
5085 origin_found = False
5086 locPrf_found = False
5087 metric_found = False
5088 as_path_found = False
5089
5090 if origin:
5091 if origin == route_Origin:
5092 logger.info(
5093 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
5094 default_ipv6_route, route_Origin
5095 )
5096 )
5097 origin_found = True
5098 else:
5099 logger.error(
5100 "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
5101 origin, route_Origin
5102 )
5103 )
5104 return False
5105 else:
5106 origin_found = True
5107
5108 if locPrf:
5109 if locPrf == route_local_pref:
5110 logger.info(
5111 "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
5112 default_ipv6_route, route_local_pref
5113 )
5114 )
5115 locPrf_found = True
5116 else:
5117 logger.error(
5118 "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
5119 locPrf, route_local_pref
5120 )
5121 )
5122 return False
5123 else:
5124 locPrf_found = True
5125
5126 if metric:
5127 if metric == route_local_metric:
5128 logger.info(
5129 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
5130 default_ipv4_route, metric
5131 )
5132 )
5133
5134 metric_found = True
5135 else:
5136 logger.error(
5137 "ERROR... IPV6::! Expected metric is {} obtained {}".format(
5138 metric, route_local_metric
5139 )
5140 )
5141 return False
5142 else:
5143 metric_found = True
5144
5145 if expected_aspath:
5146 obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"]
5147 if expected_aspath in obtained_aspath:
5148 as_path_found = True
5149 logger.info(
5150 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5151 default_ipv4_route, expected_aspath
5152 )
5153 )
5154 else:
5155 logger.error(
5156 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5157 expected_aspath, obtained_aspath
5158 )
5159 )
5160 return False
5161 else:
5162 as_path_found = True
5163
5164 if origin_found and locPrf_found and metric_found and as_path_found:
5165 is_ipv6_default_attrib_found = True
5166 logger.info(
5167 "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5168 origin, locPrf, metric, expected_aspath
5169 )
5170 )
5171 else:
5172 is_ipv6_default_attrib_found = False
5173 logger.error(
5174 "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin)
5175 )
5176 logger.error(
5177 "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
5178 locPrf, route_local_pref
5179 )
5180 )
5181 logger.error(
5182 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5183 metric, route_local_metric
5184 )
5185 )
5186 logger.error(
5187 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5188 expected_aspath, obtained_aspath
5189 )
5190 )
5191
5192 if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found:
5193 logger.info("The attributes are found for default route in RIB ")
5194 return True
5195 else:
5196 return False
5197
5198
5199 @retry(retry_timeout=5)
5200 def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop):
5201 """
5202 API to verify the the 'Default route" in FIB
5203
5204 param
5205 =====
5206 dut : device under test
5207 routes : default route with expected nexthop
5208 expected_nexthop : the nexthop that is expected the deafult route
5209
5210 """
5211 result = False
5212 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5213 tgen = get_topogen()
5214 connected_routes = {}
5215 for router, rnode in tgen.routers().items():
5216 if router == dut:
5217 ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True)
5218 ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True)
5219
5220 is_ipv4_default_route_found = False
5221 is_ipv6_default_route_found = False
5222 if routes["ipv4"] in ipv4_routes.keys():
5223 rib_ipv4_nxt_hops = []
5224 ipv4_default_route = routes["ipv4"]
5225 nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"])
5226 for index in range(nxt_hop_count):
5227 rib_ipv4_nxt_hops.append(
5228 ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"]
5229 )
5230
5231 if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops:
5232 is_ipv4_default_route_found = True
5233 logger.info(
5234 "{} default route with next hop {} is found in FIB ".format(
5235 ipv4_default_route, expected_nexthop
5236 )
5237 )
5238 else:
5239 logger.error(
5240 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5241 ipv4_default_route, expected_nexthop
5242 )
5243 )
5244 return False
5245
5246 if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys():
5247 rib_ipv6_nxt_hops = []
5248 if "::/0" in ipv6_routes.keys():
5249 ipv6_default_route = "::/0"
5250 elif routes["ipv6"] in ipv6_routes.keys():
5251 ipv6_default_route = routes["ipv6"]
5252
5253 nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"])
5254 for index in range(nxt_hop_count):
5255 rib_ipv6_nxt_hops.append(
5256 ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"]
5257 )
5258
5259 if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops:
5260 is_ipv6_default_route_found = True
5261 logger.info(
5262 "{} default route with next hop {} is found in FIB ".format(
5263 ipv6_default_route, expected_nexthop
5264 )
5265 )
5266 else:
5267 logger.error(
5268 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5269 ipv6_default_route, expected_nexthop
5270 )
5271 )
5272 return False
5273
5274 if is_ipv4_default_route_found and is_ipv6_default_route_found:
5275 return True
5276 else:
5277 logger.error(
5278 "Default Route for ipv4 and ipv6 address family is not found in FIB "
5279 )
5280 return False
5281
5282
5283 @retry(retry_timeout=5)
5284 def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
5285 """
5286 APi is verifies the the routes that are advertised from dut to peer
5287
5288 command used :
5289 "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
5290 "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
5291
5292 dut : Device Under Tests
5293 Peer : Peer on which the routs is expected
5294 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5295 expected_routes
5296
5297 returns: True / False
5298
5299 """
5300 result = False
5301 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5302 tgen = get_topogen()
5303
5304 peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
5305 peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
5306
5307 for router, rnode in tgen.routers().items():
5308 if router == dut:
5309 ipv4_receieved_routes = run_frr_cmd(
5310 rnode,
5311 "sh ip bgp neighbor {} advertised-routes json".format(
5312 peer_ipv4_neighbor_ip
5313 ),
5314 isjson=True,
5315 )
5316 ipv6_receieved_routes = run_frr_cmd(
5317 rnode,
5318 "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
5319 peer_ipv6_neighbor_ip
5320 ),
5321 isjson=True,
5322 )
5323 ipv4_route_count = 0
5324 ipv6_route_count = 0
5325 if ipv4_receieved_routes:
5326 for index in range(len(expected_routes["ipv4"])):
5327 if (
5328 expected_routes["ipv4"][index]["network"]
5329 in ipv4_receieved_routes["advertisedRoutes"].keys()
5330 ):
5331 ipv4_route_count += 1
5332 logger.info(
5333 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5334 dut, expected_routes["ipv4"][index]["network"], peer
5335 )
5336 )
5337
5338 elif (
5339 expected_routes["ipv4"][index]["network"]
5340 in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"]
5341 ):
5342 ipv4_route_count += 1
5343 logger.info(
5344 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5345 dut, expected_routes["ipv4"][index]["network"], peer
5346 )
5347 )
5348
5349 else:
5350 logger.error(
5351 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5352 dut, expected_routes["ipv4"][index]["network"], peer
5353 )
5354 )
5355 else:
5356 logger.error(ipv4_receieved_routes)
5357 logger.error(
5358 "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
5359 dut, peer
5360 )
5361 )
5362 return False
5363
5364 if ipv6_receieved_routes:
5365 for index in range(len(expected_routes["ipv6"])):
5366 if (
5367 expected_routes["ipv6"][index]["network"]
5368 in ipv6_receieved_routes["advertisedRoutes"].keys()
5369 ):
5370 ipv6_route_count += 1
5371 logger.info(
5372 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5373 dut, expected_routes["ipv6"][index]["network"], peer
5374 )
5375 )
5376 elif (
5377 expected_routes["ipv6"][index]["network"]
5378 in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"]
5379 ):
5380 ipv6_route_count += 1
5381 logger.info(
5382 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5383 dut, expected_routes["ipv6"][index]["network"], peer
5384 )
5385 )
5386 else:
5387 logger.error(
5388 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5389 dut, expected_routes["ipv6"][index]["network"], peer
5390 )
5391 )
5392 else:
5393 logger.error(ipv6_receieved_routes)
5394 logger.error(
5395 "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
5396 dut, peer
5397 )
5398 )
5399 return False
5400
5401 if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
5402 expected_routes["ipv6"]
5403 ):
5404 return True
5405 else:
5406 logger.error(
5407 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5408 expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
5409 )
5410 )
5411 logger.error(
5412 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5413 expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
5414 )
5415 )
5416 return False
5417
5418
5419 @retry(retry_timeout=5)
5420 def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
5421 """
5422 API to verify the bgp received routes
5423
5424 commad used :
5425 =============
5426 show ip bgp neighbor <x.x.x.x> received-routes
5427 show ip bgp ipv6 unicast neighbor <x::x> received-routes
5428
5429 params
5430 =======
5431 dut : Device Under Tests
5432 Peer : Peer on which the routs is expected
5433 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5434 expected_routes
5435
5436 returns:
5437 ========
5438 True / False
5439 """
5440 result = False
5441 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5442 tgen = get_topogen()
5443
5444 peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
5445 peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
5446
5447 logger.info("Enabling Soft configuration to neighbor INBOUND ")
5448 neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip}
5449 result = configure_bgp_soft_configuration(
5450 tgen, dut, neigbor_dict, direction="inbound"
5451 )
5452 assert (
5453 result is True
5454 ), " Failed to configure the soft configuration \n Error: {}".format(result)
5455
5456 """sleep of 10 sec is required to get the routes on peer after soft configuration"""
5457 sleep(10)
5458 for router, rnode in tgen.routers().items():
5459 if router == dut:
5460 ipv4_receieved_routes = run_frr_cmd(
5461 rnode,
5462 "sh ip bgp neighbor {} received-routes json".format(
5463 peer_ipv4_neighbor_ip
5464 ),
5465 isjson=True,
5466 )
5467 ipv6_receieved_routes = run_frr_cmd(
5468 rnode,
5469 "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
5470 peer_ipv6_neighbor_ip
5471 ),
5472 isjson=True,
5473 )
5474 ipv4_route_count = 0
5475 ipv6_route_count = 0
5476 if ipv4_receieved_routes:
5477 for index in range(len(expected_routes["ipv4"])):
5478 if (
5479 expected_routes["ipv4"][index]["network"]
5480 in ipv4_receieved_routes["receivedRoutes"].keys()
5481 ):
5482 ipv4_route_count += 1
5483 logger.info(
5484 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5485 dut, expected_routes["ipv4"][index]["network"], peer
5486 )
5487 )
5488 else:
5489 logger.error(
5490 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5491 dut, expected_routes["ipv4"][index]["network"], peer
5492 )
5493 )
5494 else:
5495 logger.error(ipv4_receieved_routes)
5496 logger.error(
5497 "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
5498 dut, peer
5499 )
5500 )
5501 return False
5502
5503 if ipv6_receieved_routes:
5504 for index in range(len(expected_routes["ipv6"])):
5505 if (
5506 expected_routes["ipv6"][index]["network"]
5507 in ipv6_receieved_routes["receivedRoutes"].keys()
5508 ):
5509 ipv6_route_count += 1
5510 logger.info(
5511 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5512 dut, expected_routes["ipv6"][index]["network"], peer
5513 )
5514 )
5515 else:
5516 logger.error(
5517 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5518 dut, expected_routes["ipv6"][index]["network"], peer
5519 )
5520 )
5521 else:
5522 logger.error(ipv6_receieved_routes)
5523 logger.error(
5524 "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
5525 dut, peer
5526 )
5527 )
5528 return False
5529
5530 if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
5531 expected_routes["ipv6"]
5532 ):
5533 return True
5534 else:
5535 logger.error(
5536 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5537 expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
5538 )
5539 )
5540 logger.error(
5541 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5542 expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
5543 )
5544 )
5545 return False
5546
5547
5548 def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction):
5549 """
5550 Api to configure the bgp soft configuration to show the received routes from peer
5551 params
5552 ======
5553 dut : device under test route on which the sonfiguration to be applied
5554 neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
5555 direction : Directionon which it should be applied in/out
5556
5557 returns:
5558 ========
5559 boolean
5560 """
5561 logger.info("Enabling Soft configuration to neighbor INBOUND ")
5562 local_as = get_dut_as_number(tgen, dut)
5563 ipv4_neighbor = neighbor_dict["ipv4"]
5564 ipv6_neighbor = neighbor_dict["ipv6"]
5565 direction = direction.lower()
5566 if ipv4_neighbor and ipv4_neighbor:
5567 raw_config = {
5568 dut: {
5569 "raw_config": [
5570 "router bgp {}".format(local_as),
5571 "address-family ipv4 unicast",
5572 "neighbor {} soft-reconfiguration {} ".format(
5573 ipv4_neighbor, direction
5574 ),
5575 "exit-address-family",
5576 "address-family ipv6 unicast",
5577 "neighbor {} soft-reconfiguration {} ".format(
5578 ipv6_neighbor, direction
5579 ),
5580 "exit-address-family",
5581 ]
5582 }
5583 }
5584 result = apply_raw_config(tgen, raw_config)
5585 logger.info(
5586 "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(
5587 dut, neighbor_dict
5588 )
5589 )
5590 return True