1 # Common utilities and Python wrappers for qemu-iotests
3 # Copyright (C) 2012 IBM Corp.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
22 from collections
import OrderedDict
34 from typing
import (Any
, Callable
, Dict
, Iterable
, Iterator
,
35 List
, Optional
, Sequence
, TextIO
, Tuple
, Type
, TypeVar
)
38 from contextlib
import contextmanager
40 from qemu
.machine
import qtest
41 from qemu
.qmp
.legacy
import QMPMessage
, QEMUMonitorProtocol
42 from qemu
.utils
import VerboseProcessError
44 # Use this logger for logging messages directly from the iotests module
45 logger
= logging
.getLogger('qemu.iotests')
46 logger
.addHandler(logging
.NullHandler())
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger
= logging
.getLogger('qemu.iotests.diff_io')
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args
= [os
.environ
.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os
.environ
.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args
+= os
.environ
['QEMU_IMG_OPTIONS'].strip().split(' ')
60 qemu_io_args
= [os
.environ
.get('QEMU_IO_PROG', 'qemu-io')]
61 if os
.environ
.get('QEMU_IO_OPTIONS'):
62 qemu_io_args
+= os
.environ
['QEMU_IO_OPTIONS'].strip().split(' ')
64 qemu_io_args_no_fmt
= [os
.environ
.get('QEMU_IO_PROG', 'qemu-io')]
65 if os
.environ
.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt
+= \
67 os
.environ
['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
69 qemu_nbd_prog
= os
.environ
.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args
= [qemu_nbd_prog
]
71 if os
.environ
.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args
+= os
.environ
['QEMU_NBD_OPTIONS'].strip().split(' ')
74 qemu_prog
= os
.environ
.get('QEMU_PROG', 'qemu')
75 qemu_opts
= os
.environ
.get('QEMU_OPTIONS', '').strip().split(' ')
77 qsd_prog
= os
.environ
.get('QSD_PROG', 'qemu-storage-daemon')
79 gdb_qemu_env
= os
.environ
.get('GDB_OPTIONS')
82 qemu_gdb
= ['gdbserver'] + gdb_qemu_env
.strip().split(' ')
84 qemu_print
= os
.environ
.get('PRINT_QEMU', False)
86 imgfmt
= os
.environ
.get('IMGFMT', 'raw')
87 imgproto
= os
.environ
.get('IMGPROTO', 'file')
90 test_dir
= os
.environ
['TEST_DIR']
91 sock_dir
= os
.environ
['SOCK_DIR']
92 cachemode
= os
.environ
['CACHEMODE']
93 aiomode
= os
.environ
['AIOMODE']
94 qemu_default_machine
= os
.environ
['QEMU_DEFAULT_MACHINE']
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys
.stderr
.write('Please run this test via the "check" script\n')
100 sys
.exit(os
.EX_USAGE
)
103 if os
.environ
.get('VALGRIND_QEMU') == "y" and \
104 os
.environ
.get('NO_VALGRIND') != "y":
105 valgrind_logfile
= "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
109 valgrind_logfile
+= "/%p.valgrind"
111 qemu_valgrind
= ['valgrind', valgrind_logfile
, '--error-exitcode=99']
113 luks_default_secret_object
= 'secret,id=keysec0,data=' + \
114 os
.environ
.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt
= 'key-secret=keysec0'
117 sample_img_dir
= os
.environ
['SAMPLE_IMG_DIR']
121 def change_log_level(
122 logger_name
: str, level
: int = logging
.CRITICAL
) -> Iterator
[None]:
124 Utility function for temporarily changing the log level of a logger.
126 This can be used to silence errors that are expected or uninteresting.
128 _logger
= logging
.getLogger(logger_name
)
129 current_level
= _logger
.level
130 _logger
.setLevel(level
)
135 _logger
.setLevel(current_level
)
138 def unarchive_sample_image(sample
, fname
):
139 sample_fname
= os
.path
.join(sample_img_dir
, sample
+ '.bz2')
140 with bz2
.open(sample_fname
) as f_in
, open(fname
, 'wb') as f_out
:
141 shutil
.copyfileobj(f_in
, f_out
)
144 def qemu_tool_popen(args
: Sequence
[str],
145 connect_stderr
: bool = True) -> 'subprocess.Popen[str]':
146 stderr
= subprocess
.STDOUT
if connect_stderr
else None
147 # pylint: disable=consider-using-with
148 return subprocess
.Popen(args
,
149 stdout
=subprocess
.PIPE
,
151 universal_newlines
=True)
154 def qemu_tool_pipe_and_status(tool
: str, args
: Sequence
[str],
155 connect_stderr
: bool = True,
156 drop_successful_output
: bool = False) \
159 Run a tool and return both its output and its exit code
161 with
qemu_tool_popen(args
, connect_stderr
) as subp
:
162 output
= subp
.communicate()[0]
163 if subp
.returncode
< 0:
165 sys
.stderr
.write(f
'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output
and subp
.returncode
== 0:
169 return (output
, subp
.returncode
)
171 def qemu_img_create_prepare_args(args
: List
[str]) -> List
[str]:
172 if not args
or args
[0] != 'create':
176 p
= argparse
.ArgumentParser(allow_abbrev
=False)
177 # -o option may be specified several times
178 p
.add_argument('-o', action
='append', default
=[])
180 parsed
, remaining
= p
.parse_known_args(args
)
185 if parsed
.f
is not None:
186 result
+= ['-f', parsed
.f
]
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts
= os
.environ
.get('IMGOPTS')
193 if imgopts
and parsed
.f
== imgfmt
:
194 opts_list
.insert(0, imgopts
)
196 # default luks support
197 if parsed
.f
== 'luks' and \
198 all('key-secret' not in opts
for opts
in opts_list
):
199 result
+= ['--object', luks_default_secret_object
]
200 opts_list
.append(luks_default_key_secret_opt
)
202 for opts
in opts_list
:
203 result
+= ['-o', opts
]
210 def qemu_tool(*args
: str, check
: bool = True, combine_stdio
: bool = True
211 ) -> 'subprocess.CompletedProcess[str]':
213 Run a qemu tool and return its status code and console output.
215 :param args: full command line to run.
216 :param check: Enforce a return code of zero.
217 :param combine_stdio: set to False to keep stdout/stderr separated.
219 :raise VerboseProcessError:
220 When the return code is negative, or on any non-zero exit code
221 when 'check=True' was provided (the default). This exception has
222 'stdout', 'stderr', and 'returncode' properties that may be
223 inspected to show greater detail. If this exception is not
224 handled, the command-line, return code, and all console output
225 will be included at the bottom of the stack trace.
228 a CompletedProcess. This object has args, returncode, and stdout
229 properties. If streams are not combined, it will also have a
232 subp
= subprocess
.run(
234 stdout
=subprocess
.PIPE
,
235 stderr
=subprocess
.STDOUT
if combine_stdio
else subprocess
.PIPE
,
236 universal_newlines
=True,
240 if check
and subp
.returncode
or (subp
.returncode
< 0):
241 raise VerboseProcessError(
242 subp
.returncode
, args
,
250 def qemu_img(*args
: str, check
: bool = True, combine_stdio
: bool = True
251 ) -> 'subprocess.CompletedProcess[str]':
253 Run QEMU_IMG_PROG and return its status code and console output.
255 This function always prepends QEMU_IMG_OPTIONS and may further alter
256 the args for 'create' commands.
258 See `qemu_tool()` for greater detail.
260 full_args
= qemu_img_args
+ qemu_img_create_prepare_args(list(args
))
261 return qemu_tool(*full_args
, check
=check
, combine_stdio
=combine_stdio
)
264 def ordered_qmp(qmsg
, conv_keys
=True):
265 # Dictionaries are not ordered prior to 3.6, therefore:
266 if isinstance(qmsg
, list):
267 return [ordered_qmp(atom
) for atom
in qmsg
]
268 if isinstance(qmsg
, dict):
270 for k
, v
in sorted(qmsg
.items()):
272 k
= k
.replace('_', '-')
273 od
[k
] = ordered_qmp(v
, conv_keys
=False)
277 def qemu_img_create(*args
: str) -> 'subprocess.CompletedProcess[str]':
278 return qemu_img('create', *args
)
280 def qemu_img_json(*args
: str) -> Any
:
282 Run qemu-img and return its output as deserialized JSON.
284 :raise CalledProcessError:
285 When qemu-img crashes, or returns a non-zero exit code without
286 producing a valid JSON document to stdout.
287 :raise JSONDecoderError:
288 When qemu-img returns 0, but failed to produce a valid JSON document.
290 :return: A deserialized JSON object; probably a dict[str, Any].
293 res
= qemu_img(*args
, combine_stdio
=False)
294 except subprocess
.CalledProcessError
as exc
:
295 # Terminated due to signal. Don't bother.
296 if exc
.returncode
< 0:
299 # Commands like 'check' can return failure (exit codes 2 and 3)
300 # to indicate command completion, but with errors found. For
301 # multi-command flexibility, ignore the exact error codes and
302 # *try* to load JSON.
304 return json
.loads(exc
.stdout
)
305 except json
.JSONDecodeError
:
306 # Nope. This thing is toast. Raise the /process/ error.
310 return json
.loads(res
.stdout
)
312 def qemu_img_measure(*args
: str) -> Any
:
313 return qemu_img_json("measure", "--output", "json", *args
)
315 def qemu_img_check(*args
: str) -> Any
:
316 return qemu_img_json("check", "--output", "json", *args
)
318 def qemu_img_info(*args
: str) -> Any
:
319 return qemu_img_json('info', "--output", "json", *args
)
321 def qemu_img_map(*args
: str) -> Any
:
322 return qemu_img_json('map', "--output", "json", *args
)
324 def qemu_img_log(*args
: str, check
: bool = True
325 ) -> 'subprocess.CompletedProcess[str]':
326 result
= qemu_img(*args
, check
=check
)
327 log(result
.stdout
, filters
=[filter_testfiles
])
330 def img_info_log(filename
: str, filter_path
: Optional
[str] = None,
331 use_image_opts
: bool = False, extra_args
: Sequence
[str] = (),
332 check
: bool = True, drop_child_info
: bool = True,
336 args
.append('--image-opts')
338 args
+= ['-f', imgfmt
]
340 args
.append(filename
)
342 output
= qemu_img(*args
, check
=check
).stdout
344 filter_path
= filename
345 log(filter_img_info(output
, filter_path
, drop_child_info
))
347 def qemu_io_wrap_args(args
: Sequence
[str]) -> List
[str]:
348 if '-f' in args
or '--image-opts' in args
:
349 return qemu_io_args_no_fmt
+ list(args
)
351 return qemu_io_args
+ list(args
)
353 def qemu_io_popen(*args
):
354 return qemu_tool_popen(qemu_io_wrap_args(args
))
356 def qemu_io(*args
: str, check
: bool = True, combine_stdio
: bool = True
357 ) -> 'subprocess.CompletedProcess[str]':
359 Run QEMU_IO_PROG and return the status code and console output.
361 This function always prepends either QEMU_IO_OPTIONS or
362 QEMU_IO_OPTIONS_NO_FMT.
364 return qemu_tool(*qemu_io_wrap_args(args
),
365 check
=check
, combine_stdio
=combine_stdio
)
367 def qemu_io_log(*args
: str, check
: bool = True
368 ) -> 'subprocess.CompletedProcess[str]':
369 result
= qemu_io(*args
, check
=check
)
370 log(result
.stdout
, filters
=[filter_testfiles
, filter_qemu_io
])
373 class QemuIoInteractive
:
374 def __init__(self
, *args
):
375 self
.args
= qemu_io_wrap_args(args
)
376 # We need to keep the Popen objext around, and not
377 # close it immediately. Therefore, disable the pylint check:
378 # pylint: disable=consider-using-with
379 self
._p
= subprocess
.Popen(self
.args
, stdin
=subprocess
.PIPE
,
380 stdout
=subprocess
.PIPE
,
381 stderr
=subprocess
.STDOUT
,
382 universal_newlines
=True)
383 out
= self
._p
.stdout
.read(9)
384 if out
!= 'qemu-io> ':
385 # Most probably qemu-io just failed to start.
386 # Let's collect the whole output and exit.
387 out
+= self
._p
.stdout
.read()
388 self
._p
.wait(timeout
=1)
389 raise ValueError(out
)
392 self
._p
.communicate('q\n')
394 def _read_output(self
):
395 pattern
= 'qemu-io> '
400 c
= self
._p
.stdout
.read(1)
401 # check unexpected EOF
404 if c
== pattern
[pos
]:
409 return ''.join(s
[:-n
])
412 # quit command is in close(), '\n' is added automatically
413 assert '\n' not in cmd
415 assert cmd
not in ('q', 'quit')
416 self
._p
.stdin
.write(cmd
+ '\n')
417 self
._p
.stdin
.flush()
418 return self
._read
_output
()
421 class QemuStorageDaemon
:
422 _qmp
: Optional
[QEMUMonitorProtocol
] = None
423 _qmpsock
: Optional
[str] = None
424 # Python < 3.8 would complain if this type were not a string literal
425 # (importing `annotations` from `__future__` would work; but not on <= 3.6)
426 _p
: 'Optional[subprocess.Popen[bytes]]' = None
428 def __init__(self
, *args
: str, instance_id
: str = 'a', qmp
: bool = False):
429 assert '--pidfile' not in args
430 self
.pidfile
= os
.path
.join(test_dir
, f
'qsd-{instance_id}-pid')
431 all_args
= [qsd_prog
] + list(args
) + ['--pidfile', self
.pidfile
]
434 self
._qmpsock
= os
.path
.join(sock_dir
, f
'qsd-{instance_id}.sock')
435 all_args
+= ['--chardev',
436 f
'socket,id=qmp-sock,path={self._qmpsock}',
437 '--monitor', 'qmp-sock']
439 self
._qmp
= QEMUMonitorProtocol(self
._qmpsock
, server
=True)
441 # Cannot use with here, we want the subprocess to stay around
442 # pylint: disable=consider-using-with
443 self
._p
= subprocess
.Popen(all_args
)
444 if self
._qmp
is not None:
446 while not os
.path
.exists(self
.pidfile
):
447 if self
._p
.poll() is not None:
448 cmd
= ' '.join(all_args
)
450 'qemu-storage-daemon terminated with exit code ' +
451 f
'{self._p.returncode}: {cmd}')
455 with
open(self
.pidfile
, encoding
='utf-8') as f
:
456 self
._pid
= int(f
.read().strip())
458 assert self
._pid
== self
._p
.pid
460 def qmp(self
, cmd
: str, args
: Optional
[Dict
[str, object]] = None) \
462 assert self
._qmp
is not None
463 return self
._qmp
.cmd(cmd
, args
)
465 def get_qmp(self
) -> QEMUMonitorProtocol
:
466 assert self
._qmp
is not None
469 def stop(self
, kill_signal
=15):
470 self
._p
.send_signal(kill_signal
)
477 if self
._qmpsock
is not None:
479 os
.remove(self
._qmpsock
)
483 os
.remove(self
.pidfile
)
488 if self
._p
is not None:
489 self
.stop(kill_signal
=9)
493 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
494 return subprocess
.call(qemu_nbd_args
+ ['--fork'] + list(args
))
496 def qemu_nbd_early_pipe(*args
: str) -> Tuple
[int, str]:
497 '''Run qemu-nbd in daemon mode and return both the parent's exit code
498 and its output in case of an error'''
499 full_args
= qemu_nbd_args
+ ['--fork'] + list(args
)
500 output
, returncode
= qemu_tool_pipe_and_status('qemu-nbd', full_args
,
501 connect_stderr
=False)
502 return returncode
, output
if returncode
else ''
504 def qemu_nbd_list_log(*args
: str) -> str:
505 '''Run qemu-nbd to list remote exports'''
506 full_args
= [qemu_nbd_prog
, '-L'] + list(args
)
507 output
, _
= qemu_tool_pipe_and_status('qemu-nbd', full_args
)
508 log(output
, filters
=[filter_testfiles
, filter_nbd_exports
])
512 def qemu_nbd_popen(*args
):
513 '''Context manager running qemu-nbd within the context'''
514 pid_file
= file_path("qemu_nbd_popen-nbd-pid-file")
516 assert not os
.path
.exists(pid_file
)
518 cmd
= list(qemu_nbd_args
)
519 cmd
.extend(('--persistent', '--pid-file', pid_file
))
522 log('Start NBD server')
523 with subprocess
.Popen(cmd
) as p
:
525 while not os
.path
.exists(pid_file
):
526 if p
.poll() is not None:
528 "qemu-nbd terminated with exit code {}: {}"
529 .format(p
.returncode
, ' '.join(cmd
)))
534 if os
.path
.exists(pid_file
):
536 log('Kill NBD server')
540 def compare_images(img1
: str, img2
: str,
541 fmt1
: str = imgfmt
, fmt2
: str = imgfmt
) -> bool:
543 Compare two images with QEMU_IMG; return True if they are identical.
545 :raise CalledProcessError:
546 when qemu-img crashes or returns a status code of anything other
547 than 0 (identical) or 1 (different).
550 qemu_img('compare', '-f', fmt1
, '-F', fmt2
, img1
, img2
)
552 except subprocess
.CalledProcessError
as exc
:
553 if exc
.returncode
== 1:
557 def create_image(name
, size
):
558 '''Create a fully-allocated raw image with sector markers'''
559 with
open(name
, 'wb') as file:
562 sector
= struct
.pack('>l504xl', i
// 512, i
// 512)
566 def image_size(img
: str) -> int:
567 """Return image's virtual size"""
568 value
= qemu_img_info('-f', imgfmt
, img
)['virtual-size']
569 if not isinstance(value
, int):
570 type_name
= type(value
).__name
__
571 raise TypeError("Expected 'int' for 'virtual-size', "
572 f
"got '{value}' of type '{type_name}'")
576 return isinstance(val
, str)
578 test_dir_re
= re
.compile(r
"%s" % test_dir
)
579 def filter_test_dir(msg
):
580 return test_dir_re
.sub("TEST_DIR", msg
)
582 win32_re
= re
.compile(r
"\r")
583 def filter_win32(msg
):
584 return win32_re
.sub("", msg
)
586 qemu_io_re
= re
.compile(r
"[0-9]* ops; [0-9\/:. sec]* "
587 r
"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
588 r
"and [0-9\/.inf]* ops\/sec\)")
589 def filter_qemu_io(msg
):
590 msg
= filter_win32(msg
)
591 return qemu_io_re
.sub("X ops; XX:XX:XX.X "
592 "(XXX YYY/sec and XXX ops/sec)", msg
)
594 chown_re
= re
.compile(r
"chown [0-9]+:[0-9]+")
595 def filter_chown(msg
):
596 return chown_re
.sub("chown UID:GID", msg
)
598 def filter_qmp_event(event
):
599 '''Filter a QMP event dict'''
601 if 'timestamp' in event
:
602 event
['timestamp']['seconds'] = 'SECS'
603 event
['timestamp']['microseconds'] = 'USECS'
606 def filter_qmp(qmsg
, filter_fn
):
607 '''Given a string filter, filter a QMP object's values.
608 filter_fn takes a (key, value) pair.'''
609 # Iterate through either lists or dicts;
610 if isinstance(qmsg
, list):
611 items
= enumerate(qmsg
)
612 elif isinstance(qmsg
, dict):
615 return filter_fn(None, qmsg
)
618 if isinstance(v
, (dict, list)):
619 qmsg
[k
] = filter_qmp(v
, filter_fn
)
621 qmsg
[k
] = filter_fn(k
, v
)
624 def filter_testfiles(msg
):
625 pref1
= os
.path
.join(test_dir
, "%s-" % (os
.getpid()))
626 pref2
= os
.path
.join(sock_dir
, "%s-" % (os
.getpid()))
627 return msg
.replace(pref1
, 'TEST_DIR/PID-').replace(pref2
, 'SOCK_DIR/PID-')
629 def filter_qmp_testfiles(qmsg
):
630 def _filter(_key
, value
):
632 return filter_testfiles(value
)
634 return filter_qmp(qmsg
, _filter
)
636 def filter_virtio_scsi(output
: str) -> str:
637 return re
.sub(r
'(virtio-scsi)-(ccw|pci)', r
'\1', output
)
639 def filter_qmp_virtio_scsi(qmsg
):
640 def _filter(_key
, value
):
642 return filter_virtio_scsi(value
)
644 return filter_qmp(qmsg
, _filter
)
646 def filter_generated_node_ids(msg
):
647 return re
.sub("#block[0-9]+", "NODE_NAME", msg
)
649 def filter_img_info(output
: str, filename
: str,
650 drop_child_info
: bool = True) -> str:
652 drop_indented
= False
653 for line
in output
.split('\n'):
654 if 'disk size' in line
or 'actual-size' in line
:
657 # Drop child node info
659 if line
.startswith(' '):
661 drop_indented
= False
662 if drop_child_info
and "Child node '/" in line
:
666 line
= line
.replace(filename
, 'TEST_IMG')
667 line
= filter_testfiles(line
)
668 line
= line
.replace(imgfmt
, 'IMGFMT')
669 line
= re
.sub('iters: [0-9]+', 'iters: XXX', line
)
670 line
= re
.sub('uuid: [-a-f0-9]+',
671 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
673 line
= re
.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line
)
674 line
= re
.sub('(compression type: )(zlib|zstd)', r
'\1COMPRESSION_TYPE',
677 return '\n'.join(lines
)
679 def filter_imgfmt(msg
):
680 return msg
.replace(imgfmt
, 'IMGFMT')
682 def filter_qmp_imgfmt(qmsg
):
683 def _filter(_key
, value
):
685 return filter_imgfmt(value
)
687 return filter_qmp(qmsg
, _filter
)
689 def filter_nbd_exports(output
: str) -> str:
690 return re
.sub(r
'((min|opt|max) block): [0-9]+', r
'\1: XXX', output
)
693 Msg
= TypeVar('Msg', Dict
[str, Any
], List
[Any
], str)
696 filters
: Iterable
[Callable
[[Msg
], Msg
]] = (),
697 indent
: Optional
[int] = None) -> None:
699 Logs either a string message or a JSON serializable message (like QMP).
700 If indent is provided, JSON serializable messages are pretty-printed.
704 if isinstance(msg
, (dict, list)):
705 # Don't sort if it's already sorted
706 do_sort
= not isinstance(msg
, OrderedDict
)
707 test_logger
.info(json
.dumps(msg
, sort_keys
=do_sort
, indent
=indent
))
709 test_logger
.info(msg
)
712 def __init__(self
, seconds
, errmsg
="Timeout"):
713 self
.seconds
= seconds
716 if qemu_gdb
or qemu_valgrind
:
718 signal
.signal(signal
.SIGALRM
, self
.timeout
)
719 signal
.setitimer(signal
.ITIMER_REAL
, self
.seconds
)
721 def __exit__(self
, exc_type
, value
, traceback
):
722 if qemu_gdb
or qemu_valgrind
:
724 signal
.setitimer(signal
.ITIMER_REAL
, 0)
726 def timeout(self
, signum
, frame
):
727 raise TimeoutError(self
.errmsg
)
729 def file_pattern(name
):
730 return "{0}-{1}".format(os
.getpid(), name
)
734 Context manager generating multiple file names. The generated files are
735 removed when exiting the context.
739 with FilePath('a.img', 'b.img') as (img_a, img_b):
740 # Use img_a and img_b here...
742 # a.img and b.img are automatically removed here.
744 By default images are created in iotests.test_dir. To create sockets use
747 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
749 For convenience, calling with one argument yields a single file instead of
750 a tuple with one item.
753 def __init__(self
, *names
, base_dir
=test_dir
):
754 self
.paths
= [os
.path
.join(base_dir
, file_pattern(name
))
758 if len(self
.paths
) == 1:
763 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
764 for path
in self
.paths
:
778 def file_path_remover():
779 for path
in reversed(file_path_remover
.paths
):
783 def file_path(*names
, base_dir
=test_dir
):
784 ''' Another way to get auto-generated filename that cleans itself up.
788 img_a, img_b = file_path('a.img', 'b.img')
789 sock = file_path('socket')
792 if not hasattr(file_path_remover
, 'paths'):
793 file_path_remover
.paths
= []
794 atexit
.register(file_path_remover
)
798 filename
= file_pattern(name
)
799 path
= os
.path
.join(base_dir
, filename
)
800 file_path_remover
.paths
.append(path
)
803 return paths
[0] if len(paths
) == 1 else paths
805 def remote_filename(path
):
806 if imgproto
== 'file':
808 elif imgproto
== 'ssh':
809 return "ssh://%s@127.0.0.1:22%s" % (os
.environ
.get('USER'), path
)
811 raise ValueError("Protocol %s not supported" % (imgproto
))
813 class VM(qtest
.QEMUQtestMachine
):
816 def __init__(self
, path_suffix
=''):
817 name
= "qemu%s-%d" % (path_suffix
, os
.getpid())
818 timer
= 15.0 if not (qemu_gdb
or qemu_valgrind
) else None
819 if qemu_gdb
and qemu_valgrind
:
820 sys
.stderr
.write('gdb and valgrind are mutually exclusive\n')
822 wrapper
= qemu_gdb
if qemu_gdb
else qemu_valgrind
823 super().__init
__(qemu_prog
, qemu_opts
, wrapper
=wrapper
,
825 base_temp_dir
=test_dir
,
826 sock_dir
=sock_dir
, qmp_timer
=timer
)
829 def _post_shutdown(self
) -> None:
830 super()._post
_shutdown
()
831 if not qemu_valgrind
or not self
._popen
:
833 valgrind_filename
= f
"{test_dir}/{self._popen.pid}.valgrind"
834 if self
.exitcode() == 99:
835 with
open(valgrind_filename
, encoding
='utf-8') as f
:
838 os
.remove(valgrind_filename
)
840 def _pre_launch(self
) -> None:
841 super()._pre
_launch
()
843 # set QEMU binary output to stdout
844 self
._close
_qemu
_log
_file
()
846 def add_object(self
, opts
):
847 self
._args
.append('-object')
848 self
._args
.append(opts
)
851 def add_device(self
, opts
):
852 self
._args
.append('-device')
853 self
._args
.append(opts
)
856 def add_drive_raw(self
, opts
):
857 self
._args
.append('-drive')
858 self
._args
.append(opts
)
861 def add_drive(self
, path
, opts
='', interface
='virtio', img_format
=imgfmt
):
862 '''Add a virtio-blk drive to the VM'''
863 options
= ['if=%s' % interface
,
864 'id=drive%d' % self
._num
_drives
]
867 options
.append('file=%s' % path
)
868 options
.append('format=%s' % img_format
)
869 options
.append('cache=%s' % cachemode
)
870 options
.append('aio=%s' % aiomode
)
875 if img_format
== 'luks' and 'key-secret' not in opts
:
876 # default luks support
877 if luks_default_secret_object
not in self
._args
:
878 self
.add_object(luks_default_secret_object
)
880 options
.append(luks_default_key_secret_opt
)
882 self
._args
.append('-drive')
883 self
._args
.append(','.join(options
))
884 self
._num
_drives
+= 1
887 def add_blockdev(self
, opts
):
888 self
._args
.append('-blockdev')
889 if isinstance(opts
, str):
890 self
._args
.append(opts
)
892 self
._args
.append(','.join(opts
))
895 def add_incoming(self
, addr
):
896 self
._args
.append('-incoming')
897 self
._args
.append(addr
)
900 def hmp(self
, command_line
: str, use_log
: bool = False) -> QMPMessage
:
901 cmd
= 'human-monitor-command'
902 kwargs
: Dict
[str, Any
] = {'command-line': command_line
}
904 return self
.qmp_log(cmd
, **kwargs
)
906 return self
.qmp(cmd
, **kwargs
)
908 def pause_drive(self
, drive
: str, event
: Optional
[str] = None) -> None:
909 """Pause drive r/w operations"""
911 self
.pause_drive(drive
, "read_aio")
912 self
.pause_drive(drive
, "write_aio")
914 self
.hmp(f
'qemu-io {drive} "break {event} bp_{drive}"')
916 def resume_drive(self
, drive
: str) -> None:
917 """Resume drive r/w operations"""
918 self
.hmp(f
'qemu-io {drive} "remove_break bp_{drive}"')
920 def hmp_qemu_io(self
, drive
: str, cmd
: str,
921 use_log
: bool = False, qdev
: bool = False) -> QMPMessage
:
922 """Write to a given drive using an HMP command"""
923 d
= '-d ' if qdev
else ''
924 return self
.hmp(f
'qemu-io {d}{drive} "{cmd}"', use_log
=use_log
)
926 def flatten_qmp_object(self
, obj
, output
=None, basestr
=''):
929 if isinstance(obj
, list):
930 for i
, item
in enumerate(obj
):
931 self
.flatten_qmp_object(item
, output
, basestr
+ str(i
) + '.')
932 elif isinstance(obj
, dict):
934 self
.flatten_qmp_object(obj
[key
], output
, basestr
+ key
+ '.')
936 output
[basestr
[:-1]] = obj
# Strip trailing '.'
939 def qmp_to_opts(self
, obj
):
940 obj
= self
.flatten_qmp_object(obj
)
943 output_list
+= [key
+ '=' + obj
[key
]]
944 return ','.join(output_list
)
946 def get_qmp_events_filtered(self
, wait
=60.0):
948 for ev
in self
.get_qmp_events(wait
=wait
):
949 result
.append(filter_qmp_event(ev
))
952 def qmp_log(self
, cmd
, filters
=(), indent
=None, **kwargs
):
953 full_cmd
= OrderedDict((
955 ("arguments", ordered_qmp(kwargs
))
957 log(full_cmd
, filters
, indent
=indent
)
958 result
= self
.qmp(cmd
, **kwargs
)
959 log(result
, filters
, indent
=indent
)
962 # Returns None on success, and an error string on failure
963 def run_job(self
, job
: str, auto_finalize
: bool = True,
964 auto_dismiss
: bool = False,
965 pre_finalize
: Optional
[Callable
[[], None]] = None,
966 cancel
: bool = False, wait
: float = 60.0,
967 filters
: Iterable
[Callable
[[Any
], Any
]] = (),
970 run_job moves a job from creation through to dismissal.
972 :param job: String. ID of recently-launched job
973 :param auto_finalize: Bool. True if the job was launched with
974 auto_finalize. Defaults to True.
975 :param auto_dismiss: Bool. True if the job was launched with
976 auto_dismiss=True. Defaults to False.
977 :param pre_finalize: Callback. A callable that takes no arguments to be
978 invoked prior to issuing job-finalize, if any.
979 :param cancel: Bool. When true, cancels the job after the pre_finalize
981 :param wait: Float. Timeout value specifying how long to wait for any
982 event, in seconds. Defaults to 60.0.
984 match_device
= {'data': {'device': job
}}
985 match_id
= {'data': {'id': job
}}
987 ('BLOCK_JOB_COMPLETED', match_device
),
988 ('BLOCK_JOB_CANCELLED', match_device
),
989 ('BLOCK_JOB_ERROR', match_device
),
990 ('BLOCK_JOB_READY', match_device
),
991 ('BLOCK_JOB_PENDING', match_id
),
992 ('JOB_STATUS_CHANGE', match_id
)
996 ev
= filter_qmp_event(self
.events_wait(events
, timeout
=wait
))
997 if ev
['event'] != 'JOB_STATUS_CHANGE':
998 log(ev
, filters
=filters
)
1000 status
= ev
['data']['status']
1001 if status
== 'aborting':
1002 result
= self
.qmp('query-jobs')
1003 for j
in result
['return']:
1006 log('Job failed: %s' % (j
['error']), filters
=filters
)
1007 elif status
== 'ready':
1008 self
.qmp_log('job-complete', id=job
, filters
=filters
)
1009 elif status
== 'pending' and not auto_finalize
:
1013 self
.qmp_log('job-cancel', id=job
, filters
=filters
)
1015 self
.qmp_log('job-finalize', id=job
, filters
=filters
)
1016 elif status
== 'concluded' and not auto_dismiss
:
1017 self
.qmp_log('job-dismiss', id=job
, filters
=filters
)
1018 elif status
== 'null':
1021 # Returns None on success, and an error string on failure
1022 def blockdev_create(self
, options
, job_id
='job0', filters
=None):
1024 filters
= [filter_qmp_testfiles
]
1025 result
= self
.qmp_log('blockdev-create', filters
=filters
,
1026 job_id
=job_id
, options
=options
)
1028 if 'return' in result
:
1029 assert result
['return'] == {}
1030 job_result
= self
.run_job(job_id
, filters
=filters
)
1032 job_result
= result
['error']
1037 def enable_migration_events(self
, name
):
1038 log('Enabling migration QMP events on %s...' % name
)
1039 log(self
.qmp('migrate-set-capabilities', capabilities
=[
1041 'capability': 'events',
1046 def wait_migration(self
, expect_runstate
: Optional
[str]) -> bool:
1048 event
= self
.event_wait('MIGRATION')
1049 # We use the default timeout, and with a timeout, event_wait()
1050 # never returns None
1053 log(event
, filters
=[filter_qmp_event
])
1054 if event
['data']['status'] in ('completed', 'failed'):
1057 if event
['data']['status'] == 'completed':
1058 # The event may occur in finish-migrate, so wait for the expected
1059 # post-migration runstate
1061 while runstate
!= expect_runstate
:
1062 runstate
= self
.qmp('query-status')['return']['status']
1067 def node_info(self
, node_name
):
1068 nodes
= self
.qmp('query-named-block-nodes')
1069 for x
in nodes
['return']:
1070 if x
['node-name'] == node_name
:
1074 def query_bitmaps(self
):
1075 res
= self
.qmp("query-named-block-nodes")
1076 return {device
['node-name']: device
['dirty-bitmaps']
1077 for device
in res
['return'] if 'dirty-bitmaps' in device
}
1079 def get_bitmap(self
, node_name
, bitmap_name
, recording
=None, bitmaps
=None):
1081 get a specific bitmap from the object returned by query_bitmaps.
1082 :param recording: If specified, filter results by the specified value.
1083 :param bitmaps: If specified, use it instead of call query_bitmaps()
1086 bitmaps
= self
.query_bitmaps()
1088 for bitmap
in bitmaps
[node_name
]:
1089 if bitmap
.get('name', '') == bitmap_name
:
1090 if recording
is None or bitmap
.get('recording') == recording
:
1094 def check_bitmap_status(self
, node_name
, bitmap_name
, fields
):
1095 ret
= self
.get_bitmap(node_name
, bitmap_name
)
1097 return fields
.items() <= ret
.items()
1099 def assert_block_path(self
, root
, path
, expected_node
, graph
=None):
1101 Check whether the node under the given path in the block graph
1104 @root is the node name of the node where the @path is rooted.
1106 @path is a string that consists of child names separated by
1107 slashes. It must begin with a slash.
1109 Examples for @root + @path:
1110 - root="qcow2-node", path="/backing/file"
1111 - root="quorum-node", path="/children.2/file"
1113 Hypothetically, @path could be empty, in which case it would
1114 point to @root. However, in practice this case is not useful
1115 and hence not allowed.
1117 @expected_node may be None. (All elements of the path but the
1118 leaf must still exist.)
1120 @graph may be None or the result of an x-debug-query-block-graph
1121 call that has already been performed.
1124 graph
= self
.qmp('x-debug-query-block-graph')['return']
1126 iter_path
= iter(path
.split('/'))
1128 # Must start with a /
1129 assert next(iter_path
) == ''
1131 node
= next((node
for node
in graph
['nodes'] if node
['name'] == root
),
1134 # An empty @path is not allowed, so the root node must be present
1135 assert node
is not None, 'Root node %s not found' % root
1137 for child_name
in iter_path
:
1138 assert node
is not None, 'Cannot follow path %s%s' % (root
, path
)
1141 node_id
= next(edge
['child'] for edge
in graph
['edges']
1142 if (edge
['parent'] == node
['id'] and
1143 edge
['name'] == child_name
))
1145 node
= next(node
for node
in graph
['nodes']
1146 if node
['id'] == node_id
)
1148 except StopIteration:
1152 assert expected_node
is None, \
1153 'No node found under %s (but expected %s)' % \
1154 (path
, expected_node
)
1156 assert node
['name'] == expected_node
, \
1157 'Found node %s under %s (but expected %s)' % \
1158 (node
['name'], path
, expected_node
)
1160 index_re
= re
.compile(r
'([^\[]+)\[([^\]]+)\]')
1162 class QMPTestCase(unittest
.TestCase
):
1163 '''Abstract base class for QMP test cases'''
1165 def __init__(self
, *args
, **kwargs
):
1166 super().__init
__(*args
, **kwargs
)
1167 # Many users of this class set a VM property we rely on heavily
1168 # in the methods below.
1171 def dictpath(self
, d
, path
):
1172 '''Traverse a path in a nested dict'''
1173 for component
in path
.split('/'):
1174 m
= index_re
.match(component
)
1176 component
, idx
= m
.groups()
1179 if not isinstance(d
, dict) or component
not in d
:
1180 self
.fail(f
'failed path traversal for "{path}" in "{d}"')
1184 if not isinstance(d
, list):
1185 self
.fail(f
'path component "{component}" in "{path}" '
1186 f
'is not a list in "{d}"')
1190 self
.fail(f
'invalid index "{idx}" in path "{path}" '
1194 def assert_qmp_absent(self
, d
, path
):
1196 result
= self
.dictpath(d
, path
)
1197 except AssertionError:
1199 self
.fail('path "%s" has value "%s"' % (path
, str(result
)))
1201 def assert_qmp(self
, d
, path
, value
):
1202 '''Assert that the value for a specific path in a QMP dict
1203 matches. When given a list of values, assert that any of
1206 result
= self
.dictpath(d
, path
)
1208 # [] makes no sense as a list of valid values, so treat it as
1209 # an actual single value.
1210 if isinstance(value
, list) and value
!= []:
1214 self
.fail('no match for "%s" in %s' % (str(result
), str(value
)))
1216 self
.assertEqual(result
, value
,
1217 '"%s" is "%s", expected "%s"'
1218 % (path
, str(result
), str(value
)))
1220 def assert_no_active_block_jobs(self
):
1221 result
= self
.vm
.qmp('query-block-jobs')
1222 self
.assert_qmp(result
, 'return', [])
1224 def assert_has_block_node(self
, node_name
=None, file_name
=None):
1225 """Issue a query-named-block-nodes and assert node_name and/or
1226 file_name is present in the result"""
1227 def check_equal_or_none(a
, b
):
1228 return a
is None or b
is None or a
== b
1229 assert node_name
or file_name
1230 result
= self
.vm
.qmp('query-named-block-nodes')
1231 for x
in result
["return"]:
1232 if check_equal_or_none(x
.get("node-name"), node_name
) and \
1233 check_equal_or_none(x
.get("file"), file_name
):
1235 self
.fail("Cannot find %s %s in result:\n%s" %
1236 (node_name
, file_name
, result
))
1238 def assert_json_filename_equal(self
, json_filename
, reference
):
1239 '''Asserts that the given filename is a json: filename and that its
1240 content is equal to the given reference object'''
1241 self
.assertEqual(json_filename
[:5], 'json:')
1243 self
.vm
.flatten_qmp_object(json
.loads(json_filename
[5:])),
1244 self
.vm
.flatten_qmp_object(reference
)
1247 def cancel_and_wait(self
, drive
='drive0', force
=False,
1248 resume
=False, wait
=60.0):
1249 '''Cancel a block job and wait for it to finish, returning the event'''
1250 result
= self
.vm
.qmp('block-job-cancel', device
=drive
, force
=force
)
1251 self
.assert_qmp(result
, 'return', {})
1254 self
.vm
.resume_drive(drive
)
1258 while not cancelled
:
1259 for event
in self
.vm
.get_qmp_events(wait
=wait
):
1260 if event
['event'] == 'BLOCK_JOB_COMPLETED' or \
1261 event
['event'] == 'BLOCK_JOB_CANCELLED':
1262 self
.assert_qmp(event
, 'data/device', drive
)
1265 elif event
['event'] == 'JOB_STATUS_CHANGE':
1266 self
.assert_qmp(event
, 'data/id', drive
)
1269 self
.assert_no_active_block_jobs()
1272 def wait_until_completed(self
, drive
='drive0', check_offset
=True,
1273 wait
=60.0, error
=None):
1274 '''Wait for a block job to finish, returning the event'''
1276 for event
in self
.vm
.get_qmp_events(wait
=wait
):
1277 if event
['event'] == 'BLOCK_JOB_COMPLETED':
1278 self
.assert_qmp(event
, 'data/device', drive
)
1280 self
.assert_qmp_absent(event
, 'data/error')
1282 self
.assert_qmp(event
, 'data/offset',
1283 event
['data']['len'])
1285 self
.assert_qmp(event
, 'data/error', error
)
1286 self
.assert_no_active_block_jobs()
1288 if event
['event'] == 'JOB_STATUS_CHANGE':
1289 self
.assert_qmp(event
, 'data/id', drive
)
1291 def wait_ready(self
, drive
='drive0'):
1292 """Wait until a BLOCK_JOB_READY event, and return the event."""
1293 return self
.vm
.events_wait([
1295 {'data': {'type': 'mirror', 'device': drive
}}),
1297 {'data': {'type': 'commit', 'device': drive
}})
1300 def wait_ready_and_cancel(self
, drive
='drive0'):
1301 self
.wait_ready(drive
=drive
)
1302 event
= self
.cancel_and_wait(drive
=drive
)
1303 self
.assertEqual(event
['event'], 'BLOCK_JOB_COMPLETED')
1304 self
.assert_qmp(event
, 'data/type', 'mirror')
1305 self
.assert_qmp(event
, 'data/offset', event
['data']['len'])
1307 def complete_and_wait(self
, drive
='drive0', wait_ready
=True,
1308 completion_error
=None):
1309 '''Complete a block job and wait for it to finish'''
1311 self
.wait_ready(drive
=drive
)
1313 result
= self
.vm
.qmp('block-job-complete', device
=drive
)
1314 self
.assert_qmp(result
, 'return', {})
1316 event
= self
.wait_until_completed(drive
=drive
, error
=completion_error
)
1317 self
.assertTrue(event
['data']['type'] in ['mirror', 'commit'])
1319 def pause_wait(self
, job_id
='job0'):
1320 with
Timeout(3, "Timeout waiting for job to pause"):
1322 result
= self
.vm
.qmp('query-block-jobs')
1324 for job
in result
['return']:
1325 if job
['device'] == job_id
:
1327 if job
['paused'] and not job
['busy']:
1332 def pause_job(self
, job_id
='job0', wait
=True):
1333 result
= self
.vm
.qmp('block-job-pause', device
=job_id
)
1334 self
.assert_qmp(result
, 'return', {})
1336 return self
.pause_wait(job_id
)
1339 def case_skip(self
, reason
):
1340 '''Skip this test case'''
1342 self
.skipTest(reason
)
1346 '''Skip this test suite'''
1347 # Each test in qemu-iotests has a number ("seq")
1348 seq
= os
.path
.basename(sys
.argv
[0])
1350 with
open('%s/%s.notrun' % (test_dir
, seq
), 'w', encoding
='utf-8') \
1352 outfile
.write(reason
+ '\n')
1353 logger
.warning("%s not run: %s", seq
, reason
)
1356 def case_notrun(reason
):
1357 '''Mark this test case as not having been run (without actually
1358 skipping it, that is left to the caller). See
1359 QMPTestCase.case_skip() for a variant that actually skips the
1360 current test case.'''
1362 # Each test in qemu-iotests has a number ("seq")
1363 seq
= os
.path
.basename(sys
.argv
[0])
1365 with
open('%s/%s.casenotrun' % (test_dir
, seq
), 'a', encoding
='utf-8') \
1367 outfile
.write(' [case not run] ' + reason
+ '\n')
1369 def _verify_image_format(supported_fmts
: Sequence
[str] = (),
1370 unsupported_fmts
: Sequence
[str] = ()) -> None:
1371 if 'generic' in supported_fmts
and \
1372 os
.environ
.get('IMGFMT_GENERIC', 'true') == 'true':
1374 # _supported_fmt generic
1378 not_sup
= supported_fmts
and (imgfmt
not in supported_fmts
)
1379 if not_sup
or (imgfmt
in unsupported_fmts
):
1380 notrun('not suitable for this image format: %s' % imgfmt
)
1382 if imgfmt
== 'luks':
1383 verify_working_luks()
1385 def _verify_protocol(supported
: Sequence
[str] = (),
1386 unsupported
: Sequence
[str] = ()) -> None:
1387 assert not (supported
and unsupported
)
1389 if 'generic' in supported
:
1392 not_sup
= supported
and (imgproto
not in supported
)
1393 if not_sup
or (imgproto
in unsupported
):
1394 notrun('not suitable for this protocol: %s' % imgproto
)
1396 def _verify_platform(supported
: Sequence
[str] = (),
1397 unsupported
: Sequence
[str] = ()) -> None:
1398 if any((sys
.platform
.startswith(x
) for x
in unsupported
)):
1399 notrun('not suitable for this OS: %s' % sys
.platform
)
1402 if not any((sys
.platform
.startswith(x
) for x
in supported
)):
1403 notrun('not suitable for this OS: %s' % sys
.platform
)
1405 def _verify_cache_mode(supported_cache_modes
: Sequence
[str] = ()) -> None:
1406 if supported_cache_modes
and (cachemode
not in supported_cache_modes
):
1407 notrun('not suitable for this cache mode: %s' % cachemode
)
1409 def _verify_aio_mode(supported_aio_modes
: Sequence
[str] = ()) -> None:
1410 if supported_aio_modes
and (aiomode
not in supported_aio_modes
):
1411 notrun('not suitable for this aio mode: %s' % aiomode
)
1413 def _verify_formats(required_formats
: Sequence
[str] = ()) -> None:
1414 usf_list
= list(set(required_formats
) - set(supported_formats()))
1416 notrun(f
'formats {usf_list} are not whitelisted')
1419 def _verify_virtio_blk() -> None:
1420 out
= qemu_pipe('-M', 'none', '-device', 'help')
1421 if 'virtio-blk' not in out
:
1422 notrun('Missing virtio-blk in QEMU binary')
1424 def verify_virtio_scsi_pci_or_ccw() -> None:
1425 out
= qemu_pipe('-M', 'none', '-device', 'help')
1426 if 'virtio-scsi-pci' not in out
and 'virtio-scsi-ccw' not in out
:
1427 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1430 def _verify_imgopts(unsupported
: Sequence
[str] = ()) -> None:
1431 imgopts
= os
.environ
.get('IMGOPTS')
1432 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1433 # but it supported only for bash tests. We don't have a concept of global
1434 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1435 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1436 unsup
= list(unsupported
) + ['$']
1437 if imgopts
and any(x
in imgopts
for x
in unsup
):
1438 notrun(f
'not suitable for this imgopts: {imgopts}')
1441 def supports_quorum() -> bool:
1442 return 'quorum' in qemu_img('--help').stdout
1444 def verify_quorum():
1445 '''Skip test suite if quorum support is not available'''
1446 if not supports_quorum():
1447 notrun('quorum support missing')
1449 def has_working_luks() -> Tuple
[bool, str]:
1451 Check whether our LUKS driver can actually create images
1452 (this extends to LUKS encryption for qcow2).
1454 If not, return the reason why.
1457 img_file
= f
'{test_dir}/luks-test.luks'
1458 res
= qemu_img('create', '-f', 'luks',
1459 '--object', luks_default_secret_object
,
1460 '-o', luks_default_key_secret_opt
,
1461 '-o', 'iter-time=10',
1471 for line
in res
.stdout
.splitlines():
1472 if img_file
+ ':' in line
:
1473 reason
= line
.split(img_file
+ ':', 1)[1].strip()
1476 return (False, reason
)
1480 def verify_working_luks():
1482 Skip test suite if LUKS does not work
1484 (working
, reason
) = has_working_luks()
1488 def supports_qcow2_zstd_compression() -> bool:
1489 img_file
= f
'{test_dir}/qcow2-zstd-test.qcow2'
1490 res
= qemu_img('create', '-f', 'qcow2', '-o', 'compression_type=zstd',
1498 if res
.returncode
== 1 and \
1499 "'compression-type' does not accept value 'zstd'" in res
.stdout
:
1504 def verify_qcow2_zstd_compression():
1505 if not supports_qcow2_zstd_compression():
1506 notrun('zstd compression not supported')
1508 def qemu_pipe(*args
: str) -> str:
1510 Run qemu with an option to print something and exit (e.g. a help option).
1512 :return: QEMU's stdout output.
1514 full_args
= [qemu_prog
] + qemu_opts
+ list(args
)
1515 output
, _
= qemu_tool_pipe_and_status('qemu', full_args
)
1518 def supported_formats(read_only
=False):
1519 '''Set 'read_only' to True to check ro-whitelist
1520 Otherwise, rw-whitelist is checked'''
1522 if not hasattr(supported_formats
, "formats"):
1523 supported_formats
.formats
= {}
1525 if read_only
not in supported_formats
.formats
:
1526 format_message
= qemu_pipe("-drive", "format=help")
1527 line
= 1 if read_only
else 0
1528 supported_formats
.formats
[read_only
] = \
1529 format_message
.splitlines()[line
].split(":")[1].split()
1531 return supported_formats
.formats
[read_only
]
1533 def skip_if_unsupported(required_formats
=(), read_only
=False):
1534 '''Skip Test Decorator
1535 Runs the test if all the required formats are whitelisted'''
1536 def skip_test_decorator(func
):
1537 def func_wrapper(test_case
: QMPTestCase
, *args
: List
[Any
],
1538 **kwargs
: Dict
[str, Any
]) -> None:
1539 if callable(required_formats
):
1540 fmts
= required_formats(test_case
)
1542 fmts
= required_formats
1544 usf_list
= list(set(fmts
) - set(supported_formats(read_only
)))
1546 msg
= f
'{test_case}: formats {usf_list} are not whitelisted'
1547 test_case
.case_skip(msg
)
1549 func(test_case
, *args
, **kwargs
)
1551 return skip_test_decorator
1553 def skip_for_formats(formats
: Sequence
[str] = ()) \
1554 -> Callable
[[Callable
[[QMPTestCase
, List
[Any
], Dict
[str, Any
]], None]],
1555 Callable
[[QMPTestCase
, List
[Any
], Dict
[str, Any
]], None]]:
1556 '''Skip Test Decorator
1557 Skips the test for the given formats'''
1558 def skip_test_decorator(func
):
1559 def func_wrapper(test_case
: QMPTestCase
, *args
: List
[Any
],
1560 **kwargs
: Dict
[str, Any
]) -> None:
1561 if imgfmt
in formats
:
1562 msg
= f
'{test_case}: Skipped for format {imgfmt}'
1563 test_case
.case_skip(msg
)
1565 func(test_case
, *args
, **kwargs
)
1567 return skip_test_decorator
1569 def skip_if_user_is_root(func
):
1570 '''Skip Test Decorator
1571 Runs the test only without root permissions'''
1572 def func_wrapper(*args
, **kwargs
):
1573 if os
.getuid() == 0:
1574 case_notrun('{}: cannot be run as root'.format(args
[0]))
1577 return func(*args
, **kwargs
)
1580 # We need to filter out the time taken from the output so that
1581 # qemu-iotest can reliably diff the results against master output,
1582 # and hide skipped tests from the reference output.
1584 class ReproducibleTestResult(unittest
.TextTestResult
):
1585 def addSkip(self
, test
, reason
):
1586 # Same as TextTestResult, but print dot instead of "s"
1587 unittest
.TestResult
.addSkip(self
, test
, reason
)
1589 self
.stream
.writeln("skipped {0!r}".format(reason
))
1591 self
.stream
.write(".")
1594 class ReproducibleStreamWrapper
:
1595 def __init__(self
, stream
: TextIO
):
1596 self
.stream
= stream
1598 def __getattr__(self
, attr
):
1599 if attr
in ('stream', '__getstate__'):
1600 raise AttributeError(attr
)
1601 return getattr(self
.stream
, attr
)
1603 def write(self
, arg
=None):
1604 arg
= re
.sub(r
'Ran (\d+) tests? in [\d.]+s', r
'Ran \1 tests', arg
)
1605 arg
= re
.sub(r
' \(skipped=\d+\)', r
'', arg
)
1606 self
.stream
.write(arg
)
1608 class ReproducibleTestRunner(unittest
.TextTestRunner
):
1609 def __init__(self
, stream
: Optional
[TextIO
] = None,
1610 resultclass
: Type
[unittest
.TestResult
] =
1611 ReproducibleTestResult
,
1612 **kwargs
: Any
) -> None:
1613 rstream
= ReproducibleStreamWrapper(stream
or sys
.stdout
)
1614 super().__init
__(stream
=rstream
, # type: ignore
1616 resultclass
=resultclass
,
1619 def execute_unittest(argv
: List
[str], debug
: bool = False) -> None:
1620 """Executes unittests within the calling module."""
1622 # Some tests have warnings, especially ResourceWarnings for unclosed
1623 # files and sockets. Ignore them for now to ensure reproducibility of
1625 unittest
.main(argv
=argv
,
1626 testRunner
=ReproducibleTestRunner
,
1627 verbosity
=2 if debug
else 1,
1628 warnings
=None if sys
.warnoptions
else 'ignore')
1630 def execute_setup_common(supported_fmts
: Sequence
[str] = (),
1631 supported_platforms
: Sequence
[str] = (),
1632 supported_cache_modes
: Sequence
[str] = (),
1633 supported_aio_modes
: Sequence
[str] = (),
1634 unsupported_fmts
: Sequence
[str] = (),
1635 supported_protocols
: Sequence
[str] = (),
1636 unsupported_protocols
: Sequence
[str] = (),
1637 required_fmts
: Sequence
[str] = (),
1638 unsupported_imgopts
: Sequence
[str] = ()) -> bool:
1640 Perform necessary setup for either script-style or unittest-style tests.
1642 :return: Bool; Whether or not debug mode has been requested via the CLI.
1644 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1646 debug
= '-d' in sys
.argv
1648 sys
.argv
.remove('-d')
1649 logging
.basicConfig(level
=(logging
.DEBUG
if debug
else logging
.WARN
))
1651 _verify_image_format(supported_fmts
, unsupported_fmts
)
1652 _verify_protocol(supported_protocols
, unsupported_protocols
)
1653 _verify_platform(supported
=supported_platforms
)
1654 _verify_cache_mode(supported_cache_modes
)
1655 _verify_aio_mode(supported_aio_modes
)
1656 _verify_formats(required_fmts
)
1657 _verify_virtio_blk()
1658 _verify_imgopts(unsupported_imgopts
)
1662 def execute_test(*args
, test_function
=None, **kwargs
):
1663 """Run either unittest or script-style tests."""
1665 debug
= execute_setup_common(*args
, **kwargs
)
1666 if not test_function
:
1667 execute_unittest(sys
.argv
, debug
)
1671 def activate_logging():
1672 """Activate iotests.log() output to stdout for script-style tests."""
1673 handler
= logging
.StreamHandler(stream
=sys
.stdout
)
1674 formatter
= logging
.Formatter('%(message)s')
1675 handler
.setFormatter(formatter
)
1676 test_logger
.addHandler(handler
)
1677 test_logger
.setLevel(logging
.INFO
)
1678 test_logger
.propagate
= False
1680 # This is called from script-style iotests without a single point of entry
1681 def script_initialize(*args
, **kwargs
):
1682 """Initialize script-style tests without running any tests."""
1684 execute_setup_common(*args
, **kwargs
)
1686 # This is called from script-style iotests with a single point of entry
1687 def script_main(test_function
, *args
, **kwargs
):
1688 """Run script-style tests outside of the unittest framework"""
1690 execute_test(*args
, test_function
=test_function
, **kwargs
)
1692 # This is called from unittest style iotests
1693 def main(*args
, **kwargs
):
1694 """Run tests using the unittest framework"""
1695 execute_test(*args
, **kwargs
)