]>
git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/dpdk/app/test/autotest_runner.py
1 # SPDX-License-Identifier: BSD-3-Clause
2 # Copyright(c) 2010-2014 Intel Corporation
4 # The main logic behind running autotests in parallel
6 from __future__
import print_function
9 from multiprocessing
import Pool
, Queue
19 def wait_prompt(child
):
22 result
= child
.expect(["RTE>>", pexpect
.TIMEOUT
, pexpect
.EOF
],
32 # get all valid NUMA nodes
36 re
.match(r
"node(\d+)", os
.path
.basename(node
))
39 for node
in glob
.glob("/sys/devices/system/node/node*")
43 # find first (or any, really) CPU on a particular node, will be used to spread
44 # processes around NUMA nodes to avoid exhausting memory on particular node
45 def first_cpu_on_node(node_nr
):
46 cpu_path
= glob
.glob("/sys/devices/system/node/node%d/cpu*" % node_nr
)[0]
47 cpu_name
= os
.path
.basename(cpu_path
)
48 m
= re
.match(r
"cpu(\d+)", cpu_name
)
49 return int(m
.group(1))
52 pool_child
= None # per-process child
55 # we initialize each worker with a queue because we need per-pool unique
56 # command-line arguments, but we cannot do different arguments in an initializer
57 # because the API doesn't allow per-worker initializer arguments. so, instead,
58 # we will initialize with a shared queue, and dequeue command-line arguments
60 def pool_init(queue
, result_queue
):
63 cmdline
, prefix
= queue
.get()
64 start_time
= time
.time()
65 name
= ("Start %s" % prefix
) if prefix
!= "" else "Start"
67 # use default prefix if no prefix was specified
68 prefix_cmdline
= "--file-prefix=%s" % prefix
if prefix
!= "" else ""
70 # append prefix to cmdline
71 cmdline
= "%s %s" % (cmdline
, prefix_cmdline
)
73 # prepare logging of init
74 startuplog
= StringIO
.StringIO()
79 print("\n%s %s\n" % ("=" * 20, prefix
), file=startuplog
)
80 print("\ncmdline=%s" % cmdline
, file=startuplog
)
82 pool_child
= pexpect
.spawn(cmdline
, logfile
=startuplog
)
84 # wait for target to boot
85 if not wait_prompt(pool_child
):
91 time
.time() - start_time
,
92 startuplog
.getvalue(),
99 time
.time() - start_time
,
100 startuplog
.getvalue(),
106 time
.time() - start_time
,
107 startuplog
.getvalue(),
111 result_queue
.put(result
)
115 # each result tuple in results list consists of:
116 # result value (0 or -1)
119 # total test run time (double)
121 # test report (if not available, should be None)
123 # this function needs to be outside AutotestRunner class because otherwise Pool
124 # won't work (or rather it will require quite a bit of effort to make it work).
125 def run_test(target
, test
):
128 if pool_child
is None:
129 return -1, "Fail [No test process]", test
["Name"], 0, "", None
131 # create log buffer for each test
132 # in multiprocessing environment, the logging would be
133 # interleaved and will create a mess, hence the buffering
134 logfile
= StringIO
.StringIO()
135 pool_child
.logfile
= logfile
137 # make a note when the test started
138 start_time
= time
.time()
141 # print test name to log buffer
142 print("\n%s %s\n" % ("-" * 20, test
["Name"]), file=logfile
)
144 # run test function associated with the test
145 result
= test
["Func"](pool_child
, test
["Command"])
147 # make a note when the test was finished
148 end_time
= time
.time()
150 log
= logfile
.getvalue()
152 # append test data to the result tuple
153 result
+= (test
["Name"], end_time
- start_time
, log
)
155 # call report function, if any defined, and supply it with
156 # target and complete log for test run
158 report
= test
["Report"](target
, log
)
160 # append report to results tuple
166 # make a note when the test crashed
167 end_time
= time
.time()
169 # mark test as failed
170 result
= (-1, "Fail [Crash]", test
["Name"],
171 end_time
- start_time
, logfile
.getvalue(), None)
173 # return test results
177 # class representing an instance of autotests run
178 class AutotestRunner
:
180 parallel_test_groups
= []
181 non_parallel_test_groups
= []
192 def __init__(self
, cmdline
, target
, blacklist
, whitelist
, n_processes
):
193 self
.cmdline
= cmdline
195 self
.blacklist
= blacklist
196 self
.whitelist
= whitelist
198 self
.parallel_tests
= []
199 self
.non_parallel_tests
= []
200 self
.n_processes
= n_processes
201 self
.active_processes
= 0
203 # parse the binary for available test commands
204 binary
= cmdline
.split()[0]
205 stripped
= 'not stripped' not in \
206 subprocess
.check_output(['file', binary
])
208 symbols
= subprocess
.check_output(['nm', binary
]).decode('utf-8')
209 self
.avail_cmds
= re
.findall('test_register_(\w+)', symbols
)
211 self
.avail_cmds
= None
214 logfile
= "%s.log" % target
215 csvfile
= "%s.csv" % target
217 self
.logfile
= open(logfile
, "w")
218 csvfile
= open(csvfile
, "w")
219 self
.csvwriter
= csv
.writer(csvfile
)
221 # prepare results table
222 self
.csvwriter
.writerow(["test_name", "test_result", "result_str"])
224 # set up cmdline string
225 def __get_cmdline(self
, cpu_nr
):
226 cmdline
= ("taskset -c %i " % cpu_nr
) + self
.cmdline
230 def __process_result(self
, result
):
232 # unpack result tuple
233 test_result
, result_str
, test_name
, \
234 test_time
, log
, report
= result
237 cur_time
= time
.time()
238 total_time
= int(cur_time
- self
.start
)
240 # print results, test run time and total time since start
241 result
= ("%s:" % test_name
).ljust(30)
242 result
+= result_str
.ljust(29)
243 result
+= "[%02dm %02ds]" % (test_time
/ 60, test_time
% 60)
245 # don't print out total time every line, it's the same anyway
246 print(result
+ "[%02dm %02ds]" % (total_time
/ 60, total_time
% 60))
248 # if test failed and it wasn't a "start" test
253 self
.log_buffers
.append(log
)
255 # create report if it exists
258 f
= open("%s_%s_report.rst" %
259 (self
.target
, test_name
), "w")
261 print("Report for %s could not be created!" % test_name
)
266 # write test result to CSV file
267 self
.csvwriter
.writerow([test_name
, test_result
, result_str
])
269 # this function checks individual test and decides if this test should be in
270 # the group by comparing it against whitelist/blacklist. it also checks if
271 # the test is compiled into the binary, and marks it as skipped if necessary
272 def __filter_test(self
, test
):
273 test_cmd
= test
["Command"]
276 # dump tests are specified in full e.g. "Dump_mempool"
277 if "_autotest" in test_id
:
278 test_id
= test_id
[:-len("_autotest")]
280 # filter out blacklisted/whitelisted tests
281 if self
.blacklist
and test_id
in self
.blacklist
:
283 if self
.whitelist
and test_id
not in self
.whitelist
:
286 # if test wasn't compiled in, remove it as well
287 if self
.avail_cmds
and test_cmd
not in self
.avail_cmds
:
288 result
= 0, "Skipped [Not compiled]", test_id
, 0, "", None
289 self
.skipped
.append(tuple(result
))
294 def __run_test_group(self
, test_group
, worker_cmdlines
):
295 group_queue
= Queue()
296 init_result_queue
= Queue()
297 for proc
, cmdline
in enumerate(worker_cmdlines
):
298 prefix
= "test%i" % proc
if len(worker_cmdlines
) > 1 else ""
299 group_queue
.put(tuple((cmdline
, prefix
)))
301 # create a pool of worker threads
302 # we will initialize child in the initializer, and we don't need to
303 # close the child because when the pool worker gets destroyed, child
305 pool
= Pool(processes
=len(worker_cmdlines
),
306 initializer
=pool_init
,
307 initargs
=(group_queue
, init_result_queue
))
311 # process all initialization results
312 for _
in range(len(worker_cmdlines
)):
313 self
.__process
_result
(init_result_queue
.get())
315 # run all tests asynchronously
316 for test
in test_group
:
317 result
= pool
.apply_async(run_test
, (self
.target
, test
))
318 results
.append(result
)
320 # tell the pool to stop all processes once done
323 # iterate while we have group execution results to get
324 while len(results
) > 0:
325 # iterate over a copy to be able to safely delete results
326 # this iterates over a list of group results
327 for async_result
in results
[:]:
328 # if the thread hasn't finished yet, continue
329 if not async_result
.ready():
332 res
= async_result
.get()
334 self
.__process
_result
(res
)
336 # remove result from results list once we're done with it
337 results
.remove(async_result
)
339 # iterate over test groups and run tests associated with them
340 def run_all_tests(self
):
342 self
.parallel_tests
= list(
343 filter(self
.__filter
_test
,
346 self
.non_parallel_tests
= list(
347 filter(self
.__filter
_test
,
348 self
.non_parallel_tests
)
351 parallel_cmdlines
= []
352 # FreeBSD doesn't have NUMA support
353 numa_nodes
= get_numa_nodes()
354 if len(numa_nodes
) > 0:
355 for proc
in range(self
.n_processes
):
356 # spread cpu affinity between NUMA nodes to have less chance of
357 # running out of memory while running multiple test apps in
358 # parallel. to do that, alternate between NUMA nodes in a round
359 # robin fashion, and pick an arbitrary CPU from that node to
360 # taskset our execution to
361 numa_node
= numa_nodes
[self
.active_processes
% len(numa_nodes
)]
362 cpu_nr
= first_cpu_on_node(numa_node
)
363 parallel_cmdlines
+= [self
.__get
_cmdline
(cpu_nr
)]
364 # increase number of active processes so that the next cmdline
365 # gets a different NUMA node
366 self
.active_processes
+= 1
368 parallel_cmdlines
= [self
.cmdline
] * self
.n_processes
370 print("Running tests with %d workers" % self
.n_processes
)
372 # create table header
374 print("Test name".ljust(30) + "Test result".ljust(29) +
375 "Test".center(9) + "Total".center(9))
378 if len(self
.skipped
):
379 print("Skipped autotests:")
381 # print out any skipped tests
382 for result
in self
.skipped
:
383 # unpack result tuple
384 test_result
, result_str
, test_name
, _
, _
, _
= result
385 self
.csvwriter
.writerow([test_name
, test_result
, result_str
])
387 t
= ("%s:" % test_name
).ljust(30)
388 t
+= result_str
.ljust(29)
393 # make a note of tests start time
394 self
.start
= time
.time()
396 # whatever happens, try to save as much logs as possible
398 if len(self
.parallel_tests
) > 0:
399 print("Parallel autotests:")
400 self
.__run
_test
_group
(self
.parallel_tests
, parallel_cmdlines
)
402 if len(self
.non_parallel_tests
) > 0:
403 print("Non-parallel autotests:")
404 self
.__run
_test
_group
(self
.non_parallel_tests
, [self
.cmdline
])
407 cur_time
= time
.time()
408 total_time
= int(cur_time
- self
.start
)
412 print("Total run time: %02dm %02ds" % (total_time
/ 60,
415 print("Number of failed tests: %s" % str(self
.fails
))
417 # write summary to logfile
418 self
.logfile
.write("Summary\n")
419 self
.logfile
.write("Target: ".ljust(15) + "%s\n" % self
.target
)
420 self
.logfile
.write("Tests: ".ljust(15) + "%i\n" % self
.n_tests
)
421 self
.logfile
.write("Failed tests: ".ljust(
422 15) + "%i\n" % self
.fails
)
424 print("Exception occurred")
425 print(sys
.exc_info())
428 # drop logs from all executions to a logfile
429 for buf
in self
.log_buffers
:
430 self
.logfile
.write(buf
.replace("\r", ""))