]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/ospf_topo1/test_ospf_topo1.py
Merge pull request #9861 from rgirada/ospf6_coverity
[mirror_frr.git] / tests / topotests / ospf_topo1 / test_ospf_topo1.py
1 #!/usr/bin/env python
2
3 #
4 # test_ospf_topo1.py
5 # Part of NetDEF Topology Tests
6 #
7 # Copyright (c) 2017 by
8 # Network Device Education Foundation, Inc. ("NetDEF")
9 #
10 # Permission to use, copy, modify, and/or distribute this software
11 # for any purpose with or without fee is hereby granted, provided
12 # that the above copyright notice and this permission notice appear
13 # in all copies.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
22 # OF THIS SOFTWARE.
23 #
24
25 """
26 test_ospf_topo1.py: Test the FRR OSPF routing daemon.
27 """
28
29 import os
30 import re
31 import sys
32 from functools import partial
33 from time import sleep
34 import pytest
35
36 # Save the Current Working Directory to find configuration files.
37 CWD = os.path.dirname(os.path.realpath(__file__))
38 sys.path.append(os.path.join(CWD, "../"))
39
40 # pylint: disable=C0413
41 # Import topogen and topotest helpers
42 from lib import topotest
43 from lib.topogen import Topogen, TopoRouter, get_topogen
44 from lib.topolog import logger
45
46 # Required to instantiate the topology builder class.
47
48 pytestmark = [pytest.mark.ospfd]
49
50
51 def build_topo(tgen):
52 "Build function"
53
54 # Create 4 routers
55 for routern in range(1, 5):
56 tgen.add_router("r{}".format(routern))
57
58 # Create a empty network for router 1
59 switch = tgen.add_switch("s1")
60 switch.add_link(tgen.gears["r1"])
61
62 # Create a empty network for router 2
63 switch = tgen.add_switch("s2")
64 switch.add_link(tgen.gears["r2"])
65
66 # Interconect router 1, 2 and 3
67 switch = tgen.add_switch("s3")
68 switch.add_link(tgen.gears["r1"])
69 switch.add_link(tgen.gears["r2"])
70 switch.add_link(tgen.gears["r3"])
71
72 # Create empty netowrk for router3
73 switch = tgen.add_switch("s4")
74 switch.add_link(tgen.gears["r3"])
75
76 # Interconect router 3 and 4
77 switch = tgen.add_switch("s5")
78 switch.add_link(tgen.gears["r3"])
79 switch.add_link(tgen.gears["r4"])
80
81 # Create a empty network for router 4
82 switch = tgen.add_switch("s6")
83 switch.add_link(tgen.gears["r4"])
84
85
86 def setup_module(mod):
87 "Sets up the pytest environment"
88 tgen = Topogen(build_topo, mod.__name__)
89 tgen.start_topology()
90
91 ospf6_config = "ospf6d.conf"
92
93 router_list = tgen.routers()
94 for rname, router in router_list.items():
95 router.load_config(
96 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
97 )
98 router.load_config(
99 TopoRouter.RD_OSPF, os.path.join(CWD, "{}/ospfd.conf".format(rname))
100 )
101 router.load_config(
102 TopoRouter.RD_OSPF6, os.path.join(CWD, "{}/{}".format(rname, ospf6_config))
103 )
104
105 # Initialize all routers.
106 tgen.start_router()
107
108
109 def teardown_module(mod):
110 "Teardown the pytest environment"
111 tgen = get_topogen()
112 tgen.stop_topology()
113
114
115 def test_wait_protocol_convergence():
116 "Wait for OSPFv2/OSPFv3 to converge"
117 tgen = get_topogen()
118 if tgen.routers_have_failure():
119 pytest.skip(tgen.errors)
120
121 logger.info("waiting for protocols to converge")
122
123 def expect_ospfv2_neighbor_full(router, neighbor):
124 "Wait until OSPFv2 convergence."
125 logger.info("waiting OSPFv2 router '{}'".format(router))
126
127 def run_command_and_expect():
128 """
129 Function that runs command and expect the following outcomes:
130 * Full/DR
131 * Full/DROther
132 * Full/Backup
133 """
134 result = tgen.gears[router].vtysh_cmd(
135 "show ip ospf neighbor json", isjson=True
136 )
137 if (
138 topotest.json_cmp(
139 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
140 )
141 is None
142 ):
143 return None
144
145 if (
146 topotest.json_cmp(
147 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
148 )
149 is None
150 ):
151 return None
152
153 return topotest.json_cmp(
154 result, {"neighbors": {neighbor: [{"converged": "Full"}]}}
155 )
156
157 _, result = topotest.run_and_expect(
158 run_command_and_expect, None, count=130, wait=1
159 )
160 assertmsg = '"{}" convergence failure'.format(router)
161 assert result is None, assertmsg
162
163 def expect_ospfv3_neighbor_full(router, neighbor):
164 "Wait until OSPFv3 convergence."
165 logger.info("waiting OSPFv3 router '{}'".format(router))
166 test_func = partial(
167 topotest.router_json_cmp,
168 tgen.gears[router],
169 "show ipv6 ospf6 neighbor json",
170 {"neighbors": [{"neighborId": neighbor, "state": "Full"}]},
171 )
172 _, result = topotest.run_and_expect(test_func, None, count=130, wait=1)
173 assertmsg = '"{}" convergence failure'.format(router)
174 assert result is None, assertmsg
175
176 # Wait for OSPFv2 convergence
177 expect_ospfv2_neighbor_full("r1", "10.0.255.2")
178 expect_ospfv2_neighbor_full("r1", "10.0.255.3")
179 expect_ospfv2_neighbor_full("r2", "10.0.255.1")
180 expect_ospfv2_neighbor_full("r2", "10.0.255.3")
181 expect_ospfv2_neighbor_full("r3", "10.0.255.1")
182 expect_ospfv2_neighbor_full("r3", "10.0.255.2")
183 expect_ospfv2_neighbor_full("r3", "10.0.255.4")
184 expect_ospfv2_neighbor_full("r4", "10.0.255.3")
185
186 # Wait for OSPFv3 convergence
187 expect_ospfv3_neighbor_full("r1", "10.0.255.2")
188 expect_ospfv3_neighbor_full("r1", "10.0.255.3")
189 expect_ospfv3_neighbor_full("r2", "10.0.255.1")
190 expect_ospfv3_neighbor_full("r2", "10.0.255.3")
191 expect_ospfv3_neighbor_full("r3", "10.0.255.1")
192 expect_ospfv3_neighbor_full("r3", "10.0.255.2")
193 expect_ospfv3_neighbor_full("r3", "10.0.255.4")
194 expect_ospfv3_neighbor_full("r4", "10.0.255.3")
195
196
197 def compare_show_ipv6_ospf6(rname, expected):
198 """
199 Calls 'show ipv6 ospf6 route' for router `rname` and compare the obtained
200 result with the expected output.
201 """
202 tgen = get_topogen()
203 current = tgen.gears[rname].vtysh_cmd("show ipv6 ospf6 route")
204
205 # Remove the link addresses
206 current = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", current)
207 expected = re.sub(r"fe80::[^ ]+", "fe80::xxxx:xxxx:xxxx:xxxx", expected)
208
209 # Remove the time
210 current = re.sub(r"\d+:\d{2}:\d{2}", "", current)
211 expected = re.sub(r"\d+:\d{2}:\d{2}", "", expected)
212
213 return topotest.difflines(
214 topotest.normalize_text(current),
215 topotest.normalize_text(expected),
216 title1="Current output",
217 title2="Expected output",
218 )
219
220
221 def test_ospf_convergence():
222 "Test OSPF daemon convergence"
223 tgen = get_topogen()
224 if tgen.routers_have_failure():
225 pytest.skip("skipped because of router(s) failure")
226
227 for router, rnode in tgen.routers().items():
228 logger.info('Waiting for router "%s" convergence', router)
229
230 # Load expected results from the command
231 reffile = os.path.join(CWD, "{}/ospfroute.txt".format(router))
232 expected = open(reffile).read()
233
234 # Run test function until we get an result. Wait at most 80 seconds.
235 test_func = partial(
236 topotest.router_output_cmp, rnode, "show ip ospf route", expected
237 )
238 result, diff = topotest.run_and_expect(test_func, "", count=160, wait=0.5)
239 assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
240
241
242 def test_ospf_kernel_route():
243 "Test OSPF kernel route installation"
244 tgen = get_topogen()
245 if tgen.routers_have_failure():
246 pytest.skip("skipped because of router(s) failure")
247
248 rlist = tgen.routers().values()
249 for router in rlist:
250 logger.info('Checking OSPF IPv4 kernel routes in "%s"', router.name)
251
252 routes = topotest.ip4_route(router)
253 expected = {
254 "10.0.1.0/24": {},
255 "10.0.2.0/24": {},
256 "10.0.3.0/24": {},
257 "10.0.10.0/24": {},
258 "172.16.0.0/24": {},
259 "172.16.1.0/24": {},
260 }
261 assertmsg = 'OSPF IPv4 route mismatch in router "{}"'.format(router.name)
262 assert topotest.json_cmp(routes, expected) is None, assertmsg
263
264
265 def test_ospf6_convergence():
266 "Test OSPF6 daemon convergence"
267 tgen = get_topogen()
268 if tgen.routers_have_failure():
269 pytest.skip("skipped because of router(s) failure")
270
271 ospf6route_file = "{}/ospf6route_ecmp.txt"
272 for rnum in range(1, 5):
273 router = "r{}".format(rnum)
274
275 logger.info('Waiting for router "%s" IPv6 OSPF convergence', router)
276
277 # Load expected results from the command
278 reffile = os.path.join(CWD, ospf6route_file.format(router))
279 expected = open(reffile).read()
280
281 # Run test function until we get an result. Wait at most 60 seconds.
282 test_func = partial(compare_show_ipv6_ospf6, router, expected)
283 result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
284 if (not result) and (rnum == 1):
285 # Didn't match the new ECMP version - try the old pre-ECMP format
286 ospf6route_file = "{}/ospf6route.txt"
287
288 # Load expected results from the command
289 reffile = os.path.join(CWD, ospf6route_file.format(router))
290 expected = open(reffile).read()
291
292 test_func = partial(compare_show_ipv6_ospf6, router, expected)
293 result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
294 if not result:
295 # Didn't match the old version - switch back to new ECMP version
296 # and fail
297 ospf6route_file = "{}/ospf6route_ecmp.txt"
298
299 # Load expected results from the command
300 reffile = os.path.join(CWD, ospf6route_file.format(router))
301 expected = open(reffile).read()
302
303 test_func = partial(compare_show_ipv6_ospf6, router, expected)
304 result, diff = topotest.run_and_expect(test_func, "", count=1, wait=3)
305
306 assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
307
308
309 def test_ospf6_kernel_route():
310 "Test OSPF kernel route installation"
311 tgen = get_topogen()
312 if tgen.routers_have_failure():
313 pytest.skip("skipped because of router(s) failure")
314
315 rlist = tgen.routers().values()
316 for router in rlist:
317 logger.info('Checking OSPF IPv6 kernel routes in "%s"', router.name)
318
319 routes = topotest.ip6_route(router)
320 expected = {
321 "2001:db8:1::/64": {},
322 "2001:db8:2::/64": {},
323 "2001:db8:3::/64": {},
324 "2001:db8:100::/64": {},
325 "2001:db8:200::/64": {},
326 "2001:db8:300::/64": {},
327 }
328 assertmsg = 'OSPF IPv6 route mismatch in router "{}"'.format(router.name)
329 assert topotest.json_cmp(routes, expected) is None, assertmsg
330
331
332 def test_ospf_json():
333 "Test 'show ip ospf json' output for coherency."
334 tgen = get_topogen()
335 if tgen.routers_have_failure():
336 pytest.skip("skipped because of router(s) failure")
337
338 for rnum in range(1, 5):
339 router = tgen.gears["r{}".format(rnum)]
340 logger.info('Comparing router "%s" "show ip ospf json" output', router.name)
341 expected = {
342 "routerId": "10.0.255.{}".format(rnum),
343 "tosRoutesOnly": True,
344 "rfc2328Conform": True,
345 "spfScheduleDelayMsecs": 0,
346 "holdtimeMinMsecs": 50,
347 "holdtimeMaxMsecs": 5000,
348 "lsaMinIntervalMsecs": 5000,
349 "lsaMinArrivalMsecs": 1000,
350 "writeMultiplier": 20,
351 "refreshTimerMsecs": 10000,
352 "asbrRouter": "injectingExternalRoutingInformation",
353 "attachedAreaCounter": 1,
354 "areas": {},
355 }
356 # Area specific additional checks
357 if router.name == "r1" or router.name == "r2" or router.name == "r3":
358 expected["areas"]["0.0.0.0"] = {
359 "areaIfActiveCounter": 2,
360 "areaIfTotalCounter": 2,
361 "authentication": "authenticationNone",
362 "backbone": True,
363 "lsaAsbrNumber": 1,
364 "lsaNetworkNumber": 1,
365 "lsaNssaNumber": 0,
366 "lsaNumber": 7,
367 "lsaOpaqueAreaNumber": 0,
368 "lsaOpaqueLinkNumber": 0,
369 "lsaRouterNumber": 3,
370 "lsaSummaryNumber": 2,
371 "nbrFullAdjacentCounter": 2,
372 }
373 if router.name == "r3" or router.name == "r4":
374 expected["areas"]["0.0.0.1"] = {
375 "areaIfActiveCounter": 1,
376 "areaIfTotalCounter": 1,
377 "authentication": "authenticationNone",
378 "lsaAsbrNumber": 2,
379 "lsaNetworkNumber": 1,
380 "lsaNssaNumber": 0,
381 "lsaNumber": 9,
382 "lsaOpaqueAreaNumber": 0,
383 "lsaOpaqueLinkNumber": 0,
384 "lsaRouterNumber": 2,
385 "lsaSummaryNumber": 4,
386 "nbrFullAdjacentCounter": 1,
387 }
388 # r4 has more interfaces for area 0.0.0.1
389 if router.name == "r4":
390 expected["areas"]["0.0.0.1"].update(
391 {
392 "areaIfActiveCounter": 2,
393 "areaIfTotalCounter": 2,
394 }
395 )
396
397 # router 3 has an additional area
398 if router.name == "r3":
399 expected["attachedAreaCounter"] = 2
400
401 output = router.vtysh_cmd("show ip ospf json", isjson=True)
402 result = topotest.json_cmp(output, expected)
403 assert result is None, '"{}" JSON output mismatches the expected result'.format(
404 router.name
405 )
406
407
408 def test_ospf_link_down():
409 "Test OSPF convergence after a link goes down"
410 tgen = get_topogen()
411 if tgen.routers_have_failure():
412 pytest.skip("skipped because of router(s) failure")
413
414 # Simulate a network down event on router3 switch3 interface.
415 router3 = tgen.gears["r3"]
416 router3.peer_link_enable("r3-eth0", False)
417
418 # Expect convergence on all routers
419 for router, rnode in tgen.routers().items():
420 logger.info('Waiting for router "%s" convergence after link failure', router)
421 # Load expected results from the command
422 reffile = os.path.join(CWD, "{}/ospfroute_down.txt".format(router))
423 expected = open(reffile).read()
424
425 # Run test function until we get an result. Wait at most 80 seconds.
426 test_func = partial(
427 topotest.router_output_cmp, rnode, "show ip ospf route", expected
428 )
429 result, diff = topotest.run_and_expect(test_func, "", count=140, wait=0.5)
430 assert result, "OSPF did not converge on {}:\n{}".format(router, diff)
431
432
433 def test_ospf_link_down_kernel_route():
434 "Test OSPF kernel route installation"
435 tgen = get_topogen()
436 if tgen.routers_have_failure():
437 pytest.skip("skipped because of router(s) failure")
438
439 rlist = tgen.routers().values()
440 for router in rlist:
441 logger.info(
442 'Checking OSPF IPv4 kernel routes in "%s" after link down', router.name
443 )
444
445 routes = topotest.ip4_route(router)
446 expected = {
447 "10.0.1.0/24": {},
448 "10.0.2.0/24": {},
449 "10.0.3.0/24": {},
450 "10.0.10.0/24": {},
451 "172.16.0.0/24": {},
452 "172.16.1.0/24": {},
453 }
454 if router.name == "r1" or router.name == "r2":
455 expected.update(
456 {
457 "10.0.10.0/24": None,
458 "172.16.0.0/24": None,
459 "172.16.1.0/24": None,
460 }
461 )
462 elif router.name == "r3" or router.name == "r4":
463 expected.update(
464 {
465 "10.0.1.0/24": None,
466 "10.0.2.0/24": None,
467 }
468 )
469 # Route '10.0.3.0' is no longer available for r4 since it is down.
470 if router.name == "r4":
471 expected.update(
472 {
473 "10.0.3.0/24": None,
474 }
475 )
476 assertmsg = 'OSPF IPv4 route mismatch in router "{}" after link down'.format(
477 router.name
478 )
479 count = 0
480 not_found = True
481 while not_found and count < 10:
482 not_found = topotest.json_cmp(routes, expected)
483 if not_found:
484 sleep(1)
485 routes = topotest.ip4_route(router)
486 count += 1
487 else:
488 not_found = False
489 break
490 assert not_found is False, assertmsg
491
492
493 def test_ospf6_link_down():
494 "Test OSPF6 daemon convergence after link goes down"
495 tgen = get_topogen()
496 if tgen.routers_have_failure():
497 pytest.skip("skipped because of router(s) failure")
498
499 for rnum in range(1, 5):
500 router = "r{}".format(rnum)
501
502 logger.info(
503 'Waiting for router "%s" IPv6 OSPF convergence after link down', router
504 )
505
506 # Load expected results from the command
507 reffile = os.path.join(CWD, "{}/ospf6route_down.txt".format(router))
508 expected = open(reffile).read()
509
510 # Run test function until we get an result. Wait at most 60 seconds.
511 test_func = partial(compare_show_ipv6_ospf6, router, expected)
512 result, diff = topotest.run_and_expect(test_func, "", count=25, wait=3)
513 assert result, "OSPF6 did not converge on {}:\n{}".format(router, diff)
514
515
516 def test_ospf6_link_down_kernel_route():
517 "Test OSPF kernel route installation"
518 tgen = get_topogen()
519 if tgen.routers_have_failure():
520 pytest.skip("skipped because of router(s) failure")
521
522 rlist = tgen.routers().values()
523 for router in rlist:
524 logger.info(
525 'Checking OSPF IPv6 kernel routes in "%s" after link down', router.name
526 )
527
528 routes = topotest.ip6_route(router)
529 expected = {
530 "2001:db8:1::/64": {},
531 "2001:db8:2::/64": {},
532 "2001:db8:3::/64": {},
533 "2001:db8:100::/64": {},
534 "2001:db8:200::/64": {},
535 "2001:db8:300::/64": {},
536 }
537 if router.name == "r1" or router.name == "r2":
538 expected.update(
539 {
540 "2001:db8:100::/64": None,
541 "2001:db8:200::/64": None,
542 "2001:db8:300::/64": None,
543 }
544 )
545 elif router.name == "r3" or router.name == "r4":
546 expected.update(
547 {
548 "2001:db8:1::/64": None,
549 "2001:db8:2::/64": None,
550 }
551 )
552 # Route '2001:db8:3::/64' is no longer available for r4 since it is down.
553 if router.name == "r4":
554 expected.update(
555 {
556 "2001:db8:3::/64": None,
557 }
558 )
559 assertmsg = 'OSPF IPv6 route mismatch in router "{}" after link down'.format(
560 router.name
561 )
562 count = 0
563 not_found = True
564 while not_found and count < 10:
565 not_found = topotest.json_cmp(routes, expected)
566 if not_found:
567 sleep(1)
568 routes = topotest.ip6_route(router)
569 count += 1
570 else:
571 not_found = False
572 break
573
574 assert not_found is False, assertmsg
575
576
577 def test_memory_leak():
578 "Run the memory leak test and report results."
579 tgen = get_topogen()
580 if not tgen.is_memleak_enabled():
581 pytest.skip("Memory leak test/report is disabled")
582
583 tgen.report_memory_leaks()
584
585
586 if __name__ == "__main__":
587 args = ["-s"] + sys.argv[1:]
588 sys.exit(pytest.main(args))