]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/ospf_topo1/test_ospf_topo1.py
tests: Fix test_ospf_topo1 as that it sometimes fails
[mirror_frr.git] / tests / topotests / ospf_topo1 / test_ospf_topo1.py
1 #!/usr/bin/env python
2
3 #
4 # test_ospf_topo1.py
5 # Part of NetDEF Topology Tests
6 #
7 # Copyright (c) 2017 by
8 # Network Device Education Foundation, Inc. ("NetDEF")
9 #
10 # Permission to use, copy, modify, and/or distribute this software
11 # for any purpose with or without fee is hereby granted, provided
12 # that the above copyright notice and this permission notice appear
13 # in all copies.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
22 # OF THIS SOFTWARE.
23 #
24
25 """
26 test_ospf_topo1.py: Test the FRR OSPF routing daemon.
27 """
28
29 import os
30 import re
31 import sys
32 from functools import partial
33 from time import sleep
34 import pytest
35
36 # Save the Current Working Directory to find configuration files.
37 CWD = os.path.dirname(os.path.realpath(__file__))
38 sys.path.append(os.path.join(CWD, "../"))
39
40 # pylint: disable=C0413
41 # Import topogen and topotest helpers
42 from lib import topotest
43 from lib.topogen import Topogen, TopoRouter, get_topogen
44 from lib.topolog import logger
45
46 # Required to instantiate the topology builder class.
47
48 pytestmark = [pytest.mark.ospfd]
49
50
51 def build_topo(tgen):
52 "Build function"
53
54 # Create 4 routers
55 for routern in range(1, 5):
56 tgen.add_router("r{}".format(routern))
57
58 # Create a empty network for router 1
59 switch = tgen.add_switch("s1")
60 switch.add_link(tgen.gears["r1"])
61
62 # Create a empty network for router 2
63 switch = tgen.add_switch("s2")
64 switch.add_link(tgen.gears["r2"])
65
66 # Interconect router 1, 2 and 3
67 switch = tgen.add_switch("s3")
68 switch.add_link(tgen.gears["r1"])
69 switch.add_link(tgen.gears["r2"])
70 switch.add_link(tgen.gears["r3"])
71
72 # Create empty netowrk for router3
73 switch = tgen.add_switch("s4")
74 switch.add_link(tgen.gears["r3"])
75
76 # Interconect router 3 and 4
77 switch = tgen.add_switch("s5")
78 switch.add_link(tgen.gears["r3"])
79 switch.add_link(tgen.gears["r4"])
80
81 # Create a empty network for router 4
82 switch = tgen.add_switch("s6")
83 switch.add_link(tgen.gears["r4"])
84
85
86 def setup_module(mod):
87 "Sets up the pytest environment"
88 tgen = Topogen(build_topo, mod.__name__)
89 tgen.start_topology()
90
91 ospf6_config = "ospf6d.conf"
92
93 router_list = tgen.routers()
94 for rname, router in router_list.items():
95 router.load_config(
96 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
97 )
98 router.load_config(
99 TopoRouter.RD_OSPF, os.path.join(CWD, "{}/ospfd.conf".format(rname))
100 )
101 router.load_config(
102 TopoRouter.RD_OSPF6, os.path.join(CWD, "{}/{}".format(rname, ospf6_config))
103 )
104
105 # Initialize all routers.
106 tgen.start_router()
107
108
109 def teardown_module(mod):
110 "Teardown the pytest environment"
111 tgen = get_topogen()
112 tgen.stop_topology()
113
114
115 def test_wait_protocol_convergence():
116 "Wait for OSPFv2/OSPFv3 to converge"
117 tgen = get_topogen()
118 if tgen.routers_have_failure():
119 pytest.skip(tgen.errors)
120
121 logger.info("waiting for protocols to converge")
122
123 def expect_ospfv2_neighbor_full(router, neighbor):
124 "Wait until OSPFv2 convergence."
125 logger.info("waiting OSPFv2 router '{}'".format(router))
126
127 def run_command_and_expect():
128 """
129 Function that runs command and expect the following outcomes:
130 * Full/DR
131 * Full/DROther
132 * Full/Backup
133 """
134 result = tgen.gears[router].vtysh_cmd(
135 "show ip ospf neighbor json", isjson=True
136 )
137 if (
138 topotest.json_cmp(
139 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
140 )
141 is None
142 ):
143 return None
144
145 if (
146 topotest.json_cmp(
147 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
148 )
149 is None
150 ):
151 return None
152
153 return topotest.json_cmp(
154 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
155 )
156
157 _, result = topotest.run_and_expect(
158 run_command_and_expect, None, count=130, wait=1
159 )
160 assertmsg = '"{}" convergence failure'.format(router)
161 assert result is None, assertmsg
162
163 def expect_ospfv3_neighbor_full(router, neighbor):
164 "Wait until OSPFv3 convergence."
165 logger.info("waiting OSPFv3 router '{}'".format(router))
166 test_func = partial(
167 topotest.router_json_cmp,
168 tgen.gears[router],
169 "show ipv6 ospf6 neighbor json",
170 {"neighbors": [{"neighborId": neighbor, "state": "Full"}]},
171 )
172 _, result = topotest.run_and_expect(test_func, None, count=130, wait=1)
173 assertmsg = '"{}" convergence failure'.format(router)
174 assert result is None, assertmsg
175
176 # Wait for OSPFv2 convergence
177 expect_ospfv2_neighbor_full("r1", "10.0.255.2")
178 expect_ospfv2_neighbor_full("r1", "10.0.255.3")
179 expect_ospfv2_neighbor_full("r2", "10.0.255.1")
180 expect_ospfv2_neighbor_full("r2", "10.0.255.3")
181 expect_ospfv2_neighbor_full("r3", "10.0.255.1")
182 expect_ospfv2_neighbor_full("r3", "10.0.255.2")
183 expect_ospfv2_neighbor_full("r3", "10.0.255.4")
184 expect_ospfv2_neighbor_full("r4", "10.0.255.3")
185
186 # Wait for OSPFv3 convergence
187 expect_ospfv3_neighbor_full("r1", "10.0.255.2")
188 expect_ospfv3_neighbor_full("r1", "10.0.255.3")
189 expect_ospfv3_neighbor_full("r2", "10.0.255.1")
190 expect_ospfv3_neighbor_full("r2", "10.0.255.3")
191 expect_ospfv3_neighbor_full("r3", "10.0.255.1")
192 expect_ospfv3_neighbor_full("r3", "10.0.255.2")
193 expect_ospfv3_neighbor_full("r3", "10.0.255.4")
194 expect_ospfv3_neighbor_full("r4", "10.0.255.3")
195
196
197 def compare_show_ipv6_ospf6(rname, expected):
198 """
199 Calls 'show ipv6 ospf6 route' for router `rname` and compare the obtained
200 result with the expected output.
201 """
202 tgen = get_topogen()
203 current = tgen.gears[rname].vtysh_cmd("show ipv6 ospf6 route")
204
205 # Remove the link addresses
206 current = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", current)
207 expected = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", expected)
208
209 # Remove the time
210 current = re.sub(r"\d+:\d{2}:\d{2}", "", current)
211 expected = re.sub(r"\d+:\d{2}:\d{2}", "", expected)
212
213 return topotest.difflines(
214 topotest.normalize_text(current),
215 topotest.normalize_text(expected),
216 title1="Current output",
217 title2="Expected output",
218 )
219
220
221 def test_ospf_convergence():
222 "Test OSPF daemon convergence"
223 tgen = get_topogen()
224 if tgen.routers_have_failure():
225 pytest.skip("skipped because of router(s) failure")
226
227 for router, rnode in tgen.routers().items():
228 logger.info('Waiting for router "%s" convergence', router)
229
230 # Load expected results from the command
231 reffile = os.path.join(CWD, "{}/ospfroute.txt".format(router))
232 expected = open(reffile).read()
233
234 # Run test function until we get an result. Wait at most 80 seconds.
235 test_func = partial(
236 topotest.router_output_cmp, rnode, "show ip ospf route", expected
237 )
238 result, diff = topotest.run_and_expect(test_func, "", count=160, wait=0.5)
239 assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
240
241
242 def test_ospf_kernel_route():
243 "Test OSPF kernel route installation"
244 tgen = get_topogen()
245 if tgen.routers_have_failure():
246 pytest.skip("skipped because of router(s) failure")
247
248 rlist = tgen.routers().values()
249 for router in rlist:
250 logger.info('Checking OSPF IPv4 kernel routes in "%s"', router.name)
251
252 routes = topotest.ip4_route(router)
253 expected = {
254 "10.0.1.0/24": {},
255 "10.0.2.0/24": {},
256 "10.0.3.0/24": {},
257 "10.0.10.0/24": {},
258 "172.16.0.0/24": {},
259 "172.16.1.0/24": {},
260 }
261 assertmsg = 'OSPF IPv4 route mismatch in router "{}"'.format(router.name)
262 assert topotest.json_cmp(routes, expected) is None, assertmsg
263
264
265 def test_ospf6_convergence():
266 "Test OSPF6 daemon convergence"
267 tgen = get_topogen()
268 if tgen.routers_have_failure():
269 pytest.skip("skipped because of router(s) failure")
270
271 ospf6route_file = "{}/ospf6route_ecmp.txt"
272 for rnum in range(1, 5):
273 router = "r{}".format(rnum)
274
275 logger.info('Waiting for router "%s" IPv6 OSPF convergence', router)
276
277 # Load expected results from the command
278 reffile = os.path.join(CWD, ospf6route_file.format(router))
279 expected = open(reffile).read()
280
281 # Run test function until we get an result. Wait at most 60 seconds.
282 test_func = partial(compare_show_ipv6_ospf6, router, expected)
283 result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
284 if (not result) and (rnum == 1):
285 # Didn't match the new ECMP version - try the old pre-ECMP format
286 ospf6route_file = "{}/ospf6route.txt"
287
288 # Load expected results from the command
289 reffile = os.path.join(CWD, ospf6route_file.format(router))
290 expected = open(reffile).read()
291
292 test_func = partial(compare_show_ipv6_ospf6, router, expected)
293 result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
294 if not result:
295 # Didn't match the old version - switch back to new ECMP version
296 # and fail
297 ospf6route_file = "{}/ospf6route_ecmp.txt"
298
299 # Load expected results from the command
300 reffile = os.path.join(CWD, ospf6route_file.format(router))
301 expected = open(reffile).read()
302
303 test_func = partial(compare_show_ipv6_ospf6, router, expected)
304 result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
305
306 assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
307
308
309 def test_ospf6_kernel_route():
310 "Test OSPF kernel route installation"
311 tgen = get_topogen()
312 if tgen.routers_have_failure():
313 pytest.skip("skipped because of router(s) failure")
314
315 rlist = tgen.routers().values()
316 for router in rlist:
317 logger.info('Checking OSPF IPv6 kernel routes in "%s"', router.name)
318
319 def _routes_in_fib6():
320 routes = topotest.ip6_route(router)
321 expected = {
322 "2001:db8:1::/64": {},
323 "2001:db8:2::/64": {},
324 "2001:db8:3::/64": {},
325 "2001:db8:100::/64": {},
326 "2001:db8:200::/64": {},
327 "2001:db8:300::/64": {},
328 }
329 logger.info("Routes:")
330 logger.info(routes)
331 logger.info(topotest.json_cmp(routes, expected))
332 logger.info("ENd:")
333 return topotest.json_cmp(routes, expected)
334
335 _, result = topotest.run_and_expect(_routes_in_fib6, None, count=20, wait=1)
336
337 assertmsg = 'OSPF IPv6 route mismatch in router "{}"'.format(router.name)
338 assert result is None, assertmsg
339
340
341 def test_ospf_json():
342 "Test 'show ip ospf json' output for coherency."
343 tgen = get_topogen()
344 if tgen.routers_have_failure():
345 pytest.skip("skipped because of router(s) failure")
346
347 for rnum in range(1, 5):
348 router = tgen.gears["r{}".format(rnum)]
349 logger.info(router.vtysh_cmd("show ip ospf database"))
350 logger.info('Comparing router "%s" "show ip ospf json" output', router.name)
351 expected = {
352 "routerId": "10.0.255.{}".format(rnum),
353 "tosRoutesOnly": True,
354 "rfc2328Conform": True,
355 "spfScheduleDelayMsecs": 0,
356 "holdtimeMinMsecs": 50,
357 "holdtimeMaxMsecs": 5000,
358 "lsaMinIntervalMsecs": 5000,
359 "lsaMinArrivalMsecs": 1000,
360 "writeMultiplier": 20,
361 "refreshTimerMsecs": 10000,
362 "asbrRouter": "injectingExternalRoutingInformation",
363 "attachedAreaCounter": 1,
364 "areas": {},
365 }
366 # Area specific additional checks
367 if router.name == "r1" or router.name == "r2" or router.name == "r3":
368 expected["areas"]["0.0.0.0"] = {
369 "areaIfActiveCounter": 2,
370 "areaIfTotalCounter": 2,
371 "authentication": "authenticationNone",
372 "backbone": True,
373 "lsaAsbrNumber": 1,
374 "lsaNetworkNumber": 1,
375 "lsaNssaNumber": 0,
376 "lsaNumber": 7,
377 "lsaOpaqueAreaNumber": 0,
378 "lsaOpaqueLinkNumber": 0,
379 "lsaRouterNumber": 3,
380 "lsaSummaryNumber": 2,
381 "nbrFullAdjacentCounter": 2,
382 }
383 if router.name == "r3" or router.name == "r4":
384 expected["areas"]["0.0.0.1"] = {
385 "areaIfActiveCounter": 1,
386 "areaIfTotalCounter": 1,
387 "authentication": "authenticationNone",
388 "lsaAsbrNumber": 2,
389 "lsaNetworkNumber": 1,
390 "lsaNssaNumber": 0,
391 "lsaNumber": 9,
392 "lsaOpaqueAreaNumber": 0,
393 "lsaOpaqueLinkNumber": 0,
394 "lsaRouterNumber": 2,
395 "lsaSummaryNumber": 4,
396 "nbrFullAdjacentCounter": 1,
397 }
398 # r4 has more interfaces for area 0.0.0.1
399 if router.name == "r4":
400 expected["areas"]["0.0.0.1"].update(
401 {
402 "areaIfActiveCounter": 2,
403 "areaIfTotalCounter": 2,
404 }
405 )
406
407 # router 3 has an additional area
408 if router.name == "r3":
409 expected["attachedAreaCounter"] = 2
410
411 output = router.vtysh_cmd("show ip ospf json", isjson=True)
412 result = topotest.json_cmp(output, expected)
413 assert result is None, '"{}" JSON output mismatches the expected result'.format(
414 router.name
415 )
416
417
418 def test_ospf_link_down():
419 "Test OSPF convergence after a link goes down"
420 tgen = get_topogen()
421 if tgen.routers_have_failure():
422 pytest.skip("skipped because of router(s) failure")
423
424 # Simulate a network down event on router3 switch3 interface.
425 router3 = tgen.gears["r3"]
426 router3.peer_link_enable("r3-eth0", False)
427
428 # Expect convergence on all routers
429 for router, rnode in tgen.routers().items():
430 logger.info('Waiting for router "%s" convergence after link failure', router)
431 # Load expected results from the command
432 reffile = os.path.join(CWD, "{}/ospfroute_down.txt".format(router))
433 expected = open(reffile).read()
434
435 # Run test function until we get an result. Wait at most 80 seconds.
436 test_func = partial(
437 topotest.router_output_cmp, rnode, "show ip ospf route", expected
438 )
439 result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5)
440 assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
441
442
443 def test_ospf_link_down_kernel_route():
444 "Test OSPF kernel route installation"
445 tgen = get_topogen()
446 if tgen.routers_have_failure():
447 pytest.skip("skipped because of router(s) failure")
448
449 rlist = tgen.routers().values()
450 for router in rlist:
451 logger.info(
452 'Checking OSPF IPv4 kernel routes in "%s" after link down', router.name
453 )
454
455 routes = topotest.ip4_route(router)
456 expected = {
457 "10.0.1.0/24": {},
458 "10.0.2.0/24": {},
459 "10.0.3.0/24": {},
460 "10.0.10.0/24": {},
461 "172.16.0.0/24": {},
462 "172.16.1.0/24": {},
463 }
464 if router.name == "r1" or router.name == "r2":
465 expected.update(
466 {
467 "10.0.10.0/24": None,
468 "172.16.0.0/24": None,
469 "172.16.1.0/24": None,
470 }
471 )
472 elif router.name == "r3" or router.name == "r4":
473 expected.update(
474 {
475 "10.0.1.0/24": None,
476 "10.0.2.0/24": None,
477 }
478 )
479 # Route '10.0.3.0' is no longer available for r4 since it is down.
480 if router.name == "r4":
481 expected.update(
482 {
483 "10.0.3.0/24": None,
484 }
485 )
486 assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format(
487 router.name
488 )
489 count = 0
490 not_found = True
491 while not_found and count < 10:
492 not_found = topotest.json_cmp(routes, expected)
493 if not_found:
494 sleep(1)
495 routes = topotest.ip4_route(router)
496 count += 1
497 else:
498 not_found = False
499 break
500 assert not_found is False, assertmsg
501
502
503 def test_ospf6_link_down():
504 "Test OSPF6 daemon convergence after link goes down"
505 tgen = get_topogen()
506 if tgen.routers_have_failure():
507 pytest.skip("skipped because of router(s) failure")
508
509 for rnum in range(1, 5):
510 router = "r{}".format(rnum)
511
512 logger.info(
513 'Waiting for router "%s" IPv6 OSPF convergence after link down', router
514 )
515
516 # Load expected results from the command
517 reffile = os.path.join(CWD, "{}/ospf6route_down.txt".format(router))
518 expected = open(reffile).read()
519
520 # Run test function until we get an result. Wait at most 60 seconds.
521 test_func = partial(compare_show_ipv6_ospf6, router, expected)
522 result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
523 assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
524
525
526 def test_ospf6_link_down_kernel_route():
527 "Test OSPF kernel route installation"
528 tgen = get_topogen()
529 if tgen.routers_have_failure():
530 pytest.skip("skipped because of router(s) failure")
531
532 rlist = tgen.routers().values()
533 for router in rlist:
534 logger.info(
535 'Checking OSPF IPv6 kernel routes in "%s" after link down', router.name
536 )
537
538 routes = topotest.ip6_route(router)
539 expected = {
540 "2001:db8:1::/64": {},
541 "2001:db8:2::/64": {},
542 "2001:db8:3::/64": {},
543 "2001:db8:100::/64": {},
544 "2001:db8:200::/64": {},
545 "2001:db8:300::/64": {},
546 }
547 if router.name == "r1" or router.name == "r2":
548 expected.update(
549 {
550 "2001:db8:100::/64": None,
551 "2001:db8:200::/64": None,
552 "2001:db8:300::/64": None,
553 }
554 )
555 elif router.name == "r3" or router.name == "r4":
556 expected.update(
557 {
558 "2001:db8:1::/64": None,
559 "2001:db8:2::/64": None,
560 }
561 )
562 # Route '2001:db8:3::/64' is no longer available for r4 since it is down.
563 if router.name == "r4":
564 expected.update(
565 {
566 "2001:db8:3::/64": None,
567 }
568 )
569 assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format(
570 router.name
571 )
572 count = 0
573 not_found = True
574 while not_found and count < 10:
575 not_found = topotest.json_cmp(routes, expected)
576 if not_found:
577 sleep(1)
578 routes = topotest.ip6_route(router)
579 count += 1
580 else:
581 not_found = False
582 break
583
584 assert not_found is False, assertmsg
585
586
587 def test_memory_leak():
588 "Run the memory leak test and report results."
589 tgen = get_topogen()
590 if not tgen.is_memleak_enabled():
591 pytest.skip("Memory leak test/report is disabled")
592
593 tgen.report_memory_leaks()
594
595
596 if __name__ == "__main__":
597 args = ["-s"] + sys.argv[1:]
598 sys.exit(pytest.main(args))