]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/bgp.py
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / tests / topotests / lib / bgp.py
1 # SPDX-License-Identifier: ISC
2 #
3 # Copyright (c) 2019 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
6 #
7
8 import ipaddress
9 import sys
10 import traceback
11 from copy import deepcopy
12 from time import sleep
13
14 # Import common_config to use commomnly used APIs
15 from lib.common_config import (
16 create_common_configurations,
17 FRRCFG_FILE,
18 InvalidCLIError,
19 apply_raw_config,
20 check_address_types,
21 find_interface_with_greater_ip,
22 generate_ips,
23 get_frr_ipv6_linklocal,
24 retry,
25 run_frr_cmd,
26 validate_ip_address,
27 )
28 from lib.topogen import get_topogen
29 from lib.topolog import logger
30 from lib.topotest import frr_unicode
31
32 from lib import topotest
33
34
35 def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config=True):
36 """
37 API to configure bgp on router
38
39 Parameters
40 ----------
41 * `tgen` : Topogen object
42 * `topo` : json file data
43 * `input_dict` : Input dict data, required when configuring from testcase
44 * `build` : Only for initial setup phase this is set as True.
45
46 Usage
47 -----
48 input_dict = {
49 "r1": {
50 "bgp": {
51 "local_as": "200",
52 "router_id": "22.22.22.22",
53 "graceful-restart": {
54 "graceful-restart": True,
55 "preserve-fw-state": True,
56 "timer": {
57 "restart-time": 30,
58 "rib-stale-time": 30,
59 "select-defer-time": 30,
60 }
61 },
62 "address_family": {
63 "ipv4": {
64 "unicast": {
65 "default_originate":{
66 "neighbor":"R2",
67 "add_type":"lo"
68 "route_map":"rm"
69
70 },
71 "redistribute": [{
72 "redist_type": "static",
73 "attribute": {
74 "metric" : 123
75 }
76 },
77 {"redist_type": "connected"}
78 ],
79 "advertise_networks": [
80 {
81 "network": "20.0.0.0/32",
82 "no_of_network": 10
83 },
84 {
85 "network": "30.0.0.0/32",
86 "no_of_network": 10
87 }
88 ],
89 "neighbor": {
90 "r3": {
91 "keepalivetimer": 60,
92 "holddowntimer": 180,
93 "dest_link": {
94 "r4": {
95 "allowas-in": {
96 "number_occurences": 2
97 },
98 "prefix_lists": [
99 {
100 "name": "pf_list_1",
101 "direction": "in"
102 }
103 ],
104 "route_maps": [{
105 "name": "RMAP_MED_R3",
106 "direction": "in"
107 }],
108 "next_hop_self": True
109 },
110 "r1": {"graceful-restart-helper": True}
111 }
112 }
113 }
114 }
115 }
116 }
117 }
118 }
119 }
120
121
122 Returns
123 -------
124 True or False
125 """
126 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
127 result = False
128
129 if topo is None:
130 topo = tgen.json_topo
131
132 # Flag is used when testing ipv6 over ipv4 or vice-versa
133 afi_test = False
134
135 if not input_dict:
136 input_dict = deepcopy(topo)
137 else:
138 topo = topo["routers"]
139 input_dict = deepcopy(input_dict)
140
141 config_data_dict = {}
142
143 for router in input_dict.keys():
144 if "bgp" not in input_dict[router]:
145 logger.debug("Router %s: 'bgp' not present in input_dict", router)
146 continue
147
148 bgp_data_list = input_dict[router]["bgp"]
149
150 if type(bgp_data_list) is not list:
151 bgp_data_list = [bgp_data_list]
152
153 config_data = []
154
155 for bgp_data in bgp_data_list:
156 data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
157 if data_all_bgp:
158 bgp_addr_data = bgp_data.setdefault("address_family", {})
159
160 if not bgp_addr_data:
161 logger.debug(
162 "Router %s: 'address_family' not present in "
163 "input_dict for BGP",
164 router,
165 )
166 else:
167 ipv4_data = bgp_addr_data.setdefault("ipv4", {})
168 ipv6_data = bgp_addr_data.setdefault("ipv6", {})
169 l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
170
171 neigh_unicast = (
172 True
173 if ipv4_data.setdefault("unicast", {})
174 or ipv6_data.setdefault("unicast", {})
175 else False
176 )
177
178 l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False
179
180 if neigh_unicast:
181 data_all_bgp = __create_bgp_unicast_neighbor(
182 tgen,
183 topo,
184 bgp_data,
185 router,
186 afi_test,
187 config_data=data_all_bgp,
188 )
189
190 if l2vpn_evpn:
191 data_all_bgp = __create_l2vpn_evpn_address_family(
192 tgen, topo, bgp_data, router, config_data=data_all_bgp
193 )
194 if data_all_bgp:
195 config_data.extend(data_all_bgp)
196
197 if config_data:
198 config_data_dict[router] = config_data
199
200 try:
201 result = create_common_configurations(
202 tgen, config_data_dict, "bgp", build, load_config
203 )
204 except InvalidCLIError:
205 logger.error("create_router_bgp", exc_info=True)
206 result = False
207
208 logger.debug("Exiting lib API: create_router_bgp()")
209 return result
210
211
212 def __create_bgp_global(tgen, input_dict, router, build=False):
213 """
214 Helper API to create bgp global configuration.
215
216 Parameters
217 ----------
218 * `tgen` : Topogen object
219 * `input_dict` : Input dict data, required when configuring from testcase
220 * `router` : router id to be configured.
221 * `build` : Only for initial setup phase this is set as True.
222
223 Returns
224 -------
225 list of config commands
226 """
227
228 result = False
229 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
230
231 bgp_data = input_dict
232 del_bgp_action = bgp_data.setdefault("delete", False)
233
234 config_data = []
235
236 if "local_as" not in bgp_data and build:
237 logger.debug(
238 "Router %s: 'local_as' not present in input_dict" "for BGP", router
239 )
240 return config_data
241
242 local_as = bgp_data.setdefault("local_as", "")
243 cmd = "router bgp {}".format(local_as)
244 vrf_id = bgp_data.setdefault("vrf", None)
245 if vrf_id:
246 cmd = "{} vrf {}".format(cmd, vrf_id)
247
248 if del_bgp_action:
249 cmd = "no {}".format(cmd)
250 config_data.append(cmd)
251
252 return config_data
253
254 config_data.append(cmd)
255 config_data.append("no bgp ebgp-requires-policy")
256
257 router_id = bgp_data.setdefault("router_id", None)
258 del_router_id = bgp_data.setdefault("del_router_id", False)
259 if del_router_id:
260 config_data.append("no bgp router-id")
261 if router_id:
262 config_data.append("bgp router-id {}".format(router_id))
263
264 config_data.append("bgp log-neighbor-changes")
265 config_data.append("no bgp network import-check")
266 bgp_peer_grp_data = bgp_data.setdefault("peer-group", {})
267
268 if "peer-group" in bgp_data and bgp_peer_grp_data:
269 peer_grp_data = __create_bgp_peer_group(tgen, bgp_peer_grp_data, router)
270 config_data.extend(peer_grp_data)
271
272 bst_path = bgp_data.setdefault("bestpath", None)
273 if bst_path:
274 if "aspath" in bst_path:
275 if "delete" in bst_path:
276 config_data.append(
277 "no bgp bestpath as-path {}".format(bst_path["aspath"])
278 )
279 else:
280 config_data.append("bgp bestpath as-path {}".format(bst_path["aspath"]))
281
282 if "graceful-restart" in bgp_data:
283 graceful_config = bgp_data["graceful-restart"]
284
285 graceful_restart = graceful_config.setdefault("graceful-restart", None)
286
287 graceful_restart_disable = graceful_config.setdefault(
288 "graceful-restart-disable", None
289 )
290
291 preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None)
292
293 disable_eor = graceful_config.setdefault("disable-eor", None)
294
295 if graceful_restart == False:
296 cmd = "no bgp graceful-restart"
297 if graceful_restart:
298 cmd = "bgp graceful-restart"
299
300 if graceful_restart is not None:
301 config_data.append(cmd)
302
303 if graceful_restart_disable == False:
304 cmd = "no bgp graceful-restart-disable"
305 if graceful_restart_disable:
306 cmd = "bgp graceful-restart-disable"
307
308 if graceful_restart_disable is not None:
309 config_data.append(cmd)
310
311 if preserve_fw_state == False:
312 cmd = "no bgp graceful-restart preserve-fw-state"
313 if preserve_fw_state:
314 cmd = "bgp graceful-restart preserve-fw-state"
315
316 if preserve_fw_state is not None:
317 config_data.append(cmd)
318
319 if disable_eor == False:
320 cmd = "no bgp graceful-restart disable-eor"
321 if disable_eor:
322 cmd = "bgp graceful-restart disable-eor"
323
324 if disable_eor is not None:
325 config_data.append(cmd)
326
327 if "timer" in bgp_data["graceful-restart"]:
328 timer = bgp_data["graceful-restart"]["timer"]
329
330 if "delete" in timer:
331 del_action = timer["delete"]
332 else:
333 del_action = False
334
335 for rs_timer, value in timer.items():
336 rs_timer_value = timer.setdefault(rs_timer, None)
337
338 if rs_timer_value and rs_timer != "delete":
339 cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value)
340
341 if del_action:
342 cmd = "no {}".format(cmd)
343
344 config_data.append(cmd)
345
346 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
347 return config_data
348
349
350 def __create_bgp_unicast_neighbor(
351 tgen, topo, input_dict, router, afi_test, config_data=None
352 ):
353 """
354 Helper API to create configuration for address-family unicast
355
356 Parameters
357 ----------
358 * `tgen` : Topogen object
359 * `topo` : json file data
360 * `input_dict` : Input dict data, required when configuring from testcase
361 * `router` : router id to be configured.
362 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
363 * `build` : Only for initial setup phase this is set as True.
364 """
365
366 result = False
367 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
368
369 add_neigh = True
370 bgp_data = input_dict
371 if "router bgp" in config_data:
372 add_neigh = False
373
374 bgp_data = input_dict["address_family"]
375
376 for addr_type, addr_dict in bgp_data.items():
377 if not addr_dict:
378 continue
379
380 if not check_address_types(addr_type) and not afi_test:
381 continue
382
383 addr_data = addr_dict["unicast"]
384 if addr_data:
385 config_data.append("address-family {} unicast".format(addr_type))
386
387 advertise_network = addr_data.setdefault("advertise_networks", [])
388 for advertise_network_dict in advertise_network:
389 network = advertise_network_dict["network"]
390 if type(network) is not list:
391 network = [network]
392
393 if "no_of_network" in advertise_network_dict:
394 no_of_network = advertise_network_dict["no_of_network"]
395 else:
396 no_of_network = 1
397
398 del_action = advertise_network_dict.setdefault("delete", False)
399
400 # Generating IPs for verification
401 network_list = generate_ips(network, no_of_network)
402 for ip in network_list:
403 ip = str(ipaddress.ip_network(frr_unicode(ip)))
404
405 cmd = "network {}".format(ip)
406 if del_action:
407 cmd = "no {}".format(cmd)
408
409 config_data.append(cmd)
410
411 import_cmd = addr_data.setdefault("import", {})
412 if import_cmd:
413 try:
414 if import_cmd["delete"]:
415 config_data.append("no import vrf {}".format(import_cmd["vrf"]))
416 except KeyError:
417 config_data.append("import vrf {}".format(import_cmd["vrf"]))
418
419 max_paths = addr_data.setdefault("maximum_paths", {})
420 if max_paths:
421 ibgp = max_paths.setdefault("ibgp", None)
422 ebgp = max_paths.setdefault("ebgp", None)
423 del_cmd = max_paths.setdefault("delete", False)
424 if ibgp:
425 if del_cmd:
426 config_data.append("no maximum-paths ibgp {}".format(ibgp))
427 else:
428 config_data.append("maximum-paths ibgp {}".format(ibgp))
429 if ebgp:
430 if del_cmd:
431 config_data.append("no maximum-paths {}".format(ebgp))
432 else:
433 config_data.append("maximum-paths {}".format(ebgp))
434
435 aggregate_addresses = addr_data.setdefault("aggregate_address", [])
436 for aggregate_address in aggregate_addresses:
437 network = aggregate_address.setdefault("network", None)
438 if not network:
439 logger.debug(
440 "Router %s: 'network' not present in " "input_dict for BGP", router
441 )
442 else:
443 cmd = "aggregate-address {}".format(network)
444
445 as_set = aggregate_address.setdefault("as_set", False)
446 summary = aggregate_address.setdefault("summary", False)
447 del_action = aggregate_address.setdefault("delete", False)
448 if as_set:
449 cmd = "{} as-set".format(cmd)
450 if summary:
451 cmd = "{} summary".format(cmd)
452
453 if del_action:
454 cmd = "no {}".format(cmd)
455
456 config_data.append(cmd)
457
458 redistribute_data = addr_data.setdefault("redistribute", {})
459 if redistribute_data:
460 for redistribute in redistribute_data:
461 if "redist_type" not in redistribute:
462 logger.debug(
463 "Router %s: 'redist_type' not present in " "input_dict", router
464 )
465 else:
466 cmd = "redistribute {}".format(redistribute["redist_type"])
467 redist_attr = redistribute.setdefault("attribute", None)
468 if redist_attr:
469 if type(redist_attr) is dict:
470 for key, value in redist_attr.items():
471 cmd = "{} {} {}".format(cmd, key, value)
472 else:
473 cmd = "{} {}".format(cmd, redist_attr)
474
475 del_action = redistribute.setdefault("delete", False)
476 if del_action:
477 cmd = "no {}".format(cmd)
478 config_data.append(cmd)
479
480 admin_dist_data = addr_data.setdefault("distance", {})
481 if admin_dist_data:
482 if len(admin_dist_data) < 2:
483 logger.debug(
484 "Router %s: pass the admin distance values for "
485 "ebgp, ibgp and local routes",
486 router,
487 )
488 cmd = "distance bgp {} {} {}".format(
489 admin_dist_data["ebgp"],
490 admin_dist_data["ibgp"],
491 admin_dist_data["local"],
492 )
493
494 del_action = admin_dist_data.setdefault("delete", False)
495 if del_action:
496 cmd = "no distance bgp"
497 config_data.append(cmd)
498
499 import_vrf_data = addr_data.setdefault("import", {})
500 if import_vrf_data:
501 cmd = "import vrf {}".format(import_vrf_data["vrf"])
502
503 del_action = import_vrf_data.setdefault("delete", False)
504 if del_action:
505 cmd = "no {}".format(cmd)
506 config_data.append(cmd)
507
508 if "neighbor" in addr_data:
509 neigh_data = __create_bgp_neighbor(
510 topo, input_dict, router, addr_type, add_neigh
511 )
512 config_data.extend(neigh_data)
513 # configure default originate
514 if "default_originate" in addr_data:
515 default_originate_config = __create_bgp_default_originate_neighbor(
516 topo, input_dict, router, addr_type, add_neigh
517 )
518 config_data.extend(default_originate_config)
519
520 for addr_type, addr_dict in bgp_data.items():
521 if not addr_dict or not check_address_types(addr_type):
522 continue
523
524 addr_data = addr_dict["unicast"]
525 if "neighbor" in addr_data:
526 neigh_addr_data = __create_bgp_unicast_address_family(
527 topo, input_dict, router, addr_type, add_neigh
528 )
529
530 config_data.extend(neigh_addr_data)
531
532 config_data.append("exit")
533 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
534 return config_data
535
536
537 def __create_bgp_default_originate_neighbor(
538 topo, input_dict, router, addr_type, add_neigh=True
539 ):
540 """
541 Helper API to create neighbor default - originate configuration
542
543 Parameters
544 ----------
545 * `tgen` : Topogen object
546 * `topo` : json file data
547 * `input_dict` : Input dict data, required when configuring from testcase
548 * `router` : router id to be configured
549 """
550 tgen = get_topogen()
551 config_data = []
552 logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
553
554 bgp_data = input_dict["address_family"]
555 neigh_data = bgp_data[addr_type]["unicast"]["default_originate"]
556 for name, peer_dict in neigh_data.items():
557 nh_details = topo[name]
558
559 neighbor_ip = None
560 if "dest-link" in neigh_data[name]:
561 dest_link = neigh_data[name]["dest-link"]
562 neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0]
563 elif "add_type" in neigh_data[name]:
564 add_type = neigh_data[name]["add_type"]
565 neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0]
566 else:
567 neighbor_ip = nh_details["links"][router][addr_type].split("/")[0]
568
569 config_data.append("address-family {} unicast".format(addr_type))
570 if "route_map" in peer_dict:
571 route_map = peer_dict["route_map"]
572 if "delete" in peer_dict:
573 if peer_dict["delete"]:
574 config_data.append(
575 "no neighbor {} default-originate route-map {}".format(
576 neighbor_ip, route_map
577 )
578 )
579 else:
580 config_data.append(
581 " neighbor {} default-originate route-map {}".format(
582 neighbor_ip, route_map
583 )
584 )
585 else:
586 config_data.append(
587 " neighbor {} default-originate route-map {}".format(
588 neighbor_ip, route_map
589 )
590 )
591
592 else:
593 if "delete" in peer_dict:
594 if peer_dict["delete"]:
595 config_data.append(
596 "no neighbor {} default-originate".format(neighbor_ip)
597 )
598 else:
599 config_data.append(
600 "neighbor {} default-originate".format(neighbor_ip)
601 )
602 else:
603 config_data.append("neighbor {} default-originate".format(neighbor_ip))
604
605 logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
606 return config_data
607
608
609 def __create_l2vpn_evpn_address_family(
610 tgen, topo, input_dict, router, config_data=None
611 ):
612 """
613 Helper API to create configuration for l2vpn evpn address-family
614
615 Parameters
616 ----------
617 * `tgen` : Topogen object
618 * `topo` : json file data
619 * `input_dict` : Input dict data, required when configuring
620 from testcase
621 * `router` : router id to be configured.
622 * `build` : Only for initial setup phase this is set as True.
623 """
624
625 result = False
626
627 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
628
629 bgp_data = input_dict["address_family"]
630
631 for family_type, family_dict in bgp_data.items():
632 if family_type != "l2vpn":
633 continue
634
635 family_data = family_dict["evpn"]
636 if family_data:
637 config_data.append("address-family l2vpn evpn")
638
639 advertise_data = family_data.setdefault("advertise", {})
640 neighbor_data = family_data.setdefault("neighbor", {})
641 advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None)
642 rd_data = family_data.setdefault("rd", None)
643 no_rd_data = family_data.setdefault("no rd", False)
644 route_target_data = family_data.setdefault("route-target", {})
645
646 if advertise_data:
647 for address_type, unicast_type in advertise_data.items():
648 if type(unicast_type) is dict:
649 for key, value in unicast_type.items():
650 cmd = "advertise {} {}".format(address_type, key)
651
652 if value:
653 route_map = value.setdefault("route-map", {})
654 advertise_del_action = value.setdefault("delete", None)
655
656 if route_map:
657 cmd = "{} route-map {}".format(cmd, route_map)
658
659 if advertise_del_action:
660 cmd = "no {}".format(cmd)
661
662 config_data.append(cmd)
663
664 if neighbor_data:
665 for neighbor, neighbor_data in neighbor_data.items():
666 ipv4_neighbor = neighbor_data.setdefault("ipv4", {})
667 ipv6_neighbor = neighbor_data.setdefault("ipv6", {})
668
669 if ipv4_neighbor:
670 for neighbor_name, action in ipv4_neighbor.items():
671 neighbor_ip = topo[neighbor]["links"][neighbor_name][
672 "ipv4"
673 ].split("/")[0]
674
675 if type(action) is dict:
676 next_hop_self = action.setdefault("next_hop_self", None)
677 route_maps = action.setdefault("route_maps", {})
678
679 if next_hop_self is not None:
680 if next_hop_self is True:
681 config_data.append(
682 "neighbor {} "
683 "next-hop-self".format(neighbor_ip)
684 )
685 elif next_hop_self is False:
686 config_data.append(
687 "no neighbor {} "
688 "next-hop-self".format(neighbor_ip)
689 )
690
691 if route_maps:
692 for route_map in route_maps:
693 name = route_map.setdefault("name", {})
694 direction = route_map.setdefault("direction", "in")
695 del_action = route_map.setdefault("delete", False)
696
697 if not name:
698 logger.info(
699 "Router %s: 'name' "
700 "not present in "
701 "input_dict for BGP "
702 "neighbor route name",
703 router,
704 )
705 else:
706 cmd = "neighbor {} route-map {} " "{}".format(
707 neighbor_ip, name, direction
708 )
709
710 if del_action:
711 cmd = "no {}".format(cmd)
712
713 config_data.append(cmd)
714
715 else:
716 if action == "activate":
717 cmd = "neighbor {} activate".format(neighbor_ip)
718 elif action == "deactivate":
719 cmd = "no neighbor {} activate".format(neighbor_ip)
720
721 config_data.append(cmd)
722
723 if ipv6_neighbor:
724 for neighbor_name, action in ipv4_neighbor.items():
725 neighbor_ip = topo[neighbor]["links"][neighbor_name][
726 "ipv6"
727 ].split("/")[0]
728 if action == "activate":
729 cmd = "neighbor {} activate".format(neighbor_ip)
730 elif action == "deactivate":
731 cmd = "no neighbor {} activate".format(neighbor_ip)
732
733 config_data.append(cmd)
734
735 if advertise_all_vni_data == True:
736 cmd = "advertise-all-vni"
737 config_data.append(cmd)
738 elif advertise_all_vni_data == False:
739 cmd = "no advertise-all-vni"
740 config_data.append(cmd)
741
742 if rd_data:
743 cmd = "rd {}".format(rd_data)
744 config_data.append(cmd)
745
746 if no_rd_data:
747 cmd = "no rd {}".format(no_rd_data)
748 config_data.append(cmd)
749
750 if route_target_data:
751 for rt_type, rt_dict in route_target_data.items():
752 for _rt_dict in rt_dict:
753 rt_value = _rt_dict.setdefault("value", None)
754 del_rt = _rt_dict.setdefault("delete", None)
755
756 if rt_value:
757 cmd = "route-target {} {}".format(rt_type, rt_value)
758 if del_rt:
759 cmd = "no {}".format(cmd)
760
761 config_data.append(cmd)
762
763 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
764
765 return config_data
766
767
768 def __create_bgp_peer_group(topo, input_dict, router):
769 """
770 Helper API to create neighbor specific configuration
771
772 Parameters
773 ----------
774 * `topo` : json file data
775 * `input_dict` : Input dict data, required when configuring from testcase
776 * `router` : router id to be configured
777 """
778 config_data = []
779 logger.debug("Entering lib API: __create_bgp_peer_group()")
780
781 for grp, grp_dict in input_dict.items():
782 config_data.append("neighbor {} peer-group".format(grp))
783 neigh_cxt = "neighbor {} ".format(grp)
784 update_source = grp_dict.setdefault("update-source", None)
785 remote_as = grp_dict.setdefault("remote-as", None)
786 capability = grp_dict.setdefault("capability", None)
787 if update_source:
788 config_data.append("{} update-source {}".format(neigh_cxt, update_source))
789
790 if remote_as:
791 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
792
793 if capability:
794 config_data.append("{} capability {}".format(neigh_cxt, capability))
795
796 logger.debug("Exiting lib API: __create_bgp_peer_group()")
797 return config_data
798
799
800 def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
801 """
802 Helper API to create neighbor specific configuration
803
804 Parameters
805 ----------
806 * `tgen` : Topogen object
807 * `topo` : json file data
808 * `input_dict` : Input dict data, required when configuring from testcase
809 * `router` : router id to be configured
810 """
811 config_data = []
812 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
813 tgen = get_topogen()
814 bgp_data = input_dict["address_family"]
815 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
816 global_connect = input_dict.get("connecttimer", 5)
817
818 for name, peer_dict in neigh_data.items():
819 remote_as = 0
820 for dest_link, peer in peer_dict["dest_link"].items():
821 local_asn = peer.setdefault("local_asn", {})
822 if local_asn:
823 local_as = local_asn.setdefault("local_as", 0)
824 remote_as = local_asn.setdefault("remote_as", 0)
825 no_prepend = local_asn.setdefault("no_prepend", False)
826 replace_as = local_asn.setdefault("replace_as", False)
827 if local_as == remote_as:
828 assert False is True, (
829 " Configuration Error : Router must not have "
830 "same AS-NUMBER as Local AS NUMBER"
831 )
832 nh_details = topo[name]
833
834 if "vrfs" in topo[router] or type(nh_details["bgp"]) is list:
835 for vrf_data in nh_details["bgp"]:
836 if "vrf" in nh_details["links"][dest_link] and "vrf" in vrf_data:
837 if nh_details["links"][dest_link]["vrf"] == vrf_data["vrf"]:
838 if not remote_as:
839 remote_as = vrf_data["local_as"]
840 break
841 else:
842 if "vrf" not in vrf_data:
843 if not remote_as:
844 remote_as = vrf_data["local_as"]
845 break
846 else:
847 if not remote_as:
848 remote_as = nh_details["bgp"]["local_as"]
849
850 update_source = None
851
852 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
853 ip_addr = nh_details["links"][dest_link]["peer-interface"]
854 elif "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
855 intf = topo[name]["links"][dest_link]["interface"]
856 ip_addr = get_frr_ipv6_linklocal(tgen, name, intf)
857 elif dest_link in nh_details["links"].keys():
858 try:
859 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
860 except KeyError:
861 intf = topo[name]["links"][dest_link]["interface"]
862 ip_addr = get_frr_ipv6_linklocal(tgen, name, intf)
863 if "delete" in peer and peer["delete"]:
864 neigh_cxt = "no neighbor {}".format(ip_addr)
865 config_data.append("{}".format(neigh_cxt))
866 return config_data
867 else:
868 neigh_cxt = "neighbor {}".format(ip_addr)
869
870 if "peer-group" in peer:
871 config_data.append(
872 "neighbor {} interface peer-group {}".format(
873 ip_addr, peer["peer-group"]
874 )
875 )
876
877 # Loopback interface
878 if "source_link" in peer:
879 if peer["source_link"] == "lo":
880 update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
881 else:
882 update_source = topo[router]["links"][peer["source_link"]][
883 "interface"
884 ]
885 if "peer-group" not in peer:
886 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
887 config_data.append(
888 "{} interface remote-as {}".format(neigh_cxt, remote_as)
889 )
890 elif add_neigh:
891 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
892
893 if local_asn and local_as:
894 cmd = "{} local-as {}".format(neigh_cxt, local_as)
895 if no_prepend:
896 cmd = "{} no-prepend".format(cmd)
897 if replace_as:
898 cmd = "{} replace-as".format(cmd)
899 config_data.append("{}".format(cmd))
900
901 if addr_type == "ipv6":
902 config_data.append("address-family ipv6 unicast")
903 config_data.append("{} activate".format(neigh_cxt))
904
905 if "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
906 config_data.append(
907 "{} update-source {}".format(
908 neigh_cxt, nh_details["links"][dest_link]["peer-interface"]
909 )
910 )
911 config_data.append(
912 "{} interface {}".format(
913 neigh_cxt, nh_details["links"][dest_link]["peer-interface"]
914 )
915 )
916
917 disable_connected = peer.setdefault("disable_connected_check", False)
918 connect = peer.get("connecttimer", global_connect)
919 keep_alive = peer.setdefault("keepalivetimer", 3)
920 hold_down = peer.setdefault("holddowntimer", 10)
921 password = peer.setdefault("password", None)
922 no_password = peer.setdefault("no_password", None)
923 capability = peer.setdefault("capability", None)
924 max_hop_limit = peer.setdefault("ebgp_multihop", 1)
925
926 graceful_restart = peer.setdefault("graceful-restart", None)
927 graceful_restart_helper = peer.setdefault("graceful-restart-helper", None)
928 graceful_restart_disable = peer.setdefault("graceful-restart-disable", None)
929 if capability:
930 config_data.append("{} capability {}".format(neigh_cxt, capability))
931
932 if update_source:
933 config_data.append(
934 "{} update-source {}".format(neigh_cxt, update_source)
935 )
936 if disable_connected:
937 config_data.append(
938 "{} disable-connected-check".format(disable_connected)
939 )
940 if update_source:
941 config_data.append(
942 "{} update-source {}".format(neigh_cxt, update_source)
943 )
944 if int(keep_alive) != 60 and int(hold_down) != 180:
945 config_data.append(
946 "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
947 )
948 if int(connect) != 120:
949 config_data.append("{} timers connect {}".format(neigh_cxt, connect))
950
951 if graceful_restart:
952 config_data.append("{} graceful-restart".format(neigh_cxt))
953 elif graceful_restart == False:
954 config_data.append("no {} graceful-restart".format(neigh_cxt))
955
956 if graceful_restart_helper:
957 config_data.append("{} graceful-restart-helper".format(neigh_cxt))
958 elif graceful_restart_helper == False:
959 config_data.append("no {} graceful-restart-helper".format(neigh_cxt))
960
961 if graceful_restart_disable:
962 config_data.append("{} graceful-restart-disable".format(neigh_cxt))
963 elif graceful_restart_disable == False:
964 config_data.append("no {} graceful-restart-disable".format(neigh_cxt))
965
966 if password:
967 config_data.append("{} password {}".format(neigh_cxt, password))
968
969 if no_password:
970 config_data.append("no {} password {}".format(neigh_cxt, no_password))
971
972 if max_hop_limit > 1:
973 config_data.append(
974 "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
975 )
976 config_data.append("{} enforce-multihop".format(neigh_cxt))
977
978 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
979 return config_data
980
981
982 def __create_bgp_unicast_address_family(
983 topo, input_dict, router, addr_type, add_neigh=True
984 ):
985 """
986 API prints bgp global config to bgp_json file.
987
988 Parameters
989 ----------
990 * `bgp_cfg` : BGP class variables have BGP config saved in it for
991 particular router,
992 * `local_as_no` : Local as number
993 * `router_id` : Router-id
994 * `ecmp_path` : ECMP max path
995 * `gr_enable` : BGP global gracefull restart config
996 """
997
998 config_data = []
999 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1000 tgen = get_topogen()
1001 bgp_data = input_dict["address_family"]
1002 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
1003
1004 for peer_name, peer_dict in deepcopy(neigh_data).items():
1005 for dest_link, peer in peer_dict["dest_link"].items():
1006 deactivate = None
1007 activate = None
1008 nh_details = topo[peer_name]
1009 activate_addr_family = peer.setdefault("activate", None)
1010 deactivate_addr_family = peer.setdefault("deactivate", None)
1011 # Loopback interface
1012 if "source_link" in peer and peer["source_link"] == "lo":
1013 for destRouterLink, data in sorted(nh_details["links"].items()):
1014 if "type" in data and data["type"] == "loopback":
1015 if dest_link == destRouterLink:
1016 ip_addr = (
1017 nh_details["links"][destRouterLink][addr_type]
1018 .split("/")[0]
1019 .lower()
1020 )
1021
1022 # Physical interface
1023 else:
1024 # check the neighbor type if un numbered nbr, use interface.
1025 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
1026 ip_addr = nh_details["links"][dest_link]["peer-interface"]
1027 elif "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
1028 intf = topo[peer_name]["links"][dest_link]["interface"]
1029 ip_addr = get_frr_ipv6_linklocal(tgen, peer_name, intf)
1030 elif dest_link in nh_details["links"].keys():
1031 try:
1032 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[
1033 0
1034 ]
1035 except KeyError:
1036 intf = topo[peer_name]["links"][dest_link]["interface"]
1037 ip_addr = get_frr_ipv6_linklocal(tgen, peer_name, intf)
1038 if (
1039 addr_type == "ipv4"
1040 and bgp_data["ipv6"]
1041 and check_address_types("ipv6")
1042 and "ipv6" in nh_details["links"][dest_link]
1043 ):
1044 deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
1045 0
1046 ]
1047
1048 neigh_cxt = "neighbor {}".format(ip_addr)
1049 config_data.append("address-family {} unicast".format(addr_type))
1050
1051 if activate_addr_family is not None:
1052 config_data.append(
1053 "address-family {} unicast".format(activate_addr_family)
1054 )
1055
1056 config_data.append("{} activate".format(neigh_cxt))
1057
1058 if deactivate and activate_addr_family is None:
1059 config_data.append("no neighbor {} activate".format(deactivate))
1060
1061 if deactivate_addr_family is not None:
1062 config_data.append(
1063 "address-family {} unicast".format(deactivate_addr_family)
1064 )
1065 config_data.append("no {} activate".format(neigh_cxt))
1066
1067 next_hop_self = peer.setdefault("next_hop_self", None)
1068 send_community = peer.setdefault("send_community", None)
1069 prefix_lists = peer.setdefault("prefix_lists", {})
1070 route_maps = peer.setdefault("route_maps", {})
1071 no_send_community = peer.setdefault("no_send_community", None)
1072 capability = peer.setdefault("capability", None)
1073 allowas_in = peer.setdefault("allowas-in", None)
1074
1075 # next-hop-self
1076 if next_hop_self is not None:
1077 if next_hop_self is True:
1078 config_data.append("{} next-hop-self".format(neigh_cxt))
1079 else:
1080 config_data.append("no {} next-hop-self".format(neigh_cxt))
1081
1082 # send_community
1083 if send_community:
1084 config_data.append("{} send-community".format(neigh_cxt))
1085
1086 # no_send_community
1087 if no_send_community:
1088 config_data.append(
1089 "no {} send-community {}".format(neigh_cxt, no_send_community)
1090 )
1091
1092 # capability_ext_nh
1093 if capability and addr_type == "ipv6":
1094 config_data.append("address-family ipv4 unicast")
1095 config_data.append("{} activate".format(neigh_cxt))
1096
1097 if "allowas_in" in peer:
1098 allow_as_in = peer["allowas_in"]
1099 config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
1100
1101 if "no_allowas_in" in peer:
1102 allow_as_in = peer["no_allowas_in"]
1103 config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
1104
1105 if "shutdown" in peer:
1106 config_data.append(
1107 "{} {} shutdown".format(
1108 "no" if not peer["shutdown"] else "", neigh_cxt
1109 )
1110 )
1111
1112 if prefix_lists:
1113 for prefix_list in prefix_lists:
1114 name = prefix_list.setdefault("name", {})
1115 direction = prefix_list.setdefault("direction", "in")
1116 del_action = prefix_list.setdefault("delete", False)
1117 if not name:
1118 logger.info(
1119 "Router %s: 'name' not present in "
1120 "input_dict for BGP neighbor prefix lists",
1121 router,
1122 )
1123 else:
1124 cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
1125 if del_action:
1126 cmd = "no {}".format(cmd)
1127 config_data.append(cmd)
1128
1129 if route_maps:
1130 for route_map in route_maps:
1131 name = route_map.setdefault("name", {})
1132 direction = route_map.setdefault("direction", "in")
1133 del_action = route_map.setdefault("delete", False)
1134 if not name:
1135 logger.info(
1136 "Router %s: 'name' not present in "
1137 "input_dict for BGP neighbor route name",
1138 router,
1139 )
1140 else:
1141 cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
1142 if del_action:
1143 cmd = "no {}".format(cmd)
1144 config_data.append(cmd)
1145
1146 if allowas_in:
1147 number_occurences = allowas_in.setdefault("number_occurences", {})
1148 del_action = allowas_in.setdefault("delete", False)
1149
1150 cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
1151
1152 if del_action:
1153 cmd = "no {}".format(cmd)
1154
1155 config_data.append(cmd)
1156
1157 return config_data
1158
1159
1160 def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
1161 """
1162 API will save the current config to router's /etc/frr/ for BGPd
1163 daemon(bgpd.conf file)
1164
1165 Paramters
1166 ---------
1167 * `tgen` : Topogen object
1168 * `topo` : json file data
1169 * `input_dict` : defines for which router, and which config
1170 needs to be modified
1171
1172 Usage:
1173 ------
1174 # Modify graceful-restart config not to set f-bit
1175 # and write to /etc/frr
1176
1177 # Api call to delete advertised networks
1178 input_dict_2 = {
1179 "r5": {
1180 "bgp": {
1181 "address_family": {
1182 "ipv4": {
1183 "unicast": {
1184 "advertise_networks": [
1185 {
1186 "network": "101.0.20.1/32",
1187 "no_of_network": 5,
1188 "delete": True
1189 }
1190 ],
1191 }
1192 },
1193 "ipv6": {
1194 "unicast": {
1195 "advertise_networks": [
1196 {
1197 "network": "5::1/128",
1198 "no_of_network": 5,
1199 "delete": True
1200 }
1201 ],
1202 }
1203 }
1204 }
1205 }
1206 }
1207 }
1208
1209 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
1210
1211 """
1212
1213 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1214 try:
1215 result = create_router_bgp(
1216 tgen, topo, input_dict, build=False, load_config=False
1217 )
1218 if result is not True:
1219 return result
1220
1221 # Copy bgp config file to /etc/frr
1222 for dut in input_dict.keys():
1223 router_list = tgen.routers()
1224 for router, rnode in router_list.items():
1225 if router != dut:
1226 continue
1227
1228 logger.info("Delete BGP config when BGPd is down in {}".format(router))
1229 # Reading the config from "rundir" and copy to /etc/frr/bgpd.conf
1230 cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
1231 tgen.logdir, router, FRRCFG_FILE
1232 )
1233 router_list[router].run(cmd)
1234
1235 except Exception as e:
1236 errormsg = traceback.format_exc()
1237 logger.error(errormsg)
1238 return errormsg
1239
1240 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1241 return True
1242
1243
1244 #############################################
1245 # Verification APIs
1246 #############################################
1247 @retry(retry_timeout=8)
1248 def verify_router_id(tgen, topo, input_dict, expected=True):
1249 """
1250 Running command "show ip bgp json" for DUT and reading router-id
1251 from input_dict and verifying with command output.
1252 1. Statically modfified router-id should take place
1253 2. When static router-id is deleted highest loopback should
1254 become router-id
1255 3. When loopback intf is down then highest physcial intf
1256 should become router-id
1257
1258 Parameters
1259 ----------
1260 * `tgen`: topogen object
1261 * `topo`: input json file data
1262 * `input_dict`: input dictionary, have details of Device Under Test, for
1263 which user wants to test the data
1264 * `expected` : expected results from API, by-default True
1265
1266 Usage
1267 -----
1268 # Verify if router-id for r1 is 12.12.12.12
1269 input_dict = {
1270 "r1":{
1271 "router_id": "12.12.12.12"
1272 }
1273 # Verify that router-id for r1 is highest interface ip
1274 input_dict = {
1275 "routers": ["r1"]
1276 }
1277 result = verify_router_id(tgen, topo, input_dict)
1278
1279 Returns
1280 -------
1281 errormsg(str) or True
1282 """
1283
1284 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1285 for router in input_dict.keys():
1286 if router not in tgen.routers():
1287 continue
1288
1289 rnode = tgen.routers()[router]
1290
1291 del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
1292
1293 logger.info("Checking router %s router-id", router)
1294 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1295 router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
1296 router_id_out = ipaddress.IPv4Address(frr_unicode(router_id_out))
1297
1298 # Once router-id is deleted, highest interface ip should become
1299 # router-id
1300 if del_router_id:
1301 router_id = find_interface_with_greater_ip(topo, router)
1302 else:
1303 router_id = input_dict[router]["bgp"]["router_id"]
1304 router_id = ipaddress.IPv4Address(frr_unicode(router_id))
1305
1306 if router_id == router_id_out:
1307 logger.info("Found expected router-id %s for router %s", router_id, router)
1308 else:
1309 errormsg = (
1310 "Router-id for router:{} mismatch, expected:"
1311 " {} but found:{}".format(router, router_id, router_id_out)
1312 )
1313 return errormsg
1314
1315 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1316 return True
1317
1318
1319 @retry(retry_timeout=150)
1320 def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True):
1321 """
1322 API will verify if BGP is converged with in the given time frame.
1323 Running "show bgp summary json" command and verify bgp neighbor
1324 state is established,
1325
1326 Parameters
1327 ----------
1328 * `tgen`: topogen object
1329 * `topo`: input json file data
1330 * `dut`: device under test
1331
1332 Usage
1333 -----
1334 # To veriry is BGP is converged for all the routers used in
1335 topology
1336 results = verify_bgp_convergence(tgen, topo, dut="r1")
1337
1338 Returns
1339 -------
1340 errormsg(str) or True
1341 """
1342
1343 if topo is None:
1344 topo = tgen.json_topo
1345
1346 result = False
1347 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1348 tgen = get_topogen()
1349 for router, rnode in tgen.routers().items():
1350 if "bgp" not in topo["routers"][router]:
1351 continue
1352
1353 if dut is not None and dut != router:
1354 continue
1355
1356 logger.info("Verifying BGP Convergence on router %s:", router)
1357 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1358 # Verifying output dictionary show_bgp_json is empty or not
1359 if not bool(show_bgp_json):
1360 errormsg = "BGP is not running"
1361 return errormsg
1362
1363 # To find neighbor ip type
1364 bgp_data_list = topo["routers"][router]["bgp"]
1365
1366 if type(bgp_data_list) is not list:
1367 bgp_data_list = [bgp_data_list]
1368
1369 for bgp_data in bgp_data_list:
1370 if "vrf" in bgp_data:
1371 vrf = bgp_data["vrf"]
1372 if vrf is None:
1373 vrf = "default"
1374 else:
1375 vrf = "default"
1376
1377 # To find neighbor ip type
1378 bgp_addr_type = bgp_data["address_family"]
1379 if "l2vpn" in bgp_addr_type:
1380 total_evpn_peer = 0
1381
1382 if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
1383 continue
1384
1385 bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
1386 total_evpn_peer += len(bgp_neighbors)
1387
1388 no_of_evpn_peer = 0
1389 for bgp_neighbor, peer_data in bgp_neighbors.items():
1390 for _addr_type, dest_link_dict in peer_data.items():
1391 data = topo["routers"][bgp_neighbor]["links"]
1392 for dest_link in dest_link_dict.keys():
1393 if dest_link in data:
1394 peer_details = peer_data[_addr_type][dest_link]
1395
1396 neighbor_ip = data[dest_link][_addr_type].split("/")[0]
1397 nh_state = None
1398
1399 if (
1400 "ipv4Unicast" in show_bgp_json[vrf]
1401 or "ipv6Unicast" in show_bgp_json[vrf]
1402 ):
1403 errormsg = (
1404 "[DUT: %s] VRF: %s, "
1405 "ipv4Unicast/ipv6Unicast"
1406 " address-family present"
1407 " under l2vpn" % (router, vrf)
1408 )
1409 return errormsg
1410
1411 l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
1412 "peers"
1413 ]
1414 nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
1415
1416 if nh_state == "Established":
1417 no_of_evpn_peer += 1
1418
1419 if no_of_evpn_peer == total_evpn_peer:
1420 logger.info(
1421 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1422 router,
1423 vrf,
1424 )
1425 result = True
1426 else:
1427 errormsg = (
1428 "[DUT: %s] VRF: %s, BGP is not converged "
1429 "for evpn peers" % (router, vrf)
1430 )
1431 return errormsg
1432 else:
1433 total_peer = 0
1434 for addr_type in bgp_addr_type.keys():
1435 if not check_address_types(addr_type):
1436 continue
1437
1438 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1439
1440 for bgp_neighbor in bgp_neighbors:
1441 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1442
1443 no_of_peer = 0
1444 for addr_type in bgp_addr_type.keys():
1445 if not check_address_types(addr_type):
1446 continue
1447 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1448
1449 for bgp_neighbor, peer_data in bgp_neighbors.items():
1450 for dest_link in peer_data["dest_link"].keys():
1451 data = topo["routers"][bgp_neighbor]["links"]
1452 if dest_link in data:
1453 peer_details = peer_data["dest_link"][dest_link]
1454 # for link local neighbors
1455 if (
1456 "neighbor_type" in peer_details
1457 and peer_details["neighbor_type"] == "link-local"
1458 ):
1459 intf = topo["routers"][bgp_neighbor]["links"][
1460 dest_link
1461 ]["interface"]
1462 neighbor_ip = get_frr_ipv6_linklocal(
1463 tgen, bgp_neighbor, intf
1464 )
1465 elif "source_link" in peer_details:
1466 neighbor_ip = topo["routers"][bgp_neighbor][
1467 "links"
1468 ][peer_details["source_link"]][addr_type].split(
1469 "/"
1470 )[
1471 0
1472 ]
1473 elif (
1474 "neighbor_type" in peer_details
1475 and peer_details["neighbor_type"] == "unnumbered"
1476 ):
1477 neighbor_ip = data[dest_link]["peer-interface"]
1478 else:
1479 neighbor_ip = data[dest_link][addr_type].split("/")[
1480 0
1481 ]
1482 nh_state = None
1483 neighbor_ip = neighbor_ip.lower()
1484 if addr_type == "ipv4":
1485 ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
1486 "peers"
1487 ]
1488 nh_state = ipv4_data[neighbor_ip]["state"]
1489 else:
1490 ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
1491 "peers"
1492 ]
1493 if neighbor_ip in ipv6_data:
1494 nh_state = ipv6_data[neighbor_ip]["state"]
1495
1496 if nh_state == "Established":
1497 no_of_peer += 1
1498
1499 if no_of_peer == total_peer and no_of_peer > 0:
1500 logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
1501 result = True
1502 else:
1503 errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
1504 return errormsg
1505
1506 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1507 return result
1508
1509
1510 @retry(retry_timeout=16)
1511 def verify_bgp_community(
1512 tgen,
1513 addr_type,
1514 router,
1515 network,
1516 input_dict=None,
1517 vrf=None,
1518 bestpath=False,
1519 expected=True,
1520 ):
1521 """
1522 API to veiryf BGP large community is attached in route for any given
1523 DUT by running "show bgp ipv4/6 {route address} json" command.
1524
1525 Parameters
1526 ----------
1527 * `tgen`: topogen object
1528 * `addr_type` : ip type, ipv4/ipv6
1529 * `dut`: Device Under Test
1530 * `network`: network for which set criteria needs to be verified
1531 * `input_dict`: having details like - for which router, community and
1532 values needs to be verified
1533 * `vrf`: VRF name
1534 * `bestpath`: To check best path cli
1535 * `expected` : expected results from API, by-default True
1536
1537 Usage
1538 -----
1539 networks = ["200.50.2.0/32"]
1540 input_dict = {
1541 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1542 }
1543 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1544
1545 Returns
1546 -------
1547 errormsg(str) or True
1548 """
1549
1550 logger.debug("Entering lib API: verify_bgp_community()")
1551 if router not in tgen.routers():
1552 return False
1553
1554 rnode = tgen.routers()[router]
1555
1556 logger.info(
1557 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1558 router,
1559 addr_type,
1560 network,
1561 )
1562
1563 command = "show bgp"
1564
1565 for net in network:
1566 if vrf:
1567 cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
1568 elif bestpath:
1569 cmd = "{} {} {} bestpath json".format(command, addr_type, net)
1570 else:
1571 cmd = "{} {} {} json".format(command, addr_type, net)
1572
1573 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
1574 if "paths" not in show_bgp_json:
1575 return "Prefix {} not found in BGP table of router: {}".format(net, router)
1576
1577 as_paths = show_bgp_json["paths"]
1578 found = False
1579 for i in range(len(as_paths)):
1580 if (
1581 "largeCommunity" in show_bgp_json["paths"][i]
1582 or "community" in show_bgp_json["paths"][i]
1583 ):
1584 found = True
1585 logger.info(
1586 "Large Community attribute is found for route:" " %s in router: %s",
1587 net,
1588 router,
1589 )
1590 if input_dict is not None:
1591 for criteria, comm_val in input_dict.items():
1592 show_val = show_bgp_json["paths"][i][criteria]["string"]
1593 if comm_val == show_val:
1594 logger.info(
1595 "Verifying BGP %s for prefix: %s"
1596 " in router: %s, found expected"
1597 " value: %s",
1598 criteria,
1599 net,
1600 router,
1601 comm_val,
1602 )
1603 else:
1604 errormsg = (
1605 "Failed: Verifying BGP attribute"
1606 " {} for route: {} in router: {}"
1607 ", expected value: {} but found"
1608 ": {}".format(criteria, net, router, comm_val, show_val)
1609 )
1610 return errormsg
1611
1612 if not found:
1613 errormsg = (
1614 "Large Community attribute is not found for route: "
1615 "{} in router: {} ".format(net, router)
1616 )
1617 return errormsg
1618
1619 logger.debug("Exiting lib API: verify_bgp_community()")
1620 return True
1621
1622
1623 def modify_as_number(tgen, topo, input_dict):
1624 """
1625 API reads local_as and remote_as from user defined input_dict and
1626 modify router"s ASNs accordingly. Router"s config is modified and
1627 recent/changed config is loadeded to router.
1628
1629 Parameters
1630 ----------
1631 * `tgen` : Topogen object
1632 * `topo` : json file data
1633 * `input_dict` : defines for which router ASNs needs to be modified
1634
1635 Usage
1636 -----
1637 To modify ASNs for router r1
1638 input_dict = {
1639 "r1": {
1640 "bgp": {
1641 "local_as": 131079
1642 }
1643 }
1644 result = modify_as_number(tgen, topo, input_dict)
1645
1646 Returns
1647 -------
1648 errormsg(str) or True
1649 """
1650
1651 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1652 try:
1653 new_topo = deepcopy(topo["routers"])
1654 router_dict = {}
1655 for router in input_dict.keys():
1656 # Remove bgp configuration
1657
1658 router_dict.update({router: {"bgp": {"delete": True}}})
1659 try:
1660 new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"][
1661 "local_as"
1662 ]
1663 except TypeError:
1664 new_topo[router]["bgp"][0]["local_as"] = input_dict[router]["bgp"][
1665 "local_as"
1666 ]
1667 logger.info("Removing bgp configuration")
1668 create_router_bgp(tgen, topo, router_dict)
1669
1670 logger.info("Applying modified bgp configuration")
1671 result = create_router_bgp(tgen, new_topo)
1672 if result is not True:
1673 result = "Error applying new AS number config"
1674 except Exception as e:
1675 errormsg = traceback.format_exc()
1676 logger.error(errormsg)
1677 return errormsg
1678
1679 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1680 return result
1681
1682
1683 @retry(retry_timeout=8)
1684 def verify_as_numbers(tgen, topo, input_dict, expected=True):
1685 """
1686 This API is to verify AS numbers for given DUT by running
1687 "show ip bgp neighbor json" command. Local AS and Remote AS
1688 will ve verified with input_dict data and command output.
1689
1690 Parameters
1691 ----------
1692 * `tgen`: topogen object
1693 * `topo`: input json file data
1694 * `addr_type` : ip type, ipv4/ipv6
1695 * `input_dict`: defines - for which router, AS numbers needs to be verified
1696 * `expected` : expected results from API, by-default True
1697
1698 Usage
1699 -----
1700 input_dict = {
1701 "r1": {
1702 "bgp": {
1703 "local_as": 131079
1704 }
1705 }
1706 }
1707 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1708
1709 Returns
1710 -------
1711 errormsg(str) or True
1712 """
1713
1714 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1715 for router in input_dict.keys():
1716 if router not in tgen.routers():
1717 continue
1718
1719 rnode = tgen.routers()[router]
1720
1721 logger.info("Verifying AS numbers for dut %s:", router)
1722
1723 show_ip_bgp_neighbor_json = run_frr_cmd(
1724 rnode, "show ip bgp neighbor json", isjson=True
1725 )
1726 local_as = input_dict[router]["bgp"]["local_as"]
1727 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1728
1729 for addr_type in bgp_addr_type:
1730 if not check_address_types(addr_type):
1731 continue
1732
1733 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1734
1735 for bgp_neighbor, peer_data in bgp_neighbors.items():
1736 remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
1737 for dest_link, peer_dict in peer_data["dest_link"].items():
1738 neighbor_ip = None
1739 data = topo["routers"][bgp_neighbor]["links"]
1740
1741 if dest_link in data:
1742 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1743 neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
1744 # Verify Local AS for router
1745 if neigh_data["localAs"] != local_as:
1746 errormsg = (
1747 "Failed: Verify local_as for dut {},"
1748 " found: {} but expected: {}".format(
1749 router, neigh_data["localAs"], local_as
1750 )
1751 )
1752 return errormsg
1753 else:
1754 logger.info(
1755 "Verified local_as for dut %s, found" " expected: %s",
1756 router,
1757 local_as,
1758 )
1759
1760 # Verify Remote AS for neighbor
1761 if neigh_data["remoteAs"] != remote_as:
1762 errormsg = (
1763 "Failed: Verify remote_as for dut "
1764 "{}'s neighbor {}, found: {} but "
1765 "expected: {}".format(
1766 router, bgp_neighbor, neigh_data["remoteAs"], remote_as
1767 )
1768 )
1769 return errormsg
1770 else:
1771 logger.info(
1772 "Verified remote_as for dut %s's "
1773 "neighbor %s, found expected: %s",
1774 router,
1775 bgp_neighbor,
1776 remote_as,
1777 )
1778
1779 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1780 return True
1781
1782
1783 @retry(retry_timeout=150)
1784 def verify_bgp_convergence_from_running_config(tgen, dut=None, expected=True):
1785 """
1786 API to verify BGP convergence b/w loopback and physical interface.
1787 This API would be used when routers have BGP neighborship is loopback
1788 to physical or vice-versa
1789
1790 Parameters
1791 ----------
1792 * `tgen`: topogen object
1793 * `dut`: device under test
1794 * `expected` : expected results from API, by-default True
1795
1796 Usage
1797 -----
1798 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1799 dut="r1")
1800
1801 Returns
1802 -------
1803 errormsg(str) or True
1804 """
1805
1806 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1807
1808 for router, rnode in tgen.routers().items():
1809 if dut is not None and dut != router:
1810 continue
1811
1812 logger.info("Verifying BGP Convergence on router %s:", router)
1813 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1814 # Verifying output dictionary show_bgp_json is empty or not
1815 if not bool(show_bgp_json):
1816 errormsg = "BGP is not running"
1817 return errormsg
1818
1819 for vrf, addr_family_data in show_bgp_json.items():
1820 for address_family, neighborship_data in addr_family_data.items():
1821 total_peer = 0
1822 no_of_peer = 0
1823
1824 total_peer = len(neighborship_data["peers"].keys())
1825
1826 for peer, peer_data in neighborship_data["peers"].items():
1827 if peer_data["state"] == "Established":
1828 no_of_peer += 1
1829
1830 if total_peer != no_of_peer:
1831 errormsg = (
1832 "[DUT: %s] VRF: %s, BGP is not converged"
1833 " for peer: %s" % (router, vrf, peer)
1834 )
1835 return errormsg
1836
1837 logger.info("[DUT: %s]: vrf: %s, BGP is Converged", router, vrf)
1838
1839 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1840
1841 return True
1842
1843
1844 def clear_bgp(tgen, addr_type, router, vrf=None, neighbor=None):
1845 """
1846 This API is to clear bgp neighborship by running
1847 clear ip bgp */clear bgp ipv6 * command,
1848
1849 Parameters
1850 ----------
1851 * `tgen`: topogen object
1852 * `addr_type`: ip type ipv4/ipv6
1853 * `router`: device under test
1854 * `vrf`: vrf name
1855 * `neighbor`: Neighbor for which bgp needs to be cleared
1856
1857 Usage
1858 -----
1859 clear_bgp(tgen, addr_type, "r1")
1860 """
1861
1862 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1863
1864 if router not in tgen.routers():
1865 return False
1866
1867 rnode = tgen.routers()[router]
1868
1869 if vrf:
1870 if type(vrf) is not list:
1871 vrf = [vrf]
1872
1873 # Clearing BGP
1874 logger.info("Clearing BGP neighborship for router %s..", router)
1875 if addr_type == "ipv4":
1876 if vrf:
1877 for _vrf in vrf:
1878 run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
1879 elif neighbor:
1880 run_frr_cmd(rnode, "clear bgp ipv4 {}".format(neighbor))
1881 else:
1882 run_frr_cmd(rnode, "clear ip bgp *")
1883 elif addr_type == "ipv6":
1884 if vrf:
1885 for _vrf in vrf:
1886 run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
1887 elif neighbor:
1888 run_frr_cmd(rnode, "clear bgp ipv6 {}".format(neighbor))
1889 else:
1890 run_frr_cmd(rnode, "clear bgp ipv6 *")
1891 else:
1892 run_frr_cmd(rnode, "clear bgp *")
1893
1894 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1895
1896
1897 def clear_bgp_and_verify(tgen, topo, router, rid=None):
1898 """
1899 This API is to clear bgp neighborship and verify bgp neighborship
1900 is coming up(BGP is converged) usinf "show bgp summary json" command
1901 and also verifying for all bgp neighbors uptime before and after
1902 clear bgp sessions is different as the uptime must be changed once
1903 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1904
1905 Parameters
1906 ----------
1907 * `tgen`: topogen object
1908 * `topo`: input json file data
1909 * `router`: device under test
1910
1911 Usage
1912 -----
1913 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1914
1915 Returns
1916 -------
1917 errormsg(str) or True
1918 """
1919
1920 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1921
1922 if router not in tgen.routers():
1923 return False
1924
1925 rnode = tgen.routers()[router]
1926
1927 peer_uptime_before_clear_bgp = {}
1928 sleeptime = 3
1929
1930 # Verifying BGP convergence before bgp clear command
1931 for retry in range(50):
1932 # Waiting for BGP to converge
1933 logger.info(
1934 "Waiting for %s sec for BGP to converge on router" " %s...",
1935 sleeptime,
1936 router,
1937 )
1938 sleep(sleeptime)
1939
1940 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1941 # Verifying output dictionary show_bgp_json is empty or not
1942 if not bool(show_bgp_json):
1943 errormsg = "BGP is not running"
1944 return errormsg
1945
1946 # To find neighbor ip type
1947 try:
1948 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1949 except TypeError:
1950 bgp_addr_type = topo["routers"][router]["bgp"][0]["address_family"]
1951
1952 total_peer = 0
1953 for addr_type in bgp_addr_type.keys():
1954 if not check_address_types(addr_type):
1955 continue
1956
1957 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1958
1959 for bgp_neighbor in bgp_neighbors:
1960 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1961
1962 no_of_peer = 0
1963 for addr_type in bgp_addr_type:
1964 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1965
1966 for bgp_neighbor, peer_data in bgp_neighbors.items():
1967 for dest_link, peer_dict in peer_data["dest_link"].items():
1968 data = topo["routers"][bgp_neighbor]["links"]
1969
1970 if dest_link in data:
1971 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1972 if addr_type == "ipv4":
1973 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1974 nh_state = ipv4_data[neighbor_ip]["state"]
1975
1976 # Peer up time dictionary
1977 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
1978 neighbor_ip
1979 ]["peerUptimeEstablishedEpoch"]
1980 else:
1981 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1982 nh_state = ipv6_data[neighbor_ip]["state"]
1983
1984 # Peer up time dictionary
1985 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
1986 neighbor_ip
1987 ]["peerUptimeEstablishedEpoch"]
1988
1989 if nh_state == "Established":
1990 no_of_peer += 1
1991
1992 if no_of_peer == total_peer:
1993 logger.info("BGP is Converged for router %s before bgp" " clear", router)
1994 break
1995 else:
1996 logger.info(
1997 "BGP is not yet Converged for router %s " "before bgp clear", router
1998 )
1999 else:
2000 errormsg = (
2001 "TIMEOUT!! BGP is not converged in {} seconds for"
2002 " router {}".format(retry * sleeptime, router)
2003 )
2004 return errormsg
2005
2006 # Clearing BGP
2007 logger.info("Clearing BGP neighborship for router %s..", router)
2008 for addr_type in bgp_addr_type.keys():
2009 if addr_type == "ipv4":
2010 if rid:
2011 run_frr_cmd(rnode, "clear bgp ipv4 {}".format(rid))
2012 else:
2013 run_frr_cmd(rnode, "clear bgp ipv4 *")
2014 elif addr_type == "ipv6":
2015 if rid:
2016 run_frr_cmd(rnode, "clear bgp ipv6 {}".format(rid))
2017 else:
2018 run_frr_cmd(rnode, "clear bgp ipv6 *")
2019 peer_uptime_after_clear_bgp = {}
2020 # Verifying BGP convergence after bgp clear command
2021 for retry in range(50):
2022 # Waiting for BGP to converge
2023 logger.info(
2024 "Waiting for %s sec for BGP to converge on router" " %s...",
2025 sleeptime,
2026 router,
2027 )
2028 sleep(sleeptime)
2029
2030 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
2031 # Verifying output dictionary show_bgp_json is empty or not
2032 if not bool(show_bgp_json):
2033 errormsg = "BGP is not running"
2034 return errormsg
2035
2036 # To find neighbor ip type
2037 try:
2038 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
2039 except TypeError:
2040 bgp_addr_type = topo["routers"][router]["bgp"][0]["address_family"]
2041
2042 total_peer = 0
2043 for addr_type in bgp_addr_type.keys():
2044 if not check_address_types(addr_type):
2045 continue
2046
2047 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2048
2049 for bgp_neighbor in bgp_neighbors:
2050 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
2051
2052 no_of_peer = 0
2053 for addr_type in bgp_addr_type:
2054 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2055
2056 for bgp_neighbor, peer_data in bgp_neighbors.items():
2057 for dest_link, peer_dict in peer_data["dest_link"].items():
2058 data = topo["routers"][bgp_neighbor]["links"]
2059
2060 if dest_link in data:
2061 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2062 if addr_type == "ipv4":
2063 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2064 nh_state = ipv4_data[neighbor_ip]["state"]
2065 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
2066 neighbor_ip
2067 ]["peerUptimeEstablishedEpoch"]
2068 else:
2069 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2070 nh_state = ipv6_data[neighbor_ip]["state"]
2071 # Peer up time dictionary
2072 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
2073 neighbor_ip
2074 ]["peerUptimeEstablishedEpoch"]
2075
2076 if nh_state == "Established":
2077 no_of_peer += 1
2078
2079 if no_of_peer == total_peer:
2080 logger.info("BGP is Converged for router %s after bgp clear", router)
2081 break
2082 else:
2083 logger.info(
2084 "BGP is not yet Converged for router %s after" " bgp clear", router
2085 )
2086 else:
2087 errormsg = (
2088 "TIMEOUT!! BGP is not converged in {} seconds for"
2089 " router {}".format(retry * sleeptime, router)
2090 )
2091 return errormsg
2092
2093 # Comparing peerUptimeEstablishedEpoch dictionaries
2094 if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
2095 logger.info("BGP neighborship is reset after clear BGP on router %s", router)
2096 else:
2097 errormsg = (
2098 "BGP neighborship is not reset after clear bgp on router"
2099 " {}".format(router)
2100 )
2101 return errormsg
2102
2103 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2104 return True
2105
2106
2107 def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
2108 """
2109 To verify BGP timer config, execute "show ip bgp neighbor json" command
2110 and verify bgp timers with input_dict data.
2111 To veirfy bgp timers functonality, shutting down peer interface
2112 and verify BGP neighborship status.
2113
2114 Parameters
2115 ----------
2116 * `tgen`: topogen object
2117 * `topo`: input json file data
2118 * `addr_type`: ip type, ipv4/ipv6
2119 * `input_dict`: defines for which router, bgp timers needs to be verified
2120
2121 Usage:
2122 # To verify BGP timers for neighbor r2 of router r1
2123 input_dict = {
2124 "r1": {
2125 "bgp": {
2126 "bgp_neighbors":{
2127 "r2":{
2128 "keepalivetimer": 5,
2129 "holddowntimer": 15,
2130 }}}}}
2131 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
2132 input_dict)
2133
2134 Returns
2135 -------
2136 errormsg(str) or True
2137 """
2138
2139 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2140 sleep(5)
2141 router_list = tgen.routers()
2142 for router in input_dict.keys():
2143 if router not in router_list:
2144 continue
2145
2146 rnode = router_list[router]
2147
2148 logger.info("Verifying bgp timers functionality, DUT is %s:", router)
2149
2150 show_ip_bgp_neighbor_json = run_frr_cmd(
2151 rnode, "show ip bgp neighbor json", isjson=True
2152 )
2153
2154 bgp_addr_type = input_dict[router]["bgp"]["address_family"]
2155
2156 for addr_type in bgp_addr_type:
2157 if not check_address_types(addr_type):
2158 continue
2159
2160 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2161 for bgp_neighbor, peer_data in bgp_neighbors.items():
2162 for dest_link, peer_dict in peer_data["dest_link"].items():
2163 data = topo["routers"][bgp_neighbor]["links"]
2164
2165 keepalivetimer = peer_dict["keepalivetimer"]
2166 holddowntimer = peer_dict["holddowntimer"]
2167
2168 if dest_link in data:
2169 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2170 neighbor_intf = data[dest_link]["interface"]
2171
2172 # Verify HoldDownTimer for neighbor
2173 bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
2174 "bgpTimerHoldTimeMsecs"
2175 ]
2176 if bgpHoldTimeMsecs != holddowntimer * 1000:
2177 errormsg = (
2178 "Verifying holddowntimer for bgp "
2179 "neighbor {} under dut {}, found: {} "
2180 "but expected: {}".format(
2181 neighbor_ip,
2182 router,
2183 bgpHoldTimeMsecs,
2184 holddowntimer * 1000,
2185 )
2186 )
2187 return errormsg
2188
2189 # Verify KeepAliveTimer for neighbor
2190 bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
2191 "bgpTimerKeepAliveIntervalMsecs"
2192 ]
2193 if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
2194 errormsg = (
2195 "Verifying keepalivetimer for bgp "
2196 "neighbor {} under dut {}, found: {} "
2197 "but expected: {}".format(
2198 neighbor_ip,
2199 router,
2200 bgpKeepAliveTimeMsecs,
2201 keepalivetimer * 1000,
2202 )
2203 )
2204 return errormsg
2205
2206 ####################
2207 # Shutting down peer interface after keepalive time and
2208 # after some time bringing up peer interface.
2209 # verifying BGP neighborship in (hold down-keep alive)
2210 # time, it should not go down
2211 ####################
2212
2213 # Wait till keep alive time
2214 logger.info("=" * 20)
2215 logger.info("Scenario 1:")
2216 logger.info(
2217 "Shutdown and bring up peer interface: %s "
2218 "in keep alive time : %s sec and verify "
2219 " BGP neighborship is intact in %s sec ",
2220 neighbor_intf,
2221 keepalivetimer,
2222 (holddowntimer - keepalivetimer),
2223 )
2224 logger.info("=" * 20)
2225 logger.info("Waiting for %s sec..", keepalivetimer)
2226 sleep(keepalivetimer)
2227
2228 # Shutting down peer ineterface
2229 logger.info(
2230 "Shutting down interface %s on router %s",
2231 neighbor_intf,
2232 bgp_neighbor,
2233 )
2234 topotest.interface_set_status(
2235 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2236 )
2237
2238 # Bringing up peer interface
2239 sleep(5)
2240 logger.info(
2241 "Bringing up interface %s on router %s..",
2242 neighbor_intf,
2243 bgp_neighbor,
2244 )
2245 topotest.interface_set_status(
2246 router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
2247 )
2248
2249 # Verifying BGP neighborship is intact in
2250 # (holddown - keepalive) time
2251 for timer in range(
2252 keepalivetimer, holddowntimer, int(holddowntimer / 3)
2253 ):
2254 logger.info("Waiting for %s sec..", keepalivetimer)
2255 sleep(keepalivetimer)
2256 sleep(2)
2257 show_bgp_json = run_frr_cmd(
2258 rnode, "show bgp summary json", isjson=True
2259 )
2260
2261 if addr_type == "ipv4":
2262 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2263 nh_state = ipv4_data[neighbor_ip]["state"]
2264 else:
2265 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2266 nh_state = ipv6_data[neighbor_ip]["state"]
2267
2268 if timer == (holddowntimer - keepalivetimer):
2269 if nh_state != "Established":
2270 errormsg = (
2271 "BGP neighborship has not gone "
2272 "down in {} sec for neighbor {}".format(
2273 timer, bgp_neighbor
2274 )
2275 )
2276 return errormsg
2277 else:
2278 logger.info(
2279 "BGP neighborship is intact in %s"
2280 " sec for neighbor %s",
2281 timer,
2282 bgp_neighbor,
2283 )
2284
2285 ####################
2286 # Shutting down peer interface and verifying that BGP
2287 # neighborship is going down in holddown time
2288 ####################
2289 logger.info("=" * 20)
2290 logger.info("Scenario 2:")
2291 logger.info(
2292 "Shutdown peer interface: %s and verify BGP"
2293 " neighborship has gone down in hold down "
2294 "time %s sec",
2295 neighbor_intf,
2296 holddowntimer,
2297 )
2298 logger.info("=" * 20)
2299
2300 logger.info(
2301 "Shutting down interface %s on router %s..",
2302 neighbor_intf,
2303 bgp_neighbor,
2304 )
2305 topotest.interface_set_status(
2306 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2307 )
2308
2309 # Verifying BGP neighborship is going down in holddown time
2310 for timer in range(
2311 keepalivetimer,
2312 (holddowntimer + keepalivetimer),
2313 int(holddowntimer / 3),
2314 ):
2315 logger.info("Waiting for %s sec..", keepalivetimer)
2316 sleep(keepalivetimer)
2317 sleep(2)
2318 show_bgp_json = run_frr_cmd(
2319 rnode, "show bgp summary json", isjson=True
2320 )
2321
2322 if addr_type == "ipv4":
2323 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2324 nh_state = ipv4_data[neighbor_ip]["state"]
2325 else:
2326 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2327 nh_state = ipv6_data[neighbor_ip]["state"]
2328
2329 if timer == holddowntimer:
2330 if nh_state == "Established":
2331 errormsg = (
2332 "BGP neighborship has not gone "
2333 "down in {} sec for neighbor {}".format(
2334 timer, bgp_neighbor
2335 )
2336 )
2337 return errormsg
2338 else:
2339 logger.info(
2340 "BGP neighborship has gone down in"
2341 " %s sec for neighbor %s",
2342 timer,
2343 bgp_neighbor,
2344 )
2345
2346 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2347 return True
2348
2349
2350 @retry(retry_timeout=16)
2351 def verify_bgp_attributes(
2352 tgen,
2353 addr_type,
2354 dut,
2355 static_routes,
2356 rmap_name=None,
2357 input_dict=None,
2358 seq_id=None,
2359 vrf=None,
2360 nexthop=None,
2361 expected=True,
2362 ):
2363 """
2364 API will verify BGP attributes set by Route-map for given prefix and
2365 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2366 in DUT to verify BGP attributes set by route-map, Set attributes
2367 values will be read from input_dict and verified with command output.
2368
2369 * `tgen`: topogen object
2370 * `addr_type` : ip type, ipv4/ipv6
2371 * `dut`: Device Under Test
2372 * `static_routes`: Static Routes for which BGP set attributes needs to be
2373 verified
2374 * `rmap_name`: route map name for which set criteria needs to be verified
2375 * `input_dict`: defines for which router, AS numbers needs
2376 * `seq_id`: sequence number of rmap, default is None
2377 * `expected` : expected results from API, by-default True
2378
2379 Usage
2380 -----
2381 # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2382 for prefix 10.0.20.1/32 in router r3.
2383 input_dict = {
2384 "r3": {
2385 "route_maps": {
2386 "rmap_match_pf_list1": [
2387 {
2388 "action": "PERMIT",
2389 "match": {"prefix_list": "pf_list_1"},
2390 "set": {"localpref": 150, "med": 30}
2391 }
2392 ],
2393 },
2394 "as_path": "500 400"
2395 }
2396 }
2397 static_routes (list) = ["10.0.20.1/32"]
2398
2399
2400
2401 Returns
2402 -------
2403 errormsg(str) or True
2404 """
2405
2406 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2407 for router, rnode in tgen.routers().items():
2408 if router != dut:
2409 continue
2410
2411 logger.info("Verifying BGP set attributes for dut {}:".format(router))
2412
2413 for static_route in static_routes:
2414 if vrf:
2415 cmd = "show bgp vrf {} {} {} json".format(vrf, addr_type, static_route)
2416 else:
2417 cmd = "show bgp {} {} json".format(addr_type, static_route)
2418 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2419
2420 dict_to_test = []
2421 tmp_list = []
2422
2423 dict_list = list(input_dict.values())[0]
2424
2425 if "route_maps" in dict_list:
2426 for rmap_router in input_dict.keys():
2427 for rmap, values in input_dict[rmap_router]["route_maps"].items():
2428 if rmap == rmap_name:
2429 dict_to_test = values
2430 for rmap_dict in values:
2431 if seq_id is not None:
2432 if type(seq_id) is not list:
2433 seq_id = [seq_id]
2434
2435 if "seq_id" in rmap_dict:
2436 rmap_seq_id = rmap_dict["seq_id"]
2437 for _seq_id in seq_id:
2438 if _seq_id == rmap_seq_id:
2439 tmp_list.append(rmap_dict)
2440 if tmp_list:
2441 dict_to_test = tmp_list
2442
2443 value = None
2444 for rmap_dict in dict_to_test:
2445 if "set" in rmap_dict:
2446 for criteria in rmap_dict["set"].keys():
2447 found = False
2448 for path in show_bgp_json["paths"]:
2449 if criteria not in path:
2450 continue
2451
2452 if criteria == "aspath":
2453 value = path[criteria]["string"]
2454 else:
2455 value = path[criteria]
2456
2457 if rmap_dict["set"][criteria] == value:
2458 found = True
2459 logger.info(
2460 "Verifying BGP "
2461 "attribute {} for"
2462 " route: {} in "
2463 "router: {}, found"
2464 " expected value:"
2465 " {}".format(
2466 criteria,
2467 static_route,
2468 dut,
2469 value,
2470 )
2471 )
2472 break
2473
2474 if not found:
2475 errormsg = (
2476 "Failed: Verifying BGP "
2477 "attribute {} for route:"
2478 " {} in router: {}, "
2479 " expected value: {} but"
2480 " found: {}".format(
2481 criteria,
2482 static_route,
2483 dut,
2484 rmap_dict["set"][criteria],
2485 value,
2486 )
2487 )
2488 return errormsg
2489
2490 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2491 return True
2492
2493
2494 @retry(retry_timeout=8)
2495 def verify_best_path_as_per_bgp_attribute(
2496 tgen, addr_type, router, input_dict, attribute, expected=True
2497 ):
2498 """
2499 API is to verify best path according to BGP attributes for given routes.
2500 "show bgp ipv4/6 json" command will be run and verify best path according
2501 to shortest as-path, highest local-preference and med, lowest weight and
2502 route origin IGP>EGP>INCOMPLETE.
2503 Parameters
2504 ----------
2505 * `tgen` : topogen object
2506 * `addr_type` : ip type, ipv4/ipv6
2507 * `tgen` : topogen object
2508 * `attribute` : calculate best path using this attribute
2509 * `input_dict`: defines different routes to calculate for which route
2510 best path is selected
2511 * `expected` : expected results from API, by-default True
2512
2513 Usage
2514 -----
2515 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2516 router r7 to router r1(DUT) as per shortest as-path attribute
2517 input_dict = {
2518 "r7": {
2519 "bgp": {
2520 "address_family": {
2521 "ipv4": {
2522 "unicast": {
2523 "advertise_networks": [
2524 {
2525 "network": "200.50.2.0/32"
2526 },
2527 {
2528 "network": "200.60.2.0/32"
2529 }
2530 ]
2531 }
2532 }
2533 }
2534 }
2535 }
2536 }
2537 attribute = "locPrf"
2538 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2539 input_dict, attribute)
2540 Returns
2541 -------
2542 errormsg(str) or True
2543 """
2544
2545 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2546
2547 if router not in tgen.routers():
2548 return False
2549
2550 rnode = tgen.routers()[router]
2551
2552 # Verifying show bgp json
2553 command = "show bgp"
2554
2555 sleep(2)
2556 logger.info("Verifying router %s RIB for best path:", router)
2557
2558 static_route = False
2559 advertise_network = False
2560 for route_val in input_dict.values():
2561 if "static_routes" in route_val:
2562 static_route = True
2563 networks = route_val["static_routes"]
2564 else:
2565 advertise_network = True
2566 net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
2567 networks = net_data["advertise_networks"]
2568
2569 for network in networks:
2570 _network = network["network"]
2571 no_of_ip = network.setdefault("no_of_ip", 1)
2572 vrf = network.setdefault("vrf", None)
2573
2574 if vrf:
2575 cmd = "{} vrf {}".format(command, vrf)
2576 else:
2577 cmd = command
2578
2579 cmd = "{} {}".format(cmd, addr_type)
2580 cmd = "{} json".format(cmd)
2581 sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2582
2583 routes = generate_ips(_network, no_of_ip)
2584 for route in routes:
2585 route = str(ipaddress.ip_network(frr_unicode(route)))
2586
2587 if route in sh_ip_bgp_json["routes"]:
2588 route_attributes = sh_ip_bgp_json["routes"][route]
2589 _next_hop = None
2590 compare = None
2591 attribute_dict = {}
2592 for route_attribute in route_attributes:
2593 next_hops = route_attribute["nexthops"]
2594 for next_hop in next_hops:
2595 next_hop_ip = next_hop["ip"]
2596 attribute_dict[next_hop_ip] = route_attribute[attribute]
2597
2598 # AS_PATH attribute
2599 if attribute == "path":
2600 # Find next_hop for the route have minimum as_path
2601 _next_hop = min(
2602 attribute_dict, key=lambda x: len(set(attribute_dict[x]))
2603 )
2604 compare = "SHORTEST"
2605
2606 # LOCAL_PREF attribute
2607 elif attribute == "locPrf":
2608 # Find next_hop for the route have highest local preference
2609 _next_hop = max(
2610 attribute_dict, key=(lambda k: attribute_dict[k])
2611 )
2612 compare = "HIGHEST"
2613
2614 # WEIGHT attribute
2615 elif attribute == "weight":
2616 # Find next_hop for the route have highest weight
2617 _next_hop = max(
2618 attribute_dict, key=(lambda k: attribute_dict[k])
2619 )
2620 compare = "HIGHEST"
2621
2622 # ORIGIN attribute
2623 elif attribute == "origin":
2624 # Find next_hop for the route have IGP as origin, -
2625 # - rule is IGP>EGP>INCOMPLETE
2626 _next_hop = [
2627 key
2628 for (key, value) in attribute_dict.items()
2629 if value == "IGP"
2630 ][0]
2631 compare = ""
2632
2633 # MED attribute
2634 elif attribute == "metric":
2635 # Find next_hop for the route have LOWEST MED
2636 _next_hop = min(
2637 attribute_dict, key=(lambda k: attribute_dict[k])
2638 )
2639 compare = "LOWEST"
2640
2641 # Show ip route
2642 if addr_type == "ipv4":
2643 command_1 = "show ip route"
2644 else:
2645 command_1 = "show ipv6 route"
2646
2647 if vrf:
2648 cmd = "{} vrf {} json".format(command_1, vrf)
2649 else:
2650 cmd = "{} json".format(command_1)
2651
2652 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2653
2654 # Verifying output dictionary rib_routes_json is not empty
2655 if not bool(rib_routes_json):
2656 errormsg = "No route found in RIB of router {}..".format(router)
2657 return errormsg
2658
2659 st_found = False
2660 nh_found = False
2661 # Find best is installed in RIB
2662 if route in rib_routes_json:
2663 st_found = True
2664 # Verify next_hop in rib_routes_json
2665 if (
2666 rib_routes_json[route][0]["nexthops"][0]["ip"]
2667 in attribute_dict
2668 ):
2669 nh_found = True
2670 else:
2671 errormsg = (
2672 "Incorrect Nexthop for BGP route {} in "
2673 "RIB of router {}, Expected: {}, Found:"
2674 " {}\n".format(
2675 route,
2676 router,
2677 rib_routes_json[route][0]["nexthops"][0]["ip"],
2678 _next_hop,
2679 )
2680 )
2681 return errormsg
2682
2683 if st_found and nh_found:
2684 logger.info(
2685 "Best path for prefix: %s with next_hop: %s is "
2686 "installed according to %s %s: (%s) in RIB of "
2687 "router %s",
2688 route,
2689 _next_hop,
2690 compare,
2691 attribute,
2692 attribute_dict[_next_hop],
2693 router,
2694 )
2695
2696 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2697 return True
2698
2699
2700 @retry(retry_timeout=10)
2701 def verify_best_path_as_per_admin_distance(
2702 tgen, addr_type, router, input_dict, attribute, expected=True, vrf=None
2703 ):
2704 """
2705 API is to verify best path according to admin distance for given
2706 route. "show ip/ipv6 route json" command will be run and verify
2707 best path accoring to shortest admin distanc.
2708
2709 Parameters
2710 ----------
2711 * `addr_type` : ip type, ipv4/ipv6
2712 * `dut`: Device Under Test
2713 * `tgen` : topogen object
2714 * `attribute` : calculate best path using admin distance
2715 * `input_dict`: defines different routes with different admin distance
2716 to calculate for which route best path is selected
2717 * `expected` : expected results from API, by-default True
2718 * `vrf`: Pass vrf name check for perticular vrf.
2719
2720 Usage
2721 -----
2722 # To verify best path for route 200.50.2.0/32 from router r2 to
2723 router r1(DUT) as per shortest admin distance which is 60.
2724 input_dict = {
2725 "r2": {
2726 "static_routes": [{"network": "200.50.2.0/32", \
2727 "admin_distance": 80, "next_hop": "10.0.0.14"},
2728 {"network": "200.50.2.0/32", \
2729 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2730 }}
2731 attribute = "locPrf"
2732 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2733 input_dict, attribute):
2734 Returns
2735 -------
2736 errormsg(str) or True
2737 """
2738
2739 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2740 router_list = tgen.routers()
2741 if router not in router_list:
2742 return False
2743
2744 rnode = tgen.routers()[router]
2745
2746 sleep(5)
2747 logger.info("Verifying router %s RIB for best path:", router)
2748
2749 # Show ip route cmd
2750 if addr_type == "ipv4":
2751 command = "show ip route"
2752 else:
2753 command = "show ipv6 route"
2754
2755 if vrf:
2756 command = "{} vrf {} json".format(command, vrf)
2757 else:
2758 command = "{} json".format(command)
2759
2760 for routes_from_router in input_dict.keys():
2761 sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
2762 command, isjson=True
2763 )
2764 networks = input_dict[routes_from_router]["static_routes"]
2765 for network in networks:
2766 route = network["network"]
2767
2768 route_attributes = sh_ip_route_json[route]
2769 _next_hop = None
2770 compare = None
2771 attribute_dict = {}
2772 for route_attribute in route_attributes:
2773 next_hops = route_attribute["nexthops"]
2774 for next_hop in next_hops:
2775 next_hop_ip = next_hop["ip"]
2776 attribute_dict[next_hop_ip] = route_attribute["distance"]
2777
2778 # Find next_hop for the route have LOWEST Admin Distance
2779 _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
2780 compare = "LOWEST"
2781
2782 # Show ip route
2783 rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
2784
2785 # Verifying output dictionary rib_routes_json is not empty
2786 if not bool(rib_routes_json):
2787 errormsg = "No route found in RIB of router {}..".format(router)
2788 return errormsg
2789
2790 st_found = False
2791 nh_found = False
2792 # Find best is installed in RIB
2793 if route in rib_routes_json:
2794 st_found = True
2795 # Verify next_hop in rib_routes_json
2796 if [
2797 nh
2798 for nh in rib_routes_json[route][0]["nexthops"]
2799 if nh["ip"] == _next_hop
2800 ]:
2801 nh_found = True
2802 else:
2803 errormsg = (
2804 "Nexthop {} is Missing for BGP route {}"
2805 " in RIB of router {}\n".format(_next_hop, route, router)
2806 )
2807 return errormsg
2808
2809 if st_found and nh_found:
2810 logger.info(
2811 "Best path for prefix: %s is installed according"
2812 " to %s %s: (%s) in RIB of router %s",
2813 route,
2814 compare,
2815 attribute,
2816 attribute_dict[_next_hop],
2817 router,
2818 )
2819
2820 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2821 return True
2822
2823
2824 @retry(retry_timeout=30)
2825 def verify_bgp_rib(
2826 tgen,
2827 addr_type,
2828 dut,
2829 input_dict,
2830 next_hop=None,
2831 aspath=None,
2832 multi_nh=None,
2833 expected=True,
2834 ):
2835 """
2836 This API is to verify whether bgp rib has any
2837 matching route for a nexthop.
2838
2839 Parameters
2840 ----------
2841 * `tgen`: topogen object
2842 * `dut`: input dut router name
2843 * `addr_type` : ip type ipv4/ipv6
2844 * `input_dict` : input dict, has details of static routes
2845 * `next_hop`[optional]: next_hop which needs to be verified,
2846 default = static
2847 * 'aspath'[optional]: aspath which needs to be verified
2848 * `expected` : expected results from API, by-default True
2849
2850 Usage
2851 -----
2852 dut = 'r1'
2853 next_hop = "192.168.1.10"
2854 input_dict = topo['routers']
2855 aspath = "100 200 300"
2856 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2857 next_hop, aspath)
2858
2859 Returns
2860 -------
2861 errormsg(str) or True
2862 """
2863
2864 logger.debug("Entering lib API: verify_bgp_rib()")
2865
2866 router_list = tgen.routers()
2867 additional_nexthops_in_required_nhs = []
2868 list1 = []
2869 list2 = []
2870 found_hops = []
2871 for routerInput in input_dict.keys():
2872 for router, rnode in router_list.items():
2873 if router != dut:
2874 continue
2875
2876 # Verifying RIB routes
2877 command = "show bgp"
2878
2879 # Static routes
2880 sleep(2)
2881 logger.info("Checking router {} BGP RIB:".format(dut))
2882
2883 if "static_routes" in input_dict[routerInput]:
2884 static_routes = input_dict[routerInput]["static_routes"]
2885
2886 for static_route in static_routes:
2887 found_routes = []
2888 missing_routes = []
2889 st_found = False
2890 nh_found = False
2891
2892 vrf = static_route.setdefault("vrf", None)
2893 community = static_route.setdefault("community", None)
2894 largeCommunity = static_route.setdefault("largeCommunity", None)
2895
2896 if vrf:
2897 cmd = "{} vrf {} {}".format(command, vrf, addr_type)
2898
2899 if community:
2900 cmd = "{} community {}".format(cmd, community)
2901
2902 if largeCommunity:
2903 cmd = "{} large-community {}".format(cmd, largeCommunity)
2904 else:
2905 cmd = "{} {}".format(command, addr_type)
2906
2907 cmd = "{} json".format(cmd)
2908
2909 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2910
2911 # Verifying output dictionary rib_routes_json is not empty
2912 if bool(rib_routes_json) == False:
2913 errormsg = "No route found in rib of router {}..".format(router)
2914 return errormsg
2915
2916 network = static_route["network"]
2917
2918 if "no_of_ip" in static_route:
2919 no_of_ip = static_route["no_of_ip"]
2920 else:
2921 no_of_ip = 1
2922
2923 # Generating IPs for verification
2924 ip_list = generate_ips(network, no_of_ip)
2925
2926 for st_rt in ip_list:
2927 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
2928
2929 _addr_type = validate_ip_address(st_rt)
2930 if _addr_type != addr_type:
2931 continue
2932
2933 if st_rt in rib_routes_json["routes"]:
2934 st_found = True
2935 found_routes.append(st_rt)
2936
2937 if next_hop and multi_nh and st_found:
2938 if type(next_hop) is not list:
2939 next_hop = [next_hop]
2940 list1 = next_hop
2941
2942 for mnh in range(
2943 0, len(rib_routes_json["routes"][st_rt])
2944 ):
2945 found_hops.append(
2946 [
2947 rib_r["ip"]
2948 for rib_r in rib_routes_json["routes"][
2949 st_rt
2950 ][mnh]["nexthops"]
2951 ]
2952 )
2953 for mnh in found_hops:
2954 for each_nh_in_multipath in mnh:
2955 list2.append(each_nh_in_multipath)
2956 if found_hops[0]:
2957 missing_list_of_nexthops = set(list2).difference(
2958 list1
2959 )
2960 additional_nexthops_in_required_nhs = set(
2961 list1
2962 ).difference(list2)
2963
2964 if list2:
2965 if additional_nexthops_in_required_nhs:
2966 logger.info(
2967 "Missing nexthop %s for route"
2968 " %s in RIB of router %s\n",
2969 additional_nexthops_in_required_nhs,
2970 st_rt,
2971 dut,
2972 )
2973 else:
2974 nh_found = True
2975
2976 elif next_hop and multi_nh is None:
2977 if type(next_hop) is not list:
2978 next_hop = [next_hop]
2979 list1 = next_hop
2980 found_hops = [
2981 rib_r["ip"]
2982 for rib_r in rib_routes_json["routes"][st_rt][0][
2983 "nexthops"
2984 ]
2985 ]
2986 list2 = found_hops
2987 missing_list_of_nexthops = set(list2).difference(list1)
2988 additional_nexthops_in_required_nhs = set(
2989 list1
2990 ).difference(list2)
2991
2992 if list2:
2993 if additional_nexthops_in_required_nhs:
2994 logger.info(
2995 "Missing nexthop %s for route"
2996 " %s in RIB of router %s\n",
2997 additional_nexthops_in_required_nhs,
2998 st_rt,
2999 dut,
3000 )
3001 errormsg = (
3002 "Nexthop {} is Missing for "
3003 "route {} in RIB of router {}\n".format(
3004 additional_nexthops_in_required_nhs,
3005 st_rt,
3006 dut,
3007 )
3008 )
3009 return errormsg
3010 else:
3011 nh_found = True
3012
3013 if aspath:
3014 found_paths = rib_routes_json["routes"][st_rt][0][
3015 "path"
3016 ]
3017 if aspath == found_paths:
3018 aspath_found = True
3019 logger.info(
3020 "Found AS path {} for route"
3021 " {} in RIB of router "
3022 "{}\n".format(aspath, st_rt, dut)
3023 )
3024 else:
3025 errormsg = (
3026 "AS Path {} is missing for route"
3027 "for route {} in RIB of router {}\n".format(
3028 aspath, st_rt, dut
3029 )
3030 )
3031 return errormsg
3032
3033 else:
3034 missing_routes.append(st_rt)
3035
3036 if nh_found:
3037 logger.info(
3038 "Found next_hop {} for all bgp"
3039 " routes in RIB of"
3040 " router {}\n".format(next_hop, router)
3041 )
3042
3043 if len(missing_routes) > 0:
3044 errormsg = (
3045 "Missing route in RIB of router {}, "
3046 "routes: {}\n".format(dut, missing_routes)
3047 )
3048 return errormsg
3049
3050 if found_routes:
3051 logger.info(
3052 "Verified routes in router {} BGP RIB, "
3053 "found routes are: {} \n".format(dut, found_routes)
3054 )
3055 continue
3056
3057 if "bgp" not in input_dict[routerInput]:
3058 continue
3059
3060 # Advertise networks
3061 bgp_data_list = input_dict[routerInput]["bgp"]
3062
3063 if type(bgp_data_list) is not list:
3064 bgp_data_list = [bgp_data_list]
3065
3066 for bgp_data in bgp_data_list:
3067 vrf_id = bgp_data.setdefault("vrf", None)
3068 if vrf_id:
3069 cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
3070 else:
3071 cmd = "{} {}".format(command, addr_type)
3072
3073 cmd = "{} json".format(cmd)
3074
3075 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3076
3077 # Verifying output dictionary rib_routes_json is not empty
3078 if bool(rib_routes_json) == False:
3079 errormsg = "No route found in rib of router {}..".format(router)
3080 return errormsg
3081
3082 bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
3083 advertise_network = bgp_net_advertise.setdefault(
3084 "advertise_networks", []
3085 )
3086
3087 for advertise_network_dict in advertise_network:
3088 found_routes = []
3089 missing_routes = []
3090 found = False
3091
3092 network = advertise_network_dict["network"]
3093
3094 if "no_of_network" in advertise_network_dict:
3095 no_of_network = advertise_network_dict["no_of_network"]
3096 else:
3097 no_of_network = 1
3098
3099 # Generating IPs for verification
3100 ip_list = generate_ips(network, no_of_network)
3101
3102 for st_rt in ip_list:
3103 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
3104
3105 _addr_type = validate_ip_address(st_rt)
3106 if _addr_type != addr_type:
3107 continue
3108
3109 if st_rt in rib_routes_json["routes"]:
3110 found = True
3111 found_routes.append(st_rt)
3112 else:
3113 found = False
3114 missing_routes.append(st_rt)
3115
3116 if len(missing_routes) > 0:
3117 errormsg = (
3118 "Missing route in BGP RIB of router {},"
3119 " are: {}\n".format(dut, missing_routes)
3120 )
3121 return errormsg
3122
3123 if found_routes:
3124 logger.info(
3125 "Verified routes in router {} BGP RIB, found "
3126 "routes are: {}\n".format(dut, found_routes)
3127 )
3128
3129 logger.debug("Exiting lib API: verify_bgp_rib()")
3130 return True
3131
3132
3133 @retry(retry_timeout=10)
3134 def verify_graceful_restart(
3135 tgen, topo, addr_type, input_dict, dut, peer, expected=True
3136 ):
3137 """
3138 This API is to verify verify_graceful_restart configuration of DUT and
3139 cross verify the same from the peer bgp routerrouter.
3140
3141 Parameters
3142 ----------
3143 * `tgen`: topogen object
3144 * `topo`: input json file data
3145 * `addr_type` : ip type ipv4/ipv6
3146 * `input_dict`: input dictionary, have details of Device Under Test, for
3147 which user wants to test the data
3148 * `dut`: input dut router name
3149 * `peer`: input peer router name
3150 * `expected` : expected results from API, by-default True
3151
3152 Usage
3153 -----
3154 "r1": {
3155 "bgp": {
3156 "address_family": {
3157 "ipv4": {
3158 "unicast": {
3159 "neighbor": {
3160 "r3": {
3161 "dest_link":{
3162 "r1": {
3163 "graceful-restart": True
3164 }
3165 }
3166 }
3167 }
3168 }
3169 },
3170 "ipv6": {
3171 "unicast": {
3172 "neighbor": {
3173 "r3": {
3174 "dest_link":{
3175 "r1": {
3176 "graceful-restart": True
3177 }
3178 }
3179 }
3180 }
3181 }
3182 }
3183 }
3184 }
3185 }
3186 }
3187
3188 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
3189 dut = "r1", peer = 'r2')
3190 Returns
3191 -------
3192 errormsg(str) or True
3193 """
3194
3195 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3196
3197 for router, rnode in tgen.routers().items():
3198 if router != dut:
3199 continue
3200
3201 try:
3202 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3203 except TypeError:
3204 bgp_addr_type = topo["routers"][dut]["bgp"][0]["address_family"]
3205
3206 # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3207
3208 if addr_type in bgp_addr_type:
3209 if not check_address_types(addr_type):
3210 continue
3211
3212 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3213
3214 for bgp_neighbor, peer_data in bgp_neighbors.items():
3215 if bgp_neighbor != peer:
3216 continue
3217
3218 for dest_link, peer_dict in peer_data["dest_link"].items():
3219 data = topo["routers"][bgp_neighbor]["links"]
3220
3221 if dest_link in data:
3222 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3223
3224 logger.info(
3225 "[DUT: {}]: Checking bgp graceful-restart show"
3226 " o/p {}".format(dut, neighbor_ip)
3227 )
3228
3229 show_bgp_graceful_json = None
3230
3231 show_bgp_graceful_json = run_frr_cmd(
3232 rnode,
3233 "show bgp {} neighbor {} graceful-restart json".format(
3234 addr_type, neighbor_ip
3235 ),
3236 isjson=True,
3237 )
3238
3239 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3240
3241 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3242 logger.info(
3243 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3244 )
3245 else:
3246 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3247 dut, neighbor_ip
3248 )
3249 return errormsg
3250
3251 lmode = None
3252 rmode = None
3253 # Local GR mode
3254 if "address_family" in input_dict[dut]["bgp"]:
3255 bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][
3256 "unicast"
3257 ]["neighbor"][peer]["dest_link"]
3258
3259 for dest_link, data in bgp_neighbors.items():
3260 if (
3261 "graceful-restart-helper" in data
3262 and data["graceful-restart-helper"]
3263 ):
3264 lmode = "Helper"
3265 elif "graceful-restart" in data and data["graceful-restart"]:
3266 lmode = "Restart"
3267 elif (
3268 "graceful-restart-disable" in data
3269 and data["graceful-restart-disable"]
3270 ):
3271 lmode = "Disable"
3272 else:
3273 lmode = None
3274
3275 if lmode is None:
3276 if "graceful-restart" in input_dict[dut]["bgp"]:
3277 if (
3278 "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
3279 and input_dict[dut]["bgp"]["graceful-restart"][
3280 "graceful-restart"
3281 ]
3282 ):
3283 lmode = "Restart*"
3284 elif (
3285 "graceful-restart-disable"
3286 in input_dict[dut]["bgp"]["graceful-restart"]
3287 and input_dict[dut]["bgp"]["graceful-restart"][
3288 "graceful-restart-disable"
3289 ]
3290 ):
3291 lmode = "Disable*"
3292 else:
3293 lmode = "Helper*"
3294 else:
3295 lmode = "Helper*"
3296
3297 if lmode == "Disable" or lmode == "Disable*":
3298 return True
3299
3300 # Remote GR mode
3301 if "address_family" in input_dict[peer]["bgp"]:
3302 bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][
3303 "unicast"
3304 ]["neighbor"][dut]["dest_link"]
3305
3306 for dest_link, data in bgp_neighbors.items():
3307 if (
3308 "graceful-restart-helper" in data
3309 and data["graceful-restart-helper"]
3310 ):
3311 rmode = "Helper"
3312 elif "graceful-restart" in data and data["graceful-restart"]:
3313 rmode = "Restart"
3314 elif (
3315 "graceful-restart-disable" in data
3316 and data["graceful-restart-disable"]
3317 ):
3318 rmode = "Disable"
3319 else:
3320 rmode = None
3321
3322 if rmode is None:
3323 if "graceful-restart" in input_dict[peer]["bgp"]:
3324 if (
3325 "graceful-restart"
3326 in input_dict[peer]["bgp"]["graceful-restart"]
3327 and input_dict[peer]["bgp"]["graceful-restart"][
3328 "graceful-restart"
3329 ]
3330 ):
3331 rmode = "Restart"
3332 elif (
3333 "graceful-restart-disable"
3334 in input_dict[peer]["bgp"]["graceful-restart"]
3335 and input_dict[peer]["bgp"]["graceful-restart"][
3336 "graceful-restart-disable"
3337 ]
3338 ):
3339 rmode = "Disable"
3340 else:
3341 rmode = "Helper"
3342 else:
3343 rmode = "Helper"
3344
3345 if show_bgp_graceful_json_out["localGrMode"] == lmode:
3346 logger.info(
3347 "[DUT: {}]: localGrMode : {} ".format(
3348 dut, show_bgp_graceful_json_out["localGrMode"]
3349 )
3350 )
3351 else:
3352 errormsg = (
3353 "[DUT: {}]: localGrMode is not correct"
3354 " Expected: {}, Found: {}".format(
3355 dut, lmode, show_bgp_graceful_json_out["localGrMode"]
3356 )
3357 )
3358 return errormsg
3359
3360 if show_bgp_graceful_json_out["remoteGrMode"] == rmode:
3361 logger.info(
3362 "[DUT: {}]: remoteGrMode : {} ".format(
3363 dut, show_bgp_graceful_json_out["remoteGrMode"]
3364 )
3365 )
3366 elif (
3367 show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable"
3368 and rmode == "Disable"
3369 ):
3370 logger.info(
3371 "[DUT: {}]: remoteGrMode : {} ".format(
3372 dut, show_bgp_graceful_json_out["remoteGrMode"]
3373 )
3374 )
3375 else:
3376 errormsg = (
3377 "[DUT: {}]: remoteGrMode is not correct"
3378 " Expected: {}, Found: {}".format(
3379 dut, rmode, show_bgp_graceful_json_out["remoteGrMode"]
3380 )
3381 )
3382 return errormsg
3383
3384 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3385 return True
3386
3387
3388 @retry(retry_timeout=10)
3389 def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3390 """
3391 This API is to verify r_bit in the BGP gr capability advertised
3392 by the neighbor router
3393
3394 Parameters
3395 ----------
3396 * `tgen`: topogen object
3397 * `topo`: input json file data
3398 * `addr_type` : ip type ipv4/ipv6
3399 * `input_dict`: input dictionary, have details of Device Under Test, for
3400 which user wants to test the data
3401 * `dut`: input dut router name
3402 * `peer`: peer name
3403 * `expected` : expected results from API, by-default True
3404
3405 Usage
3406 -----
3407 input_dict = {
3408 "r1": {
3409 "bgp": {
3410 "address_family": {
3411 "ipv4": {
3412 "unicast": {
3413 "neighbor": {
3414 "r3": {
3415 "dest_link":{
3416 "r1": {
3417 "graceful-restart": True
3418 }
3419 }
3420 }
3421 }
3422 }
3423 },
3424 "ipv6": {
3425 "unicast": {
3426 "neighbor": {
3427 "r3": {
3428 "dest_link":{
3429 "r1": {
3430 "graceful-restart": True
3431 }
3432 }
3433 }
3434 }
3435 }
3436 }
3437 }
3438 }
3439 }
3440 }
3441 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3442
3443 Returns
3444 -------
3445 errormsg(str) or True
3446 """
3447
3448 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3449
3450 for router, rnode in tgen.routers().items():
3451 if router != dut:
3452 continue
3453
3454 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3455
3456 if addr_type in bgp_addr_type:
3457 if not check_address_types(addr_type):
3458 continue
3459
3460 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3461
3462 for bgp_neighbor, peer_data in bgp_neighbors.items():
3463 if bgp_neighbor != peer:
3464 continue
3465
3466 for dest_link, peer_dict in peer_data["dest_link"].items():
3467 data = topo["routers"][bgp_neighbor]["links"]
3468
3469 if dest_link in data:
3470 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3471
3472 logger.info(
3473 "[DUT: {}]: Checking bgp graceful-restart show"
3474 " o/p {}".format(dut, neighbor_ip)
3475 )
3476
3477 show_bgp_graceful_json = run_frr_cmd(
3478 rnode,
3479 "show bgp {} neighbor {} graceful-restart json".format(
3480 addr_type, neighbor_ip
3481 ),
3482 isjson=True,
3483 )
3484
3485 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3486
3487 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3488 logger.info(
3489 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3490 )
3491 else:
3492 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3493 dut, neighbor_ip
3494 )
3495 return errormsg
3496
3497 if "rBit" in show_bgp_graceful_json_out:
3498 if show_bgp_graceful_json_out["rBit"]:
3499 logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip))
3500 else:
3501 errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
3502 return errormsg
3503
3504 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3505 return True
3506
3507
3508 @retry(retry_timeout=10)
3509 def verify_eor(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3510 """
3511 This API is to verify EOR
3512
3513 Parameters
3514 ----------
3515 * `tgen`: topogen object
3516 * `topo`: input json file data
3517 * `addr_type` : ip type ipv4/ipv6
3518 * `input_dict`: input dictionary, have details of DUT, for
3519 which user wants to test the data
3520 * `dut`: input dut router name
3521 * `peer`: peer name
3522 Usage
3523 -----
3524 input_dict = {
3525 input_dict = {
3526 "r1": {
3527 "bgp": {
3528 "address_family": {
3529 "ipv4": {
3530 "unicast": {
3531 "neighbor": {
3532 "r3": {
3533 "dest_link":{
3534 "r1": {
3535 "graceful-restart": True
3536 }
3537 }
3538 }
3539 }
3540 }
3541 },
3542 "ipv6": {
3543 "unicast": {
3544 "neighbor": {
3545 "r3": {
3546 "dest_link":{
3547 "r1": {
3548 "graceful-restart": True
3549 }
3550 }
3551 }
3552 }
3553 }
3554 }
3555 }
3556 }
3557 }
3558 }
3559
3560 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3561
3562 Returns
3563 -------
3564 errormsg(str) or True
3565 """
3566 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3567
3568 for router, rnode in tgen.routers().items():
3569 if router != dut:
3570 continue
3571
3572 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3573
3574 if addr_type in bgp_addr_type:
3575 if not check_address_types(addr_type):
3576 continue
3577
3578 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3579
3580 for bgp_neighbor, peer_data in bgp_neighbors.items():
3581 if bgp_neighbor != peer:
3582 continue
3583
3584 for dest_link, peer_dict in peer_data["dest_link"].items():
3585 data = topo["routers"][bgp_neighbor]["links"]
3586
3587 if dest_link in data:
3588 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3589
3590 logger.info(
3591 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3592 dut,
3593 neighbor_ip,
3594 )
3595
3596 show_bgp_graceful_json = run_frr_cmd(
3597 rnode,
3598 "show bgp {} neighbor {} graceful-restart json".format(
3599 addr_type, neighbor_ip
3600 ),
3601 isjson=True,
3602 )
3603
3604 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3605
3606 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3607 logger.info("[DUT: %s]: Neighbor ip matched %s", dut, neighbor_ip)
3608 else:
3609 errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3610 dut,
3611 neighbor_ip,
3612 )
3613 return errormsg
3614
3615 if addr_type == "ipv4":
3616 afi = "ipv4Unicast"
3617 elif addr_type == "ipv6":
3618 afi = "ipv6Unicast"
3619 else:
3620 errormsg = "Address type %s is not supported" % (addr_type)
3621 return errormsg
3622
3623 eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
3624 if "endOfRibSend" in eor_json:
3625 if eor_json["endOfRibSend"]:
3626 logger.info(
3627 "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
3628 )
3629 else:
3630 errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
3631 dut,
3632 neighbor_ip,
3633 afi,
3634 )
3635 return errormsg
3636
3637 if "endOfRibRecv" in eor_json:
3638 if eor_json["endOfRibRecv"]:
3639 logger.info(
3640 "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
3641 )
3642 else:
3643 errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
3644 dut,
3645 neighbor_ip,
3646 afi,
3647 )
3648 return errormsg
3649
3650 if "endOfRibSentAfterUpdate" in eor_json:
3651 if eor_json["endOfRibSentAfterUpdate"]:
3652 logger.info(
3653 "[DUT: %s]: EOR SendTime true for %s" " %s",
3654 dut,
3655 neighbor_ip,
3656 afi,
3657 )
3658 else:
3659 errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3660 dut,
3661 neighbor_ip,
3662 afi,
3663 )
3664 return errormsg
3665
3666 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3667 return True
3668
3669
3670 @retry(retry_timeout=8)
3671 def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3672 """
3673 This API is to verify f_bit in the BGP gr capability advertised
3674 by the neighbor router
3675
3676 Parameters
3677 ----------
3678 * `tgen`: topogen object
3679 * `topo`: input json file data
3680 * `addr_type` : ip type ipv4/ipv6
3681 * `input_dict`: input dictionary, have details of Device Under Test, for
3682 which user wants to test the data
3683 * `dut`: input dut router name
3684 * `peer`: peer name
3685 * `expected` : expected results from API, by-default True
3686
3687 Usage
3688 -----
3689 input_dict = {
3690 "r1": {
3691 "bgp": {
3692 "address_family": {
3693 "ipv4": {
3694 "unicast": {
3695 "neighbor": {
3696 "r3": {
3697 "dest_link":{
3698 "r1": {
3699 "graceful-restart": True
3700 }
3701 }
3702 }
3703 }
3704 }
3705 },
3706 "ipv6": {
3707 "unicast": {
3708 "neighbor": {
3709 "r3": {
3710 "dest_link":{
3711 "r1": {
3712 "graceful-restart": True
3713 }
3714 }
3715 }
3716 }
3717 }
3718 }
3719 }
3720 }
3721 }
3722 }
3723
3724 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3725
3726 Returns
3727 -------
3728 errormsg(str) or True
3729 """
3730
3731 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3732
3733 for router, rnode in tgen.routers().items():
3734 if router != dut:
3735 continue
3736
3737 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3738
3739 if addr_type in bgp_addr_type:
3740 if not check_address_types(addr_type):
3741 continue
3742
3743 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3744
3745 for bgp_neighbor, peer_data in bgp_neighbors.items():
3746 if bgp_neighbor != peer:
3747 continue
3748
3749 for dest_link, peer_dict in peer_data["dest_link"].items():
3750 data = topo["routers"][bgp_neighbor]["links"]
3751
3752 if dest_link in data:
3753 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3754
3755 logger.info(
3756 "[DUT: {}]: Checking bgp graceful-restart show"
3757 " o/p {}".format(dut, neighbor_ip)
3758 )
3759
3760 show_bgp_graceful_json = run_frr_cmd(
3761 rnode,
3762 "show bgp {} neighbor {} graceful-restart json".format(
3763 addr_type, neighbor_ip
3764 ),
3765 isjson=True,
3766 )
3767
3768 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3769
3770 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3771 logger.info(
3772 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3773 )
3774 else:
3775 errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format(
3776 dut, neighbor_ip
3777 )
3778 return errormsg
3779
3780 if "ipv4Unicast" in show_bgp_graceful_json_out:
3781 if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]:
3782 logger.info(
3783 "[DUT: {}]: Fbit True for {} IPv4"
3784 " Unicast".format(dut, neighbor_ip)
3785 )
3786 else:
3787 errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3788 dut, neighbor_ip
3789 )
3790 return errormsg
3791
3792 elif "ipv6Unicast" in show_bgp_graceful_json_out:
3793 if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]:
3794 logger.info(
3795 "[DUT: {}]: Fbit True for {} IPv6"
3796 " Unicast".format(dut, neighbor_ip)
3797 )
3798 else:
3799 errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3800 dut, neighbor_ip
3801 )
3802 return errormsg
3803 else:
3804 show_bgp_graceful_json_out["ipv4Unicast"]
3805 show_bgp_graceful_json_out["ipv6Unicast"]
3806
3807 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3808 return True
3809
3810
3811 @retry(retry_timeout=10)
3812 def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer):
3813 """
3814 This API is to verify graceful restart timers, configured and received
3815
3816 Parameters
3817 ----------
3818 * `tgen`: topogen object
3819 * `topo`: input json file data
3820 * `addr_type` : ip type ipv4/ipv6
3821 * `input_dict`: input dictionary, have details of Device Under Test,
3822 for which user wants to test the data
3823 * `dut`: input dut router name
3824 * `peer`: peer name
3825 * `expected` : expected results from API, by-default True
3826
3827 Usage
3828 -----
3829 # Configure graceful-restart
3830 input_dict_1 = {
3831 "r1": {
3832 "bgp": {
3833 "bgp_neighbors": {
3834 "r3": {
3835 "graceful-restart": "graceful-restart-helper"
3836 }
3837 },
3838 "gracefulrestart": ["restart-time 150"]
3839 }
3840 },
3841 "r3": {
3842 "bgp": {
3843 "bgp_neighbors": {
3844 "r1": {
3845 "graceful-restart": "graceful-restart"
3846 }
3847 }
3848 }
3849 }
3850 }
3851
3852 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3853
3854 Returns
3855 -------
3856 errormsg(str) or True
3857 """
3858
3859 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3860
3861 for router, rnode in tgen.routers().items():
3862 if router != dut:
3863 continue
3864
3865 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3866
3867 if addr_type in bgp_addr_type:
3868 if not check_address_types(addr_type):
3869 continue
3870
3871 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3872
3873 for bgp_neighbor, peer_data in bgp_neighbors.items():
3874 if bgp_neighbor != peer:
3875 continue
3876
3877 for dest_link, peer_dict in peer_data["dest_link"].items():
3878 data = topo["routers"][bgp_neighbor]["links"]
3879
3880 if dest_link in data:
3881 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3882
3883 logger.info(
3884 "[DUT: {}]: Checking bgp graceful-restart show"
3885 " o/p {}".format(dut, neighbor_ip)
3886 )
3887
3888 show_bgp_graceful_json = run_frr_cmd(
3889 rnode,
3890 "show bgp {} neighbor {} graceful-restart json".format(
3891 addr_type, neighbor_ip
3892 ),
3893 isjson=True,
3894 )
3895
3896 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3897 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3898 logger.info(
3899 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3900 )
3901 else:
3902 errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3903 dut, neighbor_ip
3904 )
3905 return errormsg
3906
3907 # Graceful-restart timer
3908 if "graceful-restart" in input_dict[peer]["bgp"]:
3909 if "timer" in input_dict[peer]["bgp"]["graceful-restart"]:
3910 for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][
3911 "timer"
3912 ].items():
3913 if rs_timer == "restart-time":
3914 receivedTimer = value
3915 if (
3916 show_bgp_graceful_json_out["timers"][
3917 "receivedRestartTimer"
3918 ]
3919 == receivedTimer
3920 ):
3921 logger.info(
3922 "receivedRestartTimer is {}"
3923 " on {} from peer {}".format(
3924 receivedTimer, router, peer
3925 )
3926 )
3927 else:
3928 errormsg = (
3929 "receivedRestartTimer is not"
3930 " as expected {}".format(receivedTimer)
3931 )
3932 return errormsg
3933
3934 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3935 return True
3936
3937
3938 @retry(retry_timeout=8)
3939 def verify_gr_address_family(
3940 tgen, topo, addr_type, addr_family, dut, peer, expected=True
3941 ):
3942 """
3943 This API is to verify gr_address_family in the BGP gr capability advertised
3944 by the neighbor router
3945
3946 Parameters
3947 ----------
3948 * `tgen`: topogen object
3949 * `topo`: input json file data
3950 * `addr_type` : ip type ipv4/ipv6
3951 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3952 * `dut`: input dut router name
3953 * `peer`: input peer router to check
3954 * `expected` : expected results from API, by-default True
3955
3956 Usage
3957 -----
3958
3959 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
3960
3961 Returns
3962 -------
3963 errormsg(str) or None
3964 """
3965
3966 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3967
3968 if not check_address_types(addr_type):
3969 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3970 return
3971
3972 routers = tgen.routers()
3973 if dut not in routers:
3974 return "{} not in routers".format(dut)
3975
3976 rnode = routers[dut]
3977 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3978
3979 if addr_type not in bgp_addr_type:
3980 return "{} not in bgp_addr_types".format(addr_type)
3981
3982 if peer not in bgp_addr_type[addr_type]["unicast"]["neighbor"]:
3983 return "{} not a peer of {} over {}".format(peer, dut, addr_type)
3984
3985 nbr_links = topo["routers"][peer]["links"]
3986 if dut not in nbr_links or addr_type not in nbr_links[dut]:
3987 return "peer {} missing back link to {} over {}".format(peer, dut, addr_type)
3988
3989 neighbor_ip = nbr_links[dut][addr_type].split("/")[0]
3990
3991 logger.info(
3992 "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
3993 dut, neighbor_ip, addr_family
3994 )
3995 )
3996
3997 show_bgp_graceful_json = run_frr_cmd(
3998 rnode,
3999 "show bgp {} neighbor {} graceful-restart json".format(addr_type, neighbor_ip),
4000 isjson=True,
4001 )
4002
4003 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
4004
4005 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
4006 logger.info("Neighbor ip matched {}".format(neighbor_ip))
4007 else:
4008 errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
4009 return errormsg
4010
4011 if addr_family == "ipv4Unicast":
4012 if "ipv4Unicast" in show_bgp_graceful_json_out:
4013 logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
4014 return True
4015 else:
4016 errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
4017 return errormsg
4018
4019 elif addr_family == "ipv6Unicast":
4020 if "ipv6Unicast" in show_bgp_graceful_json_out:
4021 logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
4022 return True
4023 else:
4024 errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
4025 return errormsg
4026 else:
4027 errormsg = "Aaddress family: {} present for {} ".format(
4028 addr_family, neighbor_ip
4029 )
4030 return errormsg
4031
4032 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4033
4034
4035 @retry(retry_timeout=12)
4036 def verify_attributes_for_evpn_routes(
4037 tgen,
4038 topo,
4039 dut,
4040 input_dict,
4041 rd=None,
4042 rt=None,
4043 ethTag=None,
4044 ipLen=None,
4045 rd_peer=None,
4046 rt_peer=None,
4047 expected=True,
4048 ):
4049 """
4050 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
4051 command.
4052 Parameters
4053 ----------
4054 * `tgen`: topogen object
4055 * `topo` : json file data
4056 * `dut` : device under test
4057 * `input_dict`: having details like - for which route, rd value
4058 needs to be verified
4059 * `rd` : route distinguisher
4060 * `rt` : route target
4061 * `ethTag` : Ethernet Tag
4062 * `ipLen` : IP prefix length
4063 * `rd_peer` : Peer name from which RD will be auto-generated
4064 * `rt_peer` : Peer name from which RT will be auto-generated
4065 * `expected` : expected results from API, by-default True
4066
4067 Usage
4068 -----
4069 input_dict_1 = {
4070 "r1": {
4071 "static_routes": [{
4072 "network": [NETWORK1_1[addr_type]],
4073 "next_hop": NEXT_HOP_IP[addr_type],
4074 "vrf": "RED"
4075 }]
4076 }
4077 }
4078
4079 result = verify_attributes_for_evpn_routes(tgen, topo,
4080 input_dict, rd = "10.0.0.33:1")
4081
4082 Returns
4083 -------
4084 errormsg(str) or True
4085 """
4086
4087 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4088 for router in input_dict.keys():
4089 rnode = tgen.routers()[dut]
4090
4091 if "static_routes" in input_dict[router]:
4092 for static_route in input_dict[router]["static_routes"]:
4093 network = static_route["network"]
4094
4095 if "vrf" in static_route:
4096 vrf = static_route["vrf"]
4097
4098 if type(network) is not list:
4099 network = [network]
4100
4101 for route in network:
4102 route = route.split("/")[0]
4103 _addr_type = validate_ip_address(route)
4104 if "v4" in _addr_type:
4105 input_afi = "v4"
4106 elif "v6" in _addr_type:
4107 input_afi = "v6"
4108
4109 cmd = "show bgp l2vpn evpn {} json".format(route)
4110 evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4111 if not bool(evpn_rd_value_json):
4112 errormsg = "No output for '{}' cli".format(cmd)
4113 return errormsg
4114
4115 if rd is not None and rd != "auto":
4116 logger.info(
4117 "[DUT: %s]: Verifying rd value for " "evpn route %s:",
4118 dut,
4119 route,
4120 )
4121
4122 if rd in evpn_rd_value_json:
4123 rd_value_json = evpn_rd_value_json[rd]
4124 if rd_value_json["rd"] != rd:
4125 errormsg = (
4126 "[DUT: %s] Failed: Verifying"
4127 " RD value for EVPN route: %s"
4128 "[FAILED]!!, EXPECTED : %s "
4129 " FOUND : %s"
4130 % (dut, route, rd, rd_value_json["rd"])
4131 )
4132 return errormsg
4133
4134 else:
4135 logger.info(
4136 "[DUT %s]: Verifying RD value for"
4137 " EVPN route: %s [PASSED]|| "
4138 "Found Expected: %s",
4139 dut,
4140 route,
4141 rd,
4142 )
4143 return True
4144
4145 else:
4146 errormsg = (
4147 "[DUT: %s] RD : %s is not present"
4148 " in cli json output" % (dut, rd)
4149 )
4150 return errormsg
4151
4152 if rd == "auto":
4153 logger.info(
4154 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
4155 dut,
4156 route,
4157 )
4158
4159 if rd_peer:
4160 index = 1
4161 vni_dict = {}
4162
4163 rnode = tgen.routers()[rd_peer]
4164 vrfs = topo["routers"][rd_peer]["vrfs"]
4165 for vrf_dict in vrfs:
4166 vni_dict[vrf_dict["name"]] = index
4167 index += 1
4168
4169 show_bgp_json = run_frr_cmd(
4170 rnode, "show bgp vrf all summary json", isjson=True
4171 )
4172
4173 # Verifying output dictionary show_bgp_json is empty
4174 if not bool(show_bgp_json):
4175 errormsg = "BGP is not running"
4176 return errormsg
4177
4178 show_bgp_json_vrf = show_bgp_json[vrf]
4179 for afi, afi_data in show_bgp_json_vrf.items():
4180 if input_afi not in afi:
4181 continue
4182 router_id = afi_data["routerId"]
4183
4184 found = False
4185 rd = "{}:{}".format(router_id, vni_dict[vrf])
4186 for _rd, rd_value_json in evpn_rd_value_json.items():
4187 if (
4188 str(rd_value_json["rd"].split(":")[0])
4189 != rd.split(":")[0]
4190 ):
4191 continue
4192
4193 if int(rd_value_json["rd"].split(":")[1]) > 0:
4194 found = True
4195
4196 if found:
4197 logger.info(
4198 "[DUT %s]: Verifying RD value for"
4199 " EVPN route: %s "
4200 "Found Expected: %s",
4201 dut,
4202 route,
4203 rd_value_json["rd"],
4204 )
4205 return True
4206 else:
4207 errormsg = (
4208 "[DUT: %s] Failed: Verifying"
4209 " RD value for EVPN route: %s"
4210 " FOUND : %s" % (dut, route, rd_value_json["rd"])
4211 )
4212 return errormsg
4213
4214 if rt == "auto":
4215 vni_dict = {}
4216 logger.info(
4217 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
4218 dut,
4219 route,
4220 )
4221
4222 if rt_peer:
4223 rnode = tgen.routers()[rt_peer]
4224 show_bgp_json = run_frr_cmd(
4225 rnode, "show bgp vrf all summary json", isjson=True
4226 )
4227
4228 # Verifying output dictionary show_bgp_json is empty
4229 if not bool(show_bgp_json):
4230 errormsg = "BGP is not running"
4231 return errormsg
4232
4233 show_bgp_json_vrf = show_bgp_json[vrf]
4234 for afi, afi_data in show_bgp_json_vrf.items():
4235 if input_afi not in afi:
4236 continue
4237 as_num = afi_data["as"]
4238
4239 show_vrf_vni_json = run_frr_cmd(
4240 rnode, "show vrf vni json", isjson=True
4241 )
4242
4243 vrfs = show_vrf_vni_json["vrfs"]
4244 for vrf_dict in vrfs:
4245 if vrf_dict["vrf"] == vrf:
4246 vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"])
4247
4248 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
4249 # for auto derived RT value.
4250 if as_num > 65535:
4251 as_bin = bin(as_num)
4252 as_bin = as_bin[-16:]
4253 as_num = int(as_bin, 2)
4254
4255 rt = "{}:{}".format(str(as_num), vni_dict[vrf])
4256 for _rd, route_data in evpn_rd_value_json.items():
4257 if route_data["ip"] == route:
4258 for rt_data in route_data["paths"]:
4259 if vni_dict[vrf] == rt_data["vni"]:
4260 rt_string = rt_data["extendedCommunity"][
4261 "string"
4262 ]
4263 rt_input = "RT:{}".format(rt)
4264 if rt_input not in rt_string:
4265 errormsg = (
4266 "[DUT: %s] Failed:"
4267 " Verifying RT "
4268 "value for EVPN "
4269 " route: %s"
4270 "[FAILED]!!,"
4271 " EXPECTED : %s "
4272 " FOUND : %s"
4273 % (dut, route, rt_input, rt_string)
4274 )
4275 return errormsg
4276
4277 else:
4278 logger.info(
4279 "[DUT %s]: Verifying "
4280 "RT value for EVPN "
4281 "route: %s [PASSED]||"
4282 "Found Expected: %s",
4283 dut,
4284 route,
4285 rt_input,
4286 )
4287 return True
4288
4289 else:
4290 errormsg = (
4291 "[DUT: %s] Route : %s is not"
4292 " present in cli json output" % (dut, route)
4293 )
4294 return errormsg
4295
4296 if rt is not None and rt != "auto":
4297 logger.info(
4298 "[DUT: %s]: Verifying rt value for " "evpn route %s:",
4299 dut,
4300 route,
4301 )
4302
4303 if type(rt) is not list:
4304 rt = [rt]
4305
4306 for _rt in rt:
4307 for _rd, route_data in evpn_rd_value_json.items():
4308 if route_data["ip"] == route:
4309 for rt_data in route_data["paths"]:
4310 rt_string = rt_data["extendedCommunity"][
4311 "string"
4312 ]
4313 rt_input = "RT:{}".format(_rt)
4314 if rt_input not in rt_string:
4315 errormsg = (
4316 "[DUT: %s] Failed: "
4317 "Verifying RT value "
4318 "for EVPN route: %s"
4319 "[FAILED]!!,"
4320 " EXPECTED : %s "
4321 " FOUND : %s"
4322 % (dut, route, rt_input, rt_string)
4323 )
4324 return errormsg
4325
4326 else:
4327 logger.info(
4328 "[DUT %s]: Verifying RT"
4329 " value for EVPN route:"
4330 " %s [PASSED]|| "
4331 "Found Expected: %s",
4332 dut,
4333 route,
4334 rt_input,
4335 )
4336 return True
4337
4338 else:
4339 errormsg = (
4340 "[DUT: %s] Route : %s is not"
4341 " present in cli json output" % (dut, route)
4342 )
4343 return errormsg
4344
4345 if ethTag is not None:
4346 logger.info(
4347 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
4348 )
4349
4350 for _rd, route_data in evpn_rd_value_json.items():
4351 if route_data["ip"] == route:
4352 if route_data["ethTag"] != ethTag:
4353 errormsg = (
4354 "[DUT: %s] RD: %s, Failed: "
4355 "Verifying ethTag value "
4356 "for EVPN route: %s"
4357 "[FAILED]!!,"
4358 " EXPECTED : %s "
4359 " FOUND : %s"
4360 % (
4361 dut,
4362 _rd,
4363 route,
4364 ethTag,
4365 route_data["ethTag"],
4366 )
4367 )
4368 return errormsg
4369
4370 else:
4371 logger.info(
4372 "[DUT %s]: RD: %s, Verifying "
4373 "ethTag value for EVPN route:"
4374 " %s [PASSED]|| "
4375 "Found Expected: %s",
4376 dut,
4377 _rd,
4378 route,
4379 ethTag,
4380 )
4381 return True
4382
4383 else:
4384 errormsg = (
4385 "[DUT: %s] RD: %s, Route : %s "
4386 "is not present in cli json "
4387 "output" % (dut, _rd, route)
4388 )
4389 return errormsg
4390
4391 if ipLen is not None:
4392 logger.info(
4393 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4394 )
4395
4396 for _rd, route_data in evpn_rd_value_json.items():
4397 if route_data["ip"] == route:
4398 if route_data["ipLen"] != int(ipLen):
4399 errormsg = (
4400 "[DUT: %s] RD: %s, Failed: "
4401 "Verifying ipLen value "
4402 "for EVPN route: %s"
4403 "[FAILED]!!,"
4404 " EXPECTED : %s "
4405 " FOUND : %s"
4406 % (dut, _rd, route, ipLen, route_data["ipLen"])
4407 )
4408 return errormsg
4409
4410 else:
4411 logger.info(
4412 "[DUT %s]: RD: %s, Verifying "
4413 "ipLen value for EVPN route:"
4414 " %s [PASSED]|| "
4415 "Found Expected: %s",
4416 dut,
4417 _rd,
4418 route,
4419 ipLen,
4420 )
4421 return True
4422
4423 else:
4424 errormsg = (
4425 "[DUT: %s] RD: %s, Route : %s "
4426 "is not present in cli json "
4427 "output " % (dut, _rd, route)
4428 )
4429 return errormsg
4430
4431 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4432 return False
4433
4434
4435 @retry(retry_timeout=10)
4436 def verify_evpn_routes(
4437 tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None, expected=True
4438 ):
4439 """
4440 API to verify evpn routes using "sh bgp l2vpn evpn"
4441 command.
4442 Parameters
4443 ----------
4444 * `tgen`: topogen object
4445 * `topo` : json file data
4446 * `dut` : device under test
4447 * `input_dict`: having details like - for which route, rd value
4448 needs to be verified
4449 * `route_type` : Route type 5 is supported as of now
4450 * `EthTag` : Ethernet tag, by-default is 0
4451 * `next_hop` : Prefered nexthop for the evpn routes
4452 * `expected` : expected results from API, by-default True
4453
4454 Usage
4455 -----
4456 input_dict_1 = {
4457 "r1": {
4458 "static_routes": [{
4459 "network": [NETWORK1_1[addr_type]],
4460 "next_hop": NEXT_HOP_IP[addr_type],
4461 "vrf": "RED"
4462 }]
4463 }
4464 }
4465 result = verify_evpn_routes(tgen, topo, input_dict)
4466 Returns
4467 -------
4468 errormsg(str) or True
4469 """
4470
4471 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4472
4473 for router in input_dict.keys():
4474 rnode = tgen.routers()[dut]
4475
4476 logger.info("[DUT: %s]: Verifying evpn routes: ", dut)
4477
4478 if "static_routes" in input_dict[router]:
4479 for static_route in input_dict[router]["static_routes"]:
4480 network = static_route["network"]
4481
4482 if type(network) is not list:
4483 network = [network]
4484
4485 missing_routes = {}
4486 for route in network:
4487 rd_keys = 0
4488 ip_len = route.split("/")[1]
4489 route = route.split("/")[0]
4490
4491 prefix = "[{}]:[{}]:[{}]:[{}]".format(
4492 routeType, EthTag, ip_len, route
4493 )
4494
4495 cmd = "show bgp l2vpn evpn route json"
4496 evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4497
4498 if not bool(evpn_value_json):
4499 errormsg = "No output for '{}' cli".format(cmd)
4500 return errormsg
4501
4502 if evpn_value_json["numPrefix"] == 0:
4503 errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut)
4504 return errormsg
4505
4506 for key, route_data_json in evpn_value_json.items():
4507 if type(route_data_json) is dict:
4508 rd_keys += 1
4509 if prefix not in route_data_json:
4510 missing_routes[key] = prefix
4511
4512 if rd_keys == len(missing_routes.keys()):
4513 errormsg = (
4514 "[DUT: %s]: "
4515 "Missing EVPN routes: "
4516 "%s [FAILED]!!" % (dut, list(set(missing_routes.values())))
4517 )
4518 return errormsg
4519
4520 for key, route_data_json in evpn_value_json.items():
4521 if type(route_data_json) is dict:
4522 if prefix not in route_data_json:
4523 continue
4524
4525 for paths in route_data_json[prefix]["paths"]:
4526 for path in paths:
4527 if path["routeType"] != routeType:
4528 errormsg = (
4529 "[DUT: %s]: "
4530 "Verifying routeType "
4531 "for EVPN route: %s "
4532 "[FAILED]!! "
4533 "Expected: %s, "
4534 "Found: %s"
4535 % (
4536 dut,
4537 prefix,
4538 routeType,
4539 path["routeType"],
4540 )
4541 )
4542 return errormsg
4543
4544 elif next_hop:
4545 for nh_dict in path["nexthops"]:
4546 if nh_dict["ip"] != next_hop:
4547 errormsg = (
4548 "[DUT: %s]: "
4549 "Verifying "
4550 "nexthop for "
4551 "EVPN route: %s"
4552 "[FAILED]!! "
4553 "Expected: %s,"
4554 " Found: %s"
4555 % (
4556 dut,
4557 prefix,
4558 next_hop,
4559 nh_dict["ip"],
4560 )
4561 )
4562 return errormsg
4563
4564 else:
4565 logger.info(
4566 "[DUT %s]: Verifying "
4567 "EVPN route : %s, "
4568 "routeType: %s is "
4569 "installed "
4570 "[PASSED]|| ",
4571 dut,
4572 prefix,
4573 routeType,
4574 )
4575 return True
4576
4577 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4578
4579 return False
4580
4581
4582 @retry(retry_timeout=10)
4583 def verify_bgp_bestpath(tgen, addr_type, input_dict):
4584 """
4585 Verifies bgp next hop values in best-path output
4586
4587 * `dut` : device under test
4588 * `addr_type` : Address type ipv4/ipv6
4589 * `input_dict`: having details like multipath and bestpath
4590
4591 Usage
4592 -----
4593 input_dict_1 = {
4594 "r1": {
4595 "ipv4" : {
4596 "bestpath": "50.0.0.1",
4597 "multipath": ["50.0.0.1", "50.0.0.2"],
4598 "network": "100.0.0.0/24"
4599 }
4600 "ipv6" : {
4601 "bestpath": "1000::1",
4602 "multipath": ["1000::1", "1000::2"]
4603 "network": "2000::1/128"
4604 }
4605 }
4606 }
4607
4608 result = verify_bgp_bestpath(tgen, input_dict)
4609
4610 """
4611
4612 result = False
4613 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4614 for dut in input_dict.keys():
4615 rnode = tgen.routers()[dut]
4616
4617 logger.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut)
4618 result = False
4619 for network_dict in input_dict[dut][addr_type]:
4620 nw_addr = network_dict.setdefault("network", None)
4621 vrf = network_dict.setdefault("vrf", None)
4622 bestpath = network_dict.setdefault("bestpath", None)
4623
4624 if vrf:
4625 cmd = "show bgp vrf {} {} {} bestpath json".format(
4626 vrf, addr_type, nw_addr
4627 )
4628 else:
4629 cmd = "show bgp {} {} bestpath json".format(addr_type, nw_addr)
4630
4631 data = run_frr_cmd(rnode, cmd, isjson=True)
4632 route = data["paths"][0]
4633
4634 if "bestpath" in route:
4635 if route["bestpath"]["overall"] is True:
4636 _bestpath = route["nexthops"][0]["ip"]
4637
4638 if _bestpath != bestpath:
4639 return (
4640 "DUT:[{}] Bestpath do not match for"
4641 " network: {}, Expected "
4642 " {} as bgp bestpath found {}".format(
4643 dut, nw_addr, bestpath, _bestpath
4644 )
4645 )
4646
4647 logger.info(
4648 "DUT:[{}] Found expected bestpath: "
4649 " {} for network: {}".format(dut, _bestpath, nw_addr)
4650 )
4651 result = True
4652
4653 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4654 return result
4655
4656
4657 def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
4658 """
4659 This api is used to verify the tcp-mss value assigned to a neigbour of DUT
4660
4661 Parameters
4662 ----------
4663 * `tgen` : topogen object
4664 * `dut`: device under test
4665 * `neighbour`:neigbout IP address
4666 * `configured_tcp_mss`:The TCP-MSS value to be verified
4667 * `vrf`:vrf
4668
4669 Usage
4670 -----
4671 result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss)
4672 Returns
4673 -------
4674 errormsg(str) or True
4675 """
4676
4677 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4678 rnode = tgen.routers()[dut]
4679 if vrf:
4680 cmd = "show bgp vrf {} neighbors {} json".format(vrf, neighbour)
4681 else:
4682 cmd = "show bgp neighbors {} json".format(neighbour)
4683
4684 # Execute the command
4685 show_vrf_stats = run_frr_cmd(rnode, cmd, isjson=True)
4686
4687 # Verify TCP-MSS on router
4688 logger.info("Verify that no core is observed")
4689 if tgen.routers_have_failure():
4690 errormsg = "Core observed while running CLI: %s" % (cmd)
4691 return errormsg
4692 else:
4693 if configured_tcp_mss == show_vrf_stats.get(neighbour).get(
4694 "bgpTcpMssConfigured"
4695 ):
4696 logger.debug(
4697 "Configured TCP - MSS Found: {}".format(sys._getframe().f_code.co_name)
4698 )
4699 return True
4700 else:
4701 logger.debug(
4702 "TCP-MSS Mismatch ,configured {} expecting {}".format(
4703 show_vrf_stats.get(neighbour).get("bgpTcpMssConfigured"),
4704 configured_tcp_mss,
4705 )
4706 )
4707 return "TCP-MSS Mismatch"
4708 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4709 return False
4710
4711
4712 def get_dut_as_number(tgen, dut):
4713 """
4714 API to get the Autonomous Number of the given DUT
4715
4716 params:
4717 =======
4718 dut : Device Under test
4719
4720 returns :
4721 =======
4722 Success : DUT Autonomous number
4723 Fail : Error message with Boolean False
4724 """
4725 tgen = get_topogen()
4726 for router, rnode in tgen.routers().items():
4727 if router == dut:
4728 show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True)
4729 as_number = show_bgp_json["ipv4Unicast"]["as"]
4730 if as_number:
4731 logger.info(
4732 "[dut {}] DUT contains Automnomous number :: {} ".format(
4733 dut, as_number
4734 )
4735 )
4736 return as_number
4737 else:
4738 logger.error(
4739 "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
4740 dut
4741 )
4742 )
4743 return False
4744
4745
4746 def get_prefix_count_route(
4747 tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None
4748 ):
4749 """
4750 API to return the prefix count of default originate the given DUT
4751 dut : Device under test
4752 peer : neigbor on which you are expecting the route to be received
4753
4754 returns :
4755 prefix_count as dict with ipv4 and ipv6 value
4756 """
4757 # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
4758
4759 if link:
4760 neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"]
4761 neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"]
4762 else:
4763 neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"]
4764 neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"]
4765
4766 neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0]
4767 neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0]
4768 prefix_count = {}
4769 tgen = get_topogen()
4770 for router, rnode in tgen.routers().items():
4771 if router == dut:
4772 if vrf:
4773 ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
4774 show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
4775 ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf)
4776 show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True)
4777
4778 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][
4779 neighbor_ipv4_address
4780 ]["pfxRcd"]
4781 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4782 neighbor_ipv6_address
4783 ]["pfxRcd"]
4784
4785 logger.info(
4786 "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4787 dut,
4788 vrf,
4789 neighbor_ipv4_address,
4790 neighbor_ipv6_address,
4791 prefix_count,
4792 )
4793 )
4794 return prefix_count
4795
4796 else:
4797 show_bgp_json_ipv4 = run_frr_cmd(
4798 rnode, "sh ip bgp summary json ", isjson=True
4799 )
4800 show_bgp_json_ipv6 = run_frr_cmd(
4801 rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True
4802 )
4803 if received:
4804 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4805 "peers"
4806 ][neighbor_ipv4_address]["pfxRcd"]
4807 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4808 neighbor_ipv6_address
4809 ]["pfxRcd"]
4810
4811 elif sent:
4812 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4813 "peers"
4814 ][neighbor_ipv4_address]["pfxSnt"]
4815 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4816 neighbor_ipv6_address
4817 ]["pfxSnt"]
4818
4819 else:
4820 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4821 "peers"
4822 ][neighbor_ipv4_address]["pfxRcd"]
4823 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4824 neighbor_ipv6_address
4825 ]["pfxRcd"]
4826
4827 logger.info(
4828 "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4829 dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count
4830 )
4831 )
4832 return prefix_count
4833 else:
4834 logger.error("ERROR...! Unknown dut {} in topolgy".format(dut))
4835
4836
4837 @retry(retry_timeout=5)
4838 def verify_rib_default_route(
4839 tgen,
4840 topo,
4841 dut,
4842 routes,
4843 expected_nexthop,
4844 metric=None,
4845 origin=None,
4846 locPrf=None,
4847 expected_aspath=None,
4848 ):
4849 """
4850 API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
4851
4852 param
4853 =====
4854 dut : device under test
4855 routes : default route with expected nexthop
4856 expected_nexthop : the nexthop that is expected the deafult route
4857
4858 """
4859 result = False
4860 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4861 tgen = get_topogen()
4862 connected_routes = {}
4863 for router, rnode in tgen.routers().items():
4864 if router == dut:
4865 ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
4866 ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
4867 is_ipv4_default_attrib_found = False
4868 is_ipv6_default_attrib_found = False
4869
4870 default_ipv4_route = routes["ipv4"]
4871 default_ipv6_route = "::/0"
4872 ipv4_route_Origin = False
4873 ipv4_route_local_pref = False
4874 ipv4_route_metric = False
4875
4876 if default_ipv4_route in ipv4_routes["routes"].keys():
4877 nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route])
4878 rib_next_hops = []
4879 for index in range(nxt_hop_count):
4880 rib_next_hops.append(
4881 ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"]
4882 )
4883
4884 for nxt_hop in expected_nexthop.items():
4885 if nxt_hop[0] == "ipv4":
4886 if nxt_hop[1] in rib_next_hops:
4887 logger.info(
4888 "Default routes [{}] obtained from {} .....PASSED".format(
4889 default_ipv4_route, nxt_hop[1]
4890 )
4891 )
4892 else:
4893 logger.error(
4894 "ERROR ...! Default routes [{}] expected is missing {}".format(
4895 default_ipv4_route, nxt_hop[1]
4896 )
4897 )
4898 return False
4899
4900 else:
4901 pass
4902
4903 if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4904 ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"]
4905 if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4906 ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][
4907 "locPrf"
4908 ]
4909 if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4910 ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"]
4911 else:
4912 logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut))
4913 return False
4914
4915 origin_found = False
4916 locPrf_found = False
4917 metric_found = False
4918 as_path_found = False
4919
4920 if origin:
4921 if origin == ipv4_route_Origin:
4922 logger.info(
4923 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
4924 default_ipv4_route, origin
4925 )
4926 )
4927 origin_found = True
4928 else:
4929 logger.error(
4930 "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
4931 origin, ipv4_route_Origin
4932 )
4933 )
4934 return False
4935 else:
4936 origin_found = True
4937
4938 if locPrf:
4939 if locPrf == ipv4_route_local_pref:
4940 logger.info(
4941 "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
4942 default_ipv4_route, locPrf
4943 )
4944 )
4945 locPrf_found = True
4946 else:
4947 logger.error(
4948 "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
4949 locPrf, ipv4_route_local_pref
4950 )
4951 )
4952 return False
4953 else:
4954 locPrf_found = True
4955
4956 if metric:
4957 if metric == ipv4_route_metric:
4958 logger.info(
4959 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
4960 default_ipv4_route, metric
4961 )
4962 )
4963
4964 metric_found = True
4965 else:
4966 logger.error(
4967 "ERROR... IPV4::! Expected metric is {} obtained {}".format(
4968 metric, ipv4_route_metric
4969 )
4970 )
4971 return False
4972 else:
4973 metric_found = True
4974
4975 if expected_aspath:
4976 obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"]
4977 if expected_aspath in obtained_aspath:
4978 as_path_found = True
4979 logger.info(
4980 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
4981 default_ipv4_route, expected_aspath
4982 )
4983 )
4984 else:
4985 logger.error(
4986 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
4987 expected_aspath, obtained_aspath
4988 )
4989 )
4990 return False
4991 else:
4992 as_path_found = True
4993
4994 if origin_found and locPrf_found and metric_found and as_path_found:
4995 is_ipv4_default_attrib_found = True
4996 logger.info(
4997 "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
4998 origin, locPrf, metric, expected_aspath
4999 )
5000 )
5001 else:
5002 is_ipv4_default_attrib_found = False
5003 logger.error(
5004 "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
5005 origin, ipv4_route_Origin
5006 )
5007 )
5008 logger.error(
5009 "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
5010 locPrf, ipv4_route_local_pref
5011 )
5012 )
5013 logger.error(
5014 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5015 metric, ipv4_route_metric
5016 )
5017 )
5018 logger.error(
5019 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5020 expected_aspath, obtained_aspath
5021 )
5022 )
5023
5024 route_Origin = False
5025 route_local_pref = False
5026 route_local_metric = False
5027 default_ipv6_route = ""
5028 try:
5029 ipv6_routes["routes"]["0::0/0"]
5030 default_ipv6_route = "0::0/0"
5031 except:
5032 ipv6_routes["routes"]["::/0"]
5033 default_ipv6_route = "::/0"
5034 if default_ipv6_route in ipv6_routes["routes"].keys():
5035 nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route])
5036 rib_next_hops = []
5037 for index in range(nxt_hop_count):
5038 rib_next_hops.append(
5039 ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"]
5040 )
5041 try:
5042 rib_next_hops.append(
5043 ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][
5044 "ip"
5045 ]
5046 )
5047 except (KeyError, IndexError) as e:
5048 logger.error("NO impact ..! Global IPV6 Address not found ")
5049
5050 for nxt_hop in expected_nexthop.items():
5051 if nxt_hop[0] == "ipv6":
5052 if nxt_hop[1] in rib_next_hops:
5053 logger.info(
5054 "Default routes [{}] obtained from {} .....PASSED".format(
5055 default_ipv6_route, nxt_hop[1]
5056 )
5057 )
5058 else:
5059 logger.error(
5060 "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
5061 default_ipv6_route, nxt_hop[1], rib_next_hops
5062 )
5063 )
5064 return False
5065
5066 else:
5067 pass
5068 if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5069 route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"]
5070 if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5071 route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"]
5072 if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5073 route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"]
5074
5075 origin_found = False
5076 locPrf_found = False
5077 metric_found = False
5078 as_path_found = False
5079
5080 if origin:
5081 if origin == route_Origin:
5082 logger.info(
5083 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
5084 default_ipv6_route, route_Origin
5085 )
5086 )
5087 origin_found = True
5088 else:
5089 logger.error(
5090 "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
5091 origin, route_Origin
5092 )
5093 )
5094 return False
5095 else:
5096 origin_found = True
5097
5098 if locPrf:
5099 if locPrf == route_local_pref:
5100 logger.info(
5101 "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
5102 default_ipv6_route, route_local_pref
5103 )
5104 )
5105 locPrf_found = True
5106 else:
5107 logger.error(
5108 "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
5109 locPrf, route_local_pref
5110 )
5111 )
5112 return False
5113 else:
5114 locPrf_found = True
5115
5116 if metric:
5117 if metric == route_local_metric:
5118 logger.info(
5119 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
5120 default_ipv4_route, metric
5121 )
5122 )
5123
5124 metric_found = True
5125 else:
5126 logger.error(
5127 "ERROR... IPV6::! Expected metric is {} obtained {}".format(
5128 metric, route_local_metric
5129 )
5130 )
5131 return False
5132 else:
5133 metric_found = True
5134
5135 if expected_aspath:
5136 obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"]
5137 if expected_aspath in obtained_aspath:
5138 as_path_found = True
5139 logger.info(
5140 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5141 default_ipv4_route, expected_aspath
5142 )
5143 )
5144 else:
5145 logger.error(
5146 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5147 expected_aspath, obtained_aspath
5148 )
5149 )
5150 return False
5151 else:
5152 as_path_found = True
5153
5154 if origin_found and locPrf_found and metric_found and as_path_found:
5155 is_ipv6_default_attrib_found = True
5156 logger.info(
5157 "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5158 origin, locPrf, metric, expected_aspath
5159 )
5160 )
5161 else:
5162 is_ipv6_default_attrib_found = False
5163 logger.error(
5164 "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin)
5165 )
5166 logger.error(
5167 "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
5168 locPrf, route_local_pref
5169 )
5170 )
5171 logger.error(
5172 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5173 metric, route_local_metric
5174 )
5175 )
5176 logger.error(
5177 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5178 expected_aspath, obtained_aspath
5179 )
5180 )
5181
5182 if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found:
5183 logger.info("The attributes are found for default route in RIB ")
5184 return True
5185 else:
5186 return False
5187
5188
5189 @retry(retry_timeout=5)
5190 def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop):
5191 """
5192 API to verify the the 'Default route" in FIB
5193
5194 param
5195 =====
5196 dut : device under test
5197 routes : default route with expected nexthop
5198 expected_nexthop : the nexthop that is expected the deafult route
5199
5200 """
5201 result = False
5202 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5203 tgen = get_topogen()
5204 connected_routes = {}
5205 for router, rnode in tgen.routers().items():
5206 if router == dut:
5207 ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True)
5208 ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True)
5209
5210 is_ipv4_default_route_found = False
5211 is_ipv6_default_route_found = False
5212 if routes["ipv4"] in ipv4_routes.keys():
5213 rib_ipv4_nxt_hops = []
5214 ipv4_default_route = routes["ipv4"]
5215 nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"])
5216 for index in range(nxt_hop_count):
5217 rib_ipv4_nxt_hops.append(
5218 ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"]
5219 )
5220
5221 if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops:
5222 is_ipv4_default_route_found = True
5223 logger.info(
5224 "{} default route with next hop {} is found in FIB ".format(
5225 ipv4_default_route, expected_nexthop
5226 )
5227 )
5228 else:
5229 logger.error(
5230 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5231 ipv4_default_route, expected_nexthop
5232 )
5233 )
5234 return False
5235
5236 if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys():
5237 rib_ipv6_nxt_hops = []
5238 if "::/0" in ipv6_routes.keys():
5239 ipv6_default_route = "::/0"
5240 elif routes["ipv6"] in ipv6_routes.keys():
5241 ipv6_default_route = routes["ipv6"]
5242
5243 nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"])
5244 for index in range(nxt_hop_count):
5245 rib_ipv6_nxt_hops.append(
5246 ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"]
5247 )
5248
5249 if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops:
5250 is_ipv6_default_route_found = True
5251 logger.info(
5252 "{} default route with next hop {} is found in FIB ".format(
5253 ipv6_default_route, expected_nexthop
5254 )
5255 )
5256 else:
5257 logger.error(
5258 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5259 ipv6_default_route, expected_nexthop
5260 )
5261 )
5262 return False
5263
5264 if is_ipv4_default_route_found and is_ipv6_default_route_found:
5265 return True
5266 else:
5267 logger.error(
5268 "Default Route for ipv4 and ipv6 address family is not found in FIB "
5269 )
5270 return False
5271
5272
5273 @retry(retry_timeout=5)
5274 def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
5275 """
5276 APi is verifies the the routes that are advertised from dut to peer
5277
5278 command used :
5279 "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
5280 "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
5281
5282 dut : Device Under Tests
5283 Peer : Peer on which the routs is expected
5284 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5285 expected_routes
5286
5287 returns: True / False
5288
5289 """
5290 result = False
5291 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5292 tgen = get_topogen()
5293
5294 peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
5295 peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
5296
5297 for router, rnode in tgen.routers().items():
5298 if router == dut:
5299 ipv4_receieved_routes = run_frr_cmd(
5300 rnode,
5301 "sh ip bgp neighbor {} advertised-routes json".format(
5302 peer_ipv4_neighbor_ip
5303 ),
5304 isjson=True,
5305 )
5306 ipv6_receieved_routes = run_frr_cmd(
5307 rnode,
5308 "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
5309 peer_ipv6_neighbor_ip
5310 ),
5311 isjson=True,
5312 )
5313 ipv4_route_count = 0
5314 ipv6_route_count = 0
5315 if ipv4_receieved_routes:
5316 for index in range(len(expected_routes["ipv4"])):
5317 if (
5318 expected_routes["ipv4"][index]["network"]
5319 in ipv4_receieved_routes["advertisedRoutes"].keys()
5320 ):
5321 ipv4_route_count += 1
5322 logger.info(
5323 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5324 dut, expected_routes["ipv4"][index]["network"], peer
5325 )
5326 )
5327
5328 elif (
5329 expected_routes["ipv4"][index]["network"]
5330 in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"]
5331 ):
5332 ipv4_route_count += 1
5333 logger.info(
5334 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5335 dut, expected_routes["ipv4"][index]["network"], peer
5336 )
5337 )
5338
5339 else:
5340 logger.error(
5341 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5342 dut, expected_routes["ipv4"][index]["network"], peer
5343 )
5344 )
5345 else:
5346 logger.error(ipv4_receieved_routes)
5347 logger.error(
5348 "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
5349 dut, peer
5350 )
5351 )
5352 return False
5353
5354 if ipv6_receieved_routes:
5355 for index in range(len(expected_routes["ipv6"])):
5356 if (
5357 expected_routes["ipv6"][index]["network"]
5358 in ipv6_receieved_routes["advertisedRoutes"].keys()
5359 ):
5360 ipv6_route_count += 1
5361 logger.info(
5362 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5363 dut, expected_routes["ipv6"][index]["network"], peer
5364 )
5365 )
5366 elif (
5367 expected_routes["ipv6"][index]["network"]
5368 in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"]
5369 ):
5370 ipv6_route_count += 1
5371 logger.info(
5372 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5373 dut, expected_routes["ipv6"][index]["network"], peer
5374 )
5375 )
5376 else:
5377 logger.error(
5378 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5379 dut, expected_routes["ipv6"][index]["network"], peer
5380 )
5381 )
5382 else:
5383 logger.error(ipv6_receieved_routes)
5384 logger.error(
5385 "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
5386 dut, peer
5387 )
5388 )
5389 return False
5390
5391 if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
5392 expected_routes["ipv6"]
5393 ):
5394 return True
5395 else:
5396 logger.error(
5397 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5398 expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
5399 )
5400 )
5401 logger.error(
5402 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5403 expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
5404 )
5405 )
5406 return False
5407
5408
5409 @retry(retry_timeout=5)
5410 def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
5411 """
5412 API to verify the bgp received routes
5413
5414 commad used :
5415 =============
5416 show ip bgp neighbor <x.x.x.x> received-routes
5417 show ip bgp ipv6 unicast neighbor <x::x> received-routes
5418
5419 params
5420 =======
5421 dut : Device Under Tests
5422 Peer : Peer on which the routs is expected
5423 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5424 expected_routes
5425
5426 returns:
5427 ========
5428 True / False
5429 """
5430 result = False
5431 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5432 tgen = get_topogen()
5433
5434 peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
5435 peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
5436
5437 logger.info("Enabling Soft configuration to neighbor INBOUND ")
5438 neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip}
5439 result = configure_bgp_soft_configuration(
5440 tgen, dut, neigbor_dict, direction="inbound"
5441 )
5442 assert (
5443 result is True
5444 ), " Failed to configure the soft configuration \n Error: {}".format(result)
5445
5446 """sleep of 10 sec is required to get the routes on peer after soft configuration"""
5447 sleep(10)
5448 for router, rnode in tgen.routers().items():
5449 if router == dut:
5450 ipv4_receieved_routes = run_frr_cmd(
5451 rnode,
5452 "sh ip bgp neighbor {} received-routes json".format(
5453 peer_ipv4_neighbor_ip
5454 ),
5455 isjson=True,
5456 )
5457 ipv6_receieved_routes = run_frr_cmd(
5458 rnode,
5459 "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
5460 peer_ipv6_neighbor_ip
5461 ),
5462 isjson=True,
5463 )
5464 ipv4_route_count = 0
5465 ipv6_route_count = 0
5466 if ipv4_receieved_routes:
5467 for index in range(len(expected_routes["ipv4"])):
5468 if (
5469 expected_routes["ipv4"][index]["network"]
5470 in ipv4_receieved_routes["receivedRoutes"].keys()
5471 ):
5472 ipv4_route_count += 1
5473 logger.info(
5474 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5475 dut, expected_routes["ipv4"][index]["network"], peer
5476 )
5477 )
5478 else:
5479 logger.error(
5480 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5481 dut, expected_routes["ipv4"][index]["network"], peer
5482 )
5483 )
5484 else:
5485 logger.error(ipv4_receieved_routes)
5486 logger.error(
5487 "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
5488 dut, peer
5489 )
5490 )
5491 return False
5492
5493 if ipv6_receieved_routes:
5494 for index in range(len(expected_routes["ipv6"])):
5495 if (
5496 expected_routes["ipv6"][index]["network"]
5497 in ipv6_receieved_routes["receivedRoutes"].keys()
5498 ):
5499 ipv6_route_count += 1
5500 logger.info(
5501 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5502 dut, expected_routes["ipv6"][index]["network"], peer
5503 )
5504 )
5505 else:
5506 logger.error(
5507 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5508 dut, expected_routes["ipv6"][index]["network"], peer
5509 )
5510 )
5511 else:
5512 logger.error(ipv6_receieved_routes)
5513 logger.error(
5514 "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
5515 dut, peer
5516 )
5517 )
5518 return False
5519
5520 if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
5521 expected_routes["ipv6"]
5522 ):
5523 return True
5524 else:
5525 logger.error(
5526 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5527 expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
5528 )
5529 )
5530 logger.error(
5531 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5532 expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
5533 )
5534 )
5535 return False
5536
5537
5538 def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction):
5539 """
5540 Api to configure the bgp soft configuration to show the received routes from peer
5541 params
5542 ======
5543 dut : device under test route on which the sonfiguration to be applied
5544 neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
5545 direction : Directionon which it should be applied in/out
5546
5547 returns:
5548 ========
5549 boolean
5550 """
5551 logger.info("Enabling Soft configuration to neighbor INBOUND ")
5552 local_as = get_dut_as_number(tgen, dut)
5553 ipv4_neighbor = neighbor_dict["ipv4"]
5554 ipv6_neighbor = neighbor_dict["ipv6"]
5555 direction = direction.lower()
5556 if ipv4_neighbor and ipv4_neighbor:
5557 raw_config = {
5558 dut: {
5559 "raw_config": [
5560 "router bgp {}".format(local_as),
5561 "address-family ipv4 unicast",
5562 "neighbor {} soft-reconfiguration {} ".format(
5563 ipv4_neighbor, direction
5564 ),
5565 "exit-address-family",
5566 "address-family ipv6 unicast",
5567 "neighbor {} soft-reconfiguration {} ".format(
5568 ipv6_neighbor, direction
5569 ),
5570 "exit-address-family",
5571 ]
5572 }
5573 }
5574 result = apply_raw_config(tgen, raw_config)
5575 logger.info(
5576 "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(
5577 dut, neighbor_dict
5578 )
5579 )
5580 return True