]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/bgp.py
doc: Add `show ipv6 rpf X:X::X:X` command to docs
[mirror_frr.git] / tests / topotests / lib / bgp.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 import ipaddress
22 import sys
23 import traceback
24 from copy import deepcopy
25 from time import sleep
26
27 # Import common_config to use commomnly used APIs
28 from lib.common_config import (
29 create_common_configurations,
30 FRRCFG_FILE,
31 InvalidCLIError,
32 apply_raw_config,
33 check_address_types,
34 find_interface_with_greater_ip,
35 generate_ips,
36 get_frr_ipv6_linklocal,
37 retry,
38 run_frr_cmd,
39 validate_ip_address,
40 )
41 from lib.topogen import get_topogen
42 from lib.topolog import logger
43 from lib.topotest import frr_unicode
44
45 from lib import topotest
46
47
48 def create_router_bgp(tgen, topo=None, input_dict=None, build=False, load_config=True):
49 """
50 API to configure bgp on router
51
52 Parameters
53 ----------
54 * `tgen` : Topogen object
55 * `topo` : json file data
56 * `input_dict` : Input dict data, required when configuring from testcase
57 * `build` : Only for initial setup phase this is set as True.
58
59 Usage
60 -----
61 input_dict = {
62 "r1": {
63 "bgp": {
64 "local_as": "200",
65 "router_id": "22.22.22.22",
66 "graceful-restart": {
67 "graceful-restart": True,
68 "preserve-fw-state": True,
69 "timer": {
70 "restart-time": 30,
71 "rib-stale-time": 30,
72 "select-defer-time": 30,
73 }
74 },
75 "address_family": {
76 "ipv4": {
77 "unicast": {
78 "default_originate":{
79 "neighbor":"R2",
80 "add_type":"lo"
81 "route_map":"rm"
82
83 },
84 "redistribute": [{
85 "redist_type": "static",
86 "attribute": {
87 "metric" : 123
88 }
89 },
90 {"redist_type": "connected"}
91 ],
92 "advertise_networks": [
93 {
94 "network": "20.0.0.0/32",
95 "no_of_network": 10
96 },
97 {
98 "network": "30.0.0.0/32",
99 "no_of_network": 10
100 }
101 ],
102 "neighbor": {
103 "r3": {
104 "keepalivetimer": 60,
105 "holddowntimer": 180,
106 "dest_link": {
107 "r4": {
108 "allowas-in": {
109 "number_occurences": 2
110 },
111 "prefix_lists": [
112 {
113 "name": "pf_list_1",
114 "direction": "in"
115 }
116 ],
117 "route_maps": [{
118 "name": "RMAP_MED_R3",
119 "direction": "in"
120 }],
121 "next_hop_self": True
122 },
123 "r1": {"graceful-restart-helper": True}
124 }
125 }
126 }
127 }
128 }
129 }
130 }
131 }
132 }
133
134
135 Returns
136 -------
137 True or False
138 """
139 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
140 result = False
141
142 if topo is None:
143 topo = tgen.json_topo
144
145 # Flag is used when testing ipv6 over ipv4 or vice-versa
146 afi_test = False
147
148 if not input_dict:
149 input_dict = deepcopy(topo)
150 else:
151 topo = topo["routers"]
152 input_dict = deepcopy(input_dict)
153
154 config_data_dict = {}
155
156 for router in input_dict.keys():
157 if "bgp" not in input_dict[router]:
158 logger.debug("Router %s: 'bgp' not present in input_dict", router)
159 continue
160
161 bgp_data_list = input_dict[router]["bgp"]
162
163 if type(bgp_data_list) is not list:
164 bgp_data_list = [bgp_data_list]
165
166 config_data = []
167
168 for bgp_data in bgp_data_list:
169 data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
170 if data_all_bgp:
171 bgp_addr_data = bgp_data.setdefault("address_family", {})
172
173 if not bgp_addr_data:
174 logger.debug(
175 "Router %s: 'address_family' not present in "
176 "input_dict for BGP",
177 router,
178 )
179 else:
180
181 ipv4_data = bgp_addr_data.setdefault("ipv4", {})
182 ipv6_data = bgp_addr_data.setdefault("ipv6", {})
183 l2vpn_data = bgp_addr_data.setdefault("l2vpn", {})
184
185 neigh_unicast = (
186 True
187 if ipv4_data.setdefault("unicast", {})
188 or ipv6_data.setdefault("unicast", {})
189 else False
190 )
191
192 l2vpn_evpn = True if l2vpn_data.setdefault("evpn", {}) else False
193
194 if neigh_unicast:
195 data_all_bgp = __create_bgp_unicast_neighbor(
196 tgen,
197 topo,
198 bgp_data,
199 router,
200 afi_test,
201 config_data=data_all_bgp,
202 )
203
204 if l2vpn_evpn:
205 data_all_bgp = __create_l2vpn_evpn_address_family(
206 tgen, topo, bgp_data, router, config_data=data_all_bgp
207 )
208 if data_all_bgp:
209 config_data.extend(data_all_bgp)
210
211 if config_data:
212 config_data_dict[router] = config_data
213
214 try:
215 result = create_common_configurations(
216 tgen, config_data_dict, "bgp", build, load_config
217 )
218 except InvalidCLIError:
219 logger.error("create_router_bgp", exc_info=True)
220 result = False
221
222 logger.debug("Exiting lib API: create_router_bgp()")
223 return result
224
225
226 def __create_bgp_global(tgen, input_dict, router, build=False):
227 """
228 Helper API to create bgp global configuration.
229
230 Parameters
231 ----------
232 * `tgen` : Topogen object
233 * `input_dict` : Input dict data, required when configuring from testcase
234 * `router` : router id to be configured.
235 * `build` : Only for initial setup phase this is set as True.
236
237 Returns
238 -------
239 list of config commands
240 """
241
242 result = False
243 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
244
245 bgp_data = input_dict
246 del_bgp_action = bgp_data.setdefault("delete", False)
247
248 config_data = []
249
250 if "local_as" not in bgp_data and build:
251 logger.debug(
252 "Router %s: 'local_as' not present in input_dict" "for BGP", router
253 )
254 return config_data
255
256 local_as = bgp_data.setdefault("local_as", "")
257 cmd = "router bgp {}".format(local_as)
258 vrf_id = bgp_data.setdefault("vrf", None)
259 if vrf_id:
260 cmd = "{} vrf {}".format(cmd, vrf_id)
261
262 if del_bgp_action:
263 cmd = "no {}".format(cmd)
264 config_data.append(cmd)
265
266 return config_data
267
268 config_data.append(cmd)
269 config_data.append("no bgp ebgp-requires-policy")
270
271 router_id = bgp_data.setdefault("router_id", None)
272 del_router_id = bgp_data.setdefault("del_router_id", False)
273 if del_router_id:
274 config_data.append("no bgp router-id")
275 if router_id:
276 config_data.append("bgp router-id {}".format(router_id))
277
278 config_data.append("bgp log-neighbor-changes")
279 config_data.append("no bgp network import-check")
280 bgp_peer_grp_data = bgp_data.setdefault("peer-group", {})
281
282 if "peer-group" in bgp_data and bgp_peer_grp_data:
283 peer_grp_data = __create_bgp_peer_group(tgen, bgp_peer_grp_data, router)
284 config_data.extend(peer_grp_data)
285
286 bst_path = bgp_data.setdefault("bestpath", None)
287 if bst_path:
288 if "aspath" in bst_path:
289 if "delete" in bst_path:
290 config_data.append(
291 "no bgp bestpath as-path {}".format(bst_path["aspath"])
292 )
293 else:
294 config_data.append("bgp bestpath as-path {}".format(bst_path["aspath"]))
295
296 if "graceful-restart" in bgp_data:
297 graceful_config = bgp_data["graceful-restart"]
298
299 graceful_restart = graceful_config.setdefault("graceful-restart", None)
300
301 graceful_restart_disable = graceful_config.setdefault(
302 "graceful-restart-disable", None
303 )
304
305 preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None)
306
307 disable_eor = graceful_config.setdefault("disable-eor", None)
308
309 if graceful_restart == False:
310 cmd = "no bgp graceful-restart"
311 if graceful_restart:
312 cmd = "bgp graceful-restart"
313
314 if graceful_restart is not None:
315 config_data.append(cmd)
316
317 if graceful_restart_disable == False:
318 cmd = "no bgp graceful-restart-disable"
319 if graceful_restart_disable:
320 cmd = "bgp graceful-restart-disable"
321
322 if graceful_restart_disable is not None:
323 config_data.append(cmd)
324
325 if preserve_fw_state == False:
326 cmd = "no bgp graceful-restart preserve-fw-state"
327 if preserve_fw_state:
328 cmd = "bgp graceful-restart preserve-fw-state"
329
330 if preserve_fw_state is not None:
331 config_data.append(cmd)
332
333 if disable_eor == False:
334 cmd = "no bgp graceful-restart disable-eor"
335 if disable_eor:
336 cmd = "bgp graceful-restart disable-eor"
337
338 if disable_eor is not None:
339 config_data.append(cmd)
340
341 if "timer" in bgp_data["graceful-restart"]:
342 timer = bgp_data["graceful-restart"]["timer"]
343
344 if "delete" in timer:
345 del_action = timer["delete"]
346 else:
347 del_action = False
348
349 for rs_timer, value in timer.items():
350 rs_timer_value = timer.setdefault(rs_timer, None)
351
352 if rs_timer_value and rs_timer != "delete":
353 cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value)
354
355 if del_action:
356 cmd = "no {}".format(cmd)
357
358 config_data.append(cmd)
359
360 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
361 return config_data
362
363
364 def __create_bgp_unicast_neighbor(
365 tgen, topo, input_dict, router, afi_test, config_data=None
366 ):
367 """
368 Helper API to create configuration for address-family unicast
369
370 Parameters
371 ----------
372 * `tgen` : Topogen object
373 * `topo` : json file data
374 * `input_dict` : Input dict data, required when configuring from testcase
375 * `router` : router id to be configured.
376 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
377 * `build` : Only for initial setup phase this is set as True.
378 """
379
380 result = False
381 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
382
383 add_neigh = True
384 bgp_data = input_dict
385 if "router bgp" in config_data:
386 add_neigh = False
387
388 bgp_data = input_dict["address_family"]
389
390 for addr_type, addr_dict in bgp_data.items():
391 if not addr_dict:
392 continue
393
394 if not check_address_types(addr_type) and not afi_test:
395 continue
396
397 addr_data = addr_dict["unicast"]
398 if addr_data:
399 config_data.append("address-family {} unicast".format(addr_type))
400
401 advertise_network = addr_data.setdefault("advertise_networks", [])
402 for advertise_network_dict in advertise_network:
403 network = advertise_network_dict["network"]
404 if type(network) is not list:
405 network = [network]
406
407 if "no_of_network" in advertise_network_dict:
408 no_of_network = advertise_network_dict["no_of_network"]
409 else:
410 no_of_network = 1
411
412 del_action = advertise_network_dict.setdefault("delete", False)
413
414 # Generating IPs for verification
415 network_list = generate_ips(network, no_of_network)
416 for ip in network_list:
417 ip = str(ipaddress.ip_network(frr_unicode(ip)))
418
419 cmd = "network {}".format(ip)
420 if del_action:
421 cmd = "no {}".format(cmd)
422
423 config_data.append(cmd)
424
425 import_cmd = addr_data.setdefault("import", {})
426 if import_cmd:
427 try:
428 if import_cmd["delete"]:
429 config_data.append("no import vrf {}".format(import_cmd["vrf"]))
430 except KeyError:
431 config_data.append("import vrf {}".format(import_cmd["vrf"]))
432
433 max_paths = addr_data.setdefault("maximum_paths", {})
434 if max_paths:
435 ibgp = max_paths.setdefault("ibgp", None)
436 ebgp = max_paths.setdefault("ebgp", None)
437 del_cmd = max_paths.setdefault("delete", False)
438 if ibgp:
439 if del_cmd:
440 config_data.append("no maximum-paths ibgp {}".format(ibgp))
441 else:
442 config_data.append("maximum-paths ibgp {}".format(ibgp))
443 if ebgp:
444 if del_cmd:
445 config_data.append("no maximum-paths {}".format(ebgp))
446 else:
447 config_data.append("maximum-paths {}".format(ebgp))
448
449 aggregate_addresses = addr_data.setdefault("aggregate_address", [])
450 for aggregate_address in aggregate_addresses:
451 network = aggregate_address.setdefault("network", None)
452 if not network:
453 logger.debug(
454 "Router %s: 'network' not present in " "input_dict for BGP", router
455 )
456 else:
457 cmd = "aggregate-address {}".format(network)
458
459 as_set = aggregate_address.setdefault("as_set", False)
460 summary = aggregate_address.setdefault("summary", False)
461 del_action = aggregate_address.setdefault("delete", False)
462 if as_set:
463 cmd = "{} as-set".format(cmd)
464 if summary:
465 cmd = "{} summary".format(cmd)
466
467 if del_action:
468 cmd = "no {}".format(cmd)
469
470 config_data.append(cmd)
471
472 redistribute_data = addr_data.setdefault("redistribute", {})
473 if redistribute_data:
474 for redistribute in redistribute_data:
475 if "redist_type" not in redistribute:
476 logger.debug(
477 "Router %s: 'redist_type' not present in " "input_dict", router
478 )
479 else:
480 cmd = "redistribute {}".format(redistribute["redist_type"])
481 redist_attr = redistribute.setdefault("attribute", None)
482 if redist_attr:
483 if type(redist_attr) is dict:
484 for key, value in redist_attr.items():
485 cmd = "{} {} {}".format(cmd, key, value)
486 else:
487 cmd = "{} {}".format(cmd, redist_attr)
488
489 del_action = redistribute.setdefault("delete", False)
490 if del_action:
491 cmd = "no {}".format(cmd)
492 config_data.append(cmd)
493
494 admin_dist_data = addr_data.setdefault("distance", {})
495 if admin_dist_data:
496 if len(admin_dist_data) < 2:
497 logger.debug(
498 "Router %s: pass the admin distance values for "
499 "ebgp, ibgp and local routes",
500 router,
501 )
502 cmd = "distance bgp {} {} {}".format(
503 admin_dist_data["ebgp"],
504 admin_dist_data["ibgp"],
505 admin_dist_data["local"],
506 )
507
508 del_action = admin_dist_data.setdefault("delete", False)
509 if del_action:
510 cmd = "no distance bgp"
511 config_data.append(cmd)
512
513 import_vrf_data = addr_data.setdefault("import", {})
514 if import_vrf_data:
515 cmd = "import vrf {}".format(import_vrf_data["vrf"])
516
517 del_action = import_vrf_data.setdefault("delete", False)
518 if del_action:
519 cmd = "no {}".format(cmd)
520 config_data.append(cmd)
521
522 if "neighbor" in addr_data:
523 neigh_data = __create_bgp_neighbor(
524 topo, input_dict, router, addr_type, add_neigh
525 )
526 config_data.extend(neigh_data)
527 # configure default originate
528 if "default_originate" in addr_data:
529 default_originate_config = __create_bgp_default_originate_neighbor(
530 topo, input_dict, router, addr_type, add_neigh
531 )
532 config_data.extend(default_originate_config)
533
534 for addr_type, addr_dict in bgp_data.items():
535 if not addr_dict or not check_address_types(addr_type):
536 continue
537
538 addr_data = addr_dict["unicast"]
539 if "neighbor" in addr_data:
540 neigh_addr_data = __create_bgp_unicast_address_family(
541 topo, input_dict, router, addr_type, add_neigh
542 )
543
544 config_data.extend(neigh_addr_data)
545
546 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
547 return config_data
548
549
550 def __create_bgp_default_originate_neighbor(
551 topo, input_dict, router, addr_type, add_neigh=True
552 ):
553 """
554 Helper API to create neighbor default - originate configuration
555
556 Parameters
557 ----------
558 * `tgen` : Topogen object
559 * `topo` : json file data
560 * `input_dict` : Input dict data, required when configuring from testcase
561 * `router` : router id to be configured
562 """
563 tgen = get_topogen()
564 config_data = []
565 logger.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
566
567 bgp_data = input_dict["address_family"]
568 neigh_data = bgp_data[addr_type]["unicast"]["default_originate"]
569 for name, peer_dict in neigh_data.items():
570 nh_details = topo[name]
571
572 neighbor_ip = None
573 if "dest-link" in neigh_data[name]:
574 dest_link = neigh_data[name]["dest-link"]
575 neighbor_ip = nh_details["links"][dest_link][addr_type].split("/")[0]
576 elif "add_type" in neigh_data[name]:
577 add_type = neigh_data[name]["add_type"]
578 neighbor_ip = nh_details["links"][add_type][addr_type].split("/")[0]
579 else:
580 neighbor_ip = nh_details["links"][router][addr_type].split("/")[0]
581
582 config_data.append("address-family {} unicast".format(addr_type))
583 if "route_map" in peer_dict:
584 route_map = peer_dict["route_map"]
585 if "delete" in peer_dict:
586 if peer_dict["delete"]:
587 config_data.append(
588 "no neighbor {} default-originate route-map {}".format(
589 neighbor_ip, route_map
590 )
591 )
592 else:
593 config_data.append(
594 " neighbor {} default-originate route-map {}".format(
595 neighbor_ip, route_map
596 )
597 )
598 else:
599 config_data.append(
600 " neighbor {} default-originate route-map {}".format(
601 neighbor_ip, route_map
602 )
603 )
604
605 else:
606 if "delete" in peer_dict:
607 if peer_dict["delete"]:
608 config_data.append(
609 "no neighbor {} default-originate".format(neighbor_ip)
610 )
611 else:
612 config_data.append(
613 "neighbor {} default-originate".format(neighbor_ip)
614 )
615 else:
616 config_data.append("neighbor {} default-originate".format(neighbor_ip))
617
618 logger.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
619 return config_data
620
621
622 def __create_l2vpn_evpn_address_family(
623 tgen, topo, input_dict, router, config_data=None
624 ):
625 """
626 Helper API to create configuration for l2vpn evpn address-family
627
628 Parameters
629 ----------
630 * `tgen` : Topogen object
631 * `topo` : json file data
632 * `input_dict` : Input dict data, required when configuring
633 from testcase
634 * `router` : router id to be configured.
635 * `build` : Only for initial setup phase this is set as True.
636 """
637
638 result = False
639
640 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
641
642 bgp_data = input_dict["address_family"]
643
644 for family_type, family_dict in bgp_data.items():
645 if family_type != "l2vpn":
646 continue
647
648 family_data = family_dict["evpn"]
649 if family_data:
650 config_data.append("address-family l2vpn evpn")
651
652 advertise_data = family_data.setdefault("advertise", {})
653 neighbor_data = family_data.setdefault("neighbor", {})
654 advertise_all_vni_data = family_data.setdefault("advertise-all-vni", None)
655 rd_data = family_data.setdefault("rd", None)
656 no_rd_data = family_data.setdefault("no rd", False)
657 route_target_data = family_data.setdefault("route-target", {})
658
659 if advertise_data:
660 for address_type, unicast_type in advertise_data.items():
661
662 if type(unicast_type) is dict:
663 for key, value in unicast_type.items():
664 cmd = "advertise {} {}".format(address_type, key)
665
666 if value:
667 route_map = value.setdefault("route-map", {})
668 advertise_del_action = value.setdefault("delete", None)
669
670 if route_map:
671 cmd = "{} route-map {}".format(cmd, route_map)
672
673 if advertise_del_action:
674 cmd = "no {}".format(cmd)
675
676 config_data.append(cmd)
677
678 if neighbor_data:
679 for neighbor, neighbor_data in neighbor_data.items():
680 ipv4_neighbor = neighbor_data.setdefault("ipv4", {})
681 ipv6_neighbor = neighbor_data.setdefault("ipv6", {})
682
683 if ipv4_neighbor:
684 for neighbor_name, action in ipv4_neighbor.items():
685 neighbor_ip = topo[neighbor]["links"][neighbor_name][
686 "ipv4"
687 ].split("/")[0]
688
689 if type(action) is dict:
690 next_hop_self = action.setdefault("next_hop_self", None)
691 route_maps = action.setdefault("route_maps", {})
692
693 if next_hop_self is not None:
694 if next_hop_self is True:
695 config_data.append(
696 "neighbor {} "
697 "next-hop-self".format(neighbor_ip)
698 )
699 elif next_hop_self is False:
700 config_data.append(
701 "no neighbor {} "
702 "next-hop-self".format(neighbor_ip)
703 )
704
705 if route_maps:
706 for route_map in route_maps:
707 name = route_map.setdefault("name", {})
708 direction = route_map.setdefault("direction", "in")
709 del_action = route_map.setdefault("delete", False)
710
711 if not name:
712 logger.info(
713 "Router %s: 'name' "
714 "not present in "
715 "input_dict for BGP "
716 "neighbor route name",
717 router,
718 )
719 else:
720 cmd = "neighbor {} route-map {} " "{}".format(
721 neighbor_ip, name, direction
722 )
723
724 if del_action:
725 cmd = "no {}".format(cmd)
726
727 config_data.append(cmd)
728
729 else:
730 if action == "activate":
731 cmd = "neighbor {} activate".format(neighbor_ip)
732 elif action == "deactivate":
733 cmd = "no neighbor {} activate".format(neighbor_ip)
734
735 config_data.append(cmd)
736
737 if ipv6_neighbor:
738 for neighbor_name, action in ipv4_neighbor.items():
739 neighbor_ip = topo[neighbor]["links"][neighbor_name][
740 "ipv6"
741 ].split("/")[0]
742 if action == "activate":
743 cmd = "neighbor {} activate".format(neighbor_ip)
744 elif action == "deactivate":
745 cmd = "no neighbor {} activate".format(neighbor_ip)
746
747 config_data.append(cmd)
748
749 if advertise_all_vni_data == True:
750 cmd = "advertise-all-vni"
751 config_data.append(cmd)
752 elif advertise_all_vni_data == False:
753 cmd = "no advertise-all-vni"
754 config_data.append(cmd)
755
756 if rd_data:
757 cmd = "rd {}".format(rd_data)
758 config_data.append(cmd)
759
760 if no_rd_data:
761 cmd = "no rd {}".format(no_rd_data)
762 config_data.append(cmd)
763
764 if route_target_data:
765 for rt_type, rt_dict in route_target_data.items():
766 for _rt_dict in rt_dict:
767 rt_value = _rt_dict.setdefault("value", None)
768 del_rt = _rt_dict.setdefault("delete", None)
769
770 if rt_value:
771 cmd = "route-target {} {}".format(rt_type, rt_value)
772 if del_rt:
773 cmd = "no {}".format(cmd)
774
775 config_data.append(cmd)
776
777 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
778
779 return config_data
780
781
782 def __create_bgp_peer_group(topo, input_dict, router):
783 """
784 Helper API to create neighbor specific configuration
785
786 Parameters
787 ----------
788 * `topo` : json file data
789 * `input_dict` : Input dict data, required when configuring from testcase
790 * `router` : router id to be configured
791 """
792 config_data = []
793 logger.debug("Entering lib API: __create_bgp_peer_group()")
794
795 for grp, grp_dict in input_dict.items():
796 config_data.append("neighbor {} peer-group".format(grp))
797 neigh_cxt = "neighbor {} ".format(grp)
798 update_source = grp_dict.setdefault("update-source", None)
799 remote_as = grp_dict.setdefault("remote-as", None)
800 capability = grp_dict.setdefault("capability", None)
801 if update_source:
802 config_data.append("{} update-source {}".format(neigh_cxt, update_source))
803
804 if remote_as:
805 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
806
807 if capability:
808 config_data.append("{} capability {}".format(neigh_cxt, capability))
809
810 logger.debug("Exiting lib API: __create_bgp_peer_group()")
811 return config_data
812
813
814 def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
815 """
816 Helper API to create neighbor specific configuration
817
818 Parameters
819 ----------
820 * `tgen` : Topogen object
821 * `topo` : json file data
822 * `input_dict` : Input dict data, required when configuring from testcase
823 * `router` : router id to be configured
824 """
825 config_data = []
826 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
827 tgen = get_topogen()
828 bgp_data = input_dict["address_family"]
829 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
830 global_connect = input_dict.get("connecttimer", 5)
831
832 for name, peer_dict in neigh_data.items():
833 remote_as = 0
834 for dest_link, peer in peer_dict["dest_link"].items():
835 local_asn = peer.setdefault("local_asn", {})
836 if local_asn:
837 local_as = local_asn.setdefault("local_as", 0)
838 remote_as = local_asn.setdefault("remote_as", 0)
839 no_prepend = local_asn.setdefault("no_prepend", False)
840 replace_as = local_asn.setdefault("replace_as", False)
841 if local_as == remote_as:
842 assert False is True, (
843 " Configuration Error : Router must not have "
844 "same AS-NUMBER as Local AS NUMBER"
845 )
846 nh_details = topo[name]
847
848 if "vrfs" in topo[router] or type(nh_details["bgp"]) is list:
849 for vrf_data in nh_details["bgp"]:
850 if "vrf" in nh_details["links"][dest_link] and "vrf" in vrf_data:
851 if nh_details["links"][dest_link]["vrf"] == vrf_data["vrf"]:
852 if not remote_as:
853 remote_as = vrf_data["local_as"]
854 break
855 else:
856 if "vrf" not in vrf_data:
857 if not remote_as:
858 remote_as = vrf_data["local_as"]
859 break
860 else:
861 if not remote_as:
862 remote_as = nh_details["bgp"]["local_as"]
863
864 update_source = None
865
866 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
867 ip_addr = nh_details["links"][dest_link]["peer-interface"]
868 elif "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
869 intf = topo[name]["links"][dest_link]["interface"]
870 ip_addr = get_frr_ipv6_linklocal(tgen, name, intf)
871 elif dest_link in nh_details["links"].keys():
872 try:
873 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
874 except KeyError:
875 intf = topo[name]["links"][dest_link]["interface"]
876 ip_addr = get_frr_ipv6_linklocal(tgen, name, intf)
877 if "delete" in peer and peer["delete"]:
878 neigh_cxt = "no neighbor {}".format(ip_addr)
879 config_data.append("{}".format(neigh_cxt))
880 return config_data
881 else:
882 neigh_cxt = "neighbor {}".format(ip_addr)
883
884 if "peer-group" in peer:
885 config_data.append(
886 "neighbor {} interface peer-group {}".format(
887 ip_addr, peer["peer-group"]
888 )
889 )
890
891 # Loopback interface
892 if "source_link" in peer:
893 if peer["source_link"] == "lo":
894 update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
895 else:
896 update_source = topo[router]["links"][peer["source_link"]][
897 "interface"
898 ]
899 if "peer-group" not in peer:
900 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
901 config_data.append(
902 "{} interface remote-as {}".format(neigh_cxt, remote_as)
903 )
904 elif add_neigh:
905 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
906
907 if local_asn and local_as:
908 cmd = "{} local-as {}".format(neigh_cxt, local_as)
909 if no_prepend:
910 cmd = "{} no-prepend".format(cmd)
911 if replace_as:
912 cmd = "{} replace-as".format(cmd)
913 config_data.append("{}".format(cmd))
914
915 if addr_type == "ipv6":
916 config_data.append("address-family ipv6 unicast")
917 config_data.append("{} activate".format(neigh_cxt))
918
919 if "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
920 config_data.append(
921 "{} update-source {}".format(
922 neigh_cxt, nh_details["links"][dest_link]["peer-interface"]
923 )
924 )
925 config_data.append(
926 "{} interface {}".format(
927 neigh_cxt, nh_details["links"][dest_link]["peer-interface"]
928 )
929 )
930
931 disable_connected = peer.setdefault("disable_connected_check", False)
932 connect = peer.get("connecttimer", global_connect)
933 keep_alive = peer.setdefault("keepalivetimer", 3)
934 hold_down = peer.setdefault("holddowntimer", 10)
935 password = peer.setdefault("password", None)
936 no_password = peer.setdefault("no_password", None)
937 capability = peer.setdefault("capability", None)
938 max_hop_limit = peer.setdefault("ebgp_multihop", 1)
939
940 graceful_restart = peer.setdefault("graceful-restart", None)
941 graceful_restart_helper = peer.setdefault("graceful-restart-helper", None)
942 graceful_restart_disable = peer.setdefault("graceful-restart-disable", None)
943 if capability:
944 config_data.append("{} capability {}".format(neigh_cxt, capability))
945
946 if update_source:
947 config_data.append(
948 "{} update-source {}".format(neigh_cxt, update_source)
949 )
950 if disable_connected:
951 config_data.append(
952 "{} disable-connected-check".format(disable_connected)
953 )
954 if update_source:
955 config_data.append(
956 "{} update-source {}".format(neigh_cxt, update_source)
957 )
958 if int(keep_alive) != 60 and int(hold_down) != 180:
959 config_data.append(
960 "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
961 )
962 if int(connect) != 120:
963 config_data.append("{} timers connect {}".format(neigh_cxt, connect))
964
965 if graceful_restart:
966 config_data.append("{} graceful-restart".format(neigh_cxt))
967 elif graceful_restart == False:
968 config_data.append("no {} graceful-restart".format(neigh_cxt))
969
970 if graceful_restart_helper:
971 config_data.append("{} graceful-restart-helper".format(neigh_cxt))
972 elif graceful_restart_helper == False:
973 config_data.append("no {} graceful-restart-helper".format(neigh_cxt))
974
975 if graceful_restart_disable:
976 config_data.append("{} graceful-restart-disable".format(neigh_cxt))
977 elif graceful_restart_disable == False:
978 config_data.append("no {} graceful-restart-disable".format(neigh_cxt))
979
980 if password:
981 config_data.append("{} password {}".format(neigh_cxt, password))
982
983 if no_password:
984 config_data.append("no {} password {}".format(neigh_cxt, no_password))
985
986 if max_hop_limit > 1:
987 config_data.append(
988 "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
989 )
990 config_data.append("{} enforce-multihop".format(neigh_cxt))
991
992 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
993 return config_data
994
995
996 def __create_bgp_unicast_address_family(
997 topo, input_dict, router, addr_type, add_neigh=True
998 ):
999 """
1000 API prints bgp global config to bgp_json file.
1001
1002 Parameters
1003 ----------
1004 * `bgp_cfg` : BGP class variables have BGP config saved in it for
1005 particular router,
1006 * `local_as_no` : Local as number
1007 * `router_id` : Router-id
1008 * `ecmp_path` : ECMP max path
1009 * `gr_enable` : BGP global gracefull restart config
1010 """
1011
1012 config_data = []
1013 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1014 tgen = get_topogen()
1015 bgp_data = input_dict["address_family"]
1016 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
1017
1018 for peer_name, peer_dict in deepcopy(neigh_data).items():
1019 for dest_link, peer in peer_dict["dest_link"].items():
1020 deactivate = None
1021 activate = None
1022 nh_details = topo[peer_name]
1023 activate_addr_family = peer.setdefault("activate", None)
1024 deactivate_addr_family = peer.setdefault("deactivate", None)
1025 # Loopback interface
1026 if "source_link" in peer and peer["source_link"] == "lo":
1027 for destRouterLink, data in sorted(nh_details["links"].items()):
1028 if "type" in data and data["type"] == "loopback":
1029 if dest_link == destRouterLink:
1030 ip_addr = (
1031 nh_details["links"][destRouterLink][addr_type]
1032 .split("/")[0]
1033 .lower()
1034 )
1035
1036 # Physical interface
1037 else:
1038 # check the neighbor type if un numbered nbr, use interface.
1039 if "neighbor_type" in peer and peer["neighbor_type"] == "unnumbered":
1040 ip_addr = nh_details["links"][dest_link]["peer-interface"]
1041 elif "neighbor_type" in peer and peer["neighbor_type"] == "link-local":
1042 intf = topo[peer_name]["links"][dest_link]["interface"]
1043 ip_addr = get_frr_ipv6_linklocal(tgen, peer_name, intf)
1044 elif dest_link in nh_details["links"].keys():
1045 try:
1046 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[
1047 0
1048 ]
1049 except KeyError:
1050 intf = topo[peer_name]["links"][dest_link]["interface"]
1051 ip_addr = get_frr_ipv6_linklocal(tgen, peer_name, intf)
1052 if (
1053 addr_type == "ipv4"
1054 and bgp_data["ipv6"]
1055 and check_address_types("ipv6")
1056 and "ipv6" in nh_details["links"][dest_link]
1057 ):
1058 deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
1059 0
1060 ]
1061
1062 neigh_cxt = "neighbor {}".format(ip_addr)
1063 config_data.append("address-family {} unicast".format(addr_type))
1064
1065 if activate_addr_family is not None:
1066 config_data.append(
1067 "address-family {} unicast".format(activate_addr_family)
1068 )
1069
1070 config_data.append("{} activate".format(neigh_cxt))
1071
1072 if deactivate and activate_addr_family is None:
1073 config_data.append("no neighbor {} activate".format(deactivate))
1074
1075 if deactivate_addr_family is not None:
1076 config_data.append(
1077 "address-family {} unicast".format(deactivate_addr_family)
1078 )
1079 config_data.append("no {} activate".format(neigh_cxt))
1080
1081 next_hop_self = peer.setdefault("next_hop_self", None)
1082 send_community = peer.setdefault("send_community", None)
1083 prefix_lists = peer.setdefault("prefix_lists", {})
1084 route_maps = peer.setdefault("route_maps", {})
1085 no_send_community = peer.setdefault("no_send_community", None)
1086 capability = peer.setdefault("capability", None)
1087 allowas_in = peer.setdefault("allowas-in", None)
1088
1089 # next-hop-self
1090 if next_hop_self is not None:
1091 if next_hop_self is True:
1092 config_data.append("{} next-hop-self".format(neigh_cxt))
1093 else:
1094 config_data.append("no {} next-hop-self".format(neigh_cxt))
1095
1096 # send_community
1097 if send_community:
1098 config_data.append("{} send-community".format(neigh_cxt))
1099
1100 # no_send_community
1101 if no_send_community:
1102 config_data.append(
1103 "no {} send-community {}".format(neigh_cxt, no_send_community)
1104 )
1105
1106 # capability_ext_nh
1107 if capability and addr_type == "ipv6":
1108 config_data.append("address-family ipv4 unicast")
1109 config_data.append("{} activate".format(neigh_cxt))
1110
1111 if "allowas_in" in peer:
1112 allow_as_in = peer["allowas_in"]
1113 config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
1114
1115 if "no_allowas_in" in peer:
1116 allow_as_in = peer["no_allowas_in"]
1117 config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
1118
1119 if "shutdown" in peer:
1120 config_data.append(
1121 "{} {} shutdown".format(
1122 "no" if not peer["shutdown"] else "", neigh_cxt
1123 )
1124 )
1125
1126 if prefix_lists:
1127 for prefix_list in prefix_lists:
1128 name = prefix_list.setdefault("name", {})
1129 direction = prefix_list.setdefault("direction", "in")
1130 del_action = prefix_list.setdefault("delete", False)
1131 if not name:
1132 logger.info(
1133 "Router %s: 'name' not present in "
1134 "input_dict for BGP neighbor prefix lists",
1135 router,
1136 )
1137 else:
1138 cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
1139 if del_action:
1140 cmd = "no {}".format(cmd)
1141 config_data.append(cmd)
1142
1143 if route_maps:
1144 for route_map in route_maps:
1145 name = route_map.setdefault("name", {})
1146 direction = route_map.setdefault("direction", "in")
1147 del_action = route_map.setdefault("delete", False)
1148 if not name:
1149 logger.info(
1150 "Router %s: 'name' not present in "
1151 "input_dict for BGP neighbor route name",
1152 router,
1153 )
1154 else:
1155 cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
1156 if del_action:
1157 cmd = "no {}".format(cmd)
1158 config_data.append(cmd)
1159
1160 if allowas_in:
1161 number_occurences = allowas_in.setdefault("number_occurences", {})
1162 del_action = allowas_in.setdefault("delete", False)
1163
1164 cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
1165
1166 if del_action:
1167 cmd = "no {}".format(cmd)
1168
1169 config_data.append(cmd)
1170
1171 return config_data
1172
1173
1174 def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
1175 """
1176 API will save the current config to router's /etc/frr/ for BGPd
1177 daemon(bgpd.conf file)
1178
1179 Paramters
1180 ---------
1181 * `tgen` : Topogen object
1182 * `topo` : json file data
1183 * `input_dict` : defines for which router, and which config
1184 needs to be modified
1185
1186 Usage:
1187 ------
1188 # Modify graceful-restart config not to set f-bit
1189 # and write to /etc/frr
1190
1191 # Api call to delete advertised networks
1192 input_dict_2 = {
1193 "r5": {
1194 "bgp": {
1195 "address_family": {
1196 "ipv4": {
1197 "unicast": {
1198 "advertise_networks": [
1199 {
1200 "network": "101.0.20.1/32",
1201 "no_of_network": 5,
1202 "delete": True
1203 }
1204 ],
1205 }
1206 },
1207 "ipv6": {
1208 "unicast": {
1209 "advertise_networks": [
1210 {
1211 "network": "5::1/128",
1212 "no_of_network": 5,
1213 "delete": True
1214 }
1215 ],
1216 }
1217 }
1218 }
1219 }
1220 }
1221 }
1222
1223 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
1224
1225 """
1226
1227 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1228 try:
1229 result = create_router_bgp(
1230 tgen, topo, input_dict, build=False, load_config=False
1231 )
1232 if result is not True:
1233 return result
1234
1235 # Copy bgp config file to /etc/frr
1236 for dut in input_dict.keys():
1237 router_list = tgen.routers()
1238 for router, rnode in router_list.items():
1239 if router != dut:
1240 continue
1241
1242 logger.info("Delete BGP config when BGPd is down in {}".format(router))
1243 # Reading the config from "rundir" and copy to /etc/frr/bgpd.conf
1244 cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
1245 tgen.logdir, router, FRRCFG_FILE
1246 )
1247 router_list[router].run(cmd)
1248
1249 except Exception as e:
1250 errormsg = traceback.format_exc()
1251 logger.error(errormsg)
1252 return errormsg
1253
1254 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1255 return True
1256
1257
1258 #############################################
1259 # Verification APIs
1260 #############################################
1261 @retry(retry_timeout=8)
1262 def verify_router_id(tgen, topo, input_dict, expected=True):
1263 """
1264 Running command "show ip bgp json" for DUT and reading router-id
1265 from input_dict and verifying with command output.
1266 1. Statically modfified router-id should take place
1267 2. When static router-id is deleted highest loopback should
1268 become router-id
1269 3. When loopback intf is down then highest physcial intf
1270 should become router-id
1271
1272 Parameters
1273 ----------
1274 * `tgen`: topogen object
1275 * `topo`: input json file data
1276 * `input_dict`: input dictionary, have details of Device Under Test, for
1277 which user wants to test the data
1278 * `expected` : expected results from API, by-default True
1279
1280 Usage
1281 -----
1282 # Verify if router-id for r1 is 12.12.12.12
1283 input_dict = {
1284 "r1":{
1285 "router_id": "12.12.12.12"
1286 }
1287 # Verify that router-id for r1 is highest interface ip
1288 input_dict = {
1289 "routers": ["r1"]
1290 }
1291 result = verify_router_id(tgen, topo, input_dict)
1292
1293 Returns
1294 -------
1295 errormsg(str) or True
1296 """
1297
1298 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1299 for router in input_dict.keys():
1300 if router not in tgen.routers():
1301 continue
1302
1303 rnode = tgen.routers()[router]
1304
1305 del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
1306
1307 logger.info("Checking router %s router-id", router)
1308 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1309 router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
1310 router_id_out = ipaddress.IPv4Address(frr_unicode(router_id_out))
1311
1312 # Once router-id is deleted, highest interface ip should become
1313 # router-id
1314 if del_router_id:
1315 router_id = find_interface_with_greater_ip(topo, router)
1316 else:
1317 router_id = input_dict[router]["bgp"]["router_id"]
1318 router_id = ipaddress.IPv4Address(frr_unicode(router_id))
1319
1320 if router_id == router_id_out:
1321 logger.info("Found expected router-id %s for router %s", router_id, router)
1322 else:
1323 errormsg = (
1324 "Router-id for router:{} mismatch, expected:"
1325 " {} but found:{}".format(router, router_id, router_id_out)
1326 )
1327 return errormsg
1328
1329 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1330 return True
1331
1332
1333 @retry(retry_timeout=150)
1334 def verify_bgp_convergence(tgen, topo=None, dut=None, expected=True):
1335 """
1336 API will verify if BGP is converged with in the given time frame.
1337 Running "show bgp summary json" command and verify bgp neighbor
1338 state is established,
1339
1340 Parameters
1341 ----------
1342 * `tgen`: topogen object
1343 * `topo`: input json file data
1344 * `dut`: device under test
1345
1346 Usage
1347 -----
1348 # To veriry is BGP is converged for all the routers used in
1349 topology
1350 results = verify_bgp_convergence(tgen, topo, dut="r1")
1351
1352 Returns
1353 -------
1354 errormsg(str) or True
1355 """
1356
1357 if topo is None:
1358 topo = tgen.json_topo
1359
1360 result = False
1361 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1362 tgen = get_topogen()
1363 for router, rnode in tgen.routers().items():
1364 if "bgp" not in topo["routers"][router]:
1365 continue
1366
1367 if dut is not None and dut != router:
1368 continue
1369
1370 logger.info("Verifying BGP Convergence on router %s:", router)
1371 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1372 # Verifying output dictionary show_bgp_json is empty or not
1373 if not bool(show_bgp_json):
1374 errormsg = "BGP is not running"
1375 return errormsg
1376
1377 # To find neighbor ip type
1378 bgp_data_list = topo["routers"][router]["bgp"]
1379
1380 if type(bgp_data_list) is not list:
1381 bgp_data_list = [bgp_data_list]
1382
1383 for bgp_data in bgp_data_list:
1384 if "vrf" in bgp_data:
1385 vrf = bgp_data["vrf"]
1386 if vrf is None:
1387 vrf = "default"
1388 else:
1389 vrf = "default"
1390
1391 # To find neighbor ip type
1392 bgp_addr_type = bgp_data["address_family"]
1393 if "l2vpn" in bgp_addr_type:
1394 total_evpn_peer = 0
1395
1396 if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
1397 continue
1398
1399 bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
1400 total_evpn_peer += len(bgp_neighbors)
1401
1402 no_of_evpn_peer = 0
1403 for bgp_neighbor, peer_data in bgp_neighbors.items():
1404 for _addr_type, dest_link_dict in peer_data.items():
1405 data = topo["routers"][bgp_neighbor]["links"]
1406 for dest_link in dest_link_dict.keys():
1407 if dest_link in data:
1408 peer_details = peer_data[_addr_type][dest_link]
1409
1410 neighbor_ip = data[dest_link][_addr_type].split("/")[0]
1411 nh_state = None
1412
1413 if (
1414 "ipv4Unicast" in show_bgp_json[vrf]
1415 or "ipv6Unicast" in show_bgp_json[vrf]
1416 ):
1417 errormsg = (
1418 "[DUT: %s] VRF: %s, "
1419 "ipv4Unicast/ipv6Unicast"
1420 " address-family present"
1421 " under l2vpn" % (router, vrf)
1422 )
1423 return errormsg
1424
1425 l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
1426 "peers"
1427 ]
1428 nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
1429
1430 if nh_state == "Established":
1431 no_of_evpn_peer += 1
1432
1433 if no_of_evpn_peer == total_evpn_peer:
1434 logger.info(
1435 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1436 router,
1437 vrf,
1438 )
1439 result = True
1440 else:
1441 errormsg = (
1442 "[DUT: %s] VRF: %s, BGP is not converged "
1443 "for evpn peers" % (router, vrf)
1444 )
1445 return errormsg
1446 else:
1447 total_peer = 0
1448 for addr_type in bgp_addr_type.keys():
1449 if not check_address_types(addr_type):
1450 continue
1451
1452 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1453
1454 for bgp_neighbor in bgp_neighbors:
1455 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1456
1457 no_of_peer = 0
1458 for addr_type in bgp_addr_type.keys():
1459 if not check_address_types(addr_type):
1460 continue
1461 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1462
1463 for bgp_neighbor, peer_data in bgp_neighbors.items():
1464 for dest_link in peer_data["dest_link"].keys():
1465 data = topo["routers"][bgp_neighbor]["links"]
1466 if dest_link in data:
1467 peer_details = peer_data["dest_link"][dest_link]
1468 # for link local neighbors
1469 if (
1470 "neighbor_type" in peer_details
1471 and peer_details["neighbor_type"] == "link-local"
1472 ):
1473 intf = topo["routers"][bgp_neighbor]["links"][
1474 dest_link
1475 ]["interface"]
1476 neighbor_ip = get_frr_ipv6_linklocal(
1477 tgen, bgp_neighbor, intf
1478 )
1479 elif "source_link" in peer_details:
1480 neighbor_ip = topo["routers"][bgp_neighbor][
1481 "links"
1482 ][peer_details["source_link"]][addr_type].split(
1483 "/"
1484 )[
1485 0
1486 ]
1487 elif (
1488 "neighbor_type" in peer_details
1489 and peer_details["neighbor_type"] == "unnumbered"
1490 ):
1491 neighbor_ip = data[dest_link]["peer-interface"]
1492 else:
1493 neighbor_ip = data[dest_link][addr_type].split("/")[
1494 0
1495 ]
1496 nh_state = None
1497 neighbor_ip = neighbor_ip.lower()
1498 if addr_type == "ipv4":
1499 ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
1500 "peers"
1501 ]
1502 nh_state = ipv4_data[neighbor_ip]["state"]
1503 else:
1504 ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
1505 "peers"
1506 ]
1507 if neighbor_ip in ipv6_data:
1508 nh_state = ipv6_data[neighbor_ip]["state"]
1509
1510 if nh_state == "Established":
1511 no_of_peer += 1
1512
1513 if no_of_peer == total_peer and no_of_peer > 0:
1514 logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
1515 result = True
1516 else:
1517 errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
1518 return errormsg
1519
1520 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1521 return result
1522
1523
1524 @retry(retry_timeout=16)
1525 def verify_bgp_community(
1526 tgen,
1527 addr_type,
1528 router,
1529 network,
1530 input_dict=None,
1531 vrf=None,
1532 bestpath=False,
1533 expected=True,
1534 ):
1535 """
1536 API to veiryf BGP large community is attached in route for any given
1537 DUT by running "show bgp ipv4/6 {route address} json" command.
1538
1539 Parameters
1540 ----------
1541 * `tgen`: topogen object
1542 * `addr_type` : ip type, ipv4/ipv6
1543 * `dut`: Device Under Test
1544 * `network`: network for which set criteria needs to be verified
1545 * `input_dict`: having details like - for which router, community and
1546 values needs to be verified
1547 * `vrf`: VRF name
1548 * `bestpath`: To check best path cli
1549 * `expected` : expected results from API, by-default True
1550
1551 Usage
1552 -----
1553 networks = ["200.50.2.0/32"]
1554 input_dict = {
1555 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1556 }
1557 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1558
1559 Returns
1560 -------
1561 errormsg(str) or True
1562 """
1563
1564 logger.debug("Entering lib API: verify_bgp_community()")
1565 if router not in tgen.routers():
1566 return False
1567
1568 rnode = tgen.routers()[router]
1569
1570 logger.info(
1571 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1572 router,
1573 addr_type,
1574 network,
1575 )
1576
1577 command = "show bgp"
1578
1579 for net in network:
1580 if vrf:
1581 cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
1582 elif bestpath:
1583 cmd = "{} {} {} bestpath json".format(command, addr_type, net)
1584 else:
1585 cmd = "{} {} {} json".format(command, addr_type, net)
1586
1587 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
1588 if "paths" not in show_bgp_json:
1589 return "Prefix {} not found in BGP table of router: {}".format(net, router)
1590
1591 as_paths = show_bgp_json["paths"]
1592 found = False
1593 for i in range(len(as_paths)):
1594 if (
1595 "largeCommunity" in show_bgp_json["paths"][i]
1596 or "community" in show_bgp_json["paths"][i]
1597 ):
1598 found = True
1599 logger.info(
1600 "Large Community attribute is found for route:" " %s in router: %s",
1601 net,
1602 router,
1603 )
1604 if input_dict is not None:
1605 for criteria, comm_val in input_dict.items():
1606 show_val = show_bgp_json["paths"][i][criteria]["string"]
1607 if comm_val == show_val:
1608 logger.info(
1609 "Verifying BGP %s for prefix: %s"
1610 " in router: %s, found expected"
1611 " value: %s",
1612 criteria,
1613 net,
1614 router,
1615 comm_val,
1616 )
1617 else:
1618 errormsg = (
1619 "Failed: Verifying BGP attribute"
1620 " {} for route: {} in router: {}"
1621 ", expected value: {} but found"
1622 ": {}".format(criteria, net, router, comm_val, show_val)
1623 )
1624 return errormsg
1625
1626 if not found:
1627 errormsg = (
1628 "Large Community attribute is not found for route: "
1629 "{} in router: {} ".format(net, router)
1630 )
1631 return errormsg
1632
1633 logger.debug("Exiting lib API: verify_bgp_community()")
1634 return True
1635
1636
1637 def modify_as_number(tgen, topo, input_dict):
1638 """
1639 API reads local_as and remote_as from user defined input_dict and
1640 modify router"s ASNs accordingly. Router"s config is modified and
1641 recent/changed config is loadeded to router.
1642
1643 Parameters
1644 ----------
1645 * `tgen` : Topogen object
1646 * `topo` : json file data
1647 * `input_dict` : defines for which router ASNs needs to be modified
1648
1649 Usage
1650 -----
1651 To modify ASNs for router r1
1652 input_dict = {
1653 "r1": {
1654 "bgp": {
1655 "local_as": 131079
1656 }
1657 }
1658 result = modify_as_number(tgen, topo, input_dict)
1659
1660 Returns
1661 -------
1662 errormsg(str) or True
1663 """
1664
1665 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1666 try:
1667
1668 new_topo = deepcopy(topo["routers"])
1669 router_dict = {}
1670 for router in input_dict.keys():
1671 # Remove bgp configuration
1672
1673 router_dict.update({router: {"bgp": {"delete": True}}})
1674 try:
1675 new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"][
1676 "local_as"
1677 ]
1678 except TypeError:
1679 new_topo[router]["bgp"][0]["local_as"] = input_dict[router]["bgp"][
1680 "local_as"
1681 ]
1682 logger.info("Removing bgp configuration")
1683 create_router_bgp(tgen, topo, router_dict)
1684
1685 logger.info("Applying modified bgp configuration")
1686 result = create_router_bgp(tgen, new_topo)
1687 if result is not True:
1688 result = "Error applying new AS number config"
1689 except Exception as e:
1690 errormsg = traceback.format_exc()
1691 logger.error(errormsg)
1692 return errormsg
1693
1694 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1695 return result
1696
1697
1698 @retry(retry_timeout=8)
1699 def verify_as_numbers(tgen, topo, input_dict, expected=True):
1700 """
1701 This API is to verify AS numbers for given DUT by running
1702 "show ip bgp neighbor json" command. Local AS and Remote AS
1703 will ve verified with input_dict data and command output.
1704
1705 Parameters
1706 ----------
1707 * `tgen`: topogen object
1708 * `topo`: input json file data
1709 * `addr_type` : ip type, ipv4/ipv6
1710 * `input_dict`: defines - for which router, AS numbers needs to be verified
1711 * `expected` : expected results from API, by-default True
1712
1713 Usage
1714 -----
1715 input_dict = {
1716 "r1": {
1717 "bgp": {
1718 "local_as": 131079
1719 }
1720 }
1721 }
1722 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1723
1724 Returns
1725 -------
1726 errormsg(str) or True
1727 """
1728
1729 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1730 for router in input_dict.keys():
1731 if router not in tgen.routers():
1732 continue
1733
1734 rnode = tgen.routers()[router]
1735
1736 logger.info("Verifying AS numbers for dut %s:", router)
1737
1738 show_ip_bgp_neighbor_json = run_frr_cmd(
1739 rnode, "show ip bgp neighbor json", isjson=True
1740 )
1741 local_as = input_dict[router]["bgp"]["local_as"]
1742 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1743
1744 for addr_type in bgp_addr_type:
1745 if not check_address_types(addr_type):
1746 continue
1747
1748 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1749
1750 for bgp_neighbor, peer_data in bgp_neighbors.items():
1751 remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
1752 for dest_link, peer_dict in peer_data["dest_link"].items():
1753 neighbor_ip = None
1754 data = topo["routers"][bgp_neighbor]["links"]
1755
1756 if dest_link in data:
1757 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1758 neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
1759 # Verify Local AS for router
1760 if neigh_data["localAs"] != local_as:
1761 errormsg = (
1762 "Failed: Verify local_as for dut {},"
1763 " found: {} but expected: {}".format(
1764 router, neigh_data["localAs"], local_as
1765 )
1766 )
1767 return errormsg
1768 else:
1769 logger.info(
1770 "Verified local_as for dut %s, found" " expected: %s",
1771 router,
1772 local_as,
1773 )
1774
1775 # Verify Remote AS for neighbor
1776 if neigh_data["remoteAs"] != remote_as:
1777 errormsg = (
1778 "Failed: Verify remote_as for dut "
1779 "{}'s neighbor {}, found: {} but "
1780 "expected: {}".format(
1781 router, bgp_neighbor, neigh_data["remoteAs"], remote_as
1782 )
1783 )
1784 return errormsg
1785 else:
1786 logger.info(
1787 "Verified remote_as for dut %s's "
1788 "neighbor %s, found expected: %s",
1789 router,
1790 bgp_neighbor,
1791 remote_as,
1792 )
1793
1794 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1795 return True
1796
1797
1798 @retry(retry_timeout=150)
1799 def verify_bgp_convergence_from_running_config(tgen, dut=None, expected=True):
1800 """
1801 API to verify BGP convergence b/w loopback and physical interface.
1802 This API would be used when routers have BGP neighborship is loopback
1803 to physical or vice-versa
1804
1805 Parameters
1806 ----------
1807 * `tgen`: topogen object
1808 * `dut`: device under test
1809 * `expected` : expected results from API, by-default True
1810
1811 Usage
1812 -----
1813 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1814 dut="r1")
1815
1816 Returns
1817 -------
1818 errormsg(str) or True
1819 """
1820
1821 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1822
1823 for router, rnode in tgen.routers().items():
1824 if dut is not None and dut != router:
1825 continue
1826
1827 logger.info("Verifying BGP Convergence on router %s:", router)
1828 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
1829 # Verifying output dictionary show_bgp_json is empty or not
1830 if not bool(show_bgp_json):
1831 errormsg = "BGP is not running"
1832 return errormsg
1833
1834 for vrf, addr_family_data in show_bgp_json.items():
1835 for address_family, neighborship_data in addr_family_data.items():
1836 total_peer = 0
1837 no_of_peer = 0
1838
1839 total_peer = len(neighborship_data["peers"].keys())
1840
1841 for peer, peer_data in neighborship_data["peers"].items():
1842 if peer_data["state"] == "Established":
1843 no_of_peer += 1
1844
1845 if total_peer != no_of_peer:
1846 errormsg = (
1847 "[DUT: %s] VRF: %s, BGP is not converged"
1848 " for peer: %s" % (router, vrf, peer)
1849 )
1850 return errormsg
1851
1852 logger.info("[DUT: %s]: vrf: %s, BGP is Converged", router, vrf)
1853
1854 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1855
1856 return True
1857
1858
1859 def clear_bgp(tgen, addr_type, router, vrf=None, neighbor=None):
1860 """
1861 This API is to clear bgp neighborship by running
1862 clear ip bgp */clear bgp ipv6 * command,
1863
1864 Parameters
1865 ----------
1866 * `tgen`: topogen object
1867 * `addr_type`: ip type ipv4/ipv6
1868 * `router`: device under test
1869 * `vrf`: vrf name
1870 * `neighbor`: Neighbor for which bgp needs to be cleared
1871
1872 Usage
1873 -----
1874 clear_bgp(tgen, addr_type, "r1")
1875 """
1876
1877 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1878
1879 if router not in tgen.routers():
1880 return False
1881
1882 rnode = tgen.routers()[router]
1883
1884 if vrf:
1885 if type(vrf) is not list:
1886 vrf = [vrf]
1887
1888 # Clearing BGP
1889 logger.info("Clearing BGP neighborship for router %s..", router)
1890 if addr_type == "ipv4":
1891 if vrf:
1892 for _vrf in vrf:
1893 run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
1894 elif neighbor:
1895 run_frr_cmd(rnode, "clear bgp ipv4 {}".format(neighbor))
1896 else:
1897 run_frr_cmd(rnode, "clear ip bgp *")
1898 elif addr_type == "ipv6":
1899 if vrf:
1900 for _vrf in vrf:
1901 run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
1902 elif neighbor:
1903 run_frr_cmd(rnode, "clear bgp ipv6 {}".format(neighbor))
1904 else:
1905 run_frr_cmd(rnode, "clear bgp ipv6 *")
1906 else:
1907 run_frr_cmd(rnode, "clear bgp *")
1908
1909 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1910
1911
1912 def clear_bgp_and_verify(tgen, topo, router, rid=None):
1913 """
1914 This API is to clear bgp neighborship and verify bgp neighborship
1915 is coming up(BGP is converged) usinf "show bgp summary json" command
1916 and also verifying for all bgp neighbors uptime before and after
1917 clear bgp sessions is different as the uptime must be changed once
1918 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1919
1920 Parameters
1921 ----------
1922 * `tgen`: topogen object
1923 * `topo`: input json file data
1924 * `router`: device under test
1925
1926 Usage
1927 -----
1928 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1929
1930 Returns
1931 -------
1932 errormsg(str) or True
1933 """
1934
1935 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1936
1937 if router not in tgen.routers():
1938 return False
1939
1940 rnode = tgen.routers()[router]
1941
1942 peer_uptime_before_clear_bgp = {}
1943 sleeptime = 3
1944
1945 # Verifying BGP convergence before bgp clear command
1946 for retry in range(50):
1947 # Waiting for BGP to converge
1948 logger.info(
1949 "Waiting for %s sec for BGP to converge on router" " %s...",
1950 sleeptime,
1951 router,
1952 )
1953 sleep(sleeptime)
1954
1955 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1956 # Verifying output dictionary show_bgp_json is empty or not
1957 if not bool(show_bgp_json):
1958 errormsg = "BGP is not running"
1959 return errormsg
1960
1961 # To find neighbor ip type
1962 try:
1963 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1964 except TypeError:
1965 bgp_addr_type = topo["routers"][router]["bgp"][0]["address_family"]
1966
1967 total_peer = 0
1968 for addr_type in bgp_addr_type.keys():
1969
1970 if not check_address_types(addr_type):
1971 continue
1972
1973 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1974
1975 for bgp_neighbor in bgp_neighbors:
1976 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1977
1978 no_of_peer = 0
1979 for addr_type in bgp_addr_type:
1980 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1981
1982 for bgp_neighbor, peer_data in bgp_neighbors.items():
1983 for dest_link, peer_dict in peer_data["dest_link"].items():
1984 data = topo["routers"][bgp_neighbor]["links"]
1985
1986 if dest_link in data:
1987 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1988 if addr_type == "ipv4":
1989 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1990 nh_state = ipv4_data[neighbor_ip]["state"]
1991
1992 # Peer up time dictionary
1993 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
1994 neighbor_ip
1995 ]["peerUptimeEstablishedEpoch"]
1996 else:
1997 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1998 nh_state = ipv6_data[neighbor_ip]["state"]
1999
2000 # Peer up time dictionary
2001 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
2002 neighbor_ip
2003 ]["peerUptimeEstablishedEpoch"]
2004
2005 if nh_state == "Established":
2006 no_of_peer += 1
2007
2008 if no_of_peer == total_peer:
2009 logger.info("BGP is Converged for router %s before bgp" " clear", router)
2010 break
2011 else:
2012 logger.info(
2013 "BGP is not yet Converged for router %s " "before bgp clear", router
2014 )
2015 else:
2016 errormsg = (
2017 "TIMEOUT!! BGP is not converged in {} seconds for"
2018 " router {}".format(retry * sleeptime, router)
2019 )
2020 return errormsg
2021
2022 # Clearing BGP
2023 logger.info("Clearing BGP neighborship for router %s..", router)
2024 for addr_type in bgp_addr_type.keys():
2025 if addr_type == "ipv4":
2026 if rid:
2027 run_frr_cmd(rnode, "clear bgp ipv4 {}".format(rid))
2028 else:
2029 run_frr_cmd(rnode, "clear bgp ipv4 *")
2030 elif addr_type == "ipv6":
2031 if rid:
2032 run_frr_cmd(rnode, "clear bgp ipv6 {}".format(rid))
2033 else:
2034 run_frr_cmd(rnode, "clear bgp ipv6 *")
2035 peer_uptime_after_clear_bgp = {}
2036 # Verifying BGP convergence after bgp clear command
2037 for retry in range(50):
2038
2039 # Waiting for BGP to converge
2040 logger.info(
2041 "Waiting for %s sec for BGP to converge on router" " %s...",
2042 sleeptime,
2043 router,
2044 )
2045 sleep(sleeptime)
2046
2047 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
2048 # Verifying output dictionary show_bgp_json is empty or not
2049 if not bool(show_bgp_json):
2050 errormsg = "BGP is not running"
2051 return errormsg
2052
2053 # To find neighbor ip type
2054 try:
2055 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
2056 except TypeError:
2057 bgp_addr_type = topo["routers"][router]["bgp"][0]["address_family"]
2058
2059 total_peer = 0
2060 for addr_type in bgp_addr_type.keys():
2061 if not check_address_types(addr_type):
2062 continue
2063
2064 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2065
2066 for bgp_neighbor in bgp_neighbors:
2067 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
2068
2069 no_of_peer = 0
2070 for addr_type in bgp_addr_type:
2071 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2072
2073 for bgp_neighbor, peer_data in bgp_neighbors.items():
2074 for dest_link, peer_dict in peer_data["dest_link"].items():
2075 data = topo["routers"][bgp_neighbor]["links"]
2076
2077 if dest_link in data:
2078 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2079 if addr_type == "ipv4":
2080 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2081 nh_state = ipv4_data[neighbor_ip]["state"]
2082 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
2083 neighbor_ip
2084 ]["peerUptimeEstablishedEpoch"]
2085 else:
2086 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2087 nh_state = ipv6_data[neighbor_ip]["state"]
2088 # Peer up time dictionary
2089 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
2090 neighbor_ip
2091 ]["peerUptimeEstablishedEpoch"]
2092
2093 if nh_state == "Established":
2094 no_of_peer += 1
2095
2096 if no_of_peer == total_peer:
2097 logger.info("BGP is Converged for router %s after bgp clear", router)
2098 break
2099 else:
2100 logger.info(
2101 "BGP is not yet Converged for router %s after" " bgp clear", router
2102 )
2103 else:
2104 errormsg = (
2105 "TIMEOUT!! BGP is not converged in {} seconds for"
2106 " router {}".format(retry * sleeptime, router)
2107 )
2108 return errormsg
2109
2110 # Comparing peerUptimeEstablishedEpoch dictionaries
2111 if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
2112 logger.info("BGP neighborship is reset after clear BGP on router %s", router)
2113 else:
2114 errormsg = (
2115 "BGP neighborship is not reset after clear bgp on router"
2116 " {}".format(router)
2117 )
2118 return errormsg
2119
2120 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2121 return True
2122
2123
2124 def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
2125 """
2126 To verify BGP timer config, execute "show ip bgp neighbor json" command
2127 and verify bgp timers with input_dict data.
2128 To veirfy bgp timers functonality, shutting down peer interface
2129 and verify BGP neighborship status.
2130
2131 Parameters
2132 ----------
2133 * `tgen`: topogen object
2134 * `topo`: input json file data
2135 * `addr_type`: ip type, ipv4/ipv6
2136 * `input_dict`: defines for which router, bgp timers needs to be verified
2137
2138 Usage:
2139 # To verify BGP timers for neighbor r2 of router r1
2140 input_dict = {
2141 "r1": {
2142 "bgp": {
2143 "bgp_neighbors":{
2144 "r2":{
2145 "keepalivetimer": 5,
2146 "holddowntimer": 15,
2147 }}}}}
2148 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
2149 input_dict)
2150
2151 Returns
2152 -------
2153 errormsg(str) or True
2154 """
2155
2156 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2157 sleep(5)
2158 router_list = tgen.routers()
2159 for router in input_dict.keys():
2160 if router not in router_list:
2161 continue
2162
2163 rnode = router_list[router]
2164
2165 logger.info("Verifying bgp timers functionality, DUT is %s:", router)
2166
2167 show_ip_bgp_neighbor_json = run_frr_cmd(
2168 rnode, "show ip bgp neighbor json", isjson=True
2169 )
2170
2171 bgp_addr_type = input_dict[router]["bgp"]["address_family"]
2172
2173 for addr_type in bgp_addr_type:
2174 if not check_address_types(addr_type):
2175 continue
2176
2177 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2178 for bgp_neighbor, peer_data in bgp_neighbors.items():
2179 for dest_link, peer_dict in peer_data["dest_link"].items():
2180 data = topo["routers"][bgp_neighbor]["links"]
2181
2182 keepalivetimer = peer_dict["keepalivetimer"]
2183 holddowntimer = peer_dict["holddowntimer"]
2184
2185 if dest_link in data:
2186 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2187 neighbor_intf = data[dest_link]["interface"]
2188
2189 # Verify HoldDownTimer for neighbor
2190 bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
2191 "bgpTimerHoldTimeMsecs"
2192 ]
2193 if bgpHoldTimeMsecs != holddowntimer * 1000:
2194 errormsg = (
2195 "Verifying holddowntimer for bgp "
2196 "neighbor {} under dut {}, found: {} "
2197 "but expected: {}".format(
2198 neighbor_ip,
2199 router,
2200 bgpHoldTimeMsecs,
2201 holddowntimer * 1000,
2202 )
2203 )
2204 return errormsg
2205
2206 # Verify KeepAliveTimer for neighbor
2207 bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
2208 "bgpTimerKeepAliveIntervalMsecs"
2209 ]
2210 if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
2211 errormsg = (
2212 "Verifying keepalivetimer for bgp "
2213 "neighbor {} under dut {}, found: {} "
2214 "but expected: {}".format(
2215 neighbor_ip,
2216 router,
2217 bgpKeepAliveTimeMsecs,
2218 keepalivetimer * 1000,
2219 )
2220 )
2221 return errormsg
2222
2223 ####################
2224 # Shutting down peer interface after keepalive time and
2225 # after some time bringing up peer interface.
2226 # verifying BGP neighborship in (hold down-keep alive)
2227 # time, it should not go down
2228 ####################
2229
2230 # Wait till keep alive time
2231 logger.info("=" * 20)
2232 logger.info("Scenario 1:")
2233 logger.info(
2234 "Shutdown and bring up peer interface: %s "
2235 "in keep alive time : %s sec and verify "
2236 " BGP neighborship is intact in %s sec ",
2237 neighbor_intf,
2238 keepalivetimer,
2239 (holddowntimer - keepalivetimer),
2240 )
2241 logger.info("=" * 20)
2242 logger.info("Waiting for %s sec..", keepalivetimer)
2243 sleep(keepalivetimer)
2244
2245 # Shutting down peer ineterface
2246 logger.info(
2247 "Shutting down interface %s on router %s",
2248 neighbor_intf,
2249 bgp_neighbor,
2250 )
2251 topotest.interface_set_status(
2252 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2253 )
2254
2255 # Bringing up peer interface
2256 sleep(5)
2257 logger.info(
2258 "Bringing up interface %s on router %s..",
2259 neighbor_intf,
2260 bgp_neighbor,
2261 )
2262 topotest.interface_set_status(
2263 router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
2264 )
2265
2266 # Verifying BGP neighborship is intact in
2267 # (holddown - keepalive) time
2268 for timer in range(
2269 keepalivetimer, holddowntimer, int(holddowntimer / 3)
2270 ):
2271 logger.info("Waiting for %s sec..", keepalivetimer)
2272 sleep(keepalivetimer)
2273 sleep(2)
2274 show_bgp_json = run_frr_cmd(
2275 rnode, "show bgp summary json", isjson=True
2276 )
2277
2278 if addr_type == "ipv4":
2279 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2280 nh_state = ipv4_data[neighbor_ip]["state"]
2281 else:
2282 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2283 nh_state = ipv6_data[neighbor_ip]["state"]
2284
2285 if timer == (holddowntimer - keepalivetimer):
2286 if nh_state != "Established":
2287 errormsg = (
2288 "BGP neighborship has not gone "
2289 "down in {} sec for neighbor {}".format(
2290 timer, bgp_neighbor
2291 )
2292 )
2293 return errormsg
2294 else:
2295 logger.info(
2296 "BGP neighborship is intact in %s"
2297 " sec for neighbor %s",
2298 timer,
2299 bgp_neighbor,
2300 )
2301
2302 ####################
2303 # Shutting down peer interface and verifying that BGP
2304 # neighborship is going down in holddown time
2305 ####################
2306 logger.info("=" * 20)
2307 logger.info("Scenario 2:")
2308 logger.info(
2309 "Shutdown peer interface: %s and verify BGP"
2310 " neighborship has gone down in hold down "
2311 "time %s sec",
2312 neighbor_intf,
2313 holddowntimer,
2314 )
2315 logger.info("=" * 20)
2316
2317 logger.info(
2318 "Shutting down interface %s on router %s..",
2319 neighbor_intf,
2320 bgp_neighbor,
2321 )
2322 topotest.interface_set_status(
2323 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
2324 )
2325
2326 # Verifying BGP neighborship is going down in holddown time
2327 for timer in range(
2328 keepalivetimer,
2329 (holddowntimer + keepalivetimer),
2330 int(holddowntimer / 3),
2331 ):
2332 logger.info("Waiting for %s sec..", keepalivetimer)
2333 sleep(keepalivetimer)
2334 sleep(2)
2335 show_bgp_json = run_frr_cmd(
2336 rnode, "show bgp summary json", isjson=True
2337 )
2338
2339 if addr_type == "ipv4":
2340 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
2341 nh_state = ipv4_data[neighbor_ip]["state"]
2342 else:
2343 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
2344 nh_state = ipv6_data[neighbor_ip]["state"]
2345
2346 if timer == holddowntimer:
2347 if nh_state == "Established":
2348 errormsg = (
2349 "BGP neighborship has not gone "
2350 "down in {} sec for neighbor {}".format(
2351 timer, bgp_neighbor
2352 )
2353 )
2354 return errormsg
2355 else:
2356 logger.info(
2357 "BGP neighborship has gone down in"
2358 " %s sec for neighbor %s",
2359 timer,
2360 bgp_neighbor,
2361 )
2362
2363 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2364 return True
2365
2366
2367 @retry(retry_timeout=16)
2368 def verify_bgp_attributes(
2369 tgen,
2370 addr_type,
2371 dut,
2372 static_routes,
2373 rmap_name=None,
2374 input_dict=None,
2375 seq_id=None,
2376 vrf=None,
2377 nexthop=None,
2378 expected=True,
2379 ):
2380 """
2381 API will verify BGP attributes set by Route-map for given prefix and
2382 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2383 in DUT to verify BGP attributes set by route-map, Set attributes
2384 values will be read from input_dict and verified with command output.
2385
2386 * `tgen`: topogen object
2387 * `addr_type` : ip type, ipv4/ipv6
2388 * `dut`: Device Under Test
2389 * `static_routes`: Static Routes for which BGP set attributes needs to be
2390 verified
2391 * `rmap_name`: route map name for which set criteria needs to be verified
2392 * `input_dict`: defines for which router, AS numbers needs
2393 * `seq_id`: sequence number of rmap, default is None
2394 * `expected` : expected results from API, by-default True
2395
2396 Usage
2397 -----
2398 # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2399 for prefix 10.0.20.1/32 in router r3.
2400 input_dict = {
2401 "r3": {
2402 "route_maps": {
2403 "rmap_match_pf_list1": [
2404 {
2405 "action": "PERMIT",
2406 "match": {"prefix_list": "pf_list_1"},
2407 "set": {"localpref": 150, "med": 30}
2408 }
2409 ],
2410 },
2411 "as_path": "500 400"
2412 }
2413 }
2414 static_routes (list) = ["10.0.20.1/32"]
2415
2416
2417
2418 Returns
2419 -------
2420 errormsg(str) or True
2421 """
2422
2423 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2424 for router, rnode in tgen.routers().items():
2425 if router != dut:
2426 continue
2427
2428 logger.info("Verifying BGP set attributes for dut {}:".format(router))
2429
2430 for static_route in static_routes:
2431 if vrf:
2432 cmd = "show bgp vrf {} {} {} json".format(vrf, addr_type, static_route)
2433 else:
2434 cmd = "show bgp {} {} json".format(addr_type, static_route)
2435 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2436
2437 dict_to_test = []
2438 tmp_list = []
2439
2440 dict_list = list(input_dict.values())[0]
2441
2442 if "route_maps" in dict_list:
2443 for rmap_router in input_dict.keys():
2444 for rmap, values in input_dict[rmap_router]["route_maps"].items():
2445 if rmap == rmap_name:
2446 dict_to_test = values
2447 for rmap_dict in values:
2448 if seq_id is not None:
2449 if type(seq_id) is not list:
2450 seq_id = [seq_id]
2451
2452 if "seq_id" in rmap_dict:
2453 rmap_seq_id = rmap_dict["seq_id"]
2454 for _seq_id in seq_id:
2455 if _seq_id == rmap_seq_id:
2456 tmp_list.append(rmap_dict)
2457 if tmp_list:
2458 dict_to_test = tmp_list
2459
2460 value = None
2461 for rmap_dict in dict_to_test:
2462 if "set" in rmap_dict:
2463 for criteria in rmap_dict["set"].keys():
2464 found = False
2465 for path in show_bgp_json["paths"]:
2466 if criteria not in path:
2467 continue
2468
2469 if criteria == "aspath":
2470 value = path[criteria]["string"]
2471 else:
2472 value = path[criteria]
2473
2474 if rmap_dict["set"][criteria] == value:
2475 found = True
2476 logger.info(
2477 "Verifying BGP "
2478 "attribute {} for"
2479 " route: {} in "
2480 "router: {}, found"
2481 " expected value:"
2482 " {}".format(
2483 criteria,
2484 static_route,
2485 dut,
2486 value,
2487 )
2488 )
2489 break
2490
2491 if not found:
2492 errormsg = (
2493 "Failed: Verifying BGP "
2494 "attribute {} for route:"
2495 " {} in router: {}, "
2496 " expected value: {} but"
2497 " found: {}".format(
2498 criteria,
2499 static_route,
2500 dut,
2501 rmap_dict["set"][criteria],
2502 value,
2503 )
2504 )
2505 return errormsg
2506
2507 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2508 return True
2509
2510
2511 @retry(retry_timeout=8)
2512 def verify_best_path_as_per_bgp_attribute(
2513 tgen, addr_type, router, input_dict, attribute, expected=True
2514 ):
2515 """
2516 API is to verify best path according to BGP attributes for given routes.
2517 "show bgp ipv4/6 json" command will be run and verify best path according
2518 to shortest as-path, highest local-preference and med, lowest weight and
2519 route origin IGP>EGP>INCOMPLETE.
2520 Parameters
2521 ----------
2522 * `tgen` : topogen object
2523 * `addr_type` : ip type, ipv4/ipv6
2524 * `tgen` : topogen object
2525 * `attribute` : calculate best path using this attribute
2526 * `input_dict`: defines different routes to calculate for which route
2527 best path is selected
2528 * `expected` : expected results from API, by-default True
2529
2530 Usage
2531 -----
2532 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2533 router r7 to router r1(DUT) as per shortest as-path attribute
2534 input_dict = {
2535 "r7": {
2536 "bgp": {
2537 "address_family": {
2538 "ipv4": {
2539 "unicast": {
2540 "advertise_networks": [
2541 {
2542 "network": "200.50.2.0/32"
2543 },
2544 {
2545 "network": "200.60.2.0/32"
2546 }
2547 ]
2548 }
2549 }
2550 }
2551 }
2552 }
2553 }
2554 attribute = "locPrf"
2555 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2556 input_dict, attribute)
2557 Returns
2558 -------
2559 errormsg(str) or True
2560 """
2561
2562 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2563
2564 if router not in tgen.routers():
2565 return False
2566
2567 rnode = tgen.routers()[router]
2568
2569 # Verifying show bgp json
2570 command = "show bgp"
2571
2572 sleep(2)
2573 logger.info("Verifying router %s RIB for best path:", router)
2574
2575 static_route = False
2576 advertise_network = False
2577 for route_val in input_dict.values():
2578 if "static_routes" in route_val:
2579 static_route = True
2580 networks = route_val["static_routes"]
2581 else:
2582 advertise_network = True
2583 net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
2584 networks = net_data["advertise_networks"]
2585
2586 for network in networks:
2587 _network = network["network"]
2588 no_of_ip = network.setdefault("no_of_ip", 1)
2589 vrf = network.setdefault("vrf", None)
2590
2591 if vrf:
2592 cmd = "{} vrf {}".format(command, vrf)
2593 else:
2594 cmd = command
2595
2596 cmd = "{} {}".format(cmd, addr_type)
2597 cmd = "{} json".format(cmd)
2598 sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2599
2600 routes = generate_ips(_network, no_of_ip)
2601 for route in routes:
2602 route = str(ipaddress.ip_network(frr_unicode(route)))
2603
2604 if route in sh_ip_bgp_json["routes"]:
2605 route_attributes = sh_ip_bgp_json["routes"][route]
2606 _next_hop = None
2607 compare = None
2608 attribute_dict = {}
2609 for route_attribute in route_attributes:
2610 next_hops = route_attribute["nexthops"]
2611 for next_hop in next_hops:
2612 next_hop_ip = next_hop["ip"]
2613 attribute_dict[next_hop_ip] = route_attribute[attribute]
2614
2615 # AS_PATH attribute
2616 if attribute == "path":
2617 # Find next_hop for the route have minimum as_path
2618 _next_hop = min(
2619 attribute_dict, key=lambda x: len(set(attribute_dict[x]))
2620 )
2621 compare = "SHORTEST"
2622
2623 # LOCAL_PREF attribute
2624 elif attribute == "locPrf":
2625 # Find next_hop for the route have highest local preference
2626 _next_hop = max(
2627 attribute_dict, key=(lambda k: attribute_dict[k])
2628 )
2629 compare = "HIGHEST"
2630
2631 # WEIGHT attribute
2632 elif attribute == "weight":
2633 # Find next_hop for the route have highest weight
2634 _next_hop = max(
2635 attribute_dict, key=(lambda k: attribute_dict[k])
2636 )
2637 compare = "HIGHEST"
2638
2639 # ORIGIN attribute
2640 elif attribute == "origin":
2641 # Find next_hop for the route have IGP as origin, -
2642 # - rule is IGP>EGP>INCOMPLETE
2643 _next_hop = [
2644 key
2645 for (key, value) in attribute_dict.items()
2646 if value == "IGP"
2647 ][0]
2648 compare = ""
2649
2650 # MED attribute
2651 elif attribute == "metric":
2652 # Find next_hop for the route have LOWEST MED
2653 _next_hop = min(
2654 attribute_dict, key=(lambda k: attribute_dict[k])
2655 )
2656 compare = "LOWEST"
2657
2658 # Show ip route
2659 if addr_type == "ipv4":
2660 command_1 = "show ip route"
2661 else:
2662 command_1 = "show ipv6 route"
2663
2664 if vrf:
2665 cmd = "{} vrf {} json".format(command_1, vrf)
2666 else:
2667 cmd = "{} json".format(command_1)
2668
2669 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2670
2671 # Verifying output dictionary rib_routes_json is not empty
2672 if not bool(rib_routes_json):
2673 errormsg = "No route found in RIB of router {}..".format(router)
2674 return errormsg
2675
2676 st_found = False
2677 nh_found = False
2678 # Find best is installed in RIB
2679 if route in rib_routes_json:
2680 st_found = True
2681 # Verify next_hop in rib_routes_json
2682 if (
2683 rib_routes_json[route][0]["nexthops"][0]["ip"]
2684 in attribute_dict
2685 ):
2686 nh_found = True
2687 else:
2688 errormsg = (
2689 "Incorrect Nexthop for BGP route {} in "
2690 "RIB of router {}, Expected: {}, Found:"
2691 " {}\n".format(
2692 route,
2693 router,
2694 rib_routes_json[route][0]["nexthops"][0]["ip"],
2695 _next_hop,
2696 )
2697 )
2698 return errormsg
2699
2700 if st_found and nh_found:
2701 logger.info(
2702 "Best path for prefix: %s with next_hop: %s is "
2703 "installed according to %s %s: (%s) in RIB of "
2704 "router %s",
2705 route,
2706 _next_hop,
2707 compare,
2708 attribute,
2709 attribute_dict[_next_hop],
2710 router,
2711 )
2712
2713 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2714 return True
2715
2716
2717 @retry(retry_timeout=10)
2718 def verify_best_path_as_per_admin_distance(
2719 tgen, addr_type, router, input_dict, attribute, expected=True, vrf=None
2720 ):
2721 """
2722 API is to verify best path according to admin distance for given
2723 route. "show ip/ipv6 route json" command will be run and verify
2724 best path accoring to shortest admin distanc.
2725
2726 Parameters
2727 ----------
2728 * `addr_type` : ip type, ipv4/ipv6
2729 * `dut`: Device Under Test
2730 * `tgen` : topogen object
2731 * `attribute` : calculate best path using admin distance
2732 * `input_dict`: defines different routes with different admin distance
2733 to calculate for which route best path is selected
2734 * `expected` : expected results from API, by-default True
2735 * `vrf`: Pass vrf name check for perticular vrf.
2736
2737 Usage
2738 -----
2739 # To verify best path for route 200.50.2.0/32 from router r2 to
2740 router r1(DUT) as per shortest admin distance which is 60.
2741 input_dict = {
2742 "r2": {
2743 "static_routes": [{"network": "200.50.2.0/32", \
2744 "admin_distance": 80, "next_hop": "10.0.0.14"},
2745 {"network": "200.50.2.0/32", \
2746 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2747 }}
2748 attribute = "locPrf"
2749 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2750 input_dict, attribute):
2751 Returns
2752 -------
2753 errormsg(str) or True
2754 """
2755
2756 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2757 router_list = tgen.routers()
2758 if router not in router_list:
2759 return False
2760
2761 rnode = tgen.routers()[router]
2762
2763 sleep(5)
2764 logger.info("Verifying router %s RIB for best path:", router)
2765
2766 # Show ip route cmd
2767 if addr_type == "ipv4":
2768 command = "show ip route"
2769 else:
2770 command = "show ipv6 route"
2771
2772 if vrf:
2773 command = "{} vrf {} json".format(command, vrf)
2774 else:
2775 command = "{} json".format(command)
2776
2777 for routes_from_router in input_dict.keys():
2778 sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
2779 command, isjson=True
2780 )
2781 networks = input_dict[routes_from_router]["static_routes"]
2782 for network in networks:
2783 route = network["network"]
2784
2785 route_attributes = sh_ip_route_json[route]
2786 _next_hop = None
2787 compare = None
2788 attribute_dict = {}
2789 for route_attribute in route_attributes:
2790 next_hops = route_attribute["nexthops"]
2791 for next_hop in next_hops:
2792 next_hop_ip = next_hop["ip"]
2793 attribute_dict[next_hop_ip] = route_attribute["distance"]
2794
2795 # Find next_hop for the route have LOWEST Admin Distance
2796 _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
2797 compare = "LOWEST"
2798
2799 # Show ip route
2800 rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
2801
2802 # Verifying output dictionary rib_routes_json is not empty
2803 if not bool(rib_routes_json):
2804 errormsg = "No route found in RIB of router {}..".format(router)
2805 return errormsg
2806
2807 st_found = False
2808 nh_found = False
2809 # Find best is installed in RIB
2810 if route in rib_routes_json:
2811 st_found = True
2812 # Verify next_hop in rib_routes_json
2813 if [
2814 nh
2815 for nh in rib_routes_json[route][0]["nexthops"]
2816 if nh["ip"] == _next_hop
2817 ]:
2818 nh_found = True
2819 else:
2820 errormsg = (
2821 "Nexthop {} is Missing for BGP route {}"
2822 " in RIB of router {}\n".format(_next_hop, route, router)
2823 )
2824 return errormsg
2825
2826 if st_found and nh_found:
2827 logger.info(
2828 "Best path for prefix: %s is installed according"
2829 " to %s %s: (%s) in RIB of router %s",
2830 route,
2831 compare,
2832 attribute,
2833 attribute_dict[_next_hop],
2834 router,
2835 )
2836
2837 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2838 return True
2839
2840
2841 @retry(retry_timeout=30)
2842 def verify_bgp_rib(
2843 tgen,
2844 addr_type,
2845 dut,
2846 input_dict,
2847 next_hop=None,
2848 aspath=None,
2849 multi_nh=None,
2850 expected=True,
2851 ):
2852 """
2853 This API is to verify whether bgp rib has any
2854 matching route for a nexthop.
2855
2856 Parameters
2857 ----------
2858 * `tgen`: topogen object
2859 * `dut`: input dut router name
2860 * `addr_type` : ip type ipv4/ipv6
2861 * `input_dict` : input dict, has details of static routes
2862 * `next_hop`[optional]: next_hop which needs to be verified,
2863 default = static
2864 * 'aspath'[optional]: aspath which needs to be verified
2865 * `expected` : expected results from API, by-default True
2866
2867 Usage
2868 -----
2869 dut = 'r1'
2870 next_hop = "192.168.1.10"
2871 input_dict = topo['routers']
2872 aspath = "100 200 300"
2873 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2874 next_hop, aspath)
2875
2876 Returns
2877 -------
2878 errormsg(str) or True
2879 """
2880
2881 logger.debug("Entering lib API: verify_bgp_rib()")
2882
2883 router_list = tgen.routers()
2884 additional_nexthops_in_required_nhs = []
2885 list1 = []
2886 list2 = []
2887 found_hops = []
2888 for routerInput in input_dict.keys():
2889 for router, rnode in router_list.items():
2890 if router != dut:
2891 continue
2892
2893 # Verifying RIB routes
2894 command = "show bgp"
2895
2896 # Static routes
2897 sleep(2)
2898 logger.info("Checking router {} BGP RIB:".format(dut))
2899
2900 if "static_routes" in input_dict[routerInput]:
2901 static_routes = input_dict[routerInput]["static_routes"]
2902
2903 for static_route in static_routes:
2904 found_routes = []
2905 missing_routes = []
2906 st_found = False
2907 nh_found = False
2908
2909 vrf = static_route.setdefault("vrf", None)
2910 community = static_route.setdefault("community", None)
2911 largeCommunity = static_route.setdefault("largeCommunity", None)
2912
2913 if vrf:
2914 cmd = "{} vrf {} {}".format(command, vrf, addr_type)
2915
2916 if community:
2917 cmd = "{} community {}".format(cmd, community)
2918
2919 if largeCommunity:
2920 cmd = "{} large-community {}".format(cmd, largeCommunity)
2921 else:
2922 cmd = "{} {}".format(command, addr_type)
2923
2924 cmd = "{} json".format(cmd)
2925
2926 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2927
2928 # Verifying output dictionary rib_routes_json is not empty
2929 if bool(rib_routes_json) == False:
2930 errormsg = "No route found in rib of router {}..".format(router)
2931 return errormsg
2932
2933 network = static_route["network"]
2934
2935 if "no_of_ip" in static_route:
2936 no_of_ip = static_route["no_of_ip"]
2937 else:
2938 no_of_ip = 1
2939
2940 # Generating IPs for verification
2941 ip_list = generate_ips(network, no_of_ip)
2942
2943 for st_rt in ip_list:
2944 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
2945
2946 _addr_type = validate_ip_address(st_rt)
2947 if _addr_type != addr_type:
2948 continue
2949
2950 if st_rt in rib_routes_json["routes"]:
2951 st_found = True
2952 found_routes.append(st_rt)
2953
2954 if next_hop and multi_nh and st_found:
2955 if type(next_hop) is not list:
2956 next_hop = [next_hop]
2957 list1 = next_hop
2958
2959 for mnh in range(
2960 0, len(rib_routes_json["routes"][st_rt])
2961 ):
2962 found_hops.append(
2963 [
2964 rib_r["ip"]
2965 for rib_r in rib_routes_json["routes"][
2966 st_rt
2967 ][mnh]["nexthops"]
2968 ]
2969 )
2970 for mnh in found_hops:
2971 for each_nh_in_multipath in mnh:
2972 list2.append(each_nh_in_multipath)
2973 if found_hops[0]:
2974 missing_list_of_nexthops = set(list2).difference(
2975 list1
2976 )
2977 additional_nexthops_in_required_nhs = set(
2978 list1
2979 ).difference(list2)
2980
2981 if list2:
2982 if additional_nexthops_in_required_nhs:
2983 logger.info(
2984 "Missing nexthop %s for route"
2985 " %s in RIB of router %s\n",
2986 additional_nexthops_in_required_nhs,
2987 st_rt,
2988 dut,
2989 )
2990 else:
2991 nh_found = True
2992
2993 elif next_hop and multi_nh is None:
2994 if type(next_hop) is not list:
2995 next_hop = [next_hop]
2996 list1 = next_hop
2997 found_hops = [
2998 rib_r["ip"]
2999 for rib_r in rib_routes_json["routes"][st_rt][0][
3000 "nexthops"
3001 ]
3002 ]
3003 list2 = found_hops
3004 missing_list_of_nexthops = set(list2).difference(list1)
3005 additional_nexthops_in_required_nhs = set(
3006 list1
3007 ).difference(list2)
3008
3009 if list2:
3010 if additional_nexthops_in_required_nhs:
3011 logger.info(
3012 "Missing nexthop %s for route"
3013 " %s in RIB of router %s\n",
3014 additional_nexthops_in_required_nhs,
3015 st_rt,
3016 dut,
3017 )
3018 errormsg = (
3019 "Nexthop {} is Missing for "
3020 "route {} in RIB of router {}\n".format(
3021 additional_nexthops_in_required_nhs,
3022 st_rt,
3023 dut,
3024 )
3025 )
3026 return errormsg
3027 else:
3028 nh_found = True
3029
3030 if aspath:
3031 found_paths = rib_routes_json["routes"][st_rt][0][
3032 "path"
3033 ]
3034 if aspath == found_paths:
3035 aspath_found = True
3036 logger.info(
3037 "Found AS path {} for route"
3038 " {} in RIB of router "
3039 "{}\n".format(aspath, st_rt, dut)
3040 )
3041 else:
3042 errormsg = (
3043 "AS Path {} is missing for route"
3044 "for route {} in RIB of router {}\n".format(
3045 aspath, st_rt, dut
3046 )
3047 )
3048 return errormsg
3049
3050 else:
3051 missing_routes.append(st_rt)
3052
3053 if nh_found:
3054 logger.info(
3055 "Found next_hop {} for all bgp"
3056 " routes in RIB of"
3057 " router {}\n".format(next_hop, router)
3058 )
3059
3060 if len(missing_routes) > 0:
3061 errormsg = (
3062 "Missing route in RIB of router {}, "
3063 "routes: {}\n".format(dut, missing_routes)
3064 )
3065 return errormsg
3066
3067 if found_routes:
3068 logger.info(
3069 "Verified routes in router {} BGP RIB, "
3070 "found routes are: {} \n".format(dut, found_routes)
3071 )
3072 continue
3073
3074 if "bgp" not in input_dict[routerInput]:
3075 continue
3076
3077 # Advertise networks
3078 bgp_data_list = input_dict[routerInput]["bgp"]
3079
3080 if type(bgp_data_list) is not list:
3081 bgp_data_list = [bgp_data_list]
3082
3083 for bgp_data in bgp_data_list:
3084 vrf_id = bgp_data.setdefault("vrf", None)
3085 if vrf_id:
3086 cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
3087 else:
3088 cmd = "{} {}".format(command, addr_type)
3089
3090 cmd = "{} json".format(cmd)
3091
3092 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3093
3094 # Verifying output dictionary rib_routes_json is not empty
3095 if bool(rib_routes_json) == False:
3096 errormsg = "No route found in rib of router {}..".format(router)
3097 return errormsg
3098
3099 bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
3100 advertise_network = bgp_net_advertise.setdefault(
3101 "advertise_networks", []
3102 )
3103
3104 for advertise_network_dict in advertise_network:
3105 found_routes = []
3106 missing_routes = []
3107 found = False
3108
3109 network = advertise_network_dict["network"]
3110
3111 if "no_of_network" in advertise_network_dict:
3112 no_of_network = advertise_network_dict["no_of_network"]
3113 else:
3114 no_of_network = 1
3115
3116 # Generating IPs for verification
3117 ip_list = generate_ips(network, no_of_network)
3118
3119 for st_rt in ip_list:
3120 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt)))
3121
3122 _addr_type = validate_ip_address(st_rt)
3123 if _addr_type != addr_type:
3124 continue
3125
3126 if st_rt in rib_routes_json["routes"]:
3127 found = True
3128 found_routes.append(st_rt)
3129 else:
3130 found = False
3131 missing_routes.append(st_rt)
3132
3133 if len(missing_routes) > 0:
3134 errormsg = (
3135 "Missing route in BGP RIB of router {},"
3136 " are: {}\n".format(dut, missing_routes)
3137 )
3138 return errormsg
3139
3140 if found_routes:
3141 logger.info(
3142 "Verified routes in router {} BGP RIB, found "
3143 "routes are: {}\n".format(dut, found_routes)
3144 )
3145
3146 logger.debug("Exiting lib API: verify_bgp_rib()")
3147 return True
3148
3149
3150 @retry(retry_timeout=10)
3151 def verify_graceful_restart(
3152 tgen, topo, addr_type, input_dict, dut, peer, expected=True
3153 ):
3154 """
3155 This API is to verify verify_graceful_restart configuration of DUT and
3156 cross verify the same from the peer bgp routerrouter.
3157
3158 Parameters
3159 ----------
3160 * `tgen`: topogen object
3161 * `topo`: input json file data
3162 * `addr_type` : ip type ipv4/ipv6
3163 * `input_dict`: input dictionary, have details of Device Under Test, for
3164 which user wants to test the data
3165 * `dut`: input dut router name
3166 * `peer`: input peer router name
3167 * `expected` : expected results from API, by-default True
3168
3169 Usage
3170 -----
3171 "r1": {
3172 "bgp": {
3173 "address_family": {
3174 "ipv4": {
3175 "unicast": {
3176 "neighbor": {
3177 "r3": {
3178 "dest_link":{
3179 "r1": {
3180 "graceful-restart": True
3181 }
3182 }
3183 }
3184 }
3185 }
3186 },
3187 "ipv6": {
3188 "unicast": {
3189 "neighbor": {
3190 "r3": {
3191 "dest_link":{
3192 "r1": {
3193 "graceful-restart": True
3194 }
3195 }
3196 }
3197 }
3198 }
3199 }
3200 }
3201 }
3202 }
3203 }
3204
3205 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
3206 dut = "r1", peer = 'r2')
3207 Returns
3208 -------
3209 errormsg(str) or True
3210 """
3211
3212 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3213
3214 for router, rnode in tgen.routers().items():
3215 if router != dut:
3216 continue
3217
3218 try:
3219 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3220 except TypeError:
3221 bgp_addr_type = topo["routers"][dut]["bgp"][0]["address_family"]
3222
3223 # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3224
3225 if addr_type in bgp_addr_type:
3226 if not check_address_types(addr_type):
3227 continue
3228
3229 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3230
3231 for bgp_neighbor, peer_data in bgp_neighbors.items():
3232 if bgp_neighbor != peer:
3233 continue
3234
3235 for dest_link, peer_dict in peer_data["dest_link"].items():
3236 data = topo["routers"][bgp_neighbor]["links"]
3237
3238 if dest_link in data:
3239 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3240
3241 logger.info(
3242 "[DUT: {}]: Checking bgp graceful-restart show"
3243 " o/p {}".format(dut, neighbor_ip)
3244 )
3245
3246 show_bgp_graceful_json = None
3247
3248 show_bgp_graceful_json = run_frr_cmd(
3249 rnode,
3250 "show bgp {} neighbor {} graceful-restart json".format(
3251 addr_type, neighbor_ip
3252 ),
3253 isjson=True,
3254 )
3255
3256 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3257
3258 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3259 logger.info(
3260 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3261 )
3262 else:
3263 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3264 dut, neighbor_ip
3265 )
3266 return errormsg
3267
3268 lmode = None
3269 rmode = None
3270 # Local GR mode
3271 if "address_family" in input_dict[dut]["bgp"]:
3272 bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][
3273 "unicast"
3274 ]["neighbor"][peer]["dest_link"]
3275
3276 for dest_link, data in bgp_neighbors.items():
3277 if (
3278 "graceful-restart-helper" in data
3279 and data["graceful-restart-helper"]
3280 ):
3281 lmode = "Helper"
3282 elif "graceful-restart" in data and data["graceful-restart"]:
3283 lmode = "Restart"
3284 elif (
3285 "graceful-restart-disable" in data
3286 and data["graceful-restart-disable"]
3287 ):
3288 lmode = "Disable"
3289 else:
3290 lmode = None
3291
3292 if lmode is None:
3293 if "graceful-restart" in input_dict[dut]["bgp"]:
3294
3295 if (
3296 "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
3297 and input_dict[dut]["bgp"]["graceful-restart"][
3298 "graceful-restart"
3299 ]
3300 ):
3301 lmode = "Restart*"
3302 elif (
3303 "graceful-restart-disable"
3304 in input_dict[dut]["bgp"]["graceful-restart"]
3305 and input_dict[dut]["bgp"]["graceful-restart"][
3306 "graceful-restart-disable"
3307 ]
3308 ):
3309 lmode = "Disable*"
3310 else:
3311 lmode = "Helper*"
3312 else:
3313 lmode = "Helper*"
3314
3315 if lmode == "Disable" or lmode == "Disable*":
3316 return True
3317
3318 # Remote GR mode
3319 if "address_family" in input_dict[peer]["bgp"]:
3320 bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][
3321 "unicast"
3322 ]["neighbor"][dut]["dest_link"]
3323
3324 for dest_link, data in bgp_neighbors.items():
3325 if (
3326 "graceful-restart-helper" in data
3327 and data["graceful-restart-helper"]
3328 ):
3329 rmode = "Helper"
3330 elif "graceful-restart" in data and data["graceful-restart"]:
3331 rmode = "Restart"
3332 elif (
3333 "graceful-restart-disable" in data
3334 and data["graceful-restart-disable"]
3335 ):
3336 rmode = "Disable"
3337 else:
3338 rmode = None
3339
3340 if rmode is None:
3341 if "graceful-restart" in input_dict[peer]["bgp"]:
3342
3343 if (
3344 "graceful-restart"
3345 in input_dict[peer]["bgp"]["graceful-restart"]
3346 and input_dict[peer]["bgp"]["graceful-restart"][
3347 "graceful-restart"
3348 ]
3349 ):
3350 rmode = "Restart"
3351 elif (
3352 "graceful-restart-disable"
3353 in input_dict[peer]["bgp"]["graceful-restart"]
3354 and input_dict[peer]["bgp"]["graceful-restart"][
3355 "graceful-restart-disable"
3356 ]
3357 ):
3358 rmode = "Disable"
3359 else:
3360 rmode = "Helper"
3361 else:
3362 rmode = "Helper"
3363
3364 if show_bgp_graceful_json_out["localGrMode"] == lmode:
3365 logger.info(
3366 "[DUT: {}]: localGrMode : {} ".format(
3367 dut, show_bgp_graceful_json_out["localGrMode"]
3368 )
3369 )
3370 else:
3371 errormsg = (
3372 "[DUT: {}]: localGrMode is not correct"
3373 " Expected: {}, Found: {}".format(
3374 dut, lmode, show_bgp_graceful_json_out["localGrMode"]
3375 )
3376 )
3377 return errormsg
3378
3379 if show_bgp_graceful_json_out["remoteGrMode"] == rmode:
3380 logger.info(
3381 "[DUT: {}]: remoteGrMode : {} ".format(
3382 dut, show_bgp_graceful_json_out["remoteGrMode"]
3383 )
3384 )
3385 elif (
3386 show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable"
3387 and rmode == "Disable"
3388 ):
3389 logger.info(
3390 "[DUT: {}]: remoteGrMode : {} ".format(
3391 dut, show_bgp_graceful_json_out["remoteGrMode"]
3392 )
3393 )
3394 else:
3395 errormsg = (
3396 "[DUT: {}]: remoteGrMode is not correct"
3397 " Expected: {}, Found: {}".format(
3398 dut, rmode, show_bgp_graceful_json_out["remoteGrMode"]
3399 )
3400 )
3401 return errormsg
3402
3403 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3404 return True
3405
3406
3407 @retry(retry_timeout=10)
3408 def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3409 """
3410 This API is to verify r_bit in the BGP gr capability advertised
3411 by the neighbor router
3412
3413 Parameters
3414 ----------
3415 * `tgen`: topogen object
3416 * `topo`: input json file data
3417 * `addr_type` : ip type ipv4/ipv6
3418 * `input_dict`: input dictionary, have details of Device Under Test, for
3419 which user wants to test the data
3420 * `dut`: input dut router name
3421 * `peer`: peer name
3422 * `expected` : expected results from API, by-default True
3423
3424 Usage
3425 -----
3426 input_dict = {
3427 "r1": {
3428 "bgp": {
3429 "address_family": {
3430 "ipv4": {
3431 "unicast": {
3432 "neighbor": {
3433 "r3": {
3434 "dest_link":{
3435 "r1": {
3436 "graceful-restart": True
3437 }
3438 }
3439 }
3440 }
3441 }
3442 },
3443 "ipv6": {
3444 "unicast": {
3445 "neighbor": {
3446 "r3": {
3447 "dest_link":{
3448 "r1": {
3449 "graceful-restart": True
3450 }
3451 }
3452 }
3453 }
3454 }
3455 }
3456 }
3457 }
3458 }
3459 }
3460 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3461
3462 Returns
3463 -------
3464 errormsg(str) or True
3465 """
3466
3467 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3468
3469 for router, rnode in tgen.routers().items():
3470 if router != dut:
3471 continue
3472
3473 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3474
3475 if addr_type in bgp_addr_type:
3476 if not check_address_types(addr_type):
3477 continue
3478
3479 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3480
3481 for bgp_neighbor, peer_data in bgp_neighbors.items():
3482 if bgp_neighbor != peer:
3483 continue
3484
3485 for dest_link, peer_dict in peer_data["dest_link"].items():
3486 data = topo["routers"][bgp_neighbor]["links"]
3487
3488 if dest_link in data:
3489 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3490
3491 logger.info(
3492 "[DUT: {}]: Checking bgp graceful-restart show"
3493 " o/p {}".format(dut, neighbor_ip)
3494 )
3495
3496 show_bgp_graceful_json = run_frr_cmd(
3497 rnode,
3498 "show bgp {} neighbor {} graceful-restart json".format(
3499 addr_type, neighbor_ip
3500 ),
3501 isjson=True,
3502 )
3503
3504 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3505
3506 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3507 logger.info(
3508 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3509 )
3510 else:
3511 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3512 dut, neighbor_ip
3513 )
3514 return errormsg
3515
3516 if "rBit" in show_bgp_graceful_json_out:
3517 if show_bgp_graceful_json_out["rBit"]:
3518 logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip))
3519 else:
3520 errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
3521 return errormsg
3522
3523 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3524 return True
3525
3526
3527 @retry(retry_timeout=10)
3528 def verify_eor(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3529 """
3530 This API is to verify EOR
3531
3532 Parameters
3533 ----------
3534 * `tgen`: topogen object
3535 * `topo`: input json file data
3536 * `addr_type` : ip type ipv4/ipv6
3537 * `input_dict`: input dictionary, have details of DUT, for
3538 which user wants to test the data
3539 * `dut`: input dut router name
3540 * `peer`: peer name
3541 Usage
3542 -----
3543 input_dict = {
3544 input_dict = {
3545 "r1": {
3546 "bgp": {
3547 "address_family": {
3548 "ipv4": {
3549 "unicast": {
3550 "neighbor": {
3551 "r3": {
3552 "dest_link":{
3553 "r1": {
3554 "graceful-restart": True
3555 }
3556 }
3557 }
3558 }
3559 }
3560 },
3561 "ipv6": {
3562 "unicast": {
3563 "neighbor": {
3564 "r3": {
3565 "dest_link":{
3566 "r1": {
3567 "graceful-restart": True
3568 }
3569 }
3570 }
3571 }
3572 }
3573 }
3574 }
3575 }
3576 }
3577 }
3578
3579 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3580
3581 Returns
3582 -------
3583 errormsg(str) or True
3584 """
3585 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3586
3587 for router, rnode in tgen.routers().items():
3588 if router != dut:
3589 continue
3590
3591 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3592
3593 if addr_type in bgp_addr_type:
3594 if not check_address_types(addr_type):
3595 continue
3596
3597 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3598
3599 for bgp_neighbor, peer_data in bgp_neighbors.items():
3600 if bgp_neighbor != peer:
3601 continue
3602
3603 for dest_link, peer_dict in peer_data["dest_link"].items():
3604 data = topo["routers"][bgp_neighbor]["links"]
3605
3606 if dest_link in data:
3607 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3608
3609 logger.info(
3610 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3611 dut,
3612 neighbor_ip,
3613 )
3614
3615 show_bgp_graceful_json = run_frr_cmd(
3616 rnode,
3617 "show bgp {} neighbor {} graceful-restart json".format(
3618 addr_type, neighbor_ip
3619 ),
3620 isjson=True,
3621 )
3622
3623 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3624
3625 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3626 logger.info("[DUT: %s]: Neighbor ip matched %s", dut, neighbor_ip)
3627 else:
3628 errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3629 dut,
3630 neighbor_ip,
3631 )
3632 return errormsg
3633
3634 if addr_type == "ipv4":
3635 afi = "ipv4Unicast"
3636 elif addr_type == "ipv6":
3637 afi = "ipv6Unicast"
3638 else:
3639 errormsg = "Address type %s is not supported" % (addr_type)
3640 return errormsg
3641
3642 eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
3643 if "endOfRibSend" in eor_json:
3644
3645 if eor_json["endOfRibSend"]:
3646 logger.info(
3647 "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
3648 )
3649 else:
3650 errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
3651 dut,
3652 neighbor_ip,
3653 afi,
3654 )
3655 return errormsg
3656
3657 if "endOfRibRecv" in eor_json:
3658 if eor_json["endOfRibRecv"]:
3659 logger.info(
3660 "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
3661 )
3662 else:
3663 errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
3664 dut,
3665 neighbor_ip,
3666 afi,
3667 )
3668 return errormsg
3669
3670 if "endOfRibSentAfterUpdate" in eor_json:
3671 if eor_json["endOfRibSentAfterUpdate"]:
3672 logger.info(
3673 "[DUT: %s]: EOR SendTime true for %s" " %s",
3674 dut,
3675 neighbor_ip,
3676 afi,
3677 )
3678 else:
3679 errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3680 dut,
3681 neighbor_ip,
3682 afi,
3683 )
3684 return errormsg
3685
3686 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3687 return True
3688
3689
3690 @retry(retry_timeout=8)
3691 def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer, expected=True):
3692 """
3693 This API is to verify f_bit in the BGP gr capability advertised
3694 by the neighbor router
3695
3696 Parameters
3697 ----------
3698 * `tgen`: topogen object
3699 * `topo`: input json file data
3700 * `addr_type` : ip type ipv4/ipv6
3701 * `input_dict`: input dictionary, have details of Device Under Test, for
3702 which user wants to test the data
3703 * `dut`: input dut router name
3704 * `peer`: peer name
3705 * `expected` : expected results from API, by-default True
3706
3707 Usage
3708 -----
3709 input_dict = {
3710 "r1": {
3711 "bgp": {
3712 "address_family": {
3713 "ipv4": {
3714 "unicast": {
3715 "neighbor": {
3716 "r3": {
3717 "dest_link":{
3718 "r1": {
3719 "graceful-restart": True
3720 }
3721 }
3722 }
3723 }
3724 }
3725 },
3726 "ipv6": {
3727 "unicast": {
3728 "neighbor": {
3729 "r3": {
3730 "dest_link":{
3731 "r1": {
3732 "graceful-restart": True
3733 }
3734 }
3735 }
3736 }
3737 }
3738 }
3739 }
3740 }
3741 }
3742 }
3743
3744 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3745
3746 Returns
3747 -------
3748 errormsg(str) or True
3749 """
3750
3751 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3752
3753 for router, rnode in tgen.routers().items():
3754 if router != dut:
3755 continue
3756
3757 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3758
3759 if addr_type in bgp_addr_type:
3760 if not check_address_types(addr_type):
3761 continue
3762
3763 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3764
3765 for bgp_neighbor, peer_data in bgp_neighbors.items():
3766 if bgp_neighbor != peer:
3767 continue
3768
3769 for dest_link, peer_dict in peer_data["dest_link"].items():
3770 data = topo["routers"][bgp_neighbor]["links"]
3771
3772 if dest_link in data:
3773 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3774
3775 logger.info(
3776 "[DUT: {}]: Checking bgp graceful-restart show"
3777 " o/p {}".format(dut, neighbor_ip)
3778 )
3779
3780 show_bgp_graceful_json = run_frr_cmd(
3781 rnode,
3782 "show bgp {} neighbor {} graceful-restart json".format(
3783 addr_type, neighbor_ip
3784 ),
3785 isjson=True,
3786 )
3787
3788 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3789
3790 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3791 logger.info(
3792 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3793 )
3794 else:
3795 errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format(
3796 dut, neighbor_ip
3797 )
3798 return errormsg
3799
3800 if "ipv4Unicast" in show_bgp_graceful_json_out:
3801 if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]:
3802 logger.info(
3803 "[DUT: {}]: Fbit True for {} IPv4"
3804 " Unicast".format(dut, neighbor_ip)
3805 )
3806 else:
3807 errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3808 dut, neighbor_ip
3809 )
3810 return errormsg
3811
3812 elif "ipv6Unicast" in show_bgp_graceful_json_out:
3813 if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]:
3814 logger.info(
3815 "[DUT: {}]: Fbit True for {} IPv6"
3816 " Unicast".format(dut, neighbor_ip)
3817 )
3818 else:
3819 errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3820 dut, neighbor_ip
3821 )
3822 return errormsg
3823 else:
3824 show_bgp_graceful_json_out["ipv4Unicast"]
3825 show_bgp_graceful_json_out["ipv6Unicast"]
3826
3827 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3828 return True
3829
3830
3831 @retry(retry_timeout=10)
3832 def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer):
3833 """
3834 This API is to verify graceful restart timers, configured and received
3835
3836 Parameters
3837 ----------
3838 * `tgen`: topogen object
3839 * `topo`: input json file data
3840 * `addr_type` : ip type ipv4/ipv6
3841 * `input_dict`: input dictionary, have details of Device Under Test,
3842 for which user wants to test the data
3843 * `dut`: input dut router name
3844 * `peer`: peer name
3845 * `expected` : expected results from API, by-default True
3846
3847 Usage
3848 -----
3849 # Configure graceful-restart
3850 input_dict_1 = {
3851 "r1": {
3852 "bgp": {
3853 "bgp_neighbors": {
3854 "r3": {
3855 "graceful-restart": "graceful-restart-helper"
3856 }
3857 },
3858 "gracefulrestart": ["restart-time 150"]
3859 }
3860 },
3861 "r3": {
3862 "bgp": {
3863 "bgp_neighbors": {
3864 "r1": {
3865 "graceful-restart": "graceful-restart"
3866 }
3867 }
3868 }
3869 }
3870 }
3871
3872 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3873
3874 Returns
3875 -------
3876 errormsg(str) or True
3877 """
3878
3879 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3880
3881 for router, rnode in tgen.routers().items():
3882 if router != dut:
3883 continue
3884
3885 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3886
3887 if addr_type in bgp_addr_type:
3888 if not check_address_types(addr_type):
3889 continue
3890
3891 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3892
3893 for bgp_neighbor, peer_data in bgp_neighbors.items():
3894 if bgp_neighbor != peer:
3895 continue
3896
3897 for dest_link, peer_dict in peer_data["dest_link"].items():
3898 data = topo["routers"][bgp_neighbor]["links"]
3899
3900 if dest_link in data:
3901 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3902
3903 logger.info(
3904 "[DUT: {}]: Checking bgp graceful-restart show"
3905 " o/p {}".format(dut, neighbor_ip)
3906 )
3907
3908 show_bgp_graceful_json = run_frr_cmd(
3909 rnode,
3910 "show bgp {} neighbor {} graceful-restart json".format(
3911 addr_type, neighbor_ip
3912 ),
3913 isjson=True,
3914 )
3915
3916 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3917 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3918 logger.info(
3919 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3920 )
3921 else:
3922 errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3923 dut, neighbor_ip
3924 )
3925 return errormsg
3926
3927 # Graceful-restart timer
3928 if "graceful-restart" in input_dict[peer]["bgp"]:
3929 if "timer" in input_dict[peer]["bgp"]["graceful-restart"]:
3930 for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][
3931 "timer"
3932 ].items():
3933 if rs_timer == "restart-time":
3934
3935 receivedTimer = value
3936 if (
3937 show_bgp_graceful_json_out["timers"][
3938 "receivedRestartTimer"
3939 ]
3940 == receivedTimer
3941 ):
3942 logger.info(
3943 "receivedRestartTimer is {}"
3944 " on {} from peer {}".format(
3945 receivedTimer, router, peer
3946 )
3947 )
3948 else:
3949 errormsg = (
3950 "receivedRestartTimer is not"
3951 " as expected {}".format(receivedTimer)
3952 )
3953 return errormsg
3954
3955 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3956 return True
3957
3958
3959 @retry(retry_timeout=8)
3960 def verify_gr_address_family(
3961 tgen, topo, addr_type, addr_family, dut, peer, expected=True
3962 ):
3963 """
3964 This API is to verify gr_address_family in the BGP gr capability advertised
3965 by the neighbor router
3966
3967 Parameters
3968 ----------
3969 * `tgen`: topogen object
3970 * `topo`: input json file data
3971 * `addr_type` : ip type ipv4/ipv6
3972 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3973 * `dut`: input dut router name
3974 * `peer`: input peer router to check
3975 * `expected` : expected results from API, by-default True
3976
3977 Usage
3978 -----
3979
3980 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
3981
3982 Returns
3983 -------
3984 errormsg(str) or None
3985 """
3986
3987 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3988
3989 if not check_address_types(addr_type):
3990 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3991 return
3992
3993 routers = tgen.routers()
3994 if dut not in routers:
3995 return "{} not in routers".format(dut)
3996
3997 rnode = routers[dut]
3998 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3999
4000 if addr_type not in bgp_addr_type:
4001 return "{} not in bgp_addr_types".format(addr_type)
4002
4003 if peer not in bgp_addr_type[addr_type]["unicast"]["neighbor"]:
4004 return "{} not a peer of {} over {}".format(peer, dut, addr_type)
4005
4006 nbr_links = topo["routers"][peer]["links"]
4007 if dut not in nbr_links or addr_type not in nbr_links[dut]:
4008 return "peer {} missing back link to {} over {}".format(peer, dut, addr_type)
4009
4010 neighbor_ip = nbr_links[dut][addr_type].split("/")[0]
4011
4012 logger.info(
4013 "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
4014 dut, neighbor_ip, addr_family
4015 )
4016 )
4017
4018 show_bgp_graceful_json = run_frr_cmd(
4019 rnode,
4020 "show bgp {} neighbor {} graceful-restart json".format(addr_type, neighbor_ip),
4021 isjson=True,
4022 )
4023
4024 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
4025
4026 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
4027 logger.info("Neighbor ip matched {}".format(neighbor_ip))
4028 else:
4029 errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
4030 return errormsg
4031
4032 if addr_family == "ipv4Unicast":
4033 if "ipv4Unicast" in show_bgp_graceful_json_out:
4034 logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
4035 return True
4036 else:
4037 errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
4038 return errormsg
4039
4040 elif addr_family == "ipv6Unicast":
4041 if "ipv6Unicast" in show_bgp_graceful_json_out:
4042 logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
4043 return True
4044 else:
4045 errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
4046 return errormsg
4047 else:
4048 errormsg = "Aaddress family: {} present for {} ".format(
4049 addr_family, neighbor_ip
4050 )
4051 return errormsg
4052
4053 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4054
4055
4056 @retry(retry_timeout=12)
4057 def verify_attributes_for_evpn_routes(
4058 tgen,
4059 topo,
4060 dut,
4061 input_dict,
4062 rd=None,
4063 rt=None,
4064 ethTag=None,
4065 ipLen=None,
4066 rd_peer=None,
4067 rt_peer=None,
4068 expected=True,
4069 ):
4070 """
4071 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
4072 command.
4073 Parameters
4074 ----------
4075 * `tgen`: topogen object
4076 * `topo` : json file data
4077 * `dut` : device under test
4078 * `input_dict`: having details like - for which route, rd value
4079 needs to be verified
4080 * `rd` : route distinguisher
4081 * `rt` : route target
4082 * `ethTag` : Ethernet Tag
4083 * `ipLen` : IP prefix length
4084 * `rd_peer` : Peer name from which RD will be auto-generated
4085 * `rt_peer` : Peer name from which RT will be auto-generated
4086 * `expected` : expected results from API, by-default True
4087
4088 Usage
4089 -----
4090 input_dict_1 = {
4091 "r1": {
4092 "static_routes": [{
4093 "network": [NETWORK1_1[addr_type]],
4094 "next_hop": NEXT_HOP_IP[addr_type],
4095 "vrf": "RED"
4096 }]
4097 }
4098 }
4099
4100 result = verify_attributes_for_evpn_routes(tgen, topo,
4101 input_dict, rd = "10.0.0.33:1")
4102
4103 Returns
4104 -------
4105 errormsg(str) or True
4106 """
4107
4108 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4109 for router in input_dict.keys():
4110 rnode = tgen.routers()[dut]
4111
4112 if "static_routes" in input_dict[router]:
4113 for static_route in input_dict[router]["static_routes"]:
4114 network = static_route["network"]
4115
4116 if "vrf" in static_route:
4117 vrf = static_route["vrf"]
4118
4119 if type(network) is not list:
4120 network = [network]
4121
4122 for route in network:
4123 route = route.split("/")[0]
4124 _addr_type = validate_ip_address(route)
4125 if "v4" in _addr_type:
4126 input_afi = "v4"
4127 elif "v6" in _addr_type:
4128 input_afi = "v6"
4129
4130 cmd = "show bgp l2vpn evpn {} json".format(route)
4131 evpn_rd_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4132 if not bool(evpn_rd_value_json):
4133 errormsg = "No output for '{}' cli".format(cmd)
4134 return errormsg
4135
4136 if rd is not None and rd != "auto":
4137 logger.info(
4138 "[DUT: %s]: Verifying rd value for " "evpn route %s:",
4139 dut,
4140 route,
4141 )
4142
4143 if rd in evpn_rd_value_json:
4144 rd_value_json = evpn_rd_value_json[rd]
4145 if rd_value_json["rd"] != rd:
4146 errormsg = (
4147 "[DUT: %s] Failed: Verifying"
4148 " RD value for EVPN route: %s"
4149 "[FAILED]!!, EXPECTED : %s "
4150 " FOUND : %s"
4151 % (dut, route, rd, rd_value_json["rd"])
4152 )
4153 return errormsg
4154
4155 else:
4156 logger.info(
4157 "[DUT %s]: Verifying RD value for"
4158 " EVPN route: %s [PASSED]|| "
4159 "Found Expected: %s",
4160 dut,
4161 route,
4162 rd,
4163 )
4164 return True
4165
4166 else:
4167 errormsg = (
4168 "[DUT: %s] RD : %s is not present"
4169 " in cli json output" % (dut, rd)
4170 )
4171 return errormsg
4172
4173 if rd == "auto":
4174 logger.info(
4175 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
4176 dut,
4177 route,
4178 )
4179
4180 if rd_peer:
4181 index = 1
4182 vni_dict = {}
4183
4184 rnode = tgen.routers()[rd_peer]
4185 vrfs = topo["routers"][rd_peer]["vrfs"]
4186 for vrf_dict in vrfs:
4187 vni_dict[vrf_dict["name"]] = index
4188 index += 1
4189
4190 show_bgp_json = run_frr_cmd(
4191 rnode, "show bgp vrf all summary json", isjson=True
4192 )
4193
4194 # Verifying output dictionary show_bgp_json is empty
4195 if not bool(show_bgp_json):
4196 errormsg = "BGP is not running"
4197 return errormsg
4198
4199 show_bgp_json_vrf = show_bgp_json[vrf]
4200 for afi, afi_data in show_bgp_json_vrf.items():
4201 if input_afi not in afi:
4202 continue
4203 router_id = afi_data["routerId"]
4204
4205 found = False
4206 rd = "{}:{}".format(router_id, vni_dict[vrf])
4207 for _rd, rd_value_json in evpn_rd_value_json.items():
4208 if (
4209 str(rd_value_json["rd"].split(":")[0])
4210 != rd.split(":")[0]
4211 ):
4212 continue
4213
4214 if int(rd_value_json["rd"].split(":")[1]) > 0:
4215 found = True
4216
4217 if found:
4218 logger.info(
4219 "[DUT %s]: Verifying RD value for"
4220 " EVPN route: %s "
4221 "Found Expected: %s",
4222 dut,
4223 route,
4224 rd_value_json["rd"],
4225 )
4226 return True
4227 else:
4228 errormsg = (
4229 "[DUT: %s] Failed: Verifying"
4230 " RD value for EVPN route: %s"
4231 " FOUND : %s" % (dut, route, rd_value_json["rd"])
4232 )
4233 return errormsg
4234
4235 if rt == "auto":
4236 vni_dict = {}
4237 logger.info(
4238 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
4239 dut,
4240 route,
4241 )
4242
4243 if rt_peer:
4244 rnode = tgen.routers()[rt_peer]
4245 show_bgp_json = run_frr_cmd(
4246 rnode, "show bgp vrf all summary json", isjson=True
4247 )
4248
4249 # Verifying output dictionary show_bgp_json is empty
4250 if not bool(show_bgp_json):
4251 errormsg = "BGP is not running"
4252 return errormsg
4253
4254 show_bgp_json_vrf = show_bgp_json[vrf]
4255 for afi, afi_data in show_bgp_json_vrf.items():
4256 if input_afi not in afi:
4257 continue
4258 as_num = afi_data["as"]
4259
4260 show_vrf_vni_json = run_frr_cmd(
4261 rnode, "show vrf vni json", isjson=True
4262 )
4263
4264 vrfs = show_vrf_vni_json["vrfs"]
4265 for vrf_dict in vrfs:
4266 if vrf_dict["vrf"] == vrf:
4267 vni_dict[vrf_dict["vrf"]] = str(vrf_dict["vni"])
4268
4269 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
4270 # for auto derived RT value.
4271 if as_num > 65535:
4272 as_bin = bin(as_num)
4273 as_bin = as_bin[-16:]
4274 as_num = int(as_bin, 2)
4275
4276 rt = "{}:{}".format(str(as_num), vni_dict[vrf])
4277 for _rd, route_data in evpn_rd_value_json.items():
4278 if route_data["ip"] == route:
4279 for rt_data in route_data["paths"]:
4280 if vni_dict[vrf] == rt_data["vni"]:
4281 rt_string = rt_data["extendedCommunity"][
4282 "string"
4283 ]
4284 rt_input = "RT:{}".format(rt)
4285 if rt_input not in rt_string:
4286 errormsg = (
4287 "[DUT: %s] Failed:"
4288 " Verifying RT "
4289 "value for EVPN "
4290 " route: %s"
4291 "[FAILED]!!,"
4292 " EXPECTED : %s "
4293 " FOUND : %s"
4294 % (dut, route, rt_input, rt_string)
4295 )
4296 return errormsg
4297
4298 else:
4299 logger.info(
4300 "[DUT %s]: Verifying "
4301 "RT value for EVPN "
4302 "route: %s [PASSED]||"
4303 "Found Expected: %s",
4304 dut,
4305 route,
4306 rt_input,
4307 )
4308 return True
4309
4310 else:
4311 errormsg = (
4312 "[DUT: %s] Route : %s is not"
4313 " present in cli json output" % (dut, route)
4314 )
4315 return errormsg
4316
4317 if rt is not None and rt != "auto":
4318 logger.info(
4319 "[DUT: %s]: Verifying rt value for " "evpn route %s:",
4320 dut,
4321 route,
4322 )
4323
4324 if type(rt) is not list:
4325 rt = [rt]
4326
4327 for _rt in rt:
4328 for _rd, route_data in evpn_rd_value_json.items():
4329 if route_data["ip"] == route:
4330 for rt_data in route_data["paths"]:
4331 rt_string = rt_data["extendedCommunity"][
4332 "string"
4333 ]
4334 rt_input = "RT:{}".format(_rt)
4335 if rt_input not in rt_string:
4336 errormsg = (
4337 "[DUT: %s] Failed: "
4338 "Verifying RT value "
4339 "for EVPN route: %s"
4340 "[FAILED]!!,"
4341 " EXPECTED : %s "
4342 " FOUND : %s"
4343 % (dut, route, rt_input, rt_string)
4344 )
4345 return errormsg
4346
4347 else:
4348 logger.info(
4349 "[DUT %s]: Verifying RT"
4350 " value for EVPN route:"
4351 " %s [PASSED]|| "
4352 "Found Expected: %s",
4353 dut,
4354 route,
4355 rt_input,
4356 )
4357 return True
4358
4359 else:
4360 errormsg = (
4361 "[DUT: %s] Route : %s is not"
4362 " present in cli json output" % (dut, route)
4363 )
4364 return errormsg
4365
4366 if ethTag is not None:
4367 logger.info(
4368 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
4369 )
4370
4371 for _rd, route_data in evpn_rd_value_json.items():
4372 if route_data["ip"] == route:
4373 if route_data["ethTag"] != ethTag:
4374 errormsg = (
4375 "[DUT: %s] RD: %s, Failed: "
4376 "Verifying ethTag value "
4377 "for EVPN route: %s"
4378 "[FAILED]!!,"
4379 " EXPECTED : %s "
4380 " FOUND : %s"
4381 % (
4382 dut,
4383 _rd,
4384 route,
4385 ethTag,
4386 route_data["ethTag"],
4387 )
4388 )
4389 return errormsg
4390
4391 else:
4392 logger.info(
4393 "[DUT %s]: RD: %s, Verifying "
4394 "ethTag value for EVPN route:"
4395 " %s [PASSED]|| "
4396 "Found Expected: %s",
4397 dut,
4398 _rd,
4399 route,
4400 ethTag,
4401 )
4402 return True
4403
4404 else:
4405 errormsg = (
4406 "[DUT: %s] RD: %s, Route : %s "
4407 "is not present in cli json "
4408 "output" % (dut, _rd, route)
4409 )
4410 return errormsg
4411
4412 if ipLen is not None:
4413 logger.info(
4414 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4415 )
4416
4417 for _rd, route_data in evpn_rd_value_json.items():
4418 if route_data["ip"] == route:
4419 if route_data["ipLen"] != int(ipLen):
4420 errormsg = (
4421 "[DUT: %s] RD: %s, Failed: "
4422 "Verifying ipLen value "
4423 "for EVPN route: %s"
4424 "[FAILED]!!,"
4425 " EXPECTED : %s "
4426 " FOUND : %s"
4427 % (dut, _rd, route, ipLen, route_data["ipLen"])
4428 )
4429 return errormsg
4430
4431 else:
4432 logger.info(
4433 "[DUT %s]: RD: %s, Verifying "
4434 "ipLen value for EVPN route:"
4435 " %s [PASSED]|| "
4436 "Found Expected: %s",
4437 dut,
4438 _rd,
4439 route,
4440 ipLen,
4441 )
4442 return True
4443
4444 else:
4445 errormsg = (
4446 "[DUT: %s] RD: %s, Route : %s "
4447 "is not present in cli json "
4448 "output " % (dut, _rd, route)
4449 )
4450 return errormsg
4451
4452 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4453 return False
4454
4455
4456 @retry(retry_timeout=10)
4457 def verify_evpn_routes(
4458 tgen, topo, dut, input_dict, routeType=5, EthTag=0, next_hop=None, expected=True
4459 ):
4460 """
4461 API to verify evpn routes using "sh bgp l2vpn evpn"
4462 command.
4463 Parameters
4464 ----------
4465 * `tgen`: topogen object
4466 * `topo` : json file data
4467 * `dut` : device under test
4468 * `input_dict`: having details like - for which route, rd value
4469 needs to be verified
4470 * `route_type` : Route type 5 is supported as of now
4471 * `EthTag` : Ethernet tag, by-default is 0
4472 * `next_hop` : Prefered nexthop for the evpn routes
4473 * `expected` : expected results from API, by-default True
4474
4475 Usage
4476 -----
4477 input_dict_1 = {
4478 "r1": {
4479 "static_routes": [{
4480 "network": [NETWORK1_1[addr_type]],
4481 "next_hop": NEXT_HOP_IP[addr_type],
4482 "vrf": "RED"
4483 }]
4484 }
4485 }
4486 result = verify_evpn_routes(tgen, topo, input_dict)
4487 Returns
4488 -------
4489 errormsg(str) or True
4490 """
4491
4492 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4493
4494 for router in input_dict.keys():
4495 rnode = tgen.routers()[dut]
4496
4497 logger.info("[DUT: %s]: Verifying evpn routes: ", dut)
4498
4499 if "static_routes" in input_dict[router]:
4500 for static_route in input_dict[router]["static_routes"]:
4501 network = static_route["network"]
4502
4503 if type(network) is not list:
4504 network = [network]
4505
4506 missing_routes = {}
4507 for route in network:
4508 rd_keys = 0
4509 ip_len = route.split("/")[1]
4510 route = route.split("/")[0]
4511
4512 prefix = "[{}]:[{}]:[{}]:[{}]".format(
4513 routeType, EthTag, ip_len, route
4514 )
4515
4516 cmd = "show bgp l2vpn evpn route json"
4517 evpn_value_json = run_frr_cmd(rnode, cmd, isjson=True)
4518
4519 if not bool(evpn_value_json):
4520 errormsg = "No output for '{}' cli".format(cmd)
4521 return errormsg
4522
4523 if evpn_value_json["numPrefix"] == 0:
4524 errormsg = "[DUT: %s]: No EVPN prefixes exist" % (dut)
4525 return errormsg
4526
4527 for key, route_data_json in evpn_value_json.items():
4528 if type(route_data_json) is dict:
4529 rd_keys += 1
4530 if prefix not in route_data_json:
4531 missing_routes[key] = prefix
4532
4533 if rd_keys == len(missing_routes.keys()):
4534 errormsg = (
4535 "[DUT: %s]: "
4536 "Missing EVPN routes: "
4537 "%s [FAILED]!!" % (dut, list(set(missing_routes.values())))
4538 )
4539 return errormsg
4540
4541 for key, route_data_json in evpn_value_json.items():
4542 if type(route_data_json) is dict:
4543 if prefix not in route_data_json:
4544 continue
4545
4546 for paths in route_data_json[prefix]["paths"]:
4547 for path in paths:
4548 if path["routeType"] != routeType:
4549 errormsg = (
4550 "[DUT: %s]: "
4551 "Verifying routeType "
4552 "for EVPN route: %s "
4553 "[FAILED]!! "
4554 "Expected: %s, "
4555 "Found: %s"
4556 % (
4557 dut,
4558 prefix,
4559 routeType,
4560 path["routeType"],
4561 )
4562 )
4563 return errormsg
4564
4565 elif next_hop:
4566 for nh_dict in path["nexthops"]:
4567 if nh_dict["ip"] != next_hop:
4568 errormsg = (
4569 "[DUT: %s]: "
4570 "Verifying "
4571 "nexthop for "
4572 "EVPN route: %s"
4573 "[FAILED]!! "
4574 "Expected: %s,"
4575 " Found: %s"
4576 % (
4577 dut,
4578 prefix,
4579 next_hop,
4580 nh_dict["ip"],
4581 )
4582 )
4583 return errormsg
4584
4585 else:
4586 logger.info(
4587 "[DUT %s]: Verifying "
4588 "EVPN route : %s, "
4589 "routeType: %s is "
4590 "installed "
4591 "[PASSED]|| ",
4592 dut,
4593 prefix,
4594 routeType,
4595 )
4596 return True
4597
4598 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4599
4600 return False
4601
4602
4603 @retry(retry_timeout=10)
4604 def verify_bgp_bestpath(tgen, addr_type, input_dict):
4605 """
4606 Verifies bgp next hop values in best-path output
4607
4608 * `dut` : device under test
4609 * `addr_type` : Address type ipv4/ipv6
4610 * `input_dict`: having details like multipath and bestpath
4611
4612 Usage
4613 -----
4614 input_dict_1 = {
4615 "r1": {
4616 "ipv4" : {
4617 "bestpath": "50.0.0.1",
4618 "multipath": ["50.0.0.1", "50.0.0.2"],
4619 "network": "100.0.0.0/24"
4620 }
4621 "ipv6" : {
4622 "bestpath": "1000::1",
4623 "multipath": ["1000::1", "1000::2"]
4624 "network": "2000::1/128"
4625 }
4626 }
4627 }
4628
4629 result = verify_bgp_bestpath(tgen, input_dict)
4630
4631 """
4632
4633 result = False
4634 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4635 for dut in input_dict.keys():
4636 rnode = tgen.routers()[dut]
4637
4638 logger.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut)
4639 result = False
4640 for network_dict in input_dict[dut][addr_type]:
4641 nw_addr = network_dict.setdefault("network", None)
4642 vrf = network_dict.setdefault("vrf", None)
4643 bestpath = network_dict.setdefault("bestpath", None)
4644
4645 if vrf:
4646 cmd = "show bgp vrf {} {} {} bestpath json".format(
4647 vrf, addr_type, nw_addr
4648 )
4649 else:
4650 cmd = "show bgp {} {} bestpath json".format(addr_type, nw_addr)
4651
4652 data = run_frr_cmd(rnode, cmd, isjson=True)
4653 route = data["paths"][0]
4654
4655 if "bestpath" in route:
4656 if route["bestpath"]["overall"] is True:
4657 _bestpath = route["nexthops"][0]["ip"]
4658
4659 if _bestpath != bestpath:
4660 return (
4661 "DUT:[{}] Bestpath do not match for"
4662 " network: {}, Expected "
4663 " {} as bgp bestpath found {}".format(
4664 dut, nw_addr, bestpath, _bestpath
4665 )
4666 )
4667
4668 logger.info(
4669 "DUT:[{}] Found expected bestpath: "
4670 " {} for network: {}".format(dut, _bestpath, nw_addr)
4671 )
4672 result = True
4673
4674 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4675 return result
4676
4677
4678 def verify_tcp_mss(tgen, dut, neighbour, configured_tcp_mss, vrf=None):
4679 """
4680 This api is used to verify the tcp-mss value assigned to a neigbour of DUT
4681
4682 Parameters
4683 ----------
4684 * `tgen` : topogen object
4685 * `dut`: device under test
4686 * `neighbour`:neigbout IP address
4687 * `configured_tcp_mss`:The TCP-MSS value to be verified
4688 * `vrf`:vrf
4689
4690 Usage
4691 -----
4692 result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss)
4693 Returns
4694 -------
4695 errormsg(str) or True
4696 """
4697
4698 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4699 rnode = tgen.routers()[dut]
4700 if vrf:
4701 cmd = "show bgp vrf {} neighbors {} json".format(vrf, neighbour)
4702 else:
4703 cmd = "show bgp neighbors {} json".format(neighbour)
4704
4705 # Execute the command
4706 show_vrf_stats = run_frr_cmd(rnode, cmd, isjson=True)
4707
4708 # Verify TCP-MSS on router
4709 logger.info("Verify that no core is observed")
4710 if tgen.routers_have_failure():
4711 errormsg = "Core observed while running CLI: %s" % (cmd)
4712 return errormsg
4713 else:
4714 if configured_tcp_mss == show_vrf_stats.get(neighbour).get(
4715 "bgpTcpMssConfigured"
4716 ):
4717 logger.debug(
4718 "Configured TCP - MSS Found: {}".format(sys._getframe().f_code.co_name)
4719 )
4720 return True
4721 else:
4722 logger.debug(
4723 "TCP-MSS Mismatch ,configured {} expecting {}".format(
4724 show_vrf_stats.get(neighbour).get("bgpTcpMssConfigured"),
4725 configured_tcp_mss,
4726 )
4727 )
4728 return "TCP-MSS Mismatch"
4729 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4730 return False
4731
4732
4733 def get_dut_as_number(tgen, dut):
4734 """
4735 API to get the Autonomous Number of the given DUT
4736
4737 params:
4738 =======
4739 dut : Device Under test
4740
4741 returns :
4742 =======
4743 Success : DUT Autonomous number
4744 Fail : Error message with Boolean False
4745 """
4746 tgen = get_topogen()
4747 for router, rnode in tgen.routers().items():
4748 if router == dut:
4749 show_bgp_json = run_frr_cmd(rnode, "sh ip bgp summary json ", isjson=True)
4750 as_number = show_bgp_json["ipv4Unicast"]["as"]
4751 if as_number:
4752 logger.info(
4753 "[dut {}] DUT contains Automnomous number :: {} ".format(
4754 dut, as_number
4755 )
4756 )
4757 return as_number
4758 else:
4759 logger.error(
4760 "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
4761 dut
4762 )
4763 )
4764 return False
4765
4766
4767 def get_prefix_count_route(
4768 tgen, topo, dut, peer, vrf=None, link=None, sent=None, received=None
4769 ):
4770 """
4771 API to return the prefix count of default originate the given DUT
4772 dut : Device under test
4773 peer : neigbor on which you are expecting the route to be received
4774
4775 returns :
4776 prefix_count as dict with ipv4 and ipv6 value
4777 """
4778 # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
4779
4780 if link:
4781 neighbor_ipv4_address = topo["routers"][peer]["links"][link]["ipv4"]
4782 neighbor_ipv6_address = topo["routers"][peer]["links"][link]["ipv6"]
4783 else:
4784 neighbor_ipv4_address = topo["routers"][peer]["links"][dut]["ipv4"]
4785 neighbor_ipv6_address = topo["routers"][peer]["links"][dut]["ipv6"]
4786
4787 neighbor_ipv4_address = neighbor_ipv4_address.split("/")[0]
4788 neighbor_ipv6_address = neighbor_ipv6_address.split("/")[0]
4789 prefix_count = {}
4790 tgen = get_topogen()
4791 for router, rnode in tgen.routers().items():
4792 if router == dut:
4793
4794 if vrf:
4795 ipv4_cmd = "sh ip bgp vrf {} summary json".format(vrf)
4796 show_bgp_json_ipv4 = run_frr_cmd(rnode, ipv4_cmd, isjson=True)
4797 ipv6_cmd = "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf)
4798 show_bgp_json_ipv6 = run_frr_cmd(rnode, ipv6_cmd, isjson=True)
4799
4800 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"]["peers"][
4801 neighbor_ipv4_address
4802 ]["pfxRcd"]
4803 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4804 neighbor_ipv6_address
4805 ]["pfxRcd"]
4806
4807 logger.info(
4808 "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4809 dut,
4810 vrf,
4811 neighbor_ipv4_address,
4812 neighbor_ipv6_address,
4813 prefix_count,
4814 )
4815 )
4816 return prefix_count
4817
4818 else:
4819 show_bgp_json_ipv4 = run_frr_cmd(
4820 rnode, "sh ip bgp summary json ", isjson=True
4821 )
4822 show_bgp_json_ipv6 = run_frr_cmd(
4823 rnode, "sh ip bgp ipv6 unicast summary json ", isjson=True
4824 )
4825 if received:
4826 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4827 "peers"
4828 ][neighbor_ipv4_address]["pfxRcd"]
4829 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4830 neighbor_ipv6_address
4831 ]["pfxRcd"]
4832
4833 elif sent:
4834 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4835 "peers"
4836 ][neighbor_ipv4_address]["pfxSnt"]
4837 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4838 neighbor_ipv6_address
4839 ]["pfxSnt"]
4840
4841 else:
4842 prefix_count["ipv4_count"] = show_bgp_json_ipv4["ipv4Unicast"][
4843 "peers"
4844 ][neighbor_ipv4_address]["pfxRcd"]
4845 prefix_count["ipv6_count"] = show_bgp_json_ipv6["peers"][
4846 neighbor_ipv6_address
4847 ]["pfxRcd"]
4848
4849 logger.info(
4850 "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4851 dut, neighbor_ipv4_address, neighbor_ipv6_address, prefix_count
4852 )
4853 )
4854 return prefix_count
4855 else:
4856 logger.error("ERROR...! Unknown dut {} in topolgy".format(dut))
4857
4858
4859 @retry(retry_timeout=5)
4860 def verify_rib_default_route(
4861 tgen,
4862 topo,
4863 dut,
4864 routes,
4865 expected_nexthop,
4866 metric=None,
4867 origin=None,
4868 locPrf=None,
4869 expected_aspath=None,
4870 ):
4871 """
4872 API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
4873
4874 param
4875 =====
4876 dut : device under test
4877 routes : default route with expected nexthop
4878 expected_nexthop : the nexthop that is expected the deafult route
4879
4880 """
4881 result = False
4882 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4883 tgen = get_topogen()
4884 connected_routes = {}
4885 for router, rnode in tgen.routers().items():
4886 if router == dut:
4887
4888 ipv4_routes = run_frr_cmd(rnode, "sh ip bgp json", isjson=True)
4889 ipv6_routes = run_frr_cmd(rnode, "sh ip bgp ipv6 unicast json", isjson=True)
4890 is_ipv4_default_attrib_found = False
4891 is_ipv6_default_attrib_found = False
4892
4893 default_ipv4_route = routes["ipv4"]
4894 default_ipv6_route = "::/0"
4895 ipv4_route_Origin = False
4896 ipv4_route_local_pref = False
4897 ipv4_route_metric = False
4898
4899 if default_ipv4_route in ipv4_routes["routes"].keys():
4900 nxt_hop_count = len(ipv4_routes["routes"][default_ipv4_route])
4901 rib_next_hops = []
4902 for index in range(nxt_hop_count):
4903 rib_next_hops.append(
4904 ipv4_routes["routes"][default_ipv4_route][index]["nexthops"][0]["ip"]
4905 )
4906
4907 for nxt_hop in expected_nexthop.items():
4908 if nxt_hop[0] == "ipv4":
4909 if nxt_hop[1] in rib_next_hops:
4910 logger.info(
4911 "Default routes [{}] obtained from {} .....PASSED".format(
4912 default_ipv4_route, nxt_hop[1]
4913 )
4914 )
4915 else:
4916 logger.error(
4917 "ERROR ...! Default routes [{}] expected is missing {}".format(
4918 default_ipv4_route, nxt_hop[1]
4919 )
4920 )
4921 return False
4922
4923 else:
4924 pass
4925
4926 if "origin" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4927 ipv4_route_Origin = ipv4_routes["routes"][default_ipv4_route][0]["origin"]
4928 if "locPrf" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4929 ipv4_route_local_pref = ipv4_routes["routes"][default_ipv4_route][0][
4930 "locPrf"
4931 ]
4932 if "metric" in ipv4_routes["routes"][default_ipv4_route][0].keys():
4933 ipv4_route_metric = ipv4_routes["routes"][default_ipv4_route][0]["metric"]
4934 else:
4935 logger.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut))
4936 return False
4937
4938 origin_found = False
4939 locPrf_found = False
4940 metric_found = False
4941 as_path_found = False
4942
4943 if origin:
4944 if origin == ipv4_route_Origin:
4945 logger.info(
4946 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
4947 default_ipv4_route, origin
4948 )
4949 )
4950 origin_found = True
4951 else:
4952 logger.error(
4953 "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
4954 origin, ipv4_route_Origin
4955 )
4956 )
4957 return False
4958 else:
4959 origin_found = True
4960
4961 if locPrf:
4962 if locPrf == ipv4_route_local_pref:
4963 logger.info(
4964 "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
4965 default_ipv4_route, locPrf
4966 )
4967 )
4968 locPrf_found = True
4969 else:
4970 logger.error(
4971 "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
4972 locPrf, ipv4_route_local_pref
4973 )
4974 )
4975 return False
4976 else:
4977 locPrf_found = True
4978
4979 if metric:
4980 if metric == ipv4_route_metric:
4981 logger.info(
4982 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
4983 default_ipv4_route, metric
4984 )
4985 )
4986
4987 metric_found = True
4988 else:
4989 logger.error(
4990 "ERROR... IPV4::! Expected metric is {} obtained {}".format(
4991 metric, ipv4_route_metric
4992 )
4993 )
4994 return False
4995 else:
4996 metric_found = True
4997
4998 if expected_aspath:
4999 obtained_aspath = ipv4_routes["routes"]["0.0.0.0/0"][0]["path"]
5000 if expected_aspath in obtained_aspath:
5001 as_path_found = True
5002 logger.info(
5003 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5004 default_ipv4_route, expected_aspath
5005 )
5006 )
5007 else:
5008 logger.error(
5009 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5010 expected_aspath, obtained_aspath
5011 )
5012 )
5013 return False
5014 else:
5015 as_path_found = True
5016
5017 if origin_found and locPrf_found and metric_found and as_path_found:
5018 is_ipv4_default_attrib_found = True
5019 logger.info(
5020 "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5021 origin, locPrf, metric, expected_aspath
5022 )
5023 )
5024 else:
5025 is_ipv4_default_attrib_found = False
5026 logger.error(
5027 "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
5028 origin, ipv4_route_Origin
5029 )
5030 )
5031 logger.error(
5032 "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
5033 locPrf, ipv4_route_local_pref
5034 )
5035 )
5036 logger.error(
5037 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5038 metric, ipv4_route_metric
5039 )
5040 )
5041 logger.error(
5042 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5043 expected_aspath, obtained_aspath
5044 )
5045 )
5046
5047 route_Origin = False
5048 route_local_pref = False
5049 route_local_metric = False
5050 default_ipv6_route = ""
5051 try:
5052 ipv6_routes["routes"]["0::0/0"]
5053 default_ipv6_route = "0::0/0"
5054 except:
5055 ipv6_routes["routes"]["::/0"]
5056 default_ipv6_route = "::/0"
5057 if default_ipv6_route in ipv6_routes["routes"].keys():
5058 nxt_hop_count = len(ipv6_routes["routes"][default_ipv6_route])
5059 rib_next_hops = []
5060 for index in range(nxt_hop_count):
5061 rib_next_hops.append(
5062 ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][0]["ip"]
5063 )
5064 try:
5065 rib_next_hops.append(
5066 ipv6_routes["routes"][default_ipv6_route][index]["nexthops"][1][
5067 "ip"
5068 ]
5069 )
5070 except (KeyError, IndexError) as e:
5071 logger.error("NO impact ..! Global IPV6 Address not found ")
5072
5073 for nxt_hop in expected_nexthop.items():
5074 if nxt_hop[0] == "ipv6":
5075 if nxt_hop[1] in rib_next_hops:
5076 logger.info(
5077 "Default routes [{}] obtained from {} .....PASSED".format(
5078 default_ipv6_route, nxt_hop[1]
5079 )
5080 )
5081 else:
5082 logger.error(
5083 "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
5084 default_ipv6_route, nxt_hop[1], rib_next_hops
5085 )
5086 )
5087 return False
5088
5089 else:
5090 pass
5091 if "origin" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5092 route_Origin = ipv6_routes["routes"][default_ipv6_route][0]["origin"]
5093 if "locPrf" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5094 route_local_pref = ipv6_routes["routes"][default_ipv6_route][0]["locPrf"]
5095 if "metric" in ipv6_routes["routes"][default_ipv6_route][0].keys():
5096 route_local_metric = ipv6_routes["routes"][default_ipv6_route][0]["metric"]
5097
5098 origin_found = False
5099 locPrf_found = False
5100 metric_found = False
5101 as_path_found = False
5102
5103 if origin:
5104 if origin == route_Origin:
5105 logger.info(
5106 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
5107 default_ipv6_route, route_Origin
5108 )
5109 )
5110 origin_found = True
5111 else:
5112 logger.error(
5113 "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
5114 origin, route_Origin
5115 )
5116 )
5117 return False
5118 else:
5119 origin_found = True
5120
5121 if locPrf:
5122 if locPrf == route_local_pref:
5123 logger.info(
5124 "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
5125 default_ipv6_route, route_local_pref
5126 )
5127 )
5128 locPrf_found = True
5129 else:
5130 logger.error(
5131 "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
5132 locPrf, route_local_pref
5133 )
5134 )
5135 return False
5136 else:
5137 locPrf_found = True
5138
5139 if metric:
5140 if metric == route_local_metric:
5141 logger.info(
5142 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
5143 default_ipv4_route, metric
5144 )
5145 )
5146
5147 metric_found = True
5148 else:
5149 logger.error(
5150 "ERROR... IPV6::! Expected metric is {} obtained {}".format(
5151 metric, route_local_metric
5152 )
5153 )
5154 return False
5155 else:
5156 metric_found = True
5157
5158 if expected_aspath:
5159 obtained_aspath = ipv6_routes["routes"]["::/0"][0]["path"]
5160 if expected_aspath in obtained_aspath:
5161 as_path_found = True
5162 logger.info(
5163 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5164 default_ipv4_route, expected_aspath
5165 )
5166 )
5167 else:
5168 logger.error(
5169 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5170 expected_aspath, obtained_aspath
5171 )
5172 )
5173 return False
5174 else:
5175 as_path_found = True
5176
5177 if origin_found and locPrf_found and metric_found and as_path_found:
5178 is_ipv6_default_attrib_found = True
5179 logger.info(
5180 "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5181 origin, locPrf, metric, expected_aspath
5182 )
5183 )
5184 else:
5185 is_ipv6_default_attrib_found = False
5186 logger.error(
5187 "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin, route_Origin)
5188 )
5189 logger.error(
5190 "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
5191 locPrf, route_local_pref
5192 )
5193 )
5194 logger.error(
5195 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5196 metric, route_local_metric
5197 )
5198 )
5199 logger.error(
5200 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5201 expected_aspath, obtained_aspath
5202 )
5203 )
5204
5205 if is_ipv4_default_attrib_found and is_ipv6_default_attrib_found:
5206 logger.info("The attributes are found for default route in RIB ")
5207 return True
5208 else:
5209 return False
5210
5211
5212 @retry(retry_timeout=5)
5213 def verify_fib_default_route(tgen, topo, dut, routes, expected_nexthop):
5214 """
5215 API to verify the the 'Default route" in FIB
5216
5217 param
5218 =====
5219 dut : device under test
5220 routes : default route with expected nexthop
5221 expected_nexthop : the nexthop that is expected the deafult route
5222
5223 """
5224 result = False
5225 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5226 tgen = get_topogen()
5227 connected_routes = {}
5228 for router, rnode in tgen.routers().items():
5229 if router == dut:
5230 ipv4_routes = run_frr_cmd(rnode, "sh ip route json", isjson=True)
5231 ipv6_routes = run_frr_cmd(rnode, "sh ipv6 route json", isjson=True)
5232
5233 is_ipv4_default_route_found = False
5234 is_ipv6_default_route_found = False
5235 if routes["ipv4"] in ipv4_routes.keys():
5236 rib_ipv4_nxt_hops = []
5237 ipv4_default_route = routes["ipv4"]
5238 nxt_hop_count = len(ipv4_routes[ipv4_default_route][0]["nexthops"])
5239 for index in range(nxt_hop_count):
5240 rib_ipv4_nxt_hops.append(
5241 ipv4_routes[ipv4_default_route][0]["nexthops"][index]["ip"]
5242 )
5243
5244 if expected_nexthop["ipv4"] in rib_ipv4_nxt_hops:
5245 is_ipv4_default_route_found = True
5246 logger.info(
5247 "{} default route with next hop {} is found in FIB ".format(
5248 ipv4_default_route, expected_nexthop
5249 )
5250 )
5251 else:
5252 logger.error(
5253 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5254 ipv4_default_route, expected_nexthop
5255 )
5256 )
5257 return False
5258
5259 if routes["ipv6"] in ipv6_routes.keys() or "::/0" in ipv6_routes.keys():
5260 rib_ipv6_nxt_hops = []
5261 if "::/0" in ipv6_routes.keys():
5262 ipv6_default_route = "::/0"
5263 elif routes["ipv6"] in ipv6_routes.keys():
5264 ipv6_default_route = routes["ipv6"]
5265
5266 nxt_hop_count = len(ipv6_routes[ipv6_default_route][0]["nexthops"])
5267 for index in range(nxt_hop_count):
5268 rib_ipv6_nxt_hops.append(
5269 ipv6_routes[ipv6_default_route][0]["nexthops"][index]["ip"]
5270 )
5271
5272 if expected_nexthop["ipv6"] in rib_ipv6_nxt_hops:
5273 is_ipv6_default_route_found = True
5274 logger.info(
5275 "{} default route with next hop {} is found in FIB ".format(
5276 ipv6_default_route, expected_nexthop
5277 )
5278 )
5279 else:
5280 logger.error(
5281 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5282 ipv6_default_route, expected_nexthop
5283 )
5284 )
5285 return False
5286
5287 if is_ipv4_default_route_found and is_ipv6_default_route_found:
5288 return True
5289 else:
5290 logger.error(
5291 "Default Route for ipv4 and ipv6 address family is not found in FIB "
5292 )
5293 return False
5294
5295
5296 @retry(retry_timeout=5)
5297 def verify_bgp_advertised_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
5298 """
5299 APi is verifies the the routes that are advertised from dut to peer
5300
5301 command used :
5302 "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
5303 "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
5304
5305 dut : Device Under Tests
5306 Peer : Peer on which the routs is expected
5307 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5308 expected_routes
5309
5310 returns: True / False
5311
5312 """
5313 result = False
5314 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5315 tgen = get_topogen()
5316
5317 peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
5318 peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
5319
5320 for router, rnode in tgen.routers().items():
5321 if router == dut:
5322 ipv4_receieved_routes = run_frr_cmd(
5323 rnode,
5324 "sh ip bgp neighbor {} advertised-routes json".format(
5325 peer_ipv4_neighbor_ip
5326 ),
5327 isjson=True,
5328 )
5329 ipv6_receieved_routes = run_frr_cmd(
5330 rnode,
5331 "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
5332 peer_ipv6_neighbor_ip
5333 ),
5334 isjson=True,
5335 )
5336 ipv4_route_count = 0
5337 ipv6_route_count = 0
5338 if ipv4_receieved_routes:
5339 for index in range(len(expected_routes["ipv4"])):
5340 if (
5341 expected_routes["ipv4"][index]["network"]
5342 in ipv4_receieved_routes["advertisedRoutes"].keys()
5343 ):
5344 ipv4_route_count += 1
5345 logger.info(
5346 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5347 dut, expected_routes["ipv4"][index]["network"], peer
5348 )
5349 )
5350
5351 elif (
5352 expected_routes["ipv4"][index]["network"]
5353 in ipv4_receieved_routes["bgpOriginatingDefaultNetwork"]
5354 ):
5355 ipv4_route_count += 1
5356 logger.info(
5357 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5358 dut, expected_routes["ipv4"][index]["network"], peer
5359 )
5360 )
5361
5362 else:
5363 logger.error(
5364 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5365 dut, expected_routes["ipv4"][index]["network"], peer
5366 )
5367 )
5368 else:
5369 logger.error(ipv4_receieved_routes)
5370 logger.error(
5371 "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
5372 dut, peer
5373 )
5374 )
5375 return False
5376
5377 if ipv6_receieved_routes:
5378 for index in range(len(expected_routes["ipv6"])):
5379 if (
5380 expected_routes["ipv6"][index]["network"]
5381 in ipv6_receieved_routes["advertisedRoutes"].keys()
5382 ):
5383 ipv6_route_count += 1
5384 logger.info(
5385 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5386 dut, expected_routes["ipv6"][index]["network"], peer
5387 )
5388 )
5389 elif (
5390 expected_routes["ipv6"][index]["network"]
5391 in ipv6_receieved_routes["bgpOriginatingDefaultNetwork"]
5392 ):
5393 ipv6_route_count += 1
5394 logger.info(
5395 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5396 dut, expected_routes["ipv6"][index]["network"], peer
5397 )
5398 )
5399 else:
5400 logger.error(
5401 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5402 dut, expected_routes["ipv6"][index]["network"], peer
5403 )
5404 )
5405 else:
5406 logger.error(ipv6_receieved_routes)
5407 logger.error(
5408 "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
5409 dut, peer
5410 )
5411 )
5412 return False
5413
5414 if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
5415 expected_routes["ipv6"]
5416 ):
5417 return True
5418 else:
5419 logger.error(
5420 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5421 expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
5422 )
5423 )
5424 logger.error(
5425 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5426 expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
5427 )
5428 )
5429 return False
5430
5431
5432 @retry(retry_timeout=5)
5433 def verify_bgp_received_routes_from_neighbor(tgen, topo, dut, peer, expected_routes):
5434 """
5435 API to verify the bgp received routes
5436
5437 commad used :
5438 =============
5439 show ip bgp neighbor <x.x.x.x> received-routes
5440 show ip bgp ipv6 unicast neighbor <x::x> received-routes
5441
5442 params
5443 =======
5444 dut : Device Under Tests
5445 Peer : Peer on which the routs is expected
5446 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5447 expected_routes
5448
5449 returns:
5450 ========
5451 True / False
5452 """
5453 result = False
5454 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5455 tgen = get_topogen()
5456
5457 peer_ipv4_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv4"].split("/")[0]
5458 peer_ipv6_neighbor_ip = topo["routers"][peer]["links"][dut]["ipv6"].split("/")[0]
5459
5460 logger.info("Enabling Soft configuration to neighbor INBOUND ")
5461 neigbor_dict = {"ipv4": peer_ipv4_neighbor_ip, "ipv6": peer_ipv6_neighbor_ip}
5462 result = configure_bgp_soft_configuration(
5463 tgen, dut, neigbor_dict, direction="inbound"
5464 )
5465 assert (
5466 result is True
5467 ), " Failed to configure the soft configuration \n Error: {}".format(result)
5468
5469 """sleep of 10 sec is required to get the routes on peer after soft configuration"""
5470 sleep(10)
5471 for router, rnode in tgen.routers().items():
5472 if router == dut:
5473 ipv4_receieved_routes = run_frr_cmd(
5474 rnode,
5475 "sh ip bgp neighbor {} received-routes json".format(
5476 peer_ipv4_neighbor_ip
5477 ),
5478 isjson=True,
5479 )
5480 ipv6_receieved_routes = run_frr_cmd(
5481 rnode,
5482 "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
5483 peer_ipv6_neighbor_ip
5484 ),
5485 isjson=True,
5486 )
5487 ipv4_route_count = 0
5488 ipv6_route_count = 0
5489 if ipv4_receieved_routes:
5490 for index in range(len(expected_routes["ipv4"])):
5491 if (
5492 expected_routes["ipv4"][index]["network"]
5493 in ipv4_receieved_routes["receivedRoutes"].keys()
5494 ):
5495 ipv4_route_count += 1
5496 logger.info(
5497 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5498 dut, expected_routes["ipv4"][index]["network"], peer
5499 )
5500 )
5501 else:
5502 logger.error(
5503 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5504 dut, expected_routes["ipv4"][index]["network"], peer
5505 )
5506 )
5507 else:
5508 logger.error(ipv4_receieved_routes)
5509 logger.error(
5510 "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
5511 dut, peer
5512 )
5513 )
5514 return False
5515
5516 if ipv6_receieved_routes:
5517 for index in range(len(expected_routes["ipv6"])):
5518 if (
5519 expected_routes["ipv6"][index]["network"]
5520 in ipv6_receieved_routes["receivedRoutes"].keys()
5521 ):
5522 ipv6_route_count += 1
5523 logger.info(
5524 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5525 dut, expected_routes["ipv6"][index]["network"], peer
5526 )
5527 )
5528 else:
5529 logger.error(
5530 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5531 dut, expected_routes["ipv6"][index]["network"], peer
5532 )
5533 )
5534 else:
5535 logger.error(ipv6_receieved_routes)
5536 logger.error(
5537 "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
5538 dut, peer
5539 )
5540 )
5541 return False
5542
5543 if ipv4_route_count == len(expected_routes["ipv4"]) and ipv6_route_count == len(
5544 expected_routes["ipv6"]
5545 ):
5546 return True
5547 else:
5548 logger.error(
5549 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5550 expected_routes["ipv4"], ipv4_receieved_routes["advertisedRoutes"]
5551 )
5552 )
5553 logger.error(
5554 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5555 expected_routes["ipv6"], ipv6_receieved_routes["advertisedRoutes"]
5556 )
5557 )
5558 return False
5559
5560
5561 def configure_bgp_soft_configuration(tgen, dut, neighbor_dict, direction):
5562 """
5563 Api to configure the bgp soft configuration to show the received routes from peer
5564 params
5565 ======
5566 dut : device under test route on which the sonfiguration to be applied
5567 neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
5568 direction : Directionon which it should be applied in/out
5569
5570 returns:
5571 ========
5572 boolean
5573 """
5574 logger.info("Enabling Soft configuration to neighbor INBOUND ")
5575 local_as = get_dut_as_number(tgen, dut)
5576 ipv4_neighbor = neighbor_dict["ipv4"]
5577 ipv6_neighbor = neighbor_dict["ipv6"]
5578 direction = direction.lower()
5579 if ipv4_neighbor and ipv4_neighbor:
5580 raw_config = {
5581 dut: {
5582 "raw_config": [
5583 "router bgp {}".format(local_as),
5584 "address-family ipv4 unicast",
5585 "neighbor {} soft-reconfiguration {} ".format(
5586 ipv4_neighbor, direction
5587 ),
5588 "exit-address-family",
5589 "address-family ipv6 unicast",
5590 "neighbor {} soft-reconfiguration {} ".format(
5591 ipv6_neighbor, direction
5592 ),
5593 "exit-address-family",
5594 ]
5595 }
5596 }
5597 result = apply_raw_config(tgen, raw_config)
5598 logger.info(
5599 "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(
5600 dut, neighbor_dict
5601 )
5602 )
5603 return True