]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/ospf_topo1/test_ospf_topo1.py
Merge pull request #12830 from anlancs/fix/doc-ripd-rst
[mirror_frr.git] / tests / topotests / ospf_topo1 / test_ospf_topo1.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # test_ospf_topo1.py
6 # Part of NetDEF Topology Tests
7 #
8 # Copyright (c) 2017 by
9 # Network Device Education Foundation, Inc. ("NetDEF")
10 #
11
12 """
13 test_ospf_topo1.py: Test the FRR OSPF routing daemon.
14 """
15
16 import os
17 import re
18 import sys
19 from functools import partial
20 from time import sleep
21 import pytest
22
23 # Save the Current Working Directory to find configuration files.
24 CWD = os.path.dirname(os.path.realpath(__file__))
25 sys.path.append(os.path.join(CWD, "../"))
26
27 # pylint: disable=C0413
28 # Import topogen and topotest helpers
29 from lib import topotest
30 from lib.topogen import Topogen, TopoRouter, get_topogen
31 from lib.topolog import logger
32
33 # Required to instantiate the topology builder class.
34
35 pytestmark = [pytest.mark.ospfd]
36
37
38 def build_topo(tgen):
39 "Build function"
40
41 # Create 4 routers
42 for routern in range(1, 5):
43 tgen.add_router("r{}".format(routern))
44
45 # Create a empty network for router 1
46 switch = tgen.add_switch("s1")
47 switch.add_link(tgen.gears["r1"])
48
49 # Create a empty network for router 2
50 switch = tgen.add_switch("s2")
51 switch.add_link(tgen.gears["r2"])
52
53 # Interconect router 1, 2 and 3
54 switch = tgen.add_switch("s3")
55 switch.add_link(tgen.gears["r1"])
56 switch.add_link(tgen.gears["r2"])
57 switch.add_link(tgen.gears["r3"])
58
59 # Create empty netowrk for router3
60 switch = tgen.add_switch("s4")
61 switch.add_link(tgen.gears["r3"])
62
63 # Interconect router 3 and 4
64 switch = tgen.add_switch("s5")
65 switch.add_link(tgen.gears["r3"])
66 switch.add_link(tgen.gears["r4"])
67
68 # Create a empty network for router 4
69 switch = tgen.add_switch("s6")
70 switch.add_link(tgen.gears["r4"])
71
72
73 def setup_module(mod):
74 "Sets up the pytest environment"
75 tgen = Topogen(build_topo, mod.__name__)
76 tgen.start_topology()
77
78 ospf6_config = "ospf6d.conf"
79
80 router_list = tgen.routers()
81 for rname, router in router_list.items():
82 router.load_config(
83 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
84 )
85 router.load_config(
86 TopoRouter.RD_OSPF, os.path.join(CWD, "{}/ospfd.conf".format(rname))
87 )
88 router.load_config(
89 TopoRouter.RD_OSPF6, os.path.join(CWD, "{}/{}".format(rname, ospf6_config))
90 )
91
92 # Initialize all routers.
93 tgen.start_router()
94
95
96 def teardown_module(mod):
97 "Teardown the pytest environment"
98 tgen = get_topogen()
99 tgen.stop_topology()
100
101
102 def test_wait_protocol_convergence():
103 "Wait for OSPFv2/OSPFv3 to converge"
104 tgen = get_topogen()
105 if tgen.routers_have_failure():
106 pytest.skip(tgen.errors)
107
108 logger.info("waiting for protocols to converge")
109
110 def expect_ospfv2_neighbor_full(router, neighbor):
111 "Wait until OSPFv2 convergence."
112 logger.info("waiting OSPFv2 router '{}'".format(router))
113
114 def run_command_and_expect():
115 """
116 Function that runs command and expect the following outcomes:
117 * Full/DR
118 * Full/DROther
119 * Full/Backup
120 """
121 result = tgen.gears[router].vtysh_cmd(
122 "show ip ospf neighbor json", isjson=True
123 )
124 if (
125 topotest.json_cmp(
126 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
127 )
128 is None
129 ):
130 return None
131
132 if (
133 topotest.json_cmp(
134 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
135 )
136 is None
137 ):
138 return None
139
140 return topotest.json_cmp(
141 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
142 )
143
144 _, result = topotest.run_and_expect(
145 run_command_and_expect, None, count=130, wait=1
146 )
147 assertmsg = '"{}" convergence failure'.format(router)
148 assert result is None, assertmsg
149
150 def expect_ospfv3_neighbor_full(router, neighbor):
151 "Wait until OSPFv3 convergence."
152 logger.info("waiting OSPFv3 router '{}'".format(router))
153 test_func = partial(
154 topotest.router_json_cmp,
155 tgen.gears[router],
156 "show ipv6 ospf6 neighbor json",
157 {"neighbors": [{"neighborId": neighbor, "state": "Full"}]},
158 )
159 _, result = topotest.run_and_expect(test_func, None, count=130, wait=1)
160 assertmsg = '"{}" convergence failure'.format(router)
161 assert result is None, assertmsg
162
163 # Wait for OSPFv2 convergence
164 expect_ospfv2_neighbor_full("r1", "10.0.255.2")
165 expect_ospfv2_neighbor_full("r1", "10.0.255.3")
166 expect_ospfv2_neighbor_full("r2", "10.0.255.1")
167 expect_ospfv2_neighbor_full("r2", "10.0.255.3")
168 expect_ospfv2_neighbor_full("r3", "10.0.255.1")
169 expect_ospfv2_neighbor_full("r3", "10.0.255.2")
170 expect_ospfv2_neighbor_full("r3", "10.0.255.4")
171 expect_ospfv2_neighbor_full("r4", "10.0.255.3")
172
173 # Wait for OSPFv3 convergence
174 expect_ospfv3_neighbor_full("r1", "10.0.255.2")
175 expect_ospfv3_neighbor_full("r1", "10.0.255.3")
176 expect_ospfv3_neighbor_full("r2", "10.0.255.1")
177 expect_ospfv3_neighbor_full("r2", "10.0.255.3")
178 expect_ospfv3_neighbor_full("r3", "10.0.255.1")
179 expect_ospfv3_neighbor_full("r3", "10.0.255.2")
180 expect_ospfv3_neighbor_full("r3", "10.0.255.4")
181 expect_ospfv3_neighbor_full("r4", "10.0.255.3")
182
183
184 def compare_show_ipv6_ospf6(rname, expected):
185 """
186 Calls 'show ipv6 ospf6 route' for router `rname` and compare the obtained
187 result with the expected output.
188 """
189 tgen = get_topogen()
190 current = tgen.gears[rname].vtysh_cmd("show ipv6 ospf6 route")
191
192 # Remove the link addresses
193 current = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", current)
194 expected = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", expected)
195
196 # Remove the time
197 current = re.sub(r"\d+:\d{2}:\d{2}", "", current)
198 expected = re.sub(r"\d+:\d{2}:\d{2}", "", expected)
199
200 return topotest.difflines(
201 topotest.normalize_text(current),
202 topotest.normalize_text(expected),
203 title1="Current output",
204 title2="Expected output",
205 )
206
207
208 def test_ospf_convergence():
209 "Test OSPF daemon convergence"
210 tgen = get_topogen()
211 if tgen.routers_have_failure():
212 pytest.skip("skipped because of router(s) failure")
213
214 for router, rnode in tgen.routers().items():
215 logger.info('Waiting for router "%s" convergence', router)
216
217 # Load expected results from the command
218 reffile = os.path.join(CWD, "{}/ospfroute.txt".format(router))
219 expected = open(reffile).read()
220
221 # Run test function until we get an result. Wait at most 80 seconds.
222 test_func = partial(
223 topotest.router_output_cmp, rnode, "show ip ospf route", expected
224 )
225 result, diff = topotest.run_and_expect(test_func, "", count=160, wait=0.5)
226 assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
227
228
229 def test_ospf_kernel_route():
230 "Test OSPF kernel route installation"
231 tgen = get_topogen()
232 if tgen.routers_have_failure():
233 pytest.skip("skipped because of router(s) failure")
234
235 rlist = tgen.routers().values()
236 for router in rlist:
237 logger.info('Checking OSPF IPv4 kernel routes in "%s"', router.name)
238
239 routes = topotest.ip4_route(router)
240 expected = {
241 "10.0.1.0/24": {},
242 "10.0.2.0/24": {},
243 "10.0.3.0/24": {},
244 "10.0.10.0/24": {},
245 "172.16.0.0/24": {},
246 "172.16.1.0/24": {},
247 }
248 assertmsg = 'OSPF IPv4 route mismatch in router "{}"'.format(router.name)
249 assert topotest.json_cmp(routes, expected) is None, assertmsg
250
251
252 def test_ospf6_convergence():
253 "Test OSPF6 daemon convergence"
254 tgen = get_topogen()
255 if tgen.routers_have_failure():
256 pytest.skip("skipped because of router(s) failure")
257
258 ospf6route_file = "{}/ospf6route_ecmp.txt"
259 for rnum in range(1, 5):
260 router = "r{}".format(rnum)
261
262 logger.info('Waiting for router "%s" IPv6 OSPF convergence', router)
263
264 # Load expected results from the command
265 reffile = os.path.join(CWD, ospf6route_file.format(router))
266 expected = open(reffile).read()
267
268 # Run test function until we get an result. Wait at most 60 seconds.
269 test_func = partial(compare_show_ipv6_ospf6, router, expected)
270 result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
271 if (not result) and (rnum == 1):
272 # Didn't match the new ECMP version - try the old pre-ECMP format
273 ospf6route_file = "{}/ospf6route.txt"
274
275 # Load expected results from the command
276 reffile = os.path.join(CWD, ospf6route_file.format(router))
277 expected = open(reffile).read()
278
279 test_func = partial(compare_show_ipv6_ospf6, router, expected)
280 result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
281 if not result:
282 # Didn't match the old version - switch back to new ECMP version
283 # and fail
284 ospf6route_file = "{}/ospf6route_ecmp.txt"
285
286 # Load expected results from the command
287 reffile = os.path.join(CWD, ospf6route_file.format(router))
288 expected = open(reffile).read()
289
290 test_func = partial(compare_show_ipv6_ospf6, router, expected)
291 result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
292
293 assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
294
295
296 def test_ospf6_kernel_route():
297 "Test OSPF kernel route installation"
298 tgen = get_topogen()
299 if tgen.routers_have_failure():
300 pytest.skip("skipped because of router(s) failure")
301
302 rlist = tgen.routers().values()
303 for router in rlist:
304 logger.info('Checking OSPF IPv6 kernel routes in "%s"', router.name)
305
306 def _routes_in_fib6():
307 routes = topotest.ip6_route(router)
308 expected = {
309 "2001:db8:1::/64": {},
310 "2001:db8:2::/64": {},
311 "2001:db8:3::/64": {},
312 "2001:db8:100::/64": {},
313 "2001:db8:200::/64": {},
314 "2001:db8:300::/64": {},
315 }
316 logger.info("Routes:")
317 logger.info(routes)
318 logger.info(topotest.json_cmp(routes, expected))
319 logger.info("ENd:")
320 return topotest.json_cmp(routes, expected)
321
322 _, result = topotest.run_and_expect(_routes_in_fib6, None, count=20, wait=1)
323
324 assertmsg = 'OSPF IPv6 route mismatch in router "{}"'.format(router.name)
325 assert result is None, assertmsg
326
327
328 def test_ospf_json():
329 "Test 'show ip ospf json' output for coherency."
330 tgen = get_topogen()
331 if tgen.routers_have_failure():
332 pytest.skip("skipped because of router(s) failure")
333
334 for rnum in range(1, 5):
335 router = tgen.gears["r{}".format(rnum)]
336 logger.info(router.vtysh_cmd("show ip ospf database"))
337 logger.info('Comparing router "%s" "show ip ospf json" output', router.name)
338 expected = {
339 "routerId": "10.0.255.{}".format(rnum),
340 "tosRoutesOnly": True,
341 "rfc2328Conform": True,
342 "spfScheduleDelayMsecs": 0,
343 "holdtimeMinMsecs": 50,
344 "holdtimeMaxMsecs": 5000,
345 "lsaMinIntervalMsecs": 5000,
346 "lsaMinArrivalMsecs": 1000,
347 "writeMultiplier": 20,
348 "refreshTimerMsecs": 10000,
349 "asbrRouter": "injectingExternalRoutingInformation",
350 "attachedAreaCounter": 1,
351 "areas": {},
352 }
353 # Area specific additional checks
354 if router.name == "r1" or router.name == "r2" or router.name == "r3":
355 expected["areas"]["0.0.0.0"] = {
356 "areaIfActiveCounter": 2,
357 "areaIfTotalCounter": 2,
358 "authentication": "authenticationNone",
359 "backbone": True,
360 "lsaAsbrNumber": 1,
361 "lsaNetworkNumber": 1,
362 "lsaNssaNumber": 0,
363 "lsaNumber": 7,
364 "lsaOpaqueAreaNumber": 0,
365 "lsaOpaqueLinkNumber": 0,
366 "lsaRouterNumber": 3,
367 "lsaSummaryNumber": 2,
368 "nbrFullAdjacentCounter": 2,
369 }
370 if router.name == "r3" or router.name == "r4":
371 expected["areas"]["0.0.0.1"] = {
372 "areaIfActiveCounter": 1,
373 "areaIfTotalCounter": 1,
374 "authentication": "authenticationNone",
375 "lsaAsbrNumber": 2,
376 "lsaNetworkNumber": 1,
377 "lsaNssaNumber": 0,
378 "lsaNumber": 9,
379 "lsaOpaqueAreaNumber": 0,
380 "lsaOpaqueLinkNumber": 0,
381 "lsaRouterNumber": 2,
382 "lsaSummaryNumber": 4,
383 "nbrFullAdjacentCounter": 1,
384 }
385 # r4 has more interfaces for area 0.0.0.1
386 if router.name == "r4":
387 expected["areas"]["0.0.0.1"].update(
388 {
389 "areaIfActiveCounter": 2,
390 "areaIfTotalCounter": 2,
391 }
392 )
393
394 # router 3 has an additional area
395 if router.name == "r3":
396 expected["attachedAreaCounter"] = 2
397
398 output = router.vtysh_cmd("show ip ospf json", isjson=True)
399 result = topotest.json_cmp(output, expected)
400 assert result is None, '"{}" JSON output mismatches the expected result'.format(
401 router.name
402 )
403
404
405 def test_ospf_link_down():
406 "Test OSPF convergence after a link goes down"
407 tgen = get_topogen()
408 if tgen.routers_have_failure():
409 pytest.skip("skipped because of router(s) failure")
410
411 # Simulate a network down event on router3 switch3 interface.
412 router3 = tgen.gears["r3"]
413 router3.peer_link_enable("r3-eth0", False)
414
415 # Expect convergence on all routers
416 for router, rnode in tgen.routers().items():
417 logger.info('Waiting for router "%s" convergence after link failure', router)
418 # Load expected results from the command
419 reffile = os.path.join(CWD, "{}/ospfroute_down.txt".format(router))
420 expected = open(reffile).read()
421
422 # Run test function until we get an result. Wait at most 80 seconds.
423 test_func = partial(
424 topotest.router_output_cmp, rnode, "show ip ospf route", expected
425 )
426 result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5)
427 assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
428
429
430 def test_ospf_link_down_kernel_route():
431 "Test OSPF kernel route installation"
432 tgen = get_topogen()
433 if tgen.routers_have_failure():
434 pytest.skip("skipped because of router(s) failure")
435
436 rlist = tgen.routers().values()
437 for router in rlist:
438 logger.info(
439 'Checking OSPF IPv4 kernel routes in "%s" after link down', router.name
440 )
441
442 routes = topotest.ip4_route(router)
443 expected = {
444 "10.0.1.0/24": {},
445 "10.0.2.0/24": {},
446 "10.0.3.0/24": {},
447 "10.0.10.0/24": {},
448 "172.16.0.0/24": {},
449 "172.16.1.0/24": {},
450 }
451 if router.name == "r1" or router.name == "r2":
452 expected.update(
453 {
454 "10.0.10.0/24": None,
455 "172.16.0.0/24": None,
456 "172.16.1.0/24": None,
457 }
458 )
459 elif router.name == "r3" or router.name == "r4":
460 expected.update(
461 {
462 "10.0.1.0/24": None,
463 "10.0.2.0/24": None,
464 }
465 )
466 # Route '10.0.3.0' is no longer available for r4 since it is down.
467 if router.name == "r4":
468 expected.update(
469 {
470 "10.0.3.0/24": None,
471 }
472 )
473 assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format(
474 router.name
475 )
476 count = 0
477 not_found = True
478 while not_found and count < 10:
479 not_found = topotest.json_cmp(routes, expected)
480 if not_found:
481 sleep(1)
482 routes = topotest.ip4_route(router)
483 count += 1
484 else:
485 not_found = False
486 break
487 assert not_found is False, assertmsg
488
489
490 def test_ospf6_link_down():
491 "Test OSPF6 daemon convergence after link goes down"
492 tgen = get_topogen()
493 if tgen.routers_have_failure():
494 pytest.skip("skipped because of router(s) failure")
495
496 for rnum in range(1, 5):
497 router = "r{}".format(rnum)
498
499 logger.info(
500 'Waiting for router "%s" IPv6 OSPF convergence after link down', router
501 )
502
503 # Load expected results from the command
504 reffile = os.path.join(CWD, "{}/ospf6route_down.txt".format(router))
505 expected = open(reffile).read()
506
507 # Run test function until we get an result. Wait at most 60 seconds.
508 test_func = partial(compare_show_ipv6_ospf6, router, expected)
509 result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
510 assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
511
512
513 def test_ospf6_link_down_kernel_route():
514 "Test OSPF kernel route installation"
515 tgen = get_topogen()
516 if tgen.routers_have_failure():
517 pytest.skip("skipped because of router(s) failure")
518
519 rlist = tgen.routers().values()
520 for router in rlist:
521 logger.info(
522 'Checking OSPF IPv6 kernel routes in "%s" after link down', router.name
523 )
524
525 routes = topotest.ip6_route(router)
526 expected = {
527 "2001:db8:1::/64": {},
528 "2001:db8:2::/64": {},
529 "2001:db8:3::/64": {},
530 "2001:db8:100::/64": {},
531 "2001:db8:200::/64": {},
532 "2001:db8:300::/64": {},
533 }
534 if router.name == "r1" or router.name == "r2":
535 expected.update(
536 {
537 "2001:db8:100::/64": None,
538 "2001:db8:200::/64": None,
539 "2001:db8:300::/64": None,
540 }
541 )
542 elif router.name == "r3" or router.name == "r4":
543 expected.update(
544 {
545 "2001:db8:1::/64": None,
546 "2001:db8:2::/64": None,
547 }
548 )
549 # Route '2001:db8:3::/64' is no longer available for r4 since it is down.
550 if router.name == "r4":
551 expected.update(
552 {
553 "2001:db8:3::/64": None,
554 }
555 )
556 assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format(
557 router.name
558 )
559 count = 0
560 not_found = True
561 while not_found and count < 10:
562 not_found = topotest.json_cmp(routes, expected)
563 if not_found:
564 sleep(1)
565 routes = topotest.ip6_route(router)
566 count += 1
567 else:
568 not_found = False
569 break
570
571 assert not_found is False, assertmsg
572
573
574 def test_memory_leak():
575 "Run the memory leak test and report results."
576 tgen = get_topogen()
577 if not tgen.is_memleak_enabled():
578 pytest.skip("Memory leak test/report is disabled")
579
580 tgen.report_memory_leaks()
581
582
583 if __name__ == "__main__":
584 args = ["-s"] + sys.argv[1:]
585 sys.exit(pytest.main(args))