]>
Commit | Line | Data |
---|---|---|
e3c4bd24 CH |
1 | # -*- coding: utf-8 eval: (blacken-mode 1) -*- |
2 | # SPDX-License-Identifier: ISC | |
3 | # | |
4 | # May 2 2023, Christian Hopps <chopps@labn.net> | |
5 | # | |
6 | # Copyright (c) 2023, LabN Consulting, L.L.C. | |
7 | # | |
8 | """ | |
9 | Test static route startup functionality | |
10 | """ | |
11 | ||
12 | import datetime | |
13 | import ipaddress | |
14 | import logging | |
15 | import math | |
16 | import os | |
17 | import re | |
18 | ||
19 | import pytest | |
20 | from lib.common_config import retry, step | |
21 | from lib.topogen import Topogen, TopoRouter | |
22 | from lib.topolog import logger | |
23 | from munet.base import Timeout | |
24 | ||
25 | CWD = os.path.dirname(os.path.realpath(__file__)) | |
26 | ||
27 | # pytestmark = [pytest.mark.staticd, pytest.mark.mgmtd] | |
28 | pytestmark = [pytest.mark.staticd] | |
29 | ||
30 | ||
31 | def get_ip_networks(super_prefix, count): | |
32 | count_log2 = math.log(count, 2) | |
33 | if count_log2 != int(count_log2): | |
34 | count_log2 = int(count_log2) + 1 | |
35 | else: | |
36 | count_log2 = int(count_log2) | |
37 | network = ipaddress.ip_network(super_prefix) | |
38 | return tuple(network.subnets(count_log2))[0:count] | |
39 | ||
40 | ||
41 | track = Timeout(0) | |
42 | ROUTE_COUNT = 5000 | |
43 | ROUTE_RANGE = [None, None] | |
44 | ||
45 | ||
46 | def write_big_route_conf(rtr, super_prefix, count): | |
47 | start = None | |
48 | end = None | |
49 | with open(f"{CWD}/{rtr.name}/big.conf", "w+", encoding="ascii") as f: | |
50 | for net in get_ip_networks(super_prefix, count): | |
51 | end = net | |
52 | if not start: | |
53 | start = net | |
54 | f.write(f"ip route {net} lo\n") | |
55 | ||
56 | return start, end | |
57 | ||
58 | ||
59 | @pytest.fixture(scope="module") | |
60 | def tgen(request): | |
61 | "Setup/Teardown the environment and provide tgen argument to tests" | |
62 | ||
63 | global start_time | |
64 | topodef = { | |
65 | "s1": ("r1",), | |
66 | } | |
67 | ||
68 | tgen = Topogen(topodef, request.module.__name__) | |
69 | tgen.start_topology() | |
70 | ||
71 | start, end = write_big_route_conf(tgen.gears["r1"].net, "10.0.0.0/8", ROUTE_COUNT) | |
72 | ROUTE_RANGE[0] = start | |
73 | ROUTE_RANGE[1] = end | |
74 | ||
75 | # configure mgmtd using current mgmtd config file | |
76 | tgen.gears["r1"].load_config(TopoRouter.RD_ZEBRA, "zebra.conf") | |
77 | tgen.gears["r1"].load_config(TopoRouter.RD_MGMTD, "big.conf") | |
78 | ||
79 | track.started_on = datetime.datetime.now() | |
80 | ||
81 | tgen.start_router() | |
82 | yield tgen | |
83 | tgen.stop_topology() | |
84 | ||
85 | ||
86 | @retry(retry_timeout=3, initial_wait=0.1) | |
87 | def check_kernel(r1, prefix, expected=True): | |
88 | net = ipaddress.ip_network(prefix) | |
89 | if net.version == 6: | |
90 | kernel = r1.net.cmd_nostatus("ip -6 route show", warn=not expected) | |
91 | else: | |
92 | kernel = r1.net.cmd_nostatus("ip -4 route show", warn=not expected) | |
93 | ||
94 | logger.debug("checking kernel routing table:\n%s", kernel) | |
95 | route = f"{str(net)}(?: nhid [0-9]+)?.*proto (static|196)" | |
96 | m = re.search(route, kernel) | |
97 | if expected and not m: | |
98 | return f"Failed to find \n'{route}'\n in \n'{kernel}'" | |
99 | elif not expected and m: | |
100 | return f"Failed found \n'{route}'\n in \n'{kernel}'" | |
101 | return None | |
102 | ||
103 | ||
104 | def test_staticd_late_start(tgen): | |
105 | if tgen.routers_have_failure(): | |
106 | pytest.skip(tgen.errors) | |
107 | ||
108 | r1 = tgen.routers()["r1"] | |
109 | ||
110 | step(f"Verifying {ROUTE_COUNT} startup routes are present") | |
111 | ||
112 | timeo = Timeout(30) | |
113 | for remaining in timeo: | |
114 | rc, o, e = r1.net.cmd_status("vtysh -c 'show version'") | |
115 | if not rc: | |
116 | break | |
117 | print("nogo: ", rc, o, e) | |
118 | assert not timeo.is_expired() | |
119 | logging.info("r1: vtysh connected after %ss", track.elapsed()) | |
120 | ||
121 | result = check_kernel(r1, ROUTE_RANGE[0], retry_timeout=20) | |
122 | assert result is None | |
123 | logging.info("r1: first route installed after %ss", track.elapsed()) | |
124 | ||
125 | result = check_kernel(r1, ROUTE_RANGE[1], retry_timeout=20) | |
126 | assert result is None | |
127 | logging.info("r1: last route installed after %ss", track.elapsed()) |