]> git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/vsementsov/tags/pull-jobs-2021-10-07-v2' into...
[mirror_qemu.git] / tests / qemu-iotests / iotests.py
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import atexit
20 import bz2
21 from collections import OrderedDict
22 import faulthandler
23 import json
24 import logging
25 import os
26 import re
27 import shutil
28 import signal
29 import struct
30 import subprocess
31 import sys
32 import time
33 from typing import (Any, Callable, Dict, Iterable,
34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35 import unittest
36
37 from contextlib import contextmanager
38
39 from qemu.machine import qtest
40 from qemu.qmp import QMPMessage
41
42 # Use this logger for logging messages directly from the iotests module
43 logger = logging.getLogger('qemu.iotests')
44 logger.addHandler(logging.NullHandler())
45
46 # Use this logger for messages that ought to be used for diff output.
47 test_logger = logging.getLogger('qemu.iotests.diff_io')
48
49
50 faulthandler.enable()
51
52 # This will not work if arguments contain spaces but is necessary if we
53 # want to support the override options that ./check supports.
54 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
55 if os.environ.get('QEMU_IMG_OPTIONS'):
56 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
57
58 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
59 if os.environ.get('QEMU_IO_OPTIONS'):
60 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
61
62 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
63 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
64 qemu_io_args_no_fmt += \
65 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
66
67 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
68 qemu_nbd_args = [qemu_nbd_prog]
69 if os.environ.get('QEMU_NBD_OPTIONS'):
70 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
71
72 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
73 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
74
75 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
76 qemu_gdb = []
77 if gdb_qemu_env:
78 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
79
80 qemu_print = os.environ.get('PRINT_QEMU', False)
81
82 imgfmt = os.environ.get('IMGFMT', 'raw')
83 imgproto = os.environ.get('IMGPROTO', 'file')
84 output_dir = os.environ.get('OUTPUT_DIR', '.')
85
86 try:
87 test_dir = os.environ['TEST_DIR']
88 sock_dir = os.environ['SOCK_DIR']
89 cachemode = os.environ['CACHEMODE']
90 aiomode = os.environ['AIOMODE']
91 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
92 except KeyError:
93 # We are using these variables as proxies to indicate that we're
94 # not being run via "check". There may be other things set up by
95 # "check" that individual test cases rely on.
96 sys.stderr.write('Please run this test via the "check" script\n')
97 sys.exit(os.EX_USAGE)
98
99 qemu_valgrind = []
100 if os.environ.get('VALGRIND_QEMU') == "y" and \
101 os.environ.get('NO_VALGRIND') != "y":
102 valgrind_logfile = "--log-file=" + test_dir
103 # %p allows to put the valgrind process PID, since
104 # we don't know it a priori (subprocess.Popen is
105 # not yet invoked)
106 valgrind_logfile += "/%p.valgrind"
107
108 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
109
110 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
111
112 luks_default_secret_object = 'secret,id=keysec0,data=' + \
113 os.environ.get('IMGKEYSECRET', '')
114 luks_default_key_secret_opt = 'key-secret=keysec0'
115
116 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
117
118
119 def unarchive_sample_image(sample, fname):
120 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
121 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
122 shutil.copyfileobj(f_in, f_out)
123
124
125 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
126 connect_stderr: bool = True) -> Tuple[str, int]:
127 """
128 Run a tool and return both its output and its exit code
129 """
130 stderr = subprocess.STDOUT if connect_stderr else None
131 with subprocess.Popen(args, stdout=subprocess.PIPE,
132 stderr=stderr, universal_newlines=True) as subp:
133 output = subp.communicate()[0]
134 if subp.returncode < 0:
135 cmd = ' '.join(args)
136 sys.stderr.write(f'{tool} received signal \
137 {-subp.returncode}: {cmd}\n')
138 return (output, subp.returncode)
139
140 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
141 """
142 Run qemu-img and return both its output and its exit code
143 """
144 full_args = qemu_img_args + list(args)
145 return qemu_tool_pipe_and_status('qemu-img', full_args)
146
147 def qemu_img(*args: str) -> int:
148 '''Run qemu-img and return the exit code'''
149 return qemu_img_pipe_and_status(*args)[1]
150
151 def ordered_qmp(qmsg, conv_keys=True):
152 # Dictionaries are not ordered prior to 3.6, therefore:
153 if isinstance(qmsg, list):
154 return [ordered_qmp(atom) for atom in qmsg]
155 if isinstance(qmsg, dict):
156 od = OrderedDict()
157 for k, v in sorted(qmsg.items()):
158 if conv_keys:
159 k = k.replace('_', '-')
160 od[k] = ordered_qmp(v, conv_keys=False)
161 return od
162 return qmsg
163
164 def qemu_img_create(*args):
165 args = list(args)
166
167 # default luks support
168 if '-f' in args and args[args.index('-f') + 1] == 'luks':
169 if '-o' in args:
170 i = args.index('-o')
171 if 'key-secret' not in args[i + 1]:
172 args[i + 1].append(luks_default_key_secret_opt)
173 args.insert(i + 2, '--object')
174 args.insert(i + 3, luks_default_secret_object)
175 else:
176 args = ['-o', luks_default_key_secret_opt,
177 '--object', luks_default_secret_object] + args
178
179 args.insert(0, 'create')
180
181 return qemu_img(*args)
182
183 def qemu_img_measure(*args):
184 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
185
186 def qemu_img_check(*args):
187 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
188
189 def qemu_img_verbose(*args):
190 '''Run qemu-img without suppressing its output and return the exit code'''
191 exitcode = subprocess.call(qemu_img_args + list(args))
192 if exitcode < 0:
193 sys.stderr.write('qemu-img received signal %i: %s\n'
194 % (-exitcode, ' '.join(qemu_img_args + list(args))))
195 return exitcode
196
197 def qemu_img_pipe(*args: str) -> str:
198 '''Run qemu-img and return its output'''
199 return qemu_img_pipe_and_status(*args)[0]
200
201 def qemu_img_log(*args):
202 result = qemu_img_pipe(*args)
203 log(result, filters=[filter_testfiles])
204 return result
205
206 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
207 args = ['info']
208 if imgopts:
209 args.append('--image-opts')
210 else:
211 args += ['-f', imgfmt]
212 args += extra_args
213 args.append(filename)
214
215 output = qemu_img_pipe(*args)
216 if not filter_path:
217 filter_path = filename
218 log(filter_img_info(output, filter_path))
219
220 def qemu_io(*args):
221 '''Run qemu-io and return the stdout data'''
222 args = qemu_io_args + list(args)
223 return qemu_tool_pipe_and_status('qemu-io', args)[0]
224
225 def qemu_io_log(*args):
226 result = qemu_io(*args)
227 log(result, filters=[filter_testfiles, filter_qemu_io])
228 return result
229
230 def qemu_io_silent(*args):
231 '''Run qemu-io and return the exit code, suppressing stdout'''
232 if '-f' in args or '--image-opts' in args:
233 default_args = qemu_io_args_no_fmt
234 else:
235 default_args = qemu_io_args
236
237 args = default_args + list(args)
238 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
239 if result.returncode < 0:
240 sys.stderr.write('qemu-io received signal %i: %s\n' %
241 (-result.returncode, ' '.join(args)))
242 return result.returncode
243
244 def qemu_io_silent_check(*args):
245 '''Run qemu-io and return the true if subprocess returned 0'''
246 args = qemu_io_args + list(args)
247 result = subprocess.run(args, stdout=subprocess.DEVNULL,
248 stderr=subprocess.STDOUT, check=False)
249 return result.returncode == 0
250
251 class QemuIoInteractive:
252 def __init__(self, *args):
253 self.args = qemu_io_args_no_fmt + list(args)
254 # We need to keep the Popen objext around, and not
255 # close it immediately. Therefore, disable the pylint check:
256 # pylint: disable=consider-using-with
257 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
258 stdout=subprocess.PIPE,
259 stderr=subprocess.STDOUT,
260 universal_newlines=True)
261 out = self._p.stdout.read(9)
262 if out != 'qemu-io> ':
263 # Most probably qemu-io just failed to start.
264 # Let's collect the whole output and exit.
265 out += self._p.stdout.read()
266 self._p.wait(timeout=1)
267 raise ValueError(out)
268
269 def close(self):
270 self._p.communicate('q\n')
271
272 def _read_output(self):
273 pattern = 'qemu-io> '
274 n = len(pattern)
275 pos = 0
276 s = []
277 while pos != n:
278 c = self._p.stdout.read(1)
279 # check unexpected EOF
280 assert c != ''
281 s.append(c)
282 if c == pattern[pos]:
283 pos += 1
284 else:
285 pos = 0
286
287 return ''.join(s[:-n])
288
289 def cmd(self, cmd):
290 # quit command is in close(), '\n' is added automatically
291 assert '\n' not in cmd
292 cmd = cmd.strip()
293 assert cmd not in ('q', 'quit')
294 self._p.stdin.write(cmd + '\n')
295 self._p.stdin.flush()
296 return self._read_output()
297
298
299 def qemu_nbd(*args):
300 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
301 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
302
303 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
304 '''Run qemu-nbd in daemon mode and return both the parent's exit code
305 and its output in case of an error'''
306 full_args = qemu_nbd_args + ['--fork'] + list(args)
307 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
308 connect_stderr=False)
309 return returncode, output if returncode else ''
310
311 def qemu_nbd_list_log(*args: str) -> str:
312 '''Run qemu-nbd to list remote exports'''
313 full_args = [qemu_nbd_prog, '-L'] + list(args)
314 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
315 log(output, filters=[filter_testfiles, filter_nbd_exports])
316 return output
317
318 @contextmanager
319 def qemu_nbd_popen(*args):
320 '''Context manager running qemu-nbd within the context'''
321 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
322
323 assert not os.path.exists(pid_file)
324
325 cmd = list(qemu_nbd_args)
326 cmd.extend(('--persistent', '--pid-file', pid_file))
327 cmd.extend(args)
328
329 log('Start NBD server')
330 with subprocess.Popen(cmd) as p:
331 try:
332 while not os.path.exists(pid_file):
333 if p.poll() is not None:
334 raise RuntimeError(
335 "qemu-nbd terminated with exit code {}: {}"
336 .format(p.returncode, ' '.join(cmd)))
337
338 time.sleep(0.01)
339 yield
340 finally:
341 if os.path.exists(pid_file):
342 os.remove(pid_file)
343 log('Kill NBD server')
344 p.kill()
345 p.wait()
346
347 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
348 '''Return True if two image files are identical'''
349 return qemu_img('compare', '-f', fmt1,
350 '-F', fmt2, img1, img2) == 0
351
352 def create_image(name, size):
353 '''Create a fully-allocated raw image with sector markers'''
354 with open(name, 'wb') as file:
355 i = 0
356 while i < size:
357 sector = struct.pack('>l504xl', i // 512, i // 512)
358 file.write(sector)
359 i = i + 512
360
361 def image_size(img):
362 '''Return image's virtual size'''
363 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
364 return json.loads(r)['virtual-size']
365
366 def is_str(val):
367 return isinstance(val, str)
368
369 test_dir_re = re.compile(r"%s" % test_dir)
370 def filter_test_dir(msg):
371 return test_dir_re.sub("TEST_DIR", msg)
372
373 win32_re = re.compile(r"\r")
374 def filter_win32(msg):
375 return win32_re.sub("", msg)
376
377 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
378 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
379 r"and [0-9\/.inf]* ops\/sec\)")
380 def filter_qemu_io(msg):
381 msg = filter_win32(msg)
382 return qemu_io_re.sub("X ops; XX:XX:XX.X "
383 "(XXX YYY/sec and XXX ops/sec)", msg)
384
385 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
386 def filter_chown(msg):
387 return chown_re.sub("chown UID:GID", msg)
388
389 def filter_qmp_event(event):
390 '''Filter a QMP event dict'''
391 event = dict(event)
392 if 'timestamp' in event:
393 event['timestamp']['seconds'] = 'SECS'
394 event['timestamp']['microseconds'] = 'USECS'
395 return event
396
397 def filter_qmp(qmsg, filter_fn):
398 '''Given a string filter, filter a QMP object's values.
399 filter_fn takes a (key, value) pair.'''
400 # Iterate through either lists or dicts;
401 if isinstance(qmsg, list):
402 items = enumerate(qmsg)
403 else:
404 items = qmsg.items()
405
406 for k, v in items:
407 if isinstance(v, (dict, list)):
408 qmsg[k] = filter_qmp(v, filter_fn)
409 else:
410 qmsg[k] = filter_fn(k, v)
411 return qmsg
412
413 def filter_testfiles(msg):
414 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
415 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
416 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
417
418 def filter_qmp_testfiles(qmsg):
419 def _filter(_key, value):
420 if is_str(value):
421 return filter_testfiles(value)
422 return value
423 return filter_qmp(qmsg, _filter)
424
425 def filter_virtio_scsi(output: str) -> str:
426 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
427
428 def filter_qmp_virtio_scsi(qmsg):
429 def _filter(_key, value):
430 if is_str(value):
431 return filter_virtio_scsi(value)
432 return value
433 return filter_qmp(qmsg, _filter)
434
435 def filter_generated_node_ids(msg):
436 return re.sub("#block[0-9]+", "NODE_NAME", msg)
437
438 def filter_img_info(output, filename):
439 lines = []
440 for line in output.split('\n'):
441 if 'disk size' in line or 'actual-size' in line:
442 continue
443 line = line.replace(filename, 'TEST_IMG')
444 line = filter_testfiles(line)
445 line = line.replace(imgfmt, 'IMGFMT')
446 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
447 line = re.sub('uuid: [-a-f0-9]+',
448 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
449 line)
450 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
451 lines.append(line)
452 return '\n'.join(lines)
453
454 def filter_imgfmt(msg):
455 return msg.replace(imgfmt, 'IMGFMT')
456
457 def filter_qmp_imgfmt(qmsg):
458 def _filter(_key, value):
459 if is_str(value):
460 return filter_imgfmt(value)
461 return value
462 return filter_qmp(qmsg, _filter)
463
464 def filter_nbd_exports(output: str) -> str:
465 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
466
467
468 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
469
470 def log(msg: Msg,
471 filters: Iterable[Callable[[Msg], Msg]] = (),
472 indent: Optional[int] = None) -> None:
473 """
474 Logs either a string message or a JSON serializable message (like QMP).
475 If indent is provided, JSON serializable messages are pretty-printed.
476 """
477 for flt in filters:
478 msg = flt(msg)
479 if isinstance(msg, (dict, list)):
480 # Don't sort if it's already sorted
481 do_sort = not isinstance(msg, OrderedDict)
482 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
483 else:
484 test_logger.info(msg)
485
486 class Timeout:
487 def __init__(self, seconds, errmsg="Timeout"):
488 self.seconds = seconds
489 self.errmsg = errmsg
490 def __enter__(self):
491 if qemu_gdb or qemu_valgrind:
492 return self
493 signal.signal(signal.SIGALRM, self.timeout)
494 signal.setitimer(signal.ITIMER_REAL, self.seconds)
495 return self
496 def __exit__(self, exc_type, value, traceback):
497 if qemu_gdb or qemu_valgrind:
498 return False
499 signal.setitimer(signal.ITIMER_REAL, 0)
500 return False
501 def timeout(self, signum, frame):
502 raise Exception(self.errmsg)
503
504 def file_pattern(name):
505 return "{0}-{1}".format(os.getpid(), name)
506
507 class FilePath:
508 """
509 Context manager generating multiple file names. The generated files are
510 removed when exiting the context.
511
512 Example usage:
513
514 with FilePath('a.img', 'b.img') as (img_a, img_b):
515 # Use img_a and img_b here...
516
517 # a.img and b.img are automatically removed here.
518
519 By default images are created in iotests.test_dir. To create sockets use
520 iotests.sock_dir:
521
522 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
523
524 For convenience, calling with one argument yields a single file instead of
525 a tuple with one item.
526
527 """
528 def __init__(self, *names, base_dir=test_dir):
529 self.paths = [os.path.join(base_dir, file_pattern(name))
530 for name in names]
531
532 def __enter__(self):
533 if len(self.paths) == 1:
534 return self.paths[0]
535 else:
536 return self.paths
537
538 def __exit__(self, exc_type, exc_val, exc_tb):
539 for path in self.paths:
540 try:
541 os.remove(path)
542 except OSError:
543 pass
544 return False
545
546
547 def try_remove(img):
548 try:
549 os.remove(img)
550 except OSError:
551 pass
552
553 def file_path_remover():
554 for path in reversed(file_path_remover.paths):
555 try_remove(path)
556
557
558 def file_path(*names, base_dir=test_dir):
559 ''' Another way to get auto-generated filename that cleans itself up.
560
561 Use is as simple as:
562
563 img_a, img_b = file_path('a.img', 'b.img')
564 sock = file_path('socket')
565 '''
566
567 if not hasattr(file_path_remover, 'paths'):
568 file_path_remover.paths = []
569 atexit.register(file_path_remover)
570
571 paths = []
572 for name in names:
573 filename = file_pattern(name)
574 path = os.path.join(base_dir, filename)
575 file_path_remover.paths.append(path)
576 paths.append(path)
577
578 return paths[0] if len(paths) == 1 else paths
579
580 def remote_filename(path):
581 if imgproto == 'file':
582 return path
583 elif imgproto == 'ssh':
584 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
585 else:
586 raise Exception("Protocol %s not supported" % (imgproto))
587
588 class VM(qtest.QEMUQtestMachine):
589 '''A QEMU VM'''
590
591 def __init__(self, path_suffix=''):
592 name = "qemu%s-%d" % (path_suffix, os.getpid())
593 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
594 if qemu_gdb and qemu_valgrind:
595 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
596 sys.exit(1)
597 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
598 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
599 name=name,
600 base_temp_dir=test_dir,
601 socket_scm_helper=socket_scm_helper,
602 sock_dir=sock_dir, qmp_timer=timer)
603 self._num_drives = 0
604
605 def _post_shutdown(self) -> None:
606 super()._post_shutdown()
607 if not qemu_valgrind or not self._popen:
608 return
609 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
610 if self.exitcode() == 99:
611 with open(valgrind_filename, encoding='utf-8') as f:
612 print(f.read())
613 else:
614 os.remove(valgrind_filename)
615
616 def _pre_launch(self) -> None:
617 super()._pre_launch()
618 if qemu_print:
619 # set QEMU binary output to stdout
620 self._close_qemu_log_file()
621
622 def add_object(self, opts):
623 self._args.append('-object')
624 self._args.append(opts)
625 return self
626
627 def add_device(self, opts):
628 self._args.append('-device')
629 self._args.append(opts)
630 return self
631
632 def add_drive_raw(self, opts):
633 self._args.append('-drive')
634 self._args.append(opts)
635 return self
636
637 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
638 '''Add a virtio-blk drive to the VM'''
639 options = ['if=%s' % interface,
640 'id=drive%d' % self._num_drives]
641
642 if path is not None:
643 options.append('file=%s' % path)
644 options.append('format=%s' % img_format)
645 options.append('cache=%s' % cachemode)
646 options.append('aio=%s' % aiomode)
647
648 if opts:
649 options.append(opts)
650
651 if img_format == 'luks' and 'key-secret' not in opts:
652 # default luks support
653 if luks_default_secret_object not in self._args:
654 self.add_object(luks_default_secret_object)
655
656 options.append(luks_default_key_secret_opt)
657
658 self._args.append('-drive')
659 self._args.append(','.join(options))
660 self._num_drives += 1
661 return self
662
663 def add_blockdev(self, opts):
664 self._args.append('-blockdev')
665 if isinstance(opts, str):
666 self._args.append(opts)
667 else:
668 self._args.append(','.join(opts))
669 return self
670
671 def add_incoming(self, addr):
672 self._args.append('-incoming')
673 self._args.append(addr)
674 return self
675
676 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
677 cmd = 'human-monitor-command'
678 kwargs: Dict[str, Any] = {'command-line': command_line}
679 if use_log:
680 return self.qmp_log(cmd, **kwargs)
681 else:
682 return self.qmp(cmd, **kwargs)
683
684 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
685 """Pause drive r/w operations"""
686 if not event:
687 self.pause_drive(drive, "read_aio")
688 self.pause_drive(drive, "write_aio")
689 return
690 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
691
692 def resume_drive(self, drive: str) -> None:
693 """Resume drive r/w operations"""
694 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
695
696 def hmp_qemu_io(self, drive: str, cmd: str,
697 use_log: bool = False, qdev: bool = False) -> QMPMessage:
698 """Write to a given drive using an HMP command"""
699 d = '-d ' if qdev else ''
700 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
701
702 def flatten_qmp_object(self, obj, output=None, basestr=''):
703 if output is None:
704 output = {}
705 if isinstance(obj, list):
706 for i, item in enumerate(obj):
707 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
708 elif isinstance(obj, dict):
709 for key in obj:
710 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
711 else:
712 output[basestr[:-1]] = obj # Strip trailing '.'
713 return output
714
715 def qmp_to_opts(self, obj):
716 obj = self.flatten_qmp_object(obj)
717 output_list = []
718 for key in obj:
719 output_list += [key + '=' + obj[key]]
720 return ','.join(output_list)
721
722 def get_qmp_events_filtered(self, wait=60.0):
723 result = []
724 for ev in self.get_qmp_events(wait=wait):
725 result.append(filter_qmp_event(ev))
726 return result
727
728 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
729 full_cmd = OrderedDict((
730 ("execute", cmd),
731 ("arguments", ordered_qmp(kwargs))
732 ))
733 log(full_cmd, filters, indent=indent)
734 result = self.qmp(cmd, **kwargs)
735 log(result, filters, indent=indent)
736 return result
737
738 # Returns None on success, and an error string on failure
739 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
740 pre_finalize=None, cancel=False, wait=60.0):
741 """
742 run_job moves a job from creation through to dismissal.
743
744 :param job: String. ID of recently-launched job
745 :param auto_finalize: Bool. True if the job was launched with
746 auto_finalize. Defaults to True.
747 :param auto_dismiss: Bool. True if the job was launched with
748 auto_dismiss=True. Defaults to False.
749 :param pre_finalize: Callback. A callable that takes no arguments to be
750 invoked prior to issuing job-finalize, if any.
751 :param cancel: Bool. When true, cancels the job after the pre_finalize
752 callback.
753 :param wait: Float. Timeout value specifying how long to wait for any
754 event, in seconds. Defaults to 60.0.
755 """
756 match_device = {'data': {'device': job}}
757 match_id = {'data': {'id': job}}
758 events = [
759 ('BLOCK_JOB_COMPLETED', match_device),
760 ('BLOCK_JOB_CANCELLED', match_device),
761 ('BLOCK_JOB_ERROR', match_device),
762 ('BLOCK_JOB_READY', match_device),
763 ('BLOCK_JOB_PENDING', match_id),
764 ('JOB_STATUS_CHANGE', match_id)
765 ]
766 error = None
767 while True:
768 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
769 if ev['event'] != 'JOB_STATUS_CHANGE':
770 log(ev)
771 continue
772 status = ev['data']['status']
773 if status == 'aborting':
774 result = self.qmp('query-jobs')
775 for j in result['return']:
776 if j['id'] == job:
777 error = j['error']
778 log('Job failed: %s' % (j['error']))
779 elif status == 'ready':
780 self.qmp_log('job-complete', id=job)
781 elif status == 'pending' and not auto_finalize:
782 if pre_finalize:
783 pre_finalize()
784 if cancel:
785 self.qmp_log('job-cancel', id=job)
786 else:
787 self.qmp_log('job-finalize', id=job)
788 elif status == 'concluded' and not auto_dismiss:
789 self.qmp_log('job-dismiss', id=job)
790 elif status == 'null':
791 return error
792
793 # Returns None on success, and an error string on failure
794 def blockdev_create(self, options, job_id='job0', filters=None):
795 if filters is None:
796 filters = [filter_qmp_testfiles]
797 result = self.qmp_log('blockdev-create', filters=filters,
798 job_id=job_id, options=options)
799
800 if 'return' in result:
801 assert result['return'] == {}
802 job_result = self.run_job(job_id)
803 else:
804 job_result = result['error']
805
806 log("")
807 return job_result
808
809 def enable_migration_events(self, name):
810 log('Enabling migration QMP events on %s...' % name)
811 log(self.qmp('migrate-set-capabilities', capabilities=[
812 {
813 'capability': 'events',
814 'state': True
815 }
816 ]))
817
818 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
819 while True:
820 event = self.event_wait('MIGRATION')
821 # We use the default timeout, and with a timeout, event_wait()
822 # never returns None
823 assert event
824
825 log(event, filters=[filter_qmp_event])
826 if event['data']['status'] in ('completed', 'failed'):
827 break
828
829 if event['data']['status'] == 'completed':
830 # The event may occur in finish-migrate, so wait for the expected
831 # post-migration runstate
832 runstate = None
833 while runstate != expect_runstate:
834 runstate = self.qmp('query-status')['return']['status']
835 return True
836 else:
837 return False
838
839 def node_info(self, node_name):
840 nodes = self.qmp('query-named-block-nodes')
841 for x in nodes['return']:
842 if x['node-name'] == node_name:
843 return x
844 return None
845
846 def query_bitmaps(self):
847 res = self.qmp("query-named-block-nodes")
848 return {device['node-name']: device['dirty-bitmaps']
849 for device in res['return'] if 'dirty-bitmaps' in device}
850
851 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
852 """
853 get a specific bitmap from the object returned by query_bitmaps.
854 :param recording: If specified, filter results by the specified value.
855 :param bitmaps: If specified, use it instead of call query_bitmaps()
856 """
857 if bitmaps is None:
858 bitmaps = self.query_bitmaps()
859
860 for bitmap in bitmaps[node_name]:
861 if bitmap.get('name', '') == bitmap_name:
862 if recording is None or bitmap.get('recording') == recording:
863 return bitmap
864 return None
865
866 def check_bitmap_status(self, node_name, bitmap_name, fields):
867 ret = self.get_bitmap(node_name, bitmap_name)
868
869 return fields.items() <= ret.items()
870
871 def assert_block_path(self, root, path, expected_node, graph=None):
872 """
873 Check whether the node under the given path in the block graph
874 is @expected_node.
875
876 @root is the node name of the node where the @path is rooted.
877
878 @path is a string that consists of child names separated by
879 slashes. It must begin with a slash.
880
881 Examples for @root + @path:
882 - root="qcow2-node", path="/backing/file"
883 - root="quorum-node", path="/children.2/file"
884
885 Hypothetically, @path could be empty, in which case it would
886 point to @root. However, in practice this case is not useful
887 and hence not allowed.
888
889 @expected_node may be None. (All elements of the path but the
890 leaf must still exist.)
891
892 @graph may be None or the result of an x-debug-query-block-graph
893 call that has already been performed.
894 """
895 if graph is None:
896 graph = self.qmp('x-debug-query-block-graph')['return']
897
898 iter_path = iter(path.split('/'))
899
900 # Must start with a /
901 assert next(iter_path) == ''
902
903 node = next((node for node in graph['nodes'] if node['name'] == root),
904 None)
905
906 # An empty @path is not allowed, so the root node must be present
907 assert node is not None, 'Root node %s not found' % root
908
909 for child_name in iter_path:
910 assert node is not None, 'Cannot follow path %s%s' % (root, path)
911
912 try:
913 node_id = next(edge['child'] for edge in graph['edges']
914 if (edge['parent'] == node['id'] and
915 edge['name'] == child_name))
916
917 node = next(node for node in graph['nodes']
918 if node['id'] == node_id)
919
920 except StopIteration:
921 node = None
922
923 if node is None:
924 assert expected_node is None, \
925 'No node found under %s (but expected %s)' % \
926 (path, expected_node)
927 else:
928 assert node['name'] == expected_node, \
929 'Found node %s under %s (but expected %s)' % \
930 (node['name'], path, expected_node)
931
932 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
933
934 class QMPTestCase(unittest.TestCase):
935 '''Abstract base class for QMP test cases'''
936
937 def __init__(self, *args, **kwargs):
938 super().__init__(*args, **kwargs)
939 # Many users of this class set a VM property we rely on heavily
940 # in the methods below.
941 self.vm = None
942
943 def dictpath(self, d, path):
944 '''Traverse a path in a nested dict'''
945 for component in path.split('/'):
946 m = index_re.match(component)
947 if m:
948 component, idx = m.groups()
949 idx = int(idx)
950
951 if not isinstance(d, dict) or component not in d:
952 self.fail(f'failed path traversal for "{path}" in "{d}"')
953 d = d[component]
954
955 if m:
956 if not isinstance(d, list):
957 self.fail(f'path component "{component}" in "{path}" '
958 f'is not a list in "{d}"')
959 try:
960 d = d[idx]
961 except IndexError:
962 self.fail(f'invalid index "{idx}" in path "{path}" '
963 f'in "{d}"')
964 return d
965
966 def assert_qmp_absent(self, d, path):
967 try:
968 result = self.dictpath(d, path)
969 except AssertionError:
970 return
971 self.fail('path "%s" has value "%s"' % (path, str(result)))
972
973 def assert_qmp(self, d, path, value):
974 '''Assert that the value for a specific path in a QMP dict
975 matches. When given a list of values, assert that any of
976 them matches.'''
977
978 result = self.dictpath(d, path)
979
980 # [] makes no sense as a list of valid values, so treat it as
981 # an actual single value.
982 if isinstance(value, list) and value != []:
983 for v in value:
984 if result == v:
985 return
986 self.fail('no match for "%s" in %s' % (str(result), str(value)))
987 else:
988 self.assertEqual(result, value,
989 '"%s" is "%s", expected "%s"'
990 % (path, str(result), str(value)))
991
992 def assert_no_active_block_jobs(self):
993 result = self.vm.qmp('query-block-jobs')
994 self.assert_qmp(result, 'return', [])
995
996 def assert_has_block_node(self, node_name=None, file_name=None):
997 """Issue a query-named-block-nodes and assert node_name and/or
998 file_name is present in the result"""
999 def check_equal_or_none(a, b):
1000 return a is None or b is None or a == b
1001 assert node_name or file_name
1002 result = self.vm.qmp('query-named-block-nodes')
1003 for x in result["return"]:
1004 if check_equal_or_none(x.get("node-name"), node_name) and \
1005 check_equal_or_none(x.get("file"), file_name):
1006 return
1007 self.fail("Cannot find %s %s in result:\n%s" %
1008 (node_name, file_name, result))
1009
1010 def assert_json_filename_equal(self, json_filename, reference):
1011 '''Asserts that the given filename is a json: filename and that its
1012 content is equal to the given reference object'''
1013 self.assertEqual(json_filename[:5], 'json:')
1014 self.assertEqual(
1015 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1016 self.vm.flatten_qmp_object(reference)
1017 )
1018
1019 def cancel_and_wait(self, drive='drive0', force=False,
1020 resume=False, wait=60.0):
1021 '''Cancel a block job and wait for it to finish, returning the event'''
1022 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1023 self.assert_qmp(result, 'return', {})
1024
1025 if resume:
1026 self.vm.resume_drive(drive)
1027
1028 cancelled = False
1029 result = None
1030 while not cancelled:
1031 for event in self.vm.get_qmp_events(wait=wait):
1032 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1033 event['event'] == 'BLOCK_JOB_CANCELLED':
1034 self.assert_qmp(event, 'data/device', drive)
1035 result = event
1036 cancelled = True
1037 elif event['event'] == 'JOB_STATUS_CHANGE':
1038 self.assert_qmp(event, 'data/id', drive)
1039
1040
1041 self.assert_no_active_block_jobs()
1042 return result
1043
1044 def wait_until_completed(self, drive='drive0', check_offset=True,
1045 wait=60.0, error=None):
1046 '''Wait for a block job to finish, returning the event'''
1047 while True:
1048 for event in self.vm.get_qmp_events(wait=wait):
1049 if event['event'] == 'BLOCK_JOB_COMPLETED':
1050 self.assert_qmp(event, 'data/device', drive)
1051 if error is None:
1052 self.assert_qmp_absent(event, 'data/error')
1053 if check_offset:
1054 self.assert_qmp(event, 'data/offset',
1055 event['data']['len'])
1056 else:
1057 self.assert_qmp(event, 'data/error', error)
1058 self.assert_no_active_block_jobs()
1059 return event
1060 if event['event'] == 'JOB_STATUS_CHANGE':
1061 self.assert_qmp(event, 'data/id', drive)
1062
1063 def wait_ready(self, drive='drive0'):
1064 """Wait until a BLOCK_JOB_READY event, and return the event."""
1065 return self.vm.events_wait([
1066 ('BLOCK_JOB_READY',
1067 {'data': {'type': 'mirror', 'device': drive}}),
1068 ('BLOCK_JOB_READY',
1069 {'data': {'type': 'commit', 'device': drive}})
1070 ])
1071
1072 def wait_ready_and_cancel(self, drive='drive0'):
1073 self.wait_ready(drive=drive)
1074 event = self.cancel_and_wait(drive=drive)
1075 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1076 self.assert_qmp(event, 'data/type', 'mirror')
1077 self.assert_qmp(event, 'data/offset', event['data']['len'])
1078
1079 def complete_and_wait(self, drive='drive0', wait_ready=True,
1080 completion_error=None):
1081 '''Complete a block job and wait for it to finish'''
1082 if wait_ready:
1083 self.wait_ready(drive=drive)
1084
1085 result = self.vm.qmp('block-job-complete', device=drive)
1086 self.assert_qmp(result, 'return', {})
1087
1088 event = self.wait_until_completed(drive=drive, error=completion_error)
1089 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1090
1091 def pause_wait(self, job_id='job0'):
1092 with Timeout(3, "Timeout waiting for job to pause"):
1093 while True:
1094 result = self.vm.qmp('query-block-jobs')
1095 found = False
1096 for job in result['return']:
1097 if job['device'] == job_id:
1098 found = True
1099 if job['paused'] and not job['busy']:
1100 return job
1101 break
1102 assert found
1103
1104 def pause_job(self, job_id='job0', wait=True):
1105 result = self.vm.qmp('block-job-pause', device=job_id)
1106 self.assert_qmp(result, 'return', {})
1107 if wait:
1108 return self.pause_wait(job_id)
1109 return result
1110
1111 def case_skip(self, reason):
1112 '''Skip this test case'''
1113 case_notrun(reason)
1114 self.skipTest(reason)
1115
1116
1117 def notrun(reason):
1118 '''Skip this test suite'''
1119 # Each test in qemu-iotests has a number ("seq")
1120 seq = os.path.basename(sys.argv[0])
1121
1122 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1123 as outfile:
1124 outfile.write(reason + '\n')
1125 logger.warning("%s not run: %s", seq, reason)
1126 sys.exit(0)
1127
1128 def case_notrun(reason):
1129 '''Mark this test case as not having been run (without actually
1130 skipping it, that is left to the caller). See
1131 QMPTestCase.case_skip() for a variant that actually skips the
1132 current test case.'''
1133
1134 # Each test in qemu-iotests has a number ("seq")
1135 seq = os.path.basename(sys.argv[0])
1136
1137 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1138 as outfile:
1139 outfile.write(' [case not run] ' + reason + '\n')
1140
1141 def _verify_image_format(supported_fmts: Sequence[str] = (),
1142 unsupported_fmts: Sequence[str] = ()) -> None:
1143 if 'generic' in supported_fmts and \
1144 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1145 # similar to
1146 # _supported_fmt generic
1147 # for bash tests
1148 supported_fmts = ()
1149
1150 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1151 if not_sup or (imgfmt in unsupported_fmts):
1152 notrun('not suitable for this image format: %s' % imgfmt)
1153
1154 if imgfmt == 'luks':
1155 verify_working_luks()
1156
1157 def _verify_protocol(supported: Sequence[str] = (),
1158 unsupported: Sequence[str] = ()) -> None:
1159 assert not (supported and unsupported)
1160
1161 if 'generic' in supported:
1162 return
1163
1164 not_sup = supported and (imgproto not in supported)
1165 if not_sup or (imgproto in unsupported):
1166 notrun('not suitable for this protocol: %s' % imgproto)
1167
1168 def _verify_platform(supported: Sequence[str] = (),
1169 unsupported: Sequence[str] = ()) -> None:
1170 if any((sys.platform.startswith(x) for x in unsupported)):
1171 notrun('not suitable for this OS: %s' % sys.platform)
1172
1173 if supported:
1174 if not any((sys.platform.startswith(x) for x in supported)):
1175 notrun('not suitable for this OS: %s' % sys.platform)
1176
1177 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1178 if supported_cache_modes and (cachemode not in supported_cache_modes):
1179 notrun('not suitable for this cache mode: %s' % cachemode)
1180
1181 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1182 if supported_aio_modes and (aiomode not in supported_aio_modes):
1183 notrun('not suitable for this aio mode: %s' % aiomode)
1184
1185 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1186 usf_list = list(set(required_formats) - set(supported_formats()))
1187 if usf_list:
1188 notrun(f'formats {usf_list} are not whitelisted')
1189
1190
1191 def _verify_virtio_blk() -> None:
1192 out = qemu_pipe('-M', 'none', '-device', 'help')
1193 if 'virtio-blk' not in out:
1194 notrun('Missing virtio-blk in QEMU binary')
1195
1196 def _verify_virtio_scsi_pci_or_ccw() -> None:
1197 out = qemu_pipe('-M', 'none', '-device', 'help')
1198 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1199 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1200
1201
1202 def supports_quorum():
1203 return 'quorum' in qemu_img_pipe('--help')
1204
1205 def verify_quorum():
1206 '''Skip test suite if quorum support is not available'''
1207 if not supports_quorum():
1208 notrun('quorum support missing')
1209
1210 def has_working_luks() -> Tuple[bool, str]:
1211 """
1212 Check whether our LUKS driver can actually create images
1213 (this extends to LUKS encryption for qcow2).
1214
1215 If not, return the reason why.
1216 """
1217
1218 img_file = f'{test_dir}/luks-test.luks'
1219 (output, status) = \
1220 qemu_img_pipe_and_status('create', '-f', 'luks',
1221 '--object', luks_default_secret_object,
1222 '-o', luks_default_key_secret_opt,
1223 '-o', 'iter-time=10',
1224 img_file, '1G')
1225 try:
1226 os.remove(img_file)
1227 except OSError:
1228 pass
1229
1230 if status != 0:
1231 reason = output
1232 for line in output.splitlines():
1233 if img_file + ':' in line:
1234 reason = line.split(img_file + ':', 1)[1].strip()
1235 break
1236
1237 return (False, reason)
1238 else:
1239 return (True, '')
1240
1241 def verify_working_luks():
1242 """
1243 Skip test suite if LUKS does not work
1244 """
1245 (working, reason) = has_working_luks()
1246 if not working:
1247 notrun(reason)
1248
1249 def qemu_pipe(*args: str) -> str:
1250 """
1251 Run qemu with an option to print something and exit (e.g. a help option).
1252
1253 :return: QEMU's stdout output.
1254 """
1255 full_args = [qemu_prog] + qemu_opts + list(args)
1256 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1257 return output
1258
1259 def supported_formats(read_only=False):
1260 '''Set 'read_only' to True to check ro-whitelist
1261 Otherwise, rw-whitelist is checked'''
1262
1263 if not hasattr(supported_formats, "formats"):
1264 supported_formats.formats = {}
1265
1266 if read_only not in supported_formats.formats:
1267 format_message = qemu_pipe("-drive", "format=help")
1268 line = 1 if read_only else 0
1269 supported_formats.formats[read_only] = \
1270 format_message.splitlines()[line].split(":")[1].split()
1271
1272 return supported_formats.formats[read_only]
1273
1274 def skip_if_unsupported(required_formats=(), read_only=False):
1275 '''Skip Test Decorator
1276 Runs the test if all the required formats are whitelisted'''
1277 def skip_test_decorator(func):
1278 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1279 **kwargs: Dict[str, Any]) -> None:
1280 if callable(required_formats):
1281 fmts = required_formats(test_case)
1282 else:
1283 fmts = required_formats
1284
1285 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1286 if usf_list:
1287 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1288 test_case.case_skip(msg)
1289 else:
1290 func(test_case, *args, **kwargs)
1291 return func_wrapper
1292 return skip_test_decorator
1293
1294 def skip_for_formats(formats: Sequence[str] = ()) \
1295 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1296 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1297 '''Skip Test Decorator
1298 Skips the test for the given formats'''
1299 def skip_test_decorator(func):
1300 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1301 **kwargs: Dict[str, Any]) -> None:
1302 if imgfmt in formats:
1303 msg = f'{test_case}: Skipped for format {imgfmt}'
1304 test_case.case_skip(msg)
1305 else:
1306 func(test_case, *args, **kwargs)
1307 return func_wrapper
1308 return skip_test_decorator
1309
1310 def skip_if_user_is_root(func):
1311 '''Skip Test Decorator
1312 Runs the test only without root permissions'''
1313 def func_wrapper(*args, **kwargs):
1314 if os.getuid() == 0:
1315 case_notrun('{}: cannot be run as root'.format(args[0]))
1316 return None
1317 else:
1318 return func(*args, **kwargs)
1319 return func_wrapper
1320
1321 # We need to filter out the time taken from the output so that
1322 # qemu-iotest can reliably diff the results against master output,
1323 # and hide skipped tests from the reference output.
1324
1325 class ReproducibleTestResult(unittest.TextTestResult):
1326 def addSkip(self, test, reason):
1327 # Same as TextTestResult, but print dot instead of "s"
1328 unittest.TestResult.addSkip(self, test, reason)
1329 if self.showAll:
1330 self.stream.writeln("skipped {0!r}".format(reason))
1331 elif self.dots:
1332 self.stream.write(".")
1333 self.stream.flush()
1334
1335 class ReproducibleStreamWrapper:
1336 def __init__(self, stream: TextIO):
1337 self.stream = stream
1338
1339 def __getattr__(self, attr):
1340 if attr in ('stream', '__getstate__'):
1341 raise AttributeError(attr)
1342 return getattr(self.stream, attr)
1343
1344 def write(self, arg=None):
1345 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1346 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1347 self.stream.write(arg)
1348
1349 class ReproducibleTestRunner(unittest.TextTestRunner):
1350 def __init__(self, stream: Optional[TextIO] = None,
1351 resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1352 **kwargs: Any) -> None:
1353 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1354 super().__init__(stream=rstream, # type: ignore
1355 descriptions=True,
1356 resultclass=resultclass,
1357 **kwargs)
1358
1359 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1360 """Executes unittests within the calling module."""
1361
1362 # Some tests have warnings, especially ResourceWarnings for unclosed
1363 # files and sockets. Ignore them for now to ensure reproducibility of
1364 # the test output.
1365 unittest.main(argv=argv,
1366 testRunner=ReproducibleTestRunner,
1367 verbosity=2 if debug else 1,
1368 warnings=None if sys.warnoptions else 'ignore')
1369
1370 def execute_setup_common(supported_fmts: Sequence[str] = (),
1371 supported_platforms: Sequence[str] = (),
1372 supported_cache_modes: Sequence[str] = (),
1373 supported_aio_modes: Sequence[str] = (),
1374 unsupported_fmts: Sequence[str] = (),
1375 supported_protocols: Sequence[str] = (),
1376 unsupported_protocols: Sequence[str] = (),
1377 required_fmts: Sequence[str] = ()) -> bool:
1378 """
1379 Perform necessary setup for either script-style or unittest-style tests.
1380
1381 :return: Bool; Whether or not debug mode has been requested via the CLI.
1382 """
1383 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1384
1385 debug = '-d' in sys.argv
1386 if debug:
1387 sys.argv.remove('-d')
1388 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1389
1390 _verify_image_format(supported_fmts, unsupported_fmts)
1391 _verify_protocol(supported_protocols, unsupported_protocols)
1392 _verify_platform(supported=supported_platforms)
1393 _verify_cache_mode(supported_cache_modes)
1394 _verify_aio_mode(supported_aio_modes)
1395 _verify_formats(required_fmts)
1396 _verify_virtio_blk()
1397
1398 return debug
1399
1400 def execute_test(*args, test_function=None, **kwargs):
1401 """Run either unittest or script-style tests."""
1402
1403 debug = execute_setup_common(*args, **kwargs)
1404 if not test_function:
1405 execute_unittest(sys.argv, debug)
1406 else:
1407 test_function()
1408
1409 def activate_logging():
1410 """Activate iotests.log() output to stdout for script-style tests."""
1411 handler = logging.StreamHandler(stream=sys.stdout)
1412 formatter = logging.Formatter('%(message)s')
1413 handler.setFormatter(formatter)
1414 test_logger.addHandler(handler)
1415 test_logger.setLevel(logging.INFO)
1416 test_logger.propagate = False
1417
1418 # This is called from script-style iotests without a single point of entry
1419 def script_initialize(*args, **kwargs):
1420 """Initialize script-style tests without running any tests."""
1421 activate_logging()
1422 execute_setup_common(*args, **kwargs)
1423
1424 # This is called from script-style iotests with a single point of entry
1425 def script_main(test_function, *args, **kwargs):
1426 """Run script-style tests outside of the unittest framework"""
1427 activate_logging()
1428 execute_test(*args, test_function=test_function, **kwargs)
1429
1430 # This is called from unittest style iotests
1431 def main(*args, **kwargs):
1432 """Run tests using the unittest framework"""
1433 execute_test(*args, **kwargs)