]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/topolog.py
3 # Library of helper functions for NetDEF Topology Tests
5 # Copyright (c) 2017 by
6 # Network Device Education Foundation, Inc. ("NetDEF")
8 # Permission to use, copy, modify, and/or distribute this software
9 # for any purpose with or without fee is hereby granted, provided
10 # that the above copyright notice and this permission notice appear
13 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
14 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
16 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
17 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
18 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
19 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
24 Logging utilities for topology tests.
26 This file defines our logging abstraction.
34 if sys
.version_info
[0] > 2:
40 from xdist
import is_xdist_controller
43 def is_xdist_controller():
49 # Helper dictionary to convert Topogen logging levels to Python's logging.
50 DEBUG_TOPO2LOGGING
= {
51 "debug": logging
.DEBUG
,
53 "output": logging
.INFO
,
54 "warning": logging
.WARNING
,
55 "error": logging
.ERROR
,
56 "critical": logging
.CRITICAL
,
58 FORMAT
= "%(asctime)s.%(msecs)03d %(levelname)s: %(name)s: %(message)s"
61 logger
= logging
.getLogger("topolog")
64 def set_handler(l
, target
=None):
66 h
= logging
.NullHandler()
68 if isinstance(target
, str):
69 h
= logging
.FileHandler(filename
=target
, mode
="w")
71 h
= logging
.StreamHandler(stream
=target
)
72 h
.setFormatter(logging
.Formatter(fmt
=FORMAT
))
73 # Don't filter anything at the handler level
74 h
.setLevel(logging
.DEBUG
)
79 def set_log_level(l
, level
):
80 "Set the logging level."
81 # Messages sent to this logger only are created if this level or above.
82 log_level
= DEBUG_TOPO2LOGGING
.get(level
, level
)
86 def get_logger(name
, log_level
=None, target
=None):
87 l
= logging
.getLogger("{}.{}".format(BASENAME
, name
))
89 if log_level
is not None:
90 set_log_level(l
, log_level
)
92 if target
is not None:
93 set_handler(l
, target
)
98 # nodeid: all_protocol_startup/test_all_protocol_startup.py::test_router_running
101 def get_test_logdir(nodeid
=None):
102 """Get log directory relative pathname."""
103 xdist_worker
= os
.getenv("PYTEST_XDIST_WORKER", "")
104 mode
= os
.getenv("PYTEST_XDIST_MODE", "no")
107 nodeid
= os
.environ
["PYTEST_CURRENT_TEST"].split(" ")[0]
109 cur_test
= nodeid
.replace("[", "_").replace("]", "_")
110 path
, testname
= cur_test
.split("::")
111 path
= path
[:-3].replace("/", ".")
113 # We use different logdir paths based on how xdist is running.
115 return os
.path
.join(path
, testname
, xdist_worker
)
117 return os
.path
.join(path
, testname
)
120 mode
== "no" or mode
== "loadfile" or mode
== "loadscope"
121 ), "Unknown dist mode {}".format(mode
)
126 def logstart(nodeid
, location
, rundir
):
127 """Called from pytest before module setup."""
129 mode
= os
.getenv("PYTEST_XDIST_MODE", "no")
130 worker
= os
.getenv("PYTEST_TOPOTEST_WORKER", "")
132 # We only per-test log in the workers (or non-dist)
133 if not worker
and mode
!= "no":
136 handler_id
= nodeid
+ worker
137 assert handler_id
not in handlers
139 rel_log_dir
= get_test_logdir(nodeid
)
140 exec_log_dir
= os
.path
.join(rundir
, rel_log_dir
)
141 subprocess
.check_call(
142 "mkdir -p {0} && chmod 1777 {0}".format(exec_log_dir
), shell
=True
144 exec_log_path
= os
.path
.join(exec_log_dir
, "exec.log")
146 # Add test based exec log handler
147 h
= set_handler(logger
, exec_log_path
)
148 handlers
[handler_id
] = h
152 "Logging on worker %s for %s into %s", worker
, handler_id
, exec_log_path
155 logger
.info("Logging for %s into %s", handler_id
, exec_log_path
)
158 def logfinish(nodeid
, location
):
159 """Called from pytest after module teardown."""
160 # This function may not be called if pytest is interrupted.
162 worker
= os
.getenv("PYTEST_TOPOTEST_WORKER", "")
163 handler_id
= nodeid
+ worker
165 if handler_id
in handlers
:
166 # Remove test based exec log handler
168 logger
.info("Closing logs for %s", handler_id
)
170 h
= handlers
[handler_id
]
171 logger
.removeHandler(handlers
[handler_id
])
174 del handlers
[handler_id
]
177 console_handler
= set_handler(logger
, None)
178 set_log_level(logger
, "debug")