]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/bgp.py
tests: After clear give it more than 90 seconds to come up
[mirror_frr.git] / tests / topotests / lib / bgp.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 from copy import deepcopy
22 from time import sleep
23 import traceback
24 import ipaddr
25 import os
26 import sys
27 from lib import topotest
28 from lib.topolog import logger
29
30 # Import common_config to use commomnly used APIs
31 from lib.common_config import (
32 create_common_configuration,
33 InvalidCLIError,
34 load_config_to_router,
35 check_address_types,
36 generate_ips,
37 validate_ip_address,
38 find_interface_with_greater_ip,
39 run_frr_cmd,
40 FRRCFG_FILE,
41 retry,
42 )
43
44 LOGDIR = "/tmp/topotests/"
45 TMPDIR = None
46
47
48 def create_router_bgp(tgen, topo, input_dict=None, build=False, load_config=True):
49 """
50 API to configure bgp on router
51
52 Parameters
53 ----------
54 * `tgen` : Topogen object
55 * `topo` : json file data
56 * `input_dict` : Input dict data, required when configuring from testcase
57 * `build` : Only for initial setup phase this is set as True.
58
59 Usage
60 -----
61 input_dict = {
62 "r1": {
63 "bgp": {
64 "local_as": "200",
65 "router_id": "22.22.22.22",
66 "graceful-restart": {
67 "graceful-restart": True,
68 "preserve-fw-state": True,
69 "timer": {
70 "restart-time": 30,
71 "rib-stale-time": 30,
72 "select-defer-time": 30,
73 }
74 },
75 "address_family": {
76 "ipv4": {
77 "unicast": {
78 "redistribute": [{
79 "redist_type": "static",
80 "attribute": {
81 "metric" : 123
82 }
83 },
84 {"redist_type": "connected"}
85 ],
86 "advertise_networks": [
87 {
88 "network": "20.0.0.0/32",
89 "no_of_network": 10
90 },
91 {
92 "network": "30.0.0.0/32",
93 "no_of_network": 10
94 }
95 ],
96 "neighbor": {
97 "r3": {
98 "keepalivetimer": 60,
99 "holddowntimer": 180,
100 "dest_link": {
101 "r4": {
102 "allowas-in": {
103 "number_occurences": 2
104 },
105 "prefix_lists": [
106 {
107 "name": "pf_list_1",
108 "direction": "in"
109 }
110 ],
111 "route_maps": [{
112 "name": "RMAP_MED_R3",
113 "direction": "in"
114 }],
115 "next_hop_self": True
116 },
117 "r1": {"graceful-restart-helper": True}
118 }
119 }
120 }
121 }
122 }
123 }
124 }
125 }
126 }
127
128
129 Returns
130 -------
131 True or False
132 """
133 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
134 result = False
135
136 # Flag is used when testing ipv6 over ipv4 or vice-versa
137 afi_test = False
138
139 if not input_dict:
140 input_dict = deepcopy(topo)
141 else:
142 topo = topo["routers"]
143 input_dict = deepcopy(input_dict)
144
145 for router in input_dict.keys():
146 if "bgp" not in input_dict[router]:
147 logger.debug("Router %s: 'bgp' not present in input_dict", router)
148 continue
149
150 bgp_data_list = input_dict[router]["bgp"]
151
152 if type(bgp_data_list) is not list:
153 bgp_data_list = [bgp_data_list]
154
155 for bgp_data in bgp_data_list:
156 data_all_bgp = __create_bgp_global(tgen, bgp_data, router, build)
157 if data_all_bgp:
158 bgp_addr_data = bgp_data.setdefault("address_family", {})
159
160 if not bgp_addr_data:
161 logger.debug(
162 "Router %s: 'address_family' not present in "
163 "input_dict for BGP",
164 router,
165 )
166 else:
167
168 ipv4_data = bgp_addr_data.setdefault("ipv4", {})
169 ipv6_data = bgp_addr_data.setdefault("ipv6", {})
170
171 neigh_unicast = (
172 True
173 if ipv4_data.setdefault("unicast", {})
174 or ipv6_data.setdefault("unicast", {})
175 else False
176 )
177
178 if neigh_unicast:
179 data_all_bgp = __create_bgp_unicast_neighbor(
180 tgen,
181 topo,
182 bgp_data,
183 router,
184 afi_test,
185 config_data=data_all_bgp,
186 )
187
188 try:
189 result = create_common_configuration(
190 tgen, router, data_all_bgp, "bgp", build, load_config
191 )
192 except InvalidCLIError:
193 # Traceback
194 errormsg = traceback.format_exc()
195 logger.error(errormsg)
196 return errormsg
197
198 logger.debug("Exiting lib API: create_router_bgp()")
199 return result
200
201
202 def __create_bgp_global(tgen, input_dict, router, build=False):
203 """
204 Helper API to create bgp global configuration.
205
206 Parameters
207 ----------
208 * `tgen` : Topogen object
209 * `input_dict` : Input dict data, required when configuring from testcase
210 * `router` : router id to be configured.
211 * `build` : Only for initial setup phase this is set as True.
212
213 Returns
214 -------
215 True or False
216 """
217
218 result = False
219 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
220
221 bgp_data = input_dict
222 del_bgp_action = bgp_data.setdefault("delete", False)
223
224 config_data = []
225
226 if "local_as" not in bgp_data and build:
227 logger.debug(
228 "Router %s: 'local_as' not present in input_dict" "for BGP", router
229 )
230 return False
231
232 local_as = bgp_data.setdefault("local_as", "")
233 cmd = "router bgp {}".format(local_as)
234 vrf_id = bgp_data.setdefault("vrf", None)
235 if vrf_id:
236 cmd = "{} vrf {}".format(cmd, vrf_id)
237
238 if del_bgp_action:
239 cmd = "no {}".format(cmd)
240 config_data.append(cmd)
241
242 return config_data
243
244 config_data.append(cmd)
245 config_data.append("no bgp ebgp-requires-policy")
246
247 router_id = bgp_data.setdefault("router_id", None)
248 del_router_id = bgp_data.setdefault("del_router_id", False)
249 if del_router_id:
250 config_data.append("no bgp router-id")
251 if router_id:
252 config_data.append("bgp router-id {}".format(router_id))
253
254 config_data.append("no bgp network import-check")
255
256 if "graceful-restart" in bgp_data:
257 graceful_config = bgp_data["graceful-restart"]
258
259 graceful_restart = graceful_config.setdefault("graceful-restart", None)
260
261 graceful_restart_disable = graceful_config.setdefault(
262 "graceful-restart-disable", None
263 )
264
265 preserve_fw_state = graceful_config.setdefault("preserve-fw-state", None)
266
267 disable_eor = graceful_config.setdefault("disable-eor", None)
268
269 if graceful_restart == False:
270 cmd = "no bgp graceful-restart"
271 if graceful_restart:
272 cmd = "bgp graceful-restart"
273
274 if graceful_restart is not None:
275 config_data.append(cmd)
276
277 if graceful_restart_disable == False:
278 cmd = "no bgp graceful-restart-disable"
279 if graceful_restart_disable:
280 cmd = "bgp graceful-restart-disable"
281
282 if graceful_restart_disable is not None:
283 config_data.append(cmd)
284
285 if preserve_fw_state == False:
286 cmd = "no bgp graceful-restart preserve-fw-state"
287 if preserve_fw_state:
288 cmd = "bgp graceful-restart preserve-fw-state"
289
290 if preserve_fw_state is not None:
291 config_data.append(cmd)
292
293 if disable_eor == False:
294 cmd = "no bgp graceful-restart disable-eor"
295 if disable_eor:
296 cmd = "bgp graceful-restart disable-eor"
297
298 if disable_eor is not None:
299 config_data.append(cmd)
300
301 if "timer" in bgp_data["graceful-restart"]:
302 timer = bgp_data["graceful-restart"]["timer"]
303
304 if "delete" in timer:
305 del_action = timer["delete"]
306 else:
307 del_action = False
308
309 for rs_timer, value in timer.items():
310 rs_timer_value = timer.setdefault(rs_timer, None)
311
312 if rs_timer_value and rs_timer != "delete":
313 cmd = "bgp graceful-restart {} {}".format(rs_timer, rs_timer_value)
314
315 if del_action:
316 cmd = "no {}".format(cmd)
317
318 config_data.append(cmd)
319
320 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
321 return config_data
322
323
324 def __create_bgp_unicast_neighbor(
325 tgen, topo, input_dict, router, afi_test, config_data=None
326 ):
327 """
328 Helper API to create configuration for address-family unicast
329
330 Parameters
331 ----------
332 * `tgen` : Topogen object
333 * `topo` : json file data
334 * `input_dict` : Input dict data, required when configuring from testcase
335 * `router` : router id to be configured.
336 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
337 * `build` : Only for initial setup phase this is set as True.
338 """
339
340 result = False
341 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
342
343 add_neigh = True
344 bgp_data = input_dict
345 if "router bgp" in config_data:
346 add_neigh = False
347
348 bgp_data = input_dict["address_family"]
349
350 for addr_type, addr_dict in bgp_data.iteritems():
351 if not addr_dict:
352 continue
353
354 if not check_address_types(addr_type) and not afi_test:
355 continue
356
357 addr_data = addr_dict["unicast"]
358 if addr_data:
359 config_data.append("address-family {} unicast".format(addr_type))
360 advertise_network = addr_data.setdefault("advertise_networks", [])
361 for advertise_network_dict in advertise_network:
362 network = advertise_network_dict["network"]
363 if type(network) is not list:
364 network = [network]
365
366 if "no_of_network" in advertise_network_dict:
367 no_of_network = advertise_network_dict["no_of_network"]
368 else:
369 no_of_network = 1
370
371 del_action = advertise_network_dict.setdefault("delete", False)
372
373 # Generating IPs for verification
374 prefix = str(ipaddr.IPNetwork(unicode(network[0])).prefixlen)
375 network_list = generate_ips(network, no_of_network)
376 for ip in network_list:
377 ip = str(ipaddr.IPNetwork(unicode(ip)).network)
378
379 cmd = "network {}/{}".format(ip, prefix)
380 if del_action:
381 cmd = "no {}".format(cmd)
382
383 config_data.append(cmd)
384
385 max_paths = addr_data.setdefault("maximum_paths", {})
386 if max_paths:
387 ibgp = max_paths.setdefault("ibgp", None)
388 ebgp = max_paths.setdefault("ebgp", None)
389 if ibgp:
390 config_data.append("maximum-paths ibgp {}".format(ibgp))
391 if ebgp:
392 config_data.append("maximum-paths {}".format(ebgp))
393
394 aggregate_addresses = addr_data.setdefault("aggregate_address", [])
395 for aggregate_address in aggregate_addresses:
396 network = aggregate_address.setdefault("network", None)
397 if not network:
398 logger.debug(
399 "Router %s: 'network' not present in " "input_dict for BGP", router
400 )
401 else:
402 cmd = "aggregate-address {}".format(network)
403
404 as_set = aggregate_address.setdefault("as_set", False)
405 summary = aggregate_address.setdefault("summary", False)
406 del_action = aggregate_address.setdefault("delete", False)
407 if as_set:
408 cmd = "{} as-set".format(cmd)
409 if summary:
410 cmd = "{} summary".format(cmd)
411
412 if del_action:
413 cmd = "no {}".format(cmd)
414
415 config_data.append(cmd)
416
417 redistribute_data = addr_data.setdefault("redistribute", {})
418 if redistribute_data:
419 for redistribute in redistribute_data:
420 if "redist_type" not in redistribute:
421 logger.debug(
422 "Router %s: 'redist_type' not present in " "input_dict", router
423 )
424 else:
425 cmd = "redistribute {}".format(redistribute["redist_type"])
426 redist_attr = redistribute.setdefault("attribute", None)
427 if redist_attr:
428 if isinstance(redist_attr, dict):
429 for key, value in redist_attr.items():
430 cmd = "{} {} {}".format(cmd, key, value)
431 else:
432 cmd = "{} {}".format(cmd, redist_attr)
433
434 del_action = redistribute.setdefault("delete", False)
435 if del_action:
436 cmd = "no {}".format(cmd)
437 config_data.append(cmd)
438
439 if "neighbor" in addr_data:
440 neigh_data = __create_bgp_neighbor(
441 topo, input_dict, router, addr_type, add_neigh
442 )
443 config_data.extend(neigh_data)
444
445 for addr_type, addr_dict in bgp_data.iteritems():
446 if not addr_dict or not check_address_types(addr_type):
447 continue
448
449 addr_data = addr_dict["unicast"]
450 if "neighbor" in addr_data:
451 neigh_addr_data = __create_bgp_unicast_address_family(
452 topo, input_dict, router, addr_type, add_neigh
453 )
454
455 config_data.extend(neigh_addr_data)
456
457 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
458 return config_data
459
460
461 def __create_bgp_neighbor(topo, input_dict, router, addr_type, add_neigh=True):
462 """
463 Helper API to create neighbor specific configuration
464
465 Parameters
466 ----------
467 * `tgen` : Topogen object
468 * `topo` : json file data
469 * `input_dict` : Input dict data, required when configuring from testcase
470 * `router` : router id to be configured
471 """
472
473 config_data = []
474 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
475
476 bgp_data = input_dict["address_family"]
477 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
478
479 for name, peer_dict in neigh_data.iteritems():
480 for dest_link, peer in peer_dict["dest_link"].iteritems():
481 nh_details = topo[name]
482
483 if "vrfs" in topo[router]:
484 remote_as = nh_details["bgp"][0]["local_as"]
485 else:
486 remote_as = nh_details["bgp"]["local_as"]
487
488 update_source = None
489
490 if dest_link in nh_details["links"].keys():
491 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
492 # Loopback interface
493 if "source_link" in peer and peer["source_link"] == "lo":
494 update_source = topo[router]["links"]["lo"][addr_type].split("/")[0]
495
496 neigh_cxt = "neighbor {}".format(ip_addr)
497
498 if add_neigh:
499 config_data.append("{} remote-as {}".format(neigh_cxt, remote_as))
500 if addr_type == "ipv6":
501 config_data.append("address-family ipv6 unicast")
502 config_data.append("{} activate".format(neigh_cxt))
503
504 disable_connected = peer.setdefault("disable_connected_check", False)
505 keep_alive = peer.setdefault("keepalivetimer", 60)
506 hold_down = peer.setdefault("holddowntimer", 180)
507 password = peer.setdefault("password", None)
508 max_hop_limit = peer.setdefault("ebgp_multihop", 1)
509 graceful_restart = peer.setdefault("graceful-restart", None)
510 graceful_restart_helper = peer.setdefault("graceful-restart-helper", None)
511 graceful_restart_disable = peer.setdefault("graceful-restart-disable", None)
512
513 if update_source:
514 config_data.append(
515 "{} update-source {}".format(neigh_cxt, update_source)
516 )
517 if disable_connected:
518 config_data.append(
519 "{} disable-connected-check".format(disable_connected)
520 )
521 if update_source:
522 config_data.append(
523 "{} update-source {}".format(neigh_cxt, update_source)
524 )
525 if int(keep_alive) != 60 and int(hold_down) != 180:
526 config_data.append(
527 "{} timers {} {}".format(neigh_cxt, keep_alive, hold_down)
528 )
529
530 if graceful_restart:
531 config_data.append("{} graceful-restart".format(neigh_cxt))
532 elif graceful_restart == False:
533 config_data.append("no {} graceful-restart".format(neigh_cxt))
534
535 if graceful_restart_helper:
536 config_data.append("{} graceful-restart-helper".format(neigh_cxt))
537 elif graceful_restart_helper == False:
538 config_data.append("no {} graceful-restart-helper".format(neigh_cxt))
539
540 if graceful_restart_disable:
541 config_data.append("{} graceful-restart-disable".format(neigh_cxt))
542 elif graceful_restart_disable == False:
543 config_data.append("no {} graceful-restart-disable".format(neigh_cxt))
544
545 if password:
546 config_data.append("{} password {}".format(neigh_cxt, password))
547
548 if max_hop_limit > 1:
549 config_data.append(
550 "{} ebgp-multihop {}".format(neigh_cxt, max_hop_limit)
551 )
552 config_data.append("{} enforce-multihop".format(neigh_cxt))
553
554 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
555 return config_data
556
557
558 def __create_bgp_unicast_address_family(
559 topo, input_dict, router, addr_type, add_neigh=True
560 ):
561 """
562 API prints bgp global config to bgp_json file.
563
564 Parameters
565 ----------
566 * `bgp_cfg` : BGP class variables have BGP config saved in it for
567 particular router,
568 * `local_as_no` : Local as number
569 * `router_id` : Router-id
570 * `ecmp_path` : ECMP max path
571 * `gr_enable` : BGP global gracefull restart config
572 """
573
574 config_data = []
575 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
576
577 bgp_data = input_dict["address_family"]
578 neigh_data = bgp_data[addr_type]["unicast"]["neighbor"]
579
580 for peer_name, peer_dict in deepcopy(neigh_data).iteritems():
581 for dest_link, peer in peer_dict["dest_link"].iteritems():
582 deactivate = None
583 activate = None
584 nh_details = topo[peer_name]
585 activate_addr_family = peer.setdefault("activate", None)
586 deactivate_addr_family = peer.setdefault("deactivate", None)
587 # Loopback interface
588 if "source_link" in peer and peer["source_link"] == "lo":
589 for destRouterLink, data in sorted(nh_details["links"].iteritems()):
590 if "type" in data and data["type"] == "loopback":
591 if dest_link == destRouterLink:
592 ip_addr = nh_details["links"][destRouterLink][
593 addr_type
594 ].split("/")[0]
595
596 # Physical interface
597 else:
598 if dest_link in nh_details["links"].keys():
599
600 ip_addr = nh_details["links"][dest_link][addr_type].split("/")[0]
601 if addr_type == "ipv4" and bgp_data["ipv6"]:
602 deactivate = nh_details["links"][dest_link]["ipv6"].split("/")[
603 0
604 ]
605
606 neigh_cxt = "neighbor {}".format(ip_addr)
607 config_data.append("address-family {} unicast".format(addr_type))
608
609 if activate_addr_family is not None:
610 config_data.append(
611 "address-family {} unicast".format(activate_addr_family)
612 )
613
614 config_data.append("{} activate".format(neigh_cxt))
615
616 if deactivate and activate_addr_family is None:
617 config_data.append("no neighbor {} activate".format(deactivate))
618
619 if deactivate_addr_family is not None:
620 config_data.append(
621 "address-family {} unicast".format(deactivate_addr_family)
622 )
623 config_data.append("no {} activate".format(neigh_cxt))
624
625 next_hop_self = peer.setdefault("next_hop_self", None)
626 send_community = peer.setdefault("send_community", None)
627 prefix_lists = peer.setdefault("prefix_lists", {})
628 route_maps = peer.setdefault("route_maps", {})
629 no_send_community = peer.setdefault("no_send_community", None)
630 allowas_in = peer.setdefault("allowas-in", None)
631
632 # next-hop-self
633 if next_hop_self is not None:
634 if next_hop_self is True:
635 config_data.append("{} next-hop-self".format(neigh_cxt))
636 else:
637 config_data.append("no {} next-hop-self".format(neigh_cxt))
638
639 # send_community
640 if send_community:
641 config_data.append("{} send-community".format(neigh_cxt))
642
643 # no_send_community
644 if no_send_community:
645 config_data.append(
646 "no {} send-community {}".format(neigh_cxt, no_send_community)
647 )
648
649 if "allowas_in" in peer:
650 allow_as_in = peer["allowas_in"]
651 config_data.append("{} allowas-in {}".format(neigh_cxt, allow_as_in))
652
653 if "no_allowas_in" in peer:
654 allow_as_in = peer["no_allowas_in"]
655 config_data.append("no {} allowas-in {}".format(neigh_cxt, allow_as_in))
656 if prefix_lists:
657 for prefix_list in prefix_lists:
658 name = prefix_list.setdefault("name", {})
659 direction = prefix_list.setdefault("direction", "in")
660 del_action = prefix_list.setdefault("delete", False)
661 if not name:
662 logger.info(
663 "Router %s: 'name' not present in "
664 "input_dict for BGP neighbor prefix lists",
665 router,
666 )
667 else:
668 cmd = "{} prefix-list {} {}".format(neigh_cxt, name, direction)
669 if del_action:
670 cmd = "no {}".format(cmd)
671 config_data.append(cmd)
672
673 if route_maps:
674 for route_map in route_maps:
675 name = route_map.setdefault("name", {})
676 direction = route_map.setdefault("direction", "in")
677 del_action = route_map.setdefault("delete", False)
678 if not name:
679 logger.info(
680 "Router %s: 'name' not present in "
681 "input_dict for BGP neighbor route name",
682 router,
683 )
684 else:
685 cmd = "{} route-map {} {}".format(neigh_cxt, name, direction)
686 if del_action:
687 cmd = "no {}".format(cmd)
688 config_data.append(cmd)
689
690 if allowas_in:
691 number_occurences = allowas_in.setdefault("number_occurences", {})
692 del_action = allowas_in.setdefault("delete", False)
693
694 cmd = "{} allowas-in {}".format(neigh_cxt, number_occurences)
695
696 if del_action:
697 cmd = "no {}".format(cmd)
698
699 config_data.append(cmd)
700
701 return config_data
702
703
704 def modify_bgp_config_when_bgpd_down(tgen, topo, input_dict):
705 """
706 API will save the current config to router's /etc/frr/ for BGPd
707 daemon(bgpd.conf file)
708
709 Paramters
710 ---------
711 * `tgen` : Topogen object
712 * `topo` : json file data
713 * `input_dict` : defines for which router, and which config
714 needs to be modified
715
716 Usage:
717 ------
718 # Modify graceful-restart config not to set f-bit
719 # and write to /etc/frr
720
721 # Api call to delete advertised networks
722 input_dict_2 = {
723 "r5": {
724 "bgp": {
725 "address_family": {
726 "ipv4": {
727 "unicast": {
728 "advertise_networks": [
729 {
730 "network": "101.0.20.1/32",
731 "no_of_network": 5,
732 "delete": True
733 }
734 ],
735 }
736 },
737 "ipv6": {
738 "unicast": {
739 "advertise_networks": [
740 {
741 "network": "5::1/128",
742 "no_of_network": 5,
743 "delete": True
744 }
745 ],
746 }
747 }
748 }
749 }
750 }
751 }
752
753 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
754
755 """
756
757 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
758 try:
759
760 global LOGDIR
761
762 result = create_router_bgp(
763 tgen, topo, input_dict, build=False, load_config=False
764 )
765 if result is not True:
766 return result
767
768 # Copy bgp config file to /etc/frr
769 for dut in input_dict.keys():
770 router_list = tgen.routers()
771 for router, rnode in router_list.iteritems():
772 if router != dut:
773 continue
774
775 TMPDIR = os.path.join(LOGDIR, tgen.modname)
776
777 logger.info("Delete BGP config when BGPd is down in {}".format(router))
778 # Reading the config from /tmp/topotests and
779 # copy to /etc/frr/bgpd.conf
780 cmd = "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
781 TMPDIR, router, FRRCFG_FILE
782 )
783 router_list[router].run(cmd)
784
785 except Exception as e:
786 # handle any exception
787 logger.error("Error %s occured. Arguments %s.", e.message, e.args)
788
789 # Traceback
790 errormsg = traceback.format_exc()
791 logger.error(errormsg)
792 return errormsg
793
794 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
795 return True
796
797
798 #############################################
799 # Verification APIs
800 #############################################
801 @retry(attempts=3, wait=2, return_is_str=True)
802 def verify_router_id(tgen, topo, input_dict):
803 """
804 Running command "show ip bgp json" for DUT and reading router-id
805 from input_dict and verifying with command output.
806 1. Statically modfified router-id should take place
807 2. When static router-id is deleted highest loopback should
808 become router-id
809 3. When loopback intf is down then highest physcial intf
810 should become router-id
811
812 Parameters
813 ----------
814 * `tgen`: topogen object
815 * `topo`: input json file data
816 * `input_dict`: input dictionary, have details of Device Under Test, for
817 which user wants to test the data
818 Usage
819 -----
820 # Verify if router-id for r1 is 12.12.12.12
821 input_dict = {
822 "r1":{
823 "router_id": "12.12.12.12"
824 }
825 # Verify that router-id for r1 is highest interface ip
826 input_dict = {
827 "routers": ["r1"]
828 }
829 result = verify_router_id(tgen, topo, input_dict)
830
831 Returns
832 -------
833 errormsg(str) or True
834 """
835
836 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
837 for router in input_dict.keys():
838 if router not in tgen.routers():
839 continue
840
841 rnode = tgen.routers()[router]
842
843 del_router_id = input_dict[router]["bgp"].setdefault("del_router_id", False)
844
845 logger.info("Checking router %s router-id", router)
846 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
847 router_id_out = show_bgp_json["ipv4Unicast"]["routerId"]
848 router_id_out = ipaddr.IPv4Address(unicode(router_id_out))
849
850 # Once router-id is deleted, highest interface ip should become
851 # router-id
852 if del_router_id:
853 router_id = find_interface_with_greater_ip(topo, router)
854 else:
855 router_id = input_dict[router]["bgp"]["router_id"]
856 router_id = ipaddr.IPv4Address(unicode(router_id))
857
858 if router_id == router_id_out:
859 logger.info("Found expected router-id %s for router %s", router_id, router)
860 else:
861 errormsg = (
862 "Router-id for router:{} mismatch, expected:"
863 " {} but found:{}".format(router, router_id, router_id_out)
864 )
865 return errormsg
866
867 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
868 return True
869
870
871 @retry(attempts=44, wait=3, return_is_str=True)
872 def verify_bgp_convergence(tgen, topo, dut=None):
873 """
874 API will verify if BGP is converged with in the given time frame.
875 Running "show bgp summary json" command and verify bgp neighbor
876 state is established,
877
878 Parameters
879 ----------
880 * `tgen`: topogen object
881 * `topo`: input json file data
882 * `dut`: device under test
883
884 Usage
885 -----
886 # To veriry is BGP is converged for all the routers used in
887 topology
888 results = verify_bgp_convergence(tgen, topo, dut="r1")
889
890 Returns
891 -------
892 errormsg(str) or True
893 """
894
895 logger.debug("Entering lib API: verify_bgp_convergence()")
896 for router, rnode in tgen.routers().iteritems():
897 if "bgp" not in topo["routers"][router]:
898 continue
899
900 if dut is not None and dut != router:
901 continue
902
903 logger.info("Verifying BGP Convergence on router %s:", router)
904 show_bgp_json = run_frr_cmd(rnode, "show bgp vrf all summary json", isjson=True)
905 # Verifying output dictionary show_bgp_json is empty or not
906 if not bool(show_bgp_json):
907 errormsg = "BGP is not running"
908 return errormsg
909
910 # To find neighbor ip type
911 bgp_data_list = topo["routers"][router]["bgp"]
912
913 if type(bgp_data_list) is not list:
914 bgp_data_list = [bgp_data_list]
915
916 for bgp_data in bgp_data_list:
917 if "vrf" in bgp_data:
918 vrf = bgp_data["vrf"]
919 if vrf is None:
920 vrf = "default"
921 else:
922 vrf = "default"
923
924 # To find neighbor ip type
925 bgp_addr_type = bgp_data["address_family"]
926 if "l2vpn" in bgp_addr_type:
927 total_evpn_peer = 0
928
929 if "neighbor" not in bgp_addr_type["l2vpn"]["evpn"]:
930 continue
931
932 bgp_neighbors = bgp_addr_type["l2vpn"]["evpn"]["neighbor"]
933 total_evpn_peer += len(bgp_neighbors)
934
935 no_of_evpn_peer = 0
936 for bgp_neighbor, peer_data in bgp_neighbors.items():
937 for _addr_type, dest_link_dict in peer_data.items():
938 data = topo["routers"][bgp_neighbor]["links"]
939 for dest_link in dest_link_dict.keys():
940 if dest_link in data:
941 peer_details = peer_data[_addr_type][dest_link]
942
943 neighbor_ip = data[dest_link][_addr_type].split("/")[0]
944 nh_state = None
945
946 if (
947 "ipv4Unicast" in show_bgp_json[vrf]
948 or "ipv6Unicast" in show_bgp_json[vrf]
949 ):
950 errormsg = (
951 "[DUT: %s] VRF: %s, "
952 "ipv4Unicast/ipv6Unicast"
953 " address-family present"
954 " under l2vpn" % (router, vrf)
955 )
956 return errormsg
957
958 l2VpnEvpn_data = show_bgp_json[vrf]["l2VpnEvpn"][
959 "peers"
960 ]
961 nh_state = l2VpnEvpn_data[neighbor_ip]["state"]
962
963 if nh_state == "Established":
964 no_of_evpn_peer += 1
965
966 if no_of_evpn_peer == total_evpn_peer:
967 logger.info(
968 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
969 router,
970 vrf,
971 )
972 else:
973 errormsg = (
974 "[DUT: %s] VRF: %s, BGP is not converged "
975 "for evpn peers" % (router, vrf)
976 )
977 return errormsg
978 else:
979 for addr_type in bgp_addr_type.keys():
980 if not check_address_types(addr_type):
981 continue
982 total_peer = 0
983
984 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
985
986 for bgp_neighbor in bgp_neighbors:
987 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
988
989 for addr_type in bgp_addr_type.keys():
990 if not check_address_types(addr_type):
991 continue
992 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
993
994 no_of_peer = 0
995 for bgp_neighbor, peer_data in bgp_neighbors.items():
996 for dest_link in peer_data["dest_link"].keys():
997 data = topo["routers"][bgp_neighbor]["links"]
998 if dest_link in data:
999 peer_details = peer_data["dest_link"][dest_link]
1000 # for link local neighbors
1001 if (
1002 "neighbor_type" in peer_details
1003 and peer_details["neighbor_type"] == "link-local"
1004 ):
1005 neighbor_ip = get_ipv6_linklocal_address(
1006 topo["routers"], bgp_neighbor, dest_link
1007 )
1008 elif "source_link" in peer_details:
1009 neighbor_ip = topo["routers"][bgp_neighbor][
1010 "links"
1011 ][peer_details["source_link"]][addr_type].split(
1012 "/"
1013 )[
1014 0
1015 ]
1016 elif (
1017 "neighbor_type" in peer_details
1018 and peer_details["neighbor_type"] == "unnumbered"
1019 ):
1020 neighbor_ip = data[dest_link]["peer-interface"]
1021 else:
1022 neighbor_ip = data[dest_link][addr_type].split("/")[
1023 0
1024 ]
1025 nh_state = None
1026
1027 if addr_type == "ipv4":
1028 ipv4_data = show_bgp_json[vrf]["ipv4Unicast"][
1029 "peers"
1030 ]
1031 nh_state = ipv4_data[neighbor_ip]["state"]
1032 else:
1033 ipv6_data = show_bgp_json[vrf]["ipv6Unicast"][
1034 "peers"
1035 ]
1036 nh_state = ipv6_data[neighbor_ip]["state"]
1037
1038 if nh_state == "Established":
1039 no_of_peer += 1
1040
1041 if no_of_peer == total_peer:
1042 logger.info("[DUT: %s] VRF: %s, BGP is Converged", router, vrf)
1043 else:
1044 errormsg = "[DUT: %s] VRF: %s, BGP is not converged" % (router, vrf)
1045 return errormsg
1046
1047 logger.debug("Exiting API: verify_bgp_convergence()")
1048 return True
1049
1050
1051 @retry(attempts=3, wait=4, return_is_str=True)
1052 def verify_bgp_community(
1053 tgen, addr_type, router, network, input_dict=None, vrf=None, bestpath=False
1054 ):
1055 """
1056 API to veiryf BGP large community is attached in route for any given
1057 DUT by running "show bgp ipv4/6 {route address} json" command.
1058
1059 Parameters
1060 ----------
1061 * `tgen`: topogen object
1062 * `addr_type` : ip type, ipv4/ipv6
1063 * `dut`: Device Under Test
1064 * `network`: network for which set criteria needs to be verified
1065 * `input_dict`: having details like - for which router, community and
1066 values needs to be verified
1067 * `vrf`: VRF name
1068 * `bestpath`: To check best path cli
1069
1070 Usage
1071 -----
1072 networks = ["200.50.2.0/32"]
1073 input_dict = {
1074 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1075 }
1076 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1077
1078 Returns
1079 -------
1080 errormsg(str) or True
1081 """
1082
1083 logger.debug("Entering lib API: verify_bgp_community()")
1084 if router not in tgen.routers():
1085 return False
1086
1087 rnode = tgen.routers()[router]
1088
1089 logger.info(
1090 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1091 router,
1092 addr_type,
1093 network,
1094 )
1095
1096 command = "show bgp"
1097
1098 sleep(5)
1099 for net in network:
1100 if vrf:
1101 cmd = "{} vrf {} {} {} json".format(command, vrf, addr_type, net)
1102 elif bestpath:
1103 cmd = "{} {} {} bestpath json".format(command, addr_type, net)
1104 else:
1105 cmd = "{} {} {} json".format(command, addr_type, net)
1106
1107 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
1108 if "paths" not in show_bgp_json:
1109 return "Prefix {} not found in BGP table of router: {}".format(net, router)
1110
1111 as_paths = show_bgp_json["paths"]
1112 found = False
1113 for i in range(len(as_paths)):
1114 if (
1115 "largeCommunity" in show_bgp_json["paths"][i]
1116 or "community" in show_bgp_json["paths"][i]
1117 ):
1118 found = True
1119 logger.info(
1120 "Large Community attribute is found for route:" " %s in router: %s",
1121 net,
1122 router,
1123 )
1124 if input_dict is not None:
1125 for criteria, comm_val in input_dict.items():
1126 show_val = show_bgp_json["paths"][i][criteria]["string"]
1127 if comm_val == show_val:
1128 logger.info(
1129 "Verifying BGP %s for prefix: %s"
1130 " in router: %s, found expected"
1131 " value: %s",
1132 criteria,
1133 net,
1134 router,
1135 comm_val,
1136 )
1137 else:
1138 errormsg = (
1139 "Failed: Verifying BGP attribute"
1140 " {} for route: {} in router: {}"
1141 ", expected value: {} but found"
1142 ": {}".format(criteria, net, router, comm_val, show_val)
1143 )
1144 return errormsg
1145
1146 if not found:
1147 errormsg = (
1148 "Large Community attribute is not found for route: "
1149 "{} in router: {} ".format(net, router)
1150 )
1151 return errormsg
1152
1153 logger.debug("Exiting lib API: verify_bgp_community()")
1154 return True
1155
1156
1157 def modify_as_number(tgen, topo, input_dict):
1158 """
1159 API reads local_as and remote_as from user defined input_dict and
1160 modify router"s ASNs accordingly. Router"s config is modified and
1161 recent/changed config is loadeded to router.
1162
1163 Parameters
1164 ----------
1165 * `tgen` : Topogen object
1166 * `topo` : json file data
1167 * `input_dict` : defines for which router ASNs needs to be modified
1168
1169 Usage
1170 -----
1171 To modify ASNs for router r1
1172 input_dict = {
1173 "r1": {
1174 "bgp": {
1175 "local_as": 131079
1176 }
1177 }
1178 result = modify_as_number(tgen, topo, input_dict)
1179
1180 Returns
1181 -------
1182 errormsg(str) or True
1183 """
1184
1185 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1186 try:
1187
1188 new_topo = deepcopy(topo["routers"])
1189 router_dict = {}
1190 for router in input_dict.keys():
1191 # Remove bgp configuration
1192
1193 router_dict.update({router: {"bgp": {"delete": True}}})
1194
1195 new_topo[router]["bgp"]["local_as"] = input_dict[router]["bgp"]["local_as"]
1196
1197 logger.info("Removing bgp configuration")
1198 create_router_bgp(tgen, topo, router_dict)
1199
1200 logger.info("Applying modified bgp configuration")
1201 create_router_bgp(tgen, new_topo)
1202
1203 except Exception as e:
1204 # handle any exception
1205 logger.error("Error %s occured. Arguments %s.", e.message, e.args)
1206
1207 # Traceback
1208 errormsg = traceback.format_exc()
1209 logger.error(errormsg)
1210 return errormsg
1211
1212 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1213 return True
1214
1215
1216 @retry(attempts=3, wait=2, return_is_str=True)
1217 def verify_as_numbers(tgen, topo, input_dict):
1218 """
1219 This API is to verify AS numbers for given DUT by running
1220 "show ip bgp neighbor json" command. Local AS and Remote AS
1221 will ve verified with input_dict data and command output.
1222
1223 Parameters
1224 ----------
1225 * `tgen`: topogen object
1226 * `topo`: input json file data
1227 * `addr_type` : ip type, ipv4/ipv6
1228 * `input_dict`: defines - for which router, AS numbers needs to be verified
1229
1230 Usage
1231 -----
1232 input_dict = {
1233 "r1": {
1234 "bgp": {
1235 "local_as": 131079
1236 }
1237 }
1238 }
1239 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1240
1241 Returns
1242 -------
1243 errormsg(str) or True
1244 """
1245
1246 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1247 for router in input_dict.keys():
1248 if router not in tgen.routers():
1249 continue
1250
1251 rnode = tgen.routers()[router]
1252
1253 logger.info("Verifying AS numbers for dut %s:", router)
1254
1255 show_ip_bgp_neighbor_json = run_frr_cmd(
1256 rnode, "show ip bgp neighbor json", isjson=True
1257 )
1258 local_as = input_dict[router]["bgp"]["local_as"]
1259 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1260
1261 for addr_type in bgp_addr_type:
1262 if not check_address_types(addr_type):
1263 continue
1264
1265 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1266
1267 for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1268 remote_as = input_dict[bgp_neighbor]["bgp"]["local_as"]
1269 for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1270 neighbor_ip = None
1271 data = topo["routers"][bgp_neighbor]["links"]
1272
1273 if dest_link in data:
1274 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1275 neigh_data = show_ip_bgp_neighbor_json[neighbor_ip]
1276 # Verify Local AS for router
1277 if neigh_data["localAs"] != local_as:
1278 errormsg = (
1279 "Failed: Verify local_as for dut {},"
1280 " found: {} but expected: {}".format(
1281 router, neigh_data["localAs"], local_as
1282 )
1283 )
1284 return errormsg
1285 else:
1286 logger.info(
1287 "Verified local_as for dut %s, found" " expected: %s",
1288 router,
1289 local_as,
1290 )
1291
1292 # Verify Remote AS for neighbor
1293 if neigh_data["remoteAs"] != remote_as:
1294 errormsg = (
1295 "Failed: Verify remote_as for dut "
1296 "{}'s neighbor {}, found: {} but "
1297 "expected: {}".format(
1298 router, bgp_neighbor, neigh_data["remoteAs"], remote_as
1299 )
1300 )
1301 return errormsg
1302 else:
1303 logger.info(
1304 "Verified remote_as for dut %s's "
1305 "neighbor %s, found expected: %s",
1306 router,
1307 bgp_neighbor,
1308 remote_as,
1309 )
1310
1311 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1312 return True
1313
1314
1315 def clear_bgp(tgen, addr_type, router, vrf=None):
1316 """
1317 This API is to clear bgp neighborship by running
1318 clear ip bgp */clear bgp ipv6 * command,
1319
1320 Parameters
1321 ----------
1322 * `tgen`: topogen object
1323 * `addr_type`: ip type ipv4/ipv6
1324 * `router`: device under test
1325 * `vrf`: vrf name
1326
1327 Usage
1328 -----
1329 clear_bgp(tgen, addr_type, "r1")
1330 """
1331
1332 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1333
1334 if router not in tgen.routers():
1335 return False
1336
1337 rnode = tgen.routers()[router]
1338
1339 if vrf:
1340 if type(vrf) is not list:
1341 vrf = [vrf]
1342
1343 # Clearing BGP
1344 logger.info("Clearing BGP neighborship for router %s..", router)
1345 if addr_type == "ipv4":
1346 if vrf:
1347 for _vrf in vrf:
1348 run_frr_cmd(rnode, "clear ip bgp vrf {} *".format(_vrf))
1349 else:
1350 run_frr_cmd(rnode, "clear ip bgp *")
1351 elif addr_type == "ipv6":
1352 if vrf:
1353 for _vrf in vrf:
1354 run_frr_cmd(rnode, "clear bgp vrf {} ipv6 *".format(_vrf))
1355 else:
1356 run_frr_cmd(rnode, "clear bgp ipv6 *")
1357 else:
1358 run_frr_cmd(rnode, "clear bgp *")
1359
1360 sleep(5)
1361
1362 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1363
1364
1365 def clear_bgp_and_verify(tgen, topo, router):
1366 """
1367 This API is to clear bgp neighborship and verify bgp neighborship
1368 is coming up(BGP is converged) usinf "show bgp summary json" command
1369 and also verifying for all bgp neighbors uptime before and after
1370 clear bgp sessions is different as the uptime must be changed once
1371 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1372
1373 Parameters
1374 ----------
1375 * `tgen`: topogen object
1376 * `topo`: input json file data
1377 * `router`: device under test
1378
1379 Usage
1380 -----
1381 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1382
1383 Returns
1384 -------
1385 errormsg(str) or True
1386 """
1387
1388 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1389
1390 if router not in tgen.routers():
1391 return False
1392
1393 rnode = tgen.routers()[router]
1394
1395 peer_uptime_before_clear_bgp = {}
1396 # Verifying BGP convergence before bgp clear command
1397 for retry in range(44):
1398 sleeptime = 3
1399 # Waiting for BGP to converge
1400 logger.info(
1401 "Waiting for %s sec for BGP to converge on router" " %s...",
1402 sleeptime,
1403 router,
1404 )
1405 sleep(sleeptime)
1406
1407 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1408 # Verifying output dictionary show_bgp_json is empty or not
1409 if not bool(show_bgp_json):
1410 errormsg = "BGP is not running"
1411 return errormsg
1412
1413 # To find neighbor ip type
1414 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1415 total_peer = 0
1416 for addr_type in bgp_addr_type.keys():
1417
1418 if not check_address_types(addr_type):
1419 continue
1420
1421 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1422
1423 for bgp_neighbor in bgp_neighbors:
1424 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1425
1426 no_of_peer = 0
1427 for addr_type in bgp_addr_type:
1428 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1429
1430 for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1431 for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1432 data = topo["routers"][bgp_neighbor]["links"]
1433
1434 if dest_link in data:
1435 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1436 if addr_type == "ipv4":
1437 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1438 nh_state = ipv4_data[neighbor_ip]["state"]
1439
1440 # Peer up time dictionary
1441 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv4_data[
1442 neighbor_ip
1443 ]["peerUptimeEstablishedEpoch"]
1444 else:
1445 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1446 nh_state = ipv6_data[neighbor_ip]["state"]
1447
1448 # Peer up time dictionary
1449 peer_uptime_before_clear_bgp[bgp_neighbor] = ipv6_data[
1450 neighbor_ip
1451 ]["peerUptimeEstablishedEpoch"]
1452
1453 if nh_state == "Established":
1454 no_of_peer += 1
1455
1456 if no_of_peer == total_peer:
1457 logger.info("BGP is Converged for router %s before bgp" " clear", router)
1458 break
1459 else:
1460 logger.info(
1461 "BGP is not yet Converged for router %s " "before bgp clear", router
1462 )
1463 else:
1464 errormsg = (
1465 "TIMEOUT!! BGP is not converged in 30 seconds for"
1466 " router {}".format(router)
1467 )
1468 return errormsg
1469
1470 # Clearing BGP
1471 logger.info("Clearing BGP neighborship for router %s..", router)
1472 for addr_type in bgp_addr_type.keys():
1473 if addr_type == "ipv4":
1474 run_frr_cmd(rnode, "clear ip bgp *")
1475 elif addr_type == "ipv6":
1476 run_frr_cmd(rnode, "clear bgp ipv6 *")
1477
1478 peer_uptime_after_clear_bgp = {}
1479 # Verifying BGP convergence after bgp clear command
1480 for retry in range(44):
1481 sleeptime = 3
1482 # Waiting for BGP to converge
1483 logger.info(
1484 "Waiting for %s sec for BGP to converge on router" " %s...",
1485 sleeptime,
1486 router,
1487 )
1488 sleep(sleeptime)
1489
1490 show_bgp_json = run_frr_cmd(rnode, "show bgp summary json", isjson=True)
1491 # Verifying output dictionary show_bgp_json is empty or not
1492 if not bool(show_bgp_json):
1493 errormsg = "BGP is not running"
1494 return errormsg
1495
1496 # To find neighbor ip type
1497 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
1498 total_peer = 0
1499 for addr_type in bgp_addr_type.keys():
1500 if not check_address_types(addr_type):
1501 continue
1502
1503 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1504
1505 for bgp_neighbor in bgp_neighbors:
1506 total_peer += len(bgp_neighbors[bgp_neighbor]["dest_link"])
1507
1508 no_of_peer = 0
1509 for addr_type in bgp_addr_type:
1510 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1511
1512 for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1513 for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1514 data = topo["routers"][bgp_neighbor]["links"]
1515
1516 if dest_link in data:
1517 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1518 if addr_type == "ipv4":
1519 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1520 nh_state = ipv4_data[neighbor_ip]["state"]
1521 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv4_data[
1522 neighbor_ip
1523 ]["peerUptimeEstablishedEpoch"]
1524 else:
1525 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1526 nh_state = ipv6_data[neighbor_ip]["state"]
1527 # Peer up time dictionary
1528 peer_uptime_after_clear_bgp[bgp_neighbor] = ipv6_data[
1529 neighbor_ip
1530 ]["peerUptimeEstablishedEpoch"]
1531
1532 if nh_state == "Established":
1533 no_of_peer += 1
1534
1535 if no_of_peer == total_peer:
1536 logger.info("BGP is Converged for router %s after bgp clear", router)
1537 break
1538 else:
1539 logger.info(
1540 "BGP is not yet Converged for router %s after" " bgp clear", router
1541 )
1542 else:
1543 errormsg = (
1544 "TIMEOUT!! BGP is not converged in 30 seconds for"
1545 " router {}".format(router)
1546 )
1547 return errormsg
1548
1549 # Comparing peerUptimeEstablishedEpoch dictionaries
1550 if peer_uptime_before_clear_bgp != peer_uptime_after_clear_bgp:
1551 logger.info("BGP neighborship is reset after clear BGP on router %s", router)
1552 else:
1553 errormsg = (
1554 "BGP neighborship is not reset after clear bgp on router"
1555 " {}".format(router)
1556 )
1557 return errormsg
1558
1559 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1560 return True
1561
1562
1563 def verify_bgp_timers_and_functionality(tgen, topo, input_dict):
1564 """
1565 To verify BGP timer config, execute "show ip bgp neighbor json" command
1566 and verify bgp timers with input_dict data.
1567 To veirfy bgp timers functonality, shutting down peer interface
1568 and verify BGP neighborship status.
1569
1570 Parameters
1571 ----------
1572 * `tgen`: topogen object
1573 * `topo`: input json file data
1574 * `addr_type`: ip type, ipv4/ipv6
1575 * `input_dict`: defines for which router, bgp timers needs to be verified
1576
1577 Usage:
1578 # To verify BGP timers for neighbor r2 of router r1
1579 input_dict = {
1580 "r1": {
1581 "bgp": {
1582 "bgp_neighbors":{
1583 "r2":{
1584 "keepalivetimer": 5,
1585 "holddowntimer": 15,
1586 }}}}}
1587 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
1588 input_dict)
1589
1590 Returns
1591 -------
1592 errormsg(str) or True
1593 """
1594
1595 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1596 sleep(5)
1597 router_list = tgen.routers()
1598 for router in input_dict.keys():
1599 if router not in router_list:
1600 continue
1601
1602 rnode = router_list[router]
1603
1604 logger.info("Verifying bgp timers functionality, DUT is %s:", router)
1605
1606 show_ip_bgp_neighbor_json = run_frr_cmd(
1607 rnode, "show ip bgp neighbor json", isjson=True
1608 )
1609
1610 bgp_addr_type = input_dict[router]["bgp"]["address_family"]
1611
1612 for addr_type in bgp_addr_type:
1613 if not check_address_types(addr_type):
1614 continue
1615
1616 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
1617 for bgp_neighbor, peer_data in bgp_neighbors.iteritems():
1618 for dest_link, peer_dict in peer_data["dest_link"].iteritems():
1619 data = topo["routers"][bgp_neighbor]["links"]
1620
1621 keepalivetimer = peer_dict["keepalivetimer"]
1622 holddowntimer = peer_dict["holddowntimer"]
1623
1624 if dest_link in data:
1625 neighbor_ip = data[dest_link][addr_type].split("/")[0]
1626 neighbor_intf = data[dest_link]["interface"]
1627
1628 # Verify HoldDownTimer for neighbor
1629 bgpHoldTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
1630 "bgpTimerHoldTimeMsecs"
1631 ]
1632 if bgpHoldTimeMsecs != holddowntimer * 1000:
1633 errormsg = (
1634 "Verifying holddowntimer for bgp "
1635 "neighbor {} under dut {}, found: {} "
1636 "but expected: {}".format(
1637 neighbor_ip,
1638 router,
1639 bgpHoldTimeMsecs,
1640 holddowntimer * 1000,
1641 )
1642 )
1643 return errormsg
1644
1645 # Verify KeepAliveTimer for neighbor
1646 bgpKeepAliveTimeMsecs = show_ip_bgp_neighbor_json[neighbor_ip][
1647 "bgpTimerKeepAliveIntervalMsecs"
1648 ]
1649 if bgpKeepAliveTimeMsecs != keepalivetimer * 1000:
1650 errormsg = (
1651 "Verifying keepalivetimer for bgp "
1652 "neighbor {} under dut {}, found: {} "
1653 "but expected: {}".format(
1654 neighbor_ip,
1655 router,
1656 bgpKeepAliveTimeMsecs,
1657 keepalivetimer * 1000,
1658 )
1659 )
1660 return errormsg
1661
1662 ####################
1663 # Shutting down peer interface after keepalive time and
1664 # after some time bringing up peer interface.
1665 # verifying BGP neighborship in (hold down-keep alive)
1666 # time, it should not go down
1667 ####################
1668
1669 # Wait till keep alive time
1670 logger.info("=" * 20)
1671 logger.info("Scenario 1:")
1672 logger.info(
1673 "Shutdown and bring up peer interface: %s "
1674 "in keep alive time : %s sec and verify "
1675 " BGP neighborship is intact in %s sec ",
1676 neighbor_intf,
1677 keepalivetimer,
1678 (holddowntimer - keepalivetimer),
1679 )
1680 logger.info("=" * 20)
1681 logger.info("Waiting for %s sec..", keepalivetimer)
1682 sleep(keepalivetimer)
1683
1684 # Shutting down peer ineterface
1685 logger.info(
1686 "Shutting down interface %s on router %s",
1687 neighbor_intf,
1688 bgp_neighbor,
1689 )
1690 topotest.interface_set_status(
1691 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
1692 )
1693
1694 # Bringing up peer interface
1695 sleep(5)
1696 logger.info(
1697 "Bringing up interface %s on router %s..",
1698 neighbor_intf,
1699 bgp_neighbor,
1700 )
1701 topotest.interface_set_status(
1702 router_list[bgp_neighbor], neighbor_intf, ifaceaction=True
1703 )
1704
1705 # Verifying BGP neighborship is intact in
1706 # (holddown - keepalive) time
1707 for timer in range(
1708 keepalivetimer, holddowntimer, int(holddowntimer / 3)
1709 ):
1710 logger.info("Waiting for %s sec..", keepalivetimer)
1711 sleep(keepalivetimer)
1712 sleep(2)
1713 show_bgp_json = run_frr_cmd(
1714 rnode, "show bgp summary json", isjson=True
1715 )
1716
1717 if addr_type == "ipv4":
1718 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1719 nh_state = ipv4_data[neighbor_ip]["state"]
1720 else:
1721 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1722 nh_state = ipv6_data[neighbor_ip]["state"]
1723
1724 if timer == (holddowntimer - keepalivetimer):
1725 if nh_state != "Established":
1726 errormsg = (
1727 "BGP neighborship has not gone "
1728 "down in {} sec for neighbor {}".format(
1729 timer, bgp_neighbor
1730 )
1731 )
1732 return errormsg
1733 else:
1734 logger.info(
1735 "BGP neighborship is intact in %s"
1736 " sec for neighbor %s",
1737 timer,
1738 bgp_neighbor,
1739 )
1740
1741 ####################
1742 # Shutting down peer interface and verifying that BGP
1743 # neighborship is going down in holddown time
1744 ####################
1745 logger.info("=" * 20)
1746 logger.info("Scenario 2:")
1747 logger.info(
1748 "Shutdown peer interface: %s and verify BGP"
1749 " neighborship has gone down in hold down "
1750 "time %s sec",
1751 neighbor_intf,
1752 holddowntimer,
1753 )
1754 logger.info("=" * 20)
1755
1756 logger.info(
1757 "Shutting down interface %s on router %s..",
1758 neighbor_intf,
1759 bgp_neighbor,
1760 )
1761 topotest.interface_set_status(
1762 router_list[bgp_neighbor], neighbor_intf, ifaceaction=False
1763 )
1764
1765 # Verifying BGP neighborship is going down in holddown time
1766 for timer in range(
1767 keepalivetimer,
1768 (holddowntimer + keepalivetimer),
1769 int(holddowntimer / 3),
1770 ):
1771 logger.info("Waiting for %s sec..", keepalivetimer)
1772 sleep(keepalivetimer)
1773 sleep(2)
1774 show_bgp_json = run_frr_cmd(
1775 rnode, "show bgp summary json", isjson=True
1776 )
1777
1778 if addr_type == "ipv4":
1779 ipv4_data = show_bgp_json["ipv4Unicast"]["peers"]
1780 nh_state = ipv4_data[neighbor_ip]["state"]
1781 else:
1782 ipv6_data = show_bgp_json["ipv6Unicast"]["peers"]
1783 nh_state = ipv6_data[neighbor_ip]["state"]
1784
1785 if timer == holddowntimer:
1786 if nh_state == "Established":
1787 errormsg = (
1788 "BGP neighborship has not gone "
1789 "down in {} sec for neighbor {}".format(
1790 timer, bgp_neighbor
1791 )
1792 )
1793 return errormsg
1794 else:
1795 logger.info(
1796 "BGP neighborship has gone down in"
1797 " %s sec for neighbor %s",
1798 timer,
1799 bgp_neighbor,
1800 )
1801
1802 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1803 return True
1804
1805
1806 @retry(attempts=3, wait=4, return_is_str=True)
1807 def verify_bgp_attributes(
1808 tgen, addr_type, dut, static_routes, rmap_name, input_dict, seq_id=None
1809 ):
1810 """
1811 API will verify BGP attributes set by Route-map for given prefix and
1812 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
1813 in DUT to verify BGP attributes set by route-map, Set attributes
1814 values will be read from input_dict and verified with command output.
1815
1816 * `tgen`: topogen object
1817 * `addr_type` : ip type, ipv4/ipv6
1818 * `dut`: Device Under Test
1819 * `static_routes`: Static Routes for which BGP set attributes needs to be
1820 verified
1821 * `rmap_name`: route map name for which set criteria needs to be verified
1822 * `input_dict`: defines for which router, AS numbers needs
1823 * `seq_id`: sequence number of rmap, default is None
1824
1825 Usage
1826 -----
1827 input_dict = {
1828 "r3": {
1829 "route_maps": {
1830 "rmap_match_pf_1_ipv4": [{
1831 "action": "permit",
1832 'seq_id': '5',
1833 "match": {
1834 addr_type: {
1835 "prefix_lists": "pf_list_1_" + addr_type
1836 }
1837 },
1838 "set": {
1839 "locPrf": 150,
1840 "weight": 100
1841 }
1842 }],
1843 "rmap_match_pf_2_ipv6": [{
1844 "action": "permit",
1845 'seq_id': '5',
1846 "match": {
1847 addr_type: {
1848 "prefix_lists": "pf_list_1_" + addr_type
1849 }
1850 },
1851 "set": {
1852 "metric": 50
1853 }
1854 }]
1855 }
1856 }
1857 }
1858 result = verify_bgp_attributes(tgen, 'ipv4', "r1", "10.0.20.1/32",
1859 rmap_match_pf_1_ipv4, input_dict)
1860
1861 Returns
1862 -------
1863 errormsg(str) or True
1864 """
1865
1866 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1867 for router, rnode in tgen.routers().iteritems():
1868 if router != dut:
1869 continue
1870
1871 logger.info("Verifying BGP set attributes for dut {}:".format(router))
1872
1873 for static_route in static_routes:
1874 cmd = "show bgp {} {} json".format(addr_type, static_route)
1875 show_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
1876
1877 dict_to_test = []
1878 tmp_list = []
1879 for rmap_router in input_dict.keys():
1880 for rmap, values in input_dict[rmap_router]["route_maps"].items():
1881 if rmap == rmap_name:
1882 dict_to_test = values
1883 for rmap_dict in values:
1884 if seq_id is not None:
1885 if type(seq_id) is not list:
1886 seq_id = [seq_id]
1887
1888 if "seq_id" in rmap_dict:
1889 rmap_seq_id = rmap_dict["seq_id"]
1890 for _seq_id in seq_id:
1891 if _seq_id == rmap_seq_id:
1892 tmp_list.append(rmap_dict)
1893 if tmp_list:
1894 dict_to_test = tmp_list
1895
1896 for rmap_dict in dict_to_test:
1897 if "set" in rmap_dict:
1898 for criteria in rmap_dict["set"].keys():
1899 if criteria not in show_bgp_json["paths"][0]:
1900 errormsg = (
1901 "BGP attribute: {}"
1902 " is not found in"
1903 " cli: {} output "
1904 "in router {}".format(criteria, cmd, router)
1905 )
1906 return errormsg
1907
1908 if (
1909 rmap_dict["set"][criteria]
1910 == show_bgp_json["paths"][0][criteria]
1911 ):
1912 logger.info(
1913 "Verifying BGP "
1914 "attribute {} for"
1915 " route: {} in "
1916 "router: {}, found"
1917 " expected value:"
1918 " {}".format(
1919 criteria,
1920 static_route,
1921 dut,
1922 rmap_dict["set"][criteria],
1923 )
1924 )
1925 else:
1926 errormsg = (
1927 "Failed: Verifying BGP "
1928 "attribute {} for route:"
1929 " {} in router: {}, "
1930 " expected value: {} but"
1931 " found: {}".format(
1932 criteria,
1933 static_route,
1934 dut,
1935 rmap_dict["set"][criteria],
1936 show_bgp_json["paths"][0][criteria],
1937 )
1938 )
1939 return errormsg
1940
1941 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1942 return True
1943
1944
1945 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
1946 def verify_best_path_as_per_bgp_attribute(
1947 tgen, addr_type, router, input_dict, attribute
1948 ):
1949 """
1950 API is to verify best path according to BGP attributes for given routes.
1951 "show bgp ipv4/6 json" command will be run and verify best path according
1952 to shortest as-path, highest local-preference and med, lowest weight and
1953 route origin IGP>EGP>INCOMPLETE.
1954 Parameters
1955 ----------
1956 * `tgen` : topogen object
1957 * `addr_type` : ip type, ipv4/ipv6
1958 * `tgen` : topogen object
1959 * `attribute` : calculate best path using this attribute
1960 * `input_dict`: defines different routes to calculate for which route
1961 best path is selected
1962 Usage
1963 -----
1964 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
1965 router r7 to router r1(DUT) as per shortest as-path attribute
1966 input_dict = {
1967 "r7": {
1968 "bgp": {
1969 "address_family": {
1970 "ipv4": {
1971 "unicast": {
1972 "advertise_networks": [
1973 {
1974 "network": "200.50.2.0/32"
1975 },
1976 {
1977 "network": "200.60.2.0/32"
1978 }
1979 ]
1980 }
1981 }
1982 }
1983 }
1984 }
1985 }
1986 attribute = "locPrf"
1987 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
1988 input_dict, attribute)
1989 Returns
1990 -------
1991 errormsg(str) or True
1992 """
1993
1994 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1995
1996 if router not in tgen.routers():
1997 return False
1998
1999 rnode = tgen.routers()[router]
2000
2001 # Verifying show bgp json
2002 command = "show bgp"
2003
2004 sleep(2)
2005 logger.info("Verifying router %s RIB for best path:", router)
2006
2007 static_route = False
2008 advertise_network = False
2009 for route_val in input_dict.values():
2010 if "static_routes" in route_val:
2011 static_route = True
2012 networks = route_val["static_routes"]
2013 else:
2014 advertise_network = True
2015 net_data = route_val["bgp"]["address_family"][addr_type]["unicast"]
2016 networks = net_data["advertise_networks"]
2017
2018 for network in networks:
2019 _network = network["network"]
2020 no_of_ip = network.setdefault("no_of_ip", 1)
2021 vrf = network.setdefault("vrf", None)
2022
2023 if vrf:
2024 cmd = "{} vrf {}".format(command, vrf)
2025 else:
2026 cmd = command
2027
2028 cmd = "{} {}".format(cmd, addr_type)
2029 cmd = "{} json".format(cmd)
2030 sh_ip_bgp_json = run_frr_cmd(rnode, cmd, isjson=True)
2031
2032 routes = generate_ips(_network, no_of_ip)
2033 for route in routes:
2034 route = str(ipaddr.IPNetwork(unicode(route)))
2035
2036 if route in sh_ip_bgp_json["routes"]:
2037 route_attributes = sh_ip_bgp_json["routes"][route]
2038 _next_hop = None
2039 compare = None
2040 attribute_dict = {}
2041 for route_attribute in route_attributes:
2042 next_hops = route_attribute["nexthops"]
2043 for next_hop in next_hops:
2044 next_hop_ip = next_hop["ip"]
2045 attribute_dict[next_hop_ip] = route_attribute[attribute]
2046
2047 # AS_PATH attribute
2048 if attribute == "path":
2049 # Find next_hop for the route have minimum as_path
2050 _next_hop = min(
2051 attribute_dict, key=lambda x: len(set(attribute_dict[x]))
2052 )
2053 compare = "SHORTEST"
2054
2055 # LOCAL_PREF attribute
2056 elif attribute == "locPrf":
2057 # Find next_hop for the route have highest local preference
2058 _next_hop = max(
2059 attribute_dict, key=(lambda k: attribute_dict[k])
2060 )
2061 compare = "HIGHEST"
2062
2063 # WEIGHT attribute
2064 elif attribute == "weight":
2065 # Find next_hop for the route have highest weight
2066 _next_hop = max(
2067 attribute_dict, key=(lambda k: attribute_dict[k])
2068 )
2069 compare = "HIGHEST"
2070
2071 # ORIGIN attribute
2072 elif attribute == "origin":
2073 # Find next_hop for the route have IGP as origin, -
2074 # - rule is IGP>EGP>INCOMPLETE
2075 _next_hop = [
2076 key
2077 for (key, value) in attribute_dict.iteritems()
2078 if value == "IGP"
2079 ][0]
2080 compare = ""
2081
2082 # MED attribute
2083 elif attribute == "metric":
2084 # Find next_hop for the route have LOWEST MED
2085 _next_hop = min(
2086 attribute_dict, key=(lambda k: attribute_dict[k])
2087 )
2088 compare = "LOWEST"
2089
2090 # Show ip route
2091 if addr_type == "ipv4":
2092 command_1 = "show ip route"
2093 else:
2094 command_1 = "show ipv6 route"
2095
2096 if vrf:
2097 cmd = "{} vrf {} json".format(command_1, vrf)
2098 else:
2099 cmd = "{} json".format(command_1)
2100
2101 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2102
2103 # Verifying output dictionary rib_routes_json is not empty
2104 if not bool(rib_routes_json):
2105 errormsg = "No route found in RIB of router {}..".format(router)
2106 return errormsg
2107
2108 st_found = False
2109 nh_found = False
2110 # Find best is installed in RIB
2111 if route in rib_routes_json:
2112 st_found = True
2113 # Verify next_hop in rib_routes_json
2114 if (
2115 rib_routes_json[route][0]["nexthops"][0]["ip"]
2116 in attribute_dict
2117 ):
2118 nh_found = True
2119 else:
2120 errormsg = (
2121 "Incorrect Nexthop for BGP route {} in "
2122 "RIB of router {}, Expected: {}, Found:"
2123 " {}\n".format(
2124 route,
2125 router,
2126 rib_routes_json[route][0]["nexthops"][0]["ip"],
2127 _next_hop,
2128 )
2129 )
2130 return errormsg
2131
2132 if st_found and nh_found:
2133 logger.info(
2134 "Best path for prefix: %s with next_hop: %s is "
2135 "installed according to %s %s: (%s) in RIB of "
2136 "router %s",
2137 route,
2138 _next_hop,
2139 compare,
2140 attribute,
2141 attribute_dict[_next_hop],
2142 router,
2143 )
2144
2145 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2146 return True
2147
2148
2149 def verify_best_path_as_per_admin_distance(
2150 tgen, addr_type, router, input_dict, attribute
2151 ):
2152 """
2153 API is to verify best path according to admin distance for given
2154 route. "show ip/ipv6 route json" command will be run and verify
2155 best path accoring to shortest admin distanc.
2156
2157 Parameters
2158 ----------
2159 * `addr_type` : ip type, ipv4/ipv6
2160 * `dut`: Device Under Test
2161 * `tgen` : topogen object
2162 * `attribute` : calculate best path using admin distance
2163 * `input_dict`: defines different routes with different admin distance
2164 to calculate for which route best path is selected
2165 Usage
2166 -----
2167 # To verify best path for route 200.50.2.0/32 from router r2 to
2168 router r1(DUT) as per shortest admin distance which is 60.
2169 input_dict = {
2170 "r2": {
2171 "static_routes": [{"network": "200.50.2.0/32", \
2172 "admin_distance": 80, "next_hop": "10.0.0.14"},
2173 {"network": "200.50.2.0/32", \
2174 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2175 }}
2176 attribute = "locPrf"
2177 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2178 input_dict, attribute):
2179 Returns
2180 -------
2181 errormsg(str) or True
2182 """
2183
2184 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2185 router_list = tgen.routers()
2186 if router not in router_list:
2187 return False
2188
2189 rnode = tgen.routers()[router]
2190
2191 sleep(5)
2192 logger.info("Verifying router %s RIB for best path:", router)
2193
2194 # Show ip route cmd
2195 if addr_type == "ipv4":
2196 command = "show ip route json"
2197 else:
2198 command = "show ipv6 route json"
2199
2200 for routes_from_router in input_dict.keys():
2201 sh_ip_route_json = router_list[routes_from_router].vtysh_cmd(
2202 command, isjson=True
2203 )
2204 networks = input_dict[routes_from_router]["static_routes"]
2205 for network in networks:
2206 route = network["network"]
2207
2208 route_attributes = sh_ip_route_json[route]
2209 _next_hop = None
2210 compare = None
2211 attribute_dict = {}
2212 for route_attribute in route_attributes:
2213 next_hops = route_attribute["nexthops"]
2214 for next_hop in next_hops:
2215 next_hop_ip = next_hop["ip"]
2216 attribute_dict[next_hop_ip] = route_attribute["distance"]
2217
2218 # Find next_hop for the route have LOWEST Admin Distance
2219 _next_hop = min(attribute_dict, key=(lambda k: attribute_dict[k]))
2220 compare = "LOWEST"
2221
2222 # Show ip route
2223 rib_routes_json = run_frr_cmd(rnode, command, isjson=True)
2224
2225 # Verifying output dictionary rib_routes_json is not empty
2226 if not bool(rib_routes_json):
2227 errormsg = "No route found in RIB of router {}..".format(router)
2228 return errormsg
2229
2230 st_found = False
2231 nh_found = False
2232 # Find best is installed in RIB
2233 if route in rib_routes_json:
2234 st_found = True
2235 # Verify next_hop in rib_routes_json
2236 if rib_routes_json[route][0]["nexthops"][0]["ip"] == _next_hop:
2237 nh_found = True
2238 else:
2239 errormsg = (
2240 "Nexthop {} is Missing for BGP route {}"
2241 " in RIB of router {}\n".format(_next_hop, route, router)
2242 )
2243 return errormsg
2244
2245 if st_found and nh_found:
2246 logger.info(
2247 "Best path for prefix: %s is installed according"
2248 " to %s %s: (%s) in RIB of router %s",
2249 route,
2250 compare,
2251 attribute,
2252 attribute_dict[_next_hop],
2253 router,
2254 )
2255
2256 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2257 return True
2258
2259
2260 @retry(attempts=5, wait=2, return_is_str=True, initial_wait=2)
2261 def verify_bgp_rib(tgen, addr_type, dut, input_dict, next_hop=None, aspath=None):
2262 """
2263 This API is to verify whether bgp rib has any
2264 matching route for a nexthop.
2265
2266 Parameters
2267 ----------
2268 * `tgen`: topogen object
2269 * `dut`: input dut router name
2270 * `addr_type` : ip type ipv4/ipv6
2271 * `input_dict` : input dict, has details of static routes
2272 * `next_hop`[optional]: next_hop which needs to be verified,
2273 default = static
2274 * 'aspath'[optional]: aspath which needs to be verified
2275
2276 Usage
2277 -----
2278 dut = 'r1'
2279 next_hop = "192.168.1.10"
2280 input_dict = topo['routers']
2281 aspath = "100 200 300"
2282 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2283 next_hop, aspath)
2284
2285 Returns
2286 -------
2287 errormsg(str) or True
2288 """
2289
2290 logger.debug("Entering lib API: verify_bgp_rib()")
2291
2292 router_list = tgen.routers()
2293 additional_nexthops_in_required_nhs = []
2294 list1 = []
2295 list2 = []
2296 for routerInput in input_dict.keys():
2297 for router, rnode in router_list.iteritems():
2298 if router != dut:
2299 continue
2300
2301 # Verifying RIB routes
2302 command = "show bgp"
2303
2304 # Static routes
2305 sleep(2)
2306 logger.info("Checking router {} BGP RIB:".format(dut))
2307
2308 if "static_routes" in input_dict[routerInput]:
2309 static_routes = input_dict[routerInput]["static_routes"]
2310
2311 for static_route in static_routes:
2312 found_routes = []
2313 missing_routes = []
2314 st_found = False
2315 nh_found = False
2316 vrf = static_route.setdefault("vrf", None)
2317 if vrf:
2318 cmd = "{} vrf {} {}".format(command, vrf, addr_type)
2319
2320 else:
2321 cmd = "{} {}".format(command, addr_type)
2322
2323 cmd = "{} json".format(cmd)
2324
2325 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2326
2327 # Verifying output dictionary rib_routes_json is not empty
2328 if bool(rib_routes_json) == False:
2329 errormsg = "No route found in rib of router {}..".format(router)
2330 return errormsg
2331
2332 network = static_route["network"]
2333
2334 if "no_of_ip" in static_route:
2335 no_of_ip = static_route["no_of_ip"]
2336 else:
2337 no_of_ip = 1
2338
2339 # Generating IPs for verification
2340 ip_list = generate_ips(network, no_of_ip)
2341
2342 for st_rt in ip_list:
2343 st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
2344
2345 _addr_type = validate_ip_address(st_rt)
2346 if _addr_type != addr_type:
2347 continue
2348
2349 if st_rt in rib_routes_json["routes"]:
2350 st_found = True
2351 found_routes.append(st_rt)
2352
2353 if next_hop:
2354 if not isinstance(next_hop, list):
2355 next_hop = [next_hop]
2356 list1 = next_hop
2357 found_hops = [
2358 rib_r["ip"]
2359 for rib_r in rib_routes_json["routes"][st_rt][0][
2360 "nexthops"
2361 ]
2362 ]
2363 list2 = found_hops
2364 missing_list_of_nexthops = set(list2).difference(list1)
2365 additional_nexthops_in_required_nhs = set(
2366 list1
2367 ).difference(list2)
2368
2369 if list2:
2370 if additional_nexthops_in_required_nhs:
2371 logger.info(
2372 "Missing nexthop %s for route"
2373 " %s in RIB of router %s\n",
2374 additional_nexthops_in_required_nhs,
2375 st_rt,
2376 dut,
2377 )
2378 errormsg = (
2379 "Nexthop {} is Missing for "
2380 "route {} in RIB of router {}\n".format(
2381 additional_nexthops_in_required_nhs,
2382 st_rt,
2383 dut,
2384 )
2385 )
2386 return errormsg
2387 else:
2388 nh_found = True
2389 if aspath:
2390 found_paths = rib_routes_json["routes"][st_rt][0][
2391 "path"
2392 ]
2393 if aspath == found_paths:
2394 aspath_found = True
2395 logger.info(
2396 "Found AS path {} for route"
2397 " {} in RIB of router "
2398 "{}\n".format(aspath, st_rt, dut)
2399 )
2400 else:
2401 errormsg = (
2402 "AS Path {} is missing for route"
2403 "for route {} in RIB of router {}\n".format(
2404 aspath, st_rt, dut
2405 )
2406 )
2407 return errormsg
2408
2409 else:
2410 missing_routes.append(st_rt)
2411
2412 if nh_found:
2413 logger.info(
2414 "Found next_hop {} for all bgp"
2415 " routes in RIB of"
2416 " router {}\n".format(next_hop, router)
2417 )
2418
2419 if len(missing_routes) > 0:
2420 errormsg = (
2421 "Missing route in RIB of router {}, "
2422 "routes: {}\n".format(dut, missing_routes)
2423 )
2424 return errormsg
2425
2426 if found_routes:
2427 logger.info(
2428 "Verified routes in router {} BGP RIB, "
2429 "found routes are: {} \n".format(dut, found_routes)
2430 )
2431 continue
2432
2433 if "bgp" not in input_dict[routerInput]:
2434 continue
2435
2436 # Advertise networks
2437 bgp_data_list = input_dict[routerInput]["bgp"]
2438
2439 if type(bgp_data_list) is not list:
2440 bgp_data_list = [bgp_data_list]
2441
2442 for bgp_data in bgp_data_list:
2443 vrf_id = bgp_data.setdefault("vrf", None)
2444 if vrf_id:
2445 cmd = "{} vrf {} {}".format(command, vrf_id, addr_type)
2446 else:
2447 cmd = "{} {}".format(command, addr_type)
2448
2449 cmd = "{} json".format(cmd)
2450
2451 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
2452
2453 # Verifying output dictionary rib_routes_json is not empty
2454 if bool(rib_routes_json) == False:
2455 errormsg = "No route found in rib of router {}..".format(router)
2456 return errormsg
2457
2458 bgp_net_advertise = bgp_data["address_family"][addr_type]["unicast"]
2459 advertise_network = bgp_net_advertise.setdefault(
2460 "advertise_networks", []
2461 )
2462
2463 for advertise_network_dict in advertise_network:
2464 found_routes = []
2465 missing_routes = []
2466 found = False
2467
2468 network = advertise_network_dict["network"]
2469
2470 if "no_of_network" in advertise_network_dict:
2471 no_of_network = advertise_network_dict["no_of_network"]
2472 else:
2473 no_of_network = 1
2474
2475 # Generating IPs for verification
2476 ip_list = generate_ips(network, no_of_network)
2477
2478 for st_rt in ip_list:
2479 st_rt = str(ipaddr.IPNetwork(unicode(st_rt)))
2480
2481 _addr_type = validate_ip_address(st_rt)
2482 if _addr_type != addr_type:
2483 continue
2484
2485 if st_rt in rib_routes_json["routes"]:
2486 found = True
2487 found_routes.append(st_rt)
2488 else:
2489 found = False
2490 missing_routes.append(st_rt)
2491
2492 if len(missing_routes) > 0:
2493 errormsg = (
2494 "Missing route in BGP RIB of router {},"
2495 " are: {}\n".format(dut, missing_routes)
2496 )
2497 return errormsg
2498
2499 if found_routes:
2500 logger.info(
2501 "Verified routes in router {} BGP RIB, found "
2502 "routes are: {}\n".format(dut, found_routes)
2503 )
2504
2505 logger.debug("Exiting lib API: verify_bgp_rib()")
2506 return True
2507
2508
2509 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
2510 def verify_graceful_restart(tgen, topo, addr_type, input_dict, dut, peer):
2511 """
2512 This API is to verify verify_graceful_restart configuration of DUT and
2513 cross verify the same from the peer bgp routerrouter.
2514
2515 Parameters
2516 ----------
2517 * `tgen`: topogen object
2518 * `topo`: input json file data
2519 * `addr_type` : ip type ipv4/ipv6
2520 * `input_dict`: input dictionary, have details of Device Under Test, for
2521 which user wants to test the data
2522 * `dut`: input dut router name
2523 * `peer`: input peer router name
2524
2525 Usage
2526 -----
2527 "r1": {
2528 "bgp": {
2529 "address_family": {
2530 "ipv4": {
2531 "unicast": {
2532 "neighbor": {
2533 "r3": {
2534 "dest_link":{
2535 "r1": {
2536 "graceful-restart": True
2537 }
2538 }
2539 }
2540 }
2541 }
2542 },
2543 "ipv6": {
2544 "unicast": {
2545 "neighbor": {
2546 "r3": {
2547 "dest_link":{
2548 "r1": {
2549 "graceful-restart": True
2550 }
2551 }
2552 }
2553 }
2554 }
2555 }
2556 }
2557 }
2558 }
2559 }
2560
2561 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
2562 dut = "r1", peer = 'r2')
2563 Returns
2564 -------
2565 errormsg(str) or True
2566 """
2567
2568 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2569
2570 for router, rnode in tgen.routers().iteritems():
2571 if router != dut:
2572 continue
2573
2574 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
2575
2576 if addr_type in bgp_addr_type:
2577 if not check_address_types(addr_type):
2578 continue
2579
2580 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2581
2582 for bgp_neighbor, peer_data in bgp_neighbors.items():
2583 if bgp_neighbor != peer:
2584 continue
2585
2586 for dest_link, peer_dict in peer_data["dest_link"].items():
2587 data = topo["routers"][bgp_neighbor]["links"]
2588
2589 if dest_link in data:
2590 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2591
2592 logger.info(
2593 "[DUT: {}]: Checking bgp graceful-restart show"
2594 " o/p {}".format(dut, neighbor_ip)
2595 )
2596
2597 show_bgp_graceful_json = None
2598
2599 show_bgp_graceful_json = run_frr_cmd(
2600 rnode,
2601 "show bgp {} neighbor {} graceful-restart json".format(
2602 addr_type, neighbor_ip
2603 ),
2604 isjson=True,
2605 )
2606
2607 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
2608
2609 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
2610 logger.info(
2611 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
2612 )
2613 else:
2614 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
2615 dut, neighbor_ip
2616 )
2617 return errormsg
2618
2619 lmode = None
2620 rmode = None
2621 # Local GR mode
2622 if "address_family" in input_dict[dut]["bgp"]:
2623 bgp_neighbors = input_dict[dut]["bgp"]["address_family"][addr_type][
2624 "unicast"
2625 ]["neighbor"][peer]["dest_link"]
2626
2627 for dest_link, data in bgp_neighbors.items():
2628 if (
2629 "graceful-restart-helper" in data
2630 and data["graceful-restart-helper"]
2631 ):
2632 lmode = "Helper"
2633 elif "graceful-restart" in data and data["graceful-restart"]:
2634 lmode = "Restart"
2635 elif (
2636 "graceful-restart-disable" in data
2637 and data["graceful-restart-disable"]
2638 ):
2639 lmode = "Disable"
2640 else:
2641 lmode = None
2642
2643 if lmode is None:
2644 if "graceful-restart" in input_dict[dut]["bgp"]:
2645
2646 if (
2647 "graceful-restart" in input_dict[dut]["bgp"]["graceful-restart"]
2648 and input_dict[dut]["bgp"]["graceful-restart"][
2649 "graceful-restart"
2650 ]
2651 ):
2652 lmode = "Restart*"
2653 elif (
2654 "graceful-restart-disable"
2655 in input_dict[dut]["bgp"]["graceful-restart"]
2656 and input_dict[dut]["bgp"]["graceful-restart"][
2657 "graceful-restart-disable"
2658 ]
2659 ):
2660 lmode = "Disable*"
2661 else:
2662 lmode = "Helper*"
2663 else:
2664 lmode = "Helper*"
2665
2666 if lmode == "Disable" or lmode == "Disable*":
2667 return True
2668
2669 # Remote GR mode
2670 if "address_family" in input_dict[peer]["bgp"]:
2671 bgp_neighbors = input_dict[peer]["bgp"]["address_family"][addr_type][
2672 "unicast"
2673 ]["neighbor"][dut]["dest_link"]
2674
2675 for dest_link, data in bgp_neighbors.items():
2676 if (
2677 "graceful-restart-helper" in data
2678 and data["graceful-restart-helper"]
2679 ):
2680 rmode = "Helper"
2681 elif "graceful-restart" in data and data["graceful-restart"]:
2682 rmode = "Restart"
2683 elif (
2684 "graceful-restart-disable" in data
2685 and data["graceful-restart-disable"]
2686 ):
2687 rmode = "Disable"
2688 else:
2689 rmode = None
2690
2691 if rmode is None:
2692 if "graceful-restart" in input_dict[peer]["bgp"]:
2693
2694 if (
2695 "graceful-restart"
2696 in input_dict[peer]["bgp"]["graceful-restart"]
2697 and input_dict[peer]["bgp"]["graceful-restart"][
2698 "graceful-restart"
2699 ]
2700 ):
2701 rmode = "Restart"
2702 elif (
2703 "graceful-restart-disable"
2704 in input_dict[peer]["bgp"]["graceful-restart"]
2705 and input_dict[peer]["bgp"]["graceful-restart"][
2706 "graceful-restart-disable"
2707 ]
2708 ):
2709 rmode = "Disable"
2710 else:
2711 rmode = "Helper"
2712 else:
2713 rmode = "Helper"
2714
2715 if show_bgp_graceful_json_out["localGrMode"] == lmode:
2716 logger.info(
2717 "[DUT: {}]: localGrMode : {} ".format(
2718 dut, show_bgp_graceful_json_out["localGrMode"]
2719 )
2720 )
2721 else:
2722 errormsg = (
2723 "[DUT: {}]: localGrMode is not correct"
2724 " Expected: {}, Found: {}".format(
2725 dut, lmode, show_bgp_graceful_json_out["localGrMode"]
2726 )
2727 )
2728 return errormsg
2729
2730 if show_bgp_graceful_json_out["remoteGrMode"] == rmode:
2731 logger.info(
2732 "[DUT: {}]: remoteGrMode : {} ".format(
2733 dut, show_bgp_graceful_json_out["remoteGrMode"]
2734 )
2735 )
2736 elif (
2737 show_bgp_graceful_json_out["remoteGrMode"] == "NotApplicable"
2738 and rmode == "Disable"
2739 ):
2740 logger.info(
2741 "[DUT: {}]: remoteGrMode : {} ".format(
2742 dut, show_bgp_graceful_json_out["remoteGrMode"]
2743 )
2744 )
2745 else:
2746 errormsg = (
2747 "[DUT: {}]: remoteGrMode is not correct"
2748 " Expected: {}, Found: {}".format(
2749 dut, rmode, show_bgp_graceful_json_out["remoteGrMode"]
2750 )
2751 )
2752 return errormsg
2753
2754 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2755 return True
2756
2757
2758 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
2759 def verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer):
2760 """
2761 This API is to verify r_bit in the BGP gr capability advertised
2762 by the neighbor router
2763
2764 Parameters
2765 ----------
2766 * `tgen`: topogen object
2767 * `topo`: input json file data
2768 * `addr_type` : ip type ipv4/ipv6
2769 * `input_dict`: input dictionary, have details of Device Under Test, for
2770 which user wants to test the data
2771 * `dut`: input dut router name
2772 * `peer`: peer name
2773 Usage
2774 -----
2775 input_dict = {
2776 "r1": {
2777 "bgp": {
2778 "address_family": {
2779 "ipv4": {
2780 "unicast": {
2781 "neighbor": {
2782 "r3": {
2783 "dest_link":{
2784 "r1": {
2785 "graceful-restart": True
2786 }
2787 }
2788 }
2789 }
2790 }
2791 },
2792 "ipv6": {
2793 "unicast": {
2794 "neighbor": {
2795 "r3": {
2796 "dest_link":{
2797 "r1": {
2798 "graceful-restart": True
2799 }
2800 }
2801 }
2802 }
2803 }
2804 }
2805 }
2806 }
2807 }
2808 }
2809 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
2810
2811 Returns
2812 -------
2813 errormsg(str) or True
2814 """
2815
2816 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2817
2818 for router, rnode in tgen.routers().iteritems():
2819 if router != dut:
2820 continue
2821
2822 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
2823
2824 if addr_type in bgp_addr_type:
2825 if not check_address_types(addr_type):
2826 continue
2827
2828 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2829
2830 for bgp_neighbor, peer_data in bgp_neighbors.items():
2831 if bgp_neighbor != peer:
2832 continue
2833
2834 for dest_link, peer_dict in peer_data["dest_link"].items():
2835 data = topo["routers"][bgp_neighbor]["links"]
2836
2837 if dest_link in data:
2838 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2839
2840 logger.info(
2841 "[DUT: {}]: Checking bgp graceful-restart show"
2842 " o/p {}".format(dut, neighbor_ip)
2843 )
2844
2845 show_bgp_graceful_json = run_frr_cmd(
2846 rnode,
2847 "show bgp {} neighbor {} graceful-restart json".format(
2848 addr_type, neighbor_ip
2849 ),
2850 isjson=True,
2851 )
2852
2853 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
2854
2855 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
2856 logger.info(
2857 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
2858 )
2859 else:
2860 errormsg = "[DUT: {}]: Neighbor ip NOT a matched {}".format(
2861 dut, neighbor_ip
2862 )
2863 return errormsg
2864
2865 if "rBit" in show_bgp_graceful_json_out:
2866 if show_bgp_graceful_json_out["rBit"]:
2867 logger.info("[DUT: {}]: Rbit true {}".format(dut, neighbor_ip))
2868 else:
2869 errormsg = "[DUT: {}]: Rbit false {}".format(dut, neighbor_ip)
2870 return errormsg
2871
2872 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2873 return True
2874
2875
2876 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
2877 def verify_eor(tgen, topo, addr_type, input_dict, dut, peer):
2878 """
2879 This API is to verify EOR
2880
2881 Parameters
2882 ----------
2883 * `tgen`: topogen object
2884 * `topo`: input json file data
2885 * `addr_type` : ip type ipv4/ipv6
2886 * `input_dict`: input dictionary, have details of DUT, for
2887 which user wants to test the data
2888 * `dut`: input dut router name
2889 * `peer`: peer name
2890 Usage
2891 -----
2892 input_dict = {
2893 input_dict = {
2894 "r1": {
2895 "bgp": {
2896 "address_family": {
2897 "ipv4": {
2898 "unicast": {
2899 "neighbor": {
2900 "r3": {
2901 "dest_link":{
2902 "r1": {
2903 "graceful-restart": True
2904 }
2905 }
2906 }
2907 }
2908 }
2909 },
2910 "ipv6": {
2911 "unicast": {
2912 "neighbor": {
2913 "r3": {
2914 "dest_link":{
2915 "r1": {
2916 "graceful-restart": True
2917 }
2918 }
2919 }
2920 }
2921 }
2922 }
2923 }
2924 }
2925 }
2926 }
2927
2928 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
2929
2930 Returns
2931 -------
2932 errormsg(str) or True
2933 """
2934 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2935
2936 for router, rnode in tgen.routers().iteritems():
2937 if router != dut:
2938 continue
2939
2940 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
2941
2942 if addr_type in bgp_addr_type:
2943 if not check_address_types(addr_type):
2944 continue
2945
2946 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
2947
2948 for bgp_neighbor, peer_data in bgp_neighbors.items():
2949 if bgp_neighbor != peer:
2950 continue
2951
2952 for dest_link, peer_dict in peer_data["dest_link"].items():
2953 data = topo["routers"][bgp_neighbor]["links"]
2954
2955 if dest_link in data:
2956 neighbor_ip = data[dest_link][addr_type].split("/")[0]
2957
2958 logger.info(
2959 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
2960 dut,
2961 neighbor_ip,
2962 )
2963
2964 show_bgp_graceful_json = run_frr_cmd(
2965 rnode,
2966 "show bgp {} neighbor {} graceful-restart json".format(
2967 addr_type, neighbor_ip
2968 ),
2969 isjson=True,
2970 )
2971
2972 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
2973
2974 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
2975 logger.info("[DUT: %s]: Neighbor ip matched %s", dut, neighbor_ip)
2976 else:
2977 errormsg = "[DUT: %s]: Neighbor ip is NOT matched %s" % (
2978 dut,
2979 neighbor_ip,
2980 )
2981 return errormsg
2982
2983 if addr_type == "ipv4":
2984 afi = "ipv4Unicast"
2985 elif addr_type == "ipv6":
2986 afi = "ipv6Unicast"
2987 else:
2988 errormsg = "Address type %s is not supported" % (addr_type)
2989 return errormsg
2990
2991 eor_json = show_bgp_graceful_json_out[afi]["endOfRibStatus"]
2992 if "endOfRibSend" in eor_json:
2993
2994 if eor_json["endOfRibSend"]:
2995 logger.info(
2996 "[DUT: %s]: EOR Send true for %s " "%s", dut, neighbor_ip, afi
2997 )
2998 else:
2999 errormsg = "[DUT: %s]: EOR Send false for %s" " %s" % (
3000 dut,
3001 neighbor_ip,
3002 afi,
3003 )
3004 return errormsg
3005
3006 if "endOfRibRecv" in eor_json:
3007 if eor_json["endOfRibRecv"]:
3008 logger.info(
3009 "[DUT: %s]: EOR Recv true %s " "%s", dut, neighbor_ip, afi
3010 )
3011 else:
3012 errormsg = "[DUT: %s]: EOR Recv false %s " "%s" % (
3013 dut,
3014 neighbor_ip,
3015 afi,
3016 )
3017 return errormsg
3018
3019 if "endOfRibSentAfterUpdate" in eor_json:
3020 if eor_json["endOfRibSentAfterUpdate"]:
3021 logger.info(
3022 "[DUT: %s]: EOR SendTime true for %s" " %s",
3023 dut,
3024 neighbor_ip,
3025 afi,
3026 )
3027 else:
3028 errormsg = "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3029 dut,
3030 neighbor_ip,
3031 afi,
3032 )
3033 return errormsg
3034
3035 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3036 return True
3037
3038
3039 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3040 def verify_f_bit(tgen, topo, addr_type, input_dict, dut, peer):
3041 """
3042 This API is to verify f_bit in the BGP gr capability advertised
3043 by the neighbor router
3044
3045 Parameters
3046 ----------
3047 * `tgen`: topogen object
3048 * `topo`: input json file data
3049 * `addr_type` : ip type ipv4/ipv6
3050 * `input_dict`: input dictionary, have details of Device Under Test, for
3051 which user wants to test the data
3052 * `dut`: input dut router name
3053 * `peer`: peer name
3054
3055 Usage
3056 -----
3057 input_dict = {
3058 "r1": {
3059 "bgp": {
3060 "address_family": {
3061 "ipv4": {
3062 "unicast": {
3063 "neighbor": {
3064 "r3": {
3065 "dest_link":{
3066 "r1": {
3067 "graceful-restart": True
3068 }
3069 }
3070 }
3071 }
3072 }
3073 },
3074 "ipv6": {
3075 "unicast": {
3076 "neighbor": {
3077 "r3": {
3078 "dest_link":{
3079 "r1": {
3080 "graceful-restart": True
3081 }
3082 }
3083 }
3084 }
3085 }
3086 }
3087 }
3088 }
3089 }
3090 }
3091
3092 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3093
3094 Returns
3095 -------
3096 errormsg(str) or True
3097 """
3098
3099 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3100
3101 for router, rnode in tgen.routers().iteritems():
3102 if router != dut:
3103 continue
3104
3105 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3106
3107 if addr_type in bgp_addr_type:
3108 if not check_address_types(addr_type):
3109 continue
3110
3111 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3112
3113 for bgp_neighbor, peer_data in bgp_neighbors.items():
3114 if bgp_neighbor != peer:
3115 continue
3116
3117 for dest_link, peer_dict in peer_data["dest_link"].items():
3118 data = topo["routers"][bgp_neighbor]["links"]
3119
3120 if dest_link in data:
3121 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3122
3123 logger.info(
3124 "[DUT: {}]: Checking bgp graceful-restart show"
3125 " o/p {}".format(dut, neighbor_ip)
3126 )
3127
3128 show_bgp_graceful_json = run_frr_cmd(
3129 rnode,
3130 "show bgp {} neighbor {} graceful-restart json".format(
3131 addr_type, neighbor_ip
3132 ),
3133 isjson=True,
3134 )
3135
3136 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3137
3138 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3139 logger.info(
3140 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3141 )
3142 else:
3143 errormsg = "[DUT: {}]: Neighbor ip NOT a match {}".format(
3144 dut, neighbor_ip
3145 )
3146 return errormsg
3147
3148 if "ipv4Unicast" in show_bgp_graceful_json_out:
3149 if show_bgp_graceful_json_out["ipv4Unicast"]["fBit"]:
3150 logger.info(
3151 "[DUT: {}]: Fbit True for {} IPv4"
3152 " Unicast".format(dut, neighbor_ip)
3153 )
3154 else:
3155 errormsg = "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3156 dut, neighbor_ip
3157 )
3158 return errormsg
3159
3160 elif "ipv6Unicast" in show_bgp_graceful_json_out:
3161 if show_bgp_graceful_json_out["ipv6Unicast"]["fBit"]:
3162 logger.info(
3163 "[DUT: {}]: Fbit True for {} IPv6"
3164 " Unicast".format(dut, neighbor_ip)
3165 )
3166 else:
3167 errormsg = "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3168 dut, neighbor_ip
3169 )
3170 return errormsg
3171 else:
3172 show_bgp_graceful_json_out["ipv4Unicast"]
3173 show_bgp_graceful_json_out["ipv6Unicast"]
3174
3175 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3176 return True
3177
3178
3179 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3180 def verify_graceful_restart_timers(tgen, topo, addr_type, input_dict, dut, peer):
3181 """
3182 This API is to verify graceful restart timers, configured and recieved
3183
3184 Parameters
3185 ----------
3186 * `tgen`: topogen object
3187 * `topo`: input json file data
3188 * `addr_type` : ip type ipv4/ipv6
3189 * `input_dict`: input dictionary, have details of Device Under Test,
3190 for which user wants to test the data
3191 * `dut`: input dut router name
3192 * `peer`: peer name
3193 Usage
3194 -----
3195 # Configure graceful-restart
3196 input_dict_1 = {
3197 "r1": {
3198 "bgp": {
3199 "bgp_neighbors": {
3200 "r3": {
3201 "graceful-restart": "graceful-restart-helper"
3202 }
3203 },
3204 "gracefulrestart": ["restart-time 150"]
3205 }
3206 },
3207 "r3": {
3208 "bgp": {
3209 "bgp_neighbors": {
3210 "r1": {
3211 "graceful-restart": "graceful-restart"
3212 }
3213 }
3214 }
3215 }
3216 }
3217
3218 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3219
3220 Returns
3221 -------
3222 errormsg(str) or True
3223 """
3224
3225 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3226
3227 for router, rnode in tgen.routers().iteritems():
3228 if router != dut:
3229 continue
3230
3231 bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3232
3233 if addr_type in bgp_addr_type:
3234 if not check_address_types(addr_type):
3235 continue
3236
3237 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3238
3239 for bgp_neighbor, peer_data in bgp_neighbors.items():
3240 if bgp_neighbor != peer:
3241 continue
3242
3243 for dest_link, peer_dict in peer_data["dest_link"].items():
3244 data = topo["routers"][bgp_neighbor]["links"]
3245
3246 if dest_link in data:
3247 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3248
3249 logger.info(
3250 "[DUT: {}]: Checking bgp graceful-restart show"
3251 " o/p {}".format(dut, neighbor_ip)
3252 )
3253
3254 show_bgp_graceful_json = run_frr_cmd(
3255 rnode,
3256 "show bgp {} neighbor {} graceful-restart json".format(
3257 addr_type, neighbor_ip
3258 ),
3259 isjson=True,
3260 )
3261
3262 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3263 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3264 logger.info(
3265 "[DUT: {}]: Neighbor ip matched {}".format(dut, neighbor_ip)
3266 )
3267 else:
3268 errormsg = "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3269 dut, neighbor_ip
3270 )
3271 return errormsg
3272
3273 # Graceful-restart timer
3274 if "graceful-restart" in input_dict[peer]["bgp"]:
3275 if "timer" in input_dict[peer]["bgp"]["graceful-restart"]:
3276 for rs_timer, value in input_dict[peer]["bgp"]["graceful-restart"][
3277 "timer"
3278 ].items():
3279 if rs_timer == "restart-time":
3280
3281 receivedTimer = value
3282 if (
3283 show_bgp_graceful_json_out["timers"][
3284 "receivedRestartTimer"
3285 ]
3286 == receivedTimer
3287 ):
3288 logger.info(
3289 "receivedRestartTimer is {}"
3290 " on {} from peer {}".format(
3291 receivedTimer, router, peer
3292 )
3293 )
3294 else:
3295 errormsg = (
3296 "receivedRestartTimer is not"
3297 " as expected {}".format(receivedTimer)
3298 )
3299 return errormsg
3300
3301 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3302 return True
3303
3304
3305 @retry(attempts=4, wait=2, return_is_str=True, initial_wait=2)
3306 def verify_gr_address_family(tgen, topo, addr_type, addr_family, dut):
3307 """
3308 This API is to verify gr_address_family in the BGP gr capability advertised
3309 by the neighbor router
3310
3311 Parameters
3312 ----------
3313 * `tgen`: topogen object
3314 * `topo`: input json file data
3315 * `addr_type` : ip type ipv4/ipv6
3316 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3317 * `dut`: input dut router name
3318
3319 Usage
3320 -----
3321
3322 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1")
3323
3324 Returns
3325 -------
3326 errormsg(str) or True
3327 """
3328
3329 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3330
3331 for router, rnode in tgen.routers().iteritems():
3332 if router != dut:
3333 continue
3334
3335 bgp_addr_type = topo["routers"][router]["bgp"]["address_family"]
3336
3337 if addr_type in bgp_addr_type:
3338 if not check_address_types(addr_type):
3339 continue
3340
3341 bgp_neighbors = bgp_addr_type[addr_type]["unicast"]["neighbor"]
3342
3343 for bgp_neighbor, peer_data in bgp_neighbors.items():
3344 for dest_link, peer_dict in peer_data["dest_link"].items():
3345 data = topo["routers"][bgp_neighbor]["links"]
3346
3347 if dest_link in data:
3348 neighbor_ip = data[dest_link][addr_type].split("/")[0]
3349
3350 logger.info(
3351 "[DUT: {}]: Checking bgp graceful-restart"
3352 " show o/p {}".format(dut, neighbor_ip)
3353 )
3354
3355 show_bgp_graceful_json = run_frr_cmd(
3356 rnode,
3357 "show bgp {} neighbor {} graceful-restart json".format(
3358 addr_type, neighbor_ip
3359 ),
3360 isjson=True,
3361 )
3362
3363 show_bgp_graceful_json_out = show_bgp_graceful_json[neighbor_ip]
3364
3365 if show_bgp_graceful_json_out["neighborAddr"] == neighbor_ip:
3366 logger.info("Neighbor ip matched {}".format(neighbor_ip))
3367 else:
3368 errormsg = "Neighbor ip NOT a match {}".format(neighbor_ip)
3369 return errormsg
3370
3371 if addr_family == "ipv4Unicast":
3372 if "ipv4Unicast" in show_bgp_graceful_json_out:
3373 logger.info("ipv4Unicast present for {} ".format(neighbor_ip))
3374 return True
3375 else:
3376 errormsg = "ipv4Unicast NOT present for {} ".format(neighbor_ip)
3377 return errormsg
3378
3379 elif addr_family == "ipv6Unicast":
3380 if "ipv6Unicast" in show_bgp_graceful_json_out:
3381 logger.info("ipv6Unicast present for {} ".format(neighbor_ip))
3382 return True
3383 else:
3384 errormsg = "ipv6Unicast NOT present for {} ".format(neighbor_ip)
3385 return errormsg
3386 else:
3387 errormsg = "Aaddress family: {} present for {} ".format(
3388 addr_family, neighbor_ip
3389 )
3390 return errormsg
3391
3392 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))