]> git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/vsementsov/tags/pull-nbd-2022-02-09-v2' into...
[mirror_qemu.git] / tests / qemu-iotests / iotests.py
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import argparse
20 import atexit
21 import bz2
22 from collections import OrderedDict
23 import faulthandler
24 import json
25 import logging
26 import os
27 import re
28 import shutil
29 import signal
30 import struct
31 import subprocess
32 import sys
33 import time
34 from typing import (Any, Callable, Dict, Iterable, Iterator,
35 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
36 import unittest
37
38 from contextlib import contextmanager
39
40 from qemu.machine import qtest
41 from qemu.qmp import QMPMessage
42
43 # Use this logger for logging messages directly from the iotests module
44 logger = logging.getLogger('qemu.iotests')
45 logger.addHandler(logging.NullHandler())
46
47 # Use this logger for messages that ought to be used for diff output.
48 test_logger = logging.getLogger('qemu.iotests.diff_io')
49
50
51 faulthandler.enable()
52
53 # This will not work if arguments contain spaces but is necessary if we
54 # want to support the override options that ./check supports.
55 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
56 if os.environ.get('QEMU_IMG_OPTIONS'):
57 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
58
59 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
60 if os.environ.get('QEMU_IO_OPTIONS'):
61 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
62
63 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
64 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
65 qemu_io_args_no_fmt += \
66 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
67
68 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
69 qemu_nbd_args = [qemu_nbd_prog]
70 if os.environ.get('QEMU_NBD_OPTIONS'):
71 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
72
73 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
74 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
75
76 qsd_prog = os.environ.get('QSD_PROG', 'qemu-storage-daemon')
77
78 gdb_qemu_env = os.environ.get('GDB_OPTIONS')
79 qemu_gdb = []
80 if gdb_qemu_env:
81 qemu_gdb = ['gdbserver'] + gdb_qemu_env.strip().split(' ')
82
83 qemu_print = os.environ.get('PRINT_QEMU', False)
84
85 imgfmt = os.environ.get('IMGFMT', 'raw')
86 imgproto = os.environ.get('IMGPROTO', 'file')
87 output_dir = os.environ.get('OUTPUT_DIR', '.')
88
89 try:
90 test_dir = os.environ['TEST_DIR']
91 sock_dir = os.environ['SOCK_DIR']
92 cachemode = os.environ['CACHEMODE']
93 aiomode = os.environ['AIOMODE']
94 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
95 except KeyError:
96 # We are using these variables as proxies to indicate that we're
97 # not being run via "check". There may be other things set up by
98 # "check" that individual test cases rely on.
99 sys.stderr.write('Please run this test via the "check" script\n')
100 sys.exit(os.EX_USAGE)
101
102 qemu_valgrind = []
103 if os.environ.get('VALGRIND_QEMU') == "y" and \
104 os.environ.get('NO_VALGRIND') != "y":
105 valgrind_logfile = "--log-file=" + test_dir
106 # %p allows to put the valgrind process PID, since
107 # we don't know it a priori (subprocess.Popen is
108 # not yet invoked)
109 valgrind_logfile += "/%p.valgrind"
110
111 qemu_valgrind = ['valgrind', valgrind_logfile, '--error-exitcode=99']
112
113 luks_default_secret_object = 'secret,id=keysec0,data=' + \
114 os.environ.get('IMGKEYSECRET', '')
115 luks_default_key_secret_opt = 'key-secret=keysec0'
116
117 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
118
119
120 @contextmanager
121 def change_log_level(
122 logger_name: str, level: int = logging.CRITICAL) -> Iterator[None]:
123 """
124 Utility function for temporarily changing the log level of a logger.
125
126 This can be used to silence errors that are expected or uninteresting.
127 """
128 _logger = logging.getLogger(logger_name)
129 current_level = _logger.level
130 _logger.setLevel(level)
131
132 try:
133 yield
134 finally:
135 _logger.setLevel(current_level)
136
137
138 def unarchive_sample_image(sample, fname):
139 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
140 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
141 shutil.copyfileobj(f_in, f_out)
142
143
144 def qemu_tool_popen(args: Sequence[str],
145 connect_stderr: bool = True) -> 'subprocess.Popen[str]':
146 stderr = subprocess.STDOUT if connect_stderr else None
147 # pylint: disable=consider-using-with
148 return subprocess.Popen(args,
149 stdout=subprocess.PIPE,
150 stderr=stderr,
151 universal_newlines=True)
152
153
154 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
155 connect_stderr: bool = True,
156 drop_successful_output: bool = False) \
157 -> Tuple[str, int]:
158 """
159 Run a tool and return both its output and its exit code
160 """
161 with qemu_tool_popen(args, connect_stderr) as subp:
162 output = subp.communicate()[0]
163 if subp.returncode < 0:
164 cmd = ' '.join(args)
165 sys.stderr.write(f'{tool} received signal \
166 {-subp.returncode}: {cmd}\n')
167 if drop_successful_output and subp.returncode == 0:
168 output = ''
169 return (output, subp.returncode)
170
171 def qemu_img_create_prepare_args(args: List[str]) -> List[str]:
172 if not args or args[0] != 'create':
173 return list(args)
174 args = args[1:]
175
176 p = argparse.ArgumentParser(allow_abbrev=False)
177 # -o option may be specified several times
178 p.add_argument('-o', action='append', default=[])
179 p.add_argument('-f')
180 parsed, remaining = p.parse_known_args(args)
181
182 opts_list = parsed.o
183
184 result = ['create']
185 if parsed.f is not None:
186 result += ['-f', parsed.f]
187
188 # IMGOPTS most probably contain options specific for the selected format,
189 # like extended_l2 or compression_type for qcow2. Test may want to create
190 # additional images in other formats that doesn't support these options.
191 # So, use IMGOPTS only for images created in imgfmt format.
192 imgopts = os.environ.get('IMGOPTS')
193 if imgopts and parsed.f == imgfmt:
194 opts_list.insert(0, imgopts)
195
196 # default luks support
197 if parsed.f == 'luks' and \
198 all('key-secret' not in opts for opts in opts_list):
199 result += ['--object', luks_default_secret_object]
200 opts_list.append(luks_default_key_secret_opt)
201
202 for opts in opts_list:
203 result += ['-o', opts]
204
205 result += remaining
206
207 return result
208
209 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
210 """
211 Run qemu-img and return both its output and its exit code
212 """
213 is_create = bool(args and args[0] == 'create')
214 full_args = qemu_img_args + qemu_img_create_prepare_args(list(args))
215 return qemu_tool_pipe_and_status('qemu-img', full_args,
216 drop_successful_output=is_create)
217
218 def qemu_img(*args: str) -> int:
219 '''Run qemu-img and return the exit code'''
220 return qemu_img_pipe_and_status(*args)[1]
221
222 def ordered_qmp(qmsg, conv_keys=True):
223 # Dictionaries are not ordered prior to 3.6, therefore:
224 if isinstance(qmsg, list):
225 return [ordered_qmp(atom) for atom in qmsg]
226 if isinstance(qmsg, dict):
227 od = OrderedDict()
228 for k, v in sorted(qmsg.items()):
229 if conv_keys:
230 k = k.replace('_', '-')
231 od[k] = ordered_qmp(v, conv_keys=False)
232 return od
233 return qmsg
234
235 def qemu_img_create(*args):
236 return qemu_img('create', *args)
237
238 def qemu_img_measure(*args):
239 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
240
241 def qemu_img_check(*args):
242 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
243
244 def qemu_img_pipe(*args: str) -> str:
245 '''Run qemu-img and return its output'''
246 return qemu_img_pipe_and_status(*args)[0]
247
248 def qemu_img_log(*args):
249 result = qemu_img_pipe(*args)
250 log(result, filters=[filter_testfiles])
251 return result
252
253 def img_info_log(filename, filter_path=None, use_image_opts=False,
254 extra_args=()):
255 args = ['info']
256 if use_image_opts:
257 args.append('--image-opts')
258 else:
259 args += ['-f', imgfmt]
260 args += extra_args
261 args.append(filename)
262
263 output = qemu_img_pipe(*args)
264 if not filter_path:
265 filter_path = filename
266 log(filter_img_info(output, filter_path))
267
268 def qemu_io_wrap_args(args: Sequence[str]) -> List[str]:
269 if '-f' in args or '--image-opts' in args:
270 return qemu_io_args_no_fmt + list(args)
271 else:
272 return qemu_io_args + list(args)
273
274 def qemu_io_popen(*args):
275 return qemu_tool_popen(qemu_io_wrap_args(args))
276
277 def qemu_io(*args):
278 '''Run qemu-io and return the stdout data'''
279 return qemu_tool_pipe_and_status('qemu-io', qemu_io_wrap_args(args))[0]
280
281 def qemu_io_log(*args):
282 result = qemu_io(*args)
283 log(result, filters=[filter_testfiles, filter_qemu_io])
284 return result
285
286 def qemu_io_silent(*args):
287 '''Run qemu-io and return the exit code, suppressing stdout'''
288 args = qemu_io_wrap_args(args)
289 result = subprocess.run(args, stdout=subprocess.DEVNULL, check=False)
290 if result.returncode < 0:
291 sys.stderr.write('qemu-io received signal %i: %s\n' %
292 (-result.returncode, ' '.join(args)))
293 return result.returncode
294
295 def qemu_io_silent_check(*args):
296 '''Run qemu-io and return the true if subprocess returned 0'''
297 args = qemu_io_wrap_args(args)
298 result = subprocess.run(args, stdout=subprocess.DEVNULL,
299 stderr=subprocess.STDOUT, check=False)
300 return result.returncode == 0
301
302 class QemuIoInteractive:
303 def __init__(self, *args):
304 self.args = qemu_io_wrap_args(args)
305 # We need to keep the Popen objext around, and not
306 # close it immediately. Therefore, disable the pylint check:
307 # pylint: disable=consider-using-with
308 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
309 stdout=subprocess.PIPE,
310 stderr=subprocess.STDOUT,
311 universal_newlines=True)
312 out = self._p.stdout.read(9)
313 if out != 'qemu-io> ':
314 # Most probably qemu-io just failed to start.
315 # Let's collect the whole output and exit.
316 out += self._p.stdout.read()
317 self._p.wait(timeout=1)
318 raise ValueError(out)
319
320 def close(self):
321 self._p.communicate('q\n')
322
323 def _read_output(self):
324 pattern = 'qemu-io> '
325 n = len(pattern)
326 pos = 0
327 s = []
328 while pos != n:
329 c = self._p.stdout.read(1)
330 # check unexpected EOF
331 assert c != ''
332 s.append(c)
333 if c == pattern[pos]:
334 pos += 1
335 else:
336 pos = 0
337
338 return ''.join(s[:-n])
339
340 def cmd(self, cmd):
341 # quit command is in close(), '\n' is added automatically
342 assert '\n' not in cmd
343 cmd = cmd.strip()
344 assert cmd not in ('q', 'quit')
345 self._p.stdin.write(cmd + '\n')
346 self._p.stdin.flush()
347 return self._read_output()
348
349
350 class QemuStorageDaemon:
351 def __init__(self, *args: str, instance_id: str = 'a'):
352 assert '--pidfile' not in args
353 self.pidfile = os.path.join(test_dir, f'qsd-{instance_id}-pid')
354 all_args = [qsd_prog] + list(args) + ['--pidfile', self.pidfile]
355
356 # Cannot use with here, we want the subprocess to stay around
357 # pylint: disable=consider-using-with
358 self._p = subprocess.Popen(all_args)
359 while not os.path.exists(self.pidfile):
360 if self._p.poll() is not None:
361 cmd = ' '.join(all_args)
362 raise RuntimeError(
363 'qemu-storage-daemon terminated with exit code ' +
364 f'{self._p.returncode}: {cmd}')
365
366 time.sleep(0.01)
367
368 with open(self.pidfile, encoding='utf-8') as f:
369 self._pid = int(f.read().strip())
370
371 assert self._pid == self._p.pid
372
373 def stop(self, kill_signal=15):
374 self._p.send_signal(kill_signal)
375 self._p.wait()
376 self._p = None
377
378 try:
379 os.remove(self.pidfile)
380 except OSError:
381 pass
382
383 def __del__(self):
384 if self._p is not None:
385 self.stop(kill_signal=9)
386
387
388 def qemu_nbd(*args):
389 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
390 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
391
392 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
393 '''Run qemu-nbd in daemon mode and return both the parent's exit code
394 and its output in case of an error'''
395 full_args = qemu_nbd_args + ['--fork'] + list(args)
396 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
397 connect_stderr=False)
398 return returncode, output if returncode else ''
399
400 def qemu_nbd_list_log(*args: str) -> str:
401 '''Run qemu-nbd to list remote exports'''
402 full_args = [qemu_nbd_prog, '-L'] + list(args)
403 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
404 log(output, filters=[filter_testfiles, filter_nbd_exports])
405 return output
406
407 @contextmanager
408 def qemu_nbd_popen(*args):
409 '''Context manager running qemu-nbd within the context'''
410 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
411
412 assert not os.path.exists(pid_file)
413
414 cmd = list(qemu_nbd_args)
415 cmd.extend(('--persistent', '--pid-file', pid_file))
416 cmd.extend(args)
417
418 log('Start NBD server')
419 with subprocess.Popen(cmd) as p:
420 try:
421 while not os.path.exists(pid_file):
422 if p.poll() is not None:
423 raise RuntimeError(
424 "qemu-nbd terminated with exit code {}: {}"
425 .format(p.returncode, ' '.join(cmd)))
426
427 time.sleep(0.01)
428 yield
429 finally:
430 if os.path.exists(pid_file):
431 os.remove(pid_file)
432 log('Kill NBD server')
433 p.kill()
434 p.wait()
435
436 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
437 '''Return True if two image files are identical'''
438 return qemu_img('compare', '-f', fmt1,
439 '-F', fmt2, img1, img2) == 0
440
441 def create_image(name, size):
442 '''Create a fully-allocated raw image with sector markers'''
443 with open(name, 'wb') as file:
444 i = 0
445 while i < size:
446 sector = struct.pack('>l504xl', i // 512, i // 512)
447 file.write(sector)
448 i = i + 512
449
450 def image_size(img):
451 '''Return image's virtual size'''
452 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
453 return json.loads(r)['virtual-size']
454
455 def is_str(val):
456 return isinstance(val, str)
457
458 test_dir_re = re.compile(r"%s" % test_dir)
459 def filter_test_dir(msg):
460 return test_dir_re.sub("TEST_DIR", msg)
461
462 win32_re = re.compile(r"\r")
463 def filter_win32(msg):
464 return win32_re.sub("", msg)
465
466 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
467 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
468 r"and [0-9\/.inf]* ops\/sec\)")
469 def filter_qemu_io(msg):
470 msg = filter_win32(msg)
471 return qemu_io_re.sub("X ops; XX:XX:XX.X "
472 "(XXX YYY/sec and XXX ops/sec)", msg)
473
474 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
475 def filter_chown(msg):
476 return chown_re.sub("chown UID:GID", msg)
477
478 def filter_qmp_event(event):
479 '''Filter a QMP event dict'''
480 event = dict(event)
481 if 'timestamp' in event:
482 event['timestamp']['seconds'] = 'SECS'
483 event['timestamp']['microseconds'] = 'USECS'
484 return event
485
486 def filter_qmp(qmsg, filter_fn):
487 '''Given a string filter, filter a QMP object's values.
488 filter_fn takes a (key, value) pair.'''
489 # Iterate through either lists or dicts;
490 if isinstance(qmsg, list):
491 items = enumerate(qmsg)
492 else:
493 items = qmsg.items()
494
495 for k, v in items:
496 if isinstance(v, (dict, list)):
497 qmsg[k] = filter_qmp(v, filter_fn)
498 else:
499 qmsg[k] = filter_fn(k, v)
500 return qmsg
501
502 def filter_testfiles(msg):
503 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
504 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
505 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
506
507 def filter_qmp_testfiles(qmsg):
508 def _filter(_key, value):
509 if is_str(value):
510 return filter_testfiles(value)
511 return value
512 return filter_qmp(qmsg, _filter)
513
514 def filter_virtio_scsi(output: str) -> str:
515 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
516
517 def filter_qmp_virtio_scsi(qmsg):
518 def _filter(_key, value):
519 if is_str(value):
520 return filter_virtio_scsi(value)
521 return value
522 return filter_qmp(qmsg, _filter)
523
524 def filter_generated_node_ids(msg):
525 return re.sub("#block[0-9]+", "NODE_NAME", msg)
526
527 def filter_img_info(output, filename):
528 lines = []
529 for line in output.split('\n'):
530 if 'disk size' in line or 'actual-size' in line:
531 continue
532 line = line.replace(filename, 'TEST_IMG')
533 line = filter_testfiles(line)
534 line = line.replace(imgfmt, 'IMGFMT')
535 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
536 line = re.sub('uuid: [-a-f0-9]+',
537 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
538 line)
539 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
540 line = re.sub('(compression type: )(zlib|zstd)', r'\1COMPRESSION_TYPE',
541 line)
542 lines.append(line)
543 return '\n'.join(lines)
544
545 def filter_imgfmt(msg):
546 return msg.replace(imgfmt, 'IMGFMT')
547
548 def filter_qmp_imgfmt(qmsg):
549 def _filter(_key, value):
550 if is_str(value):
551 return filter_imgfmt(value)
552 return value
553 return filter_qmp(qmsg, _filter)
554
555 def filter_nbd_exports(output: str) -> str:
556 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
557
558
559 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
560
561 def log(msg: Msg,
562 filters: Iterable[Callable[[Msg], Msg]] = (),
563 indent: Optional[int] = None) -> None:
564 """
565 Logs either a string message or a JSON serializable message (like QMP).
566 If indent is provided, JSON serializable messages are pretty-printed.
567 """
568 for flt in filters:
569 msg = flt(msg)
570 if isinstance(msg, (dict, list)):
571 # Don't sort if it's already sorted
572 do_sort = not isinstance(msg, OrderedDict)
573 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
574 else:
575 test_logger.info(msg)
576
577 class Timeout:
578 def __init__(self, seconds, errmsg="Timeout"):
579 self.seconds = seconds
580 self.errmsg = errmsg
581 def __enter__(self):
582 if qemu_gdb or qemu_valgrind:
583 return self
584 signal.signal(signal.SIGALRM, self.timeout)
585 signal.setitimer(signal.ITIMER_REAL, self.seconds)
586 return self
587 def __exit__(self, exc_type, value, traceback):
588 if qemu_gdb or qemu_valgrind:
589 return False
590 signal.setitimer(signal.ITIMER_REAL, 0)
591 return False
592 def timeout(self, signum, frame):
593 raise Exception(self.errmsg)
594
595 def file_pattern(name):
596 return "{0}-{1}".format(os.getpid(), name)
597
598 class FilePath:
599 """
600 Context manager generating multiple file names. The generated files are
601 removed when exiting the context.
602
603 Example usage:
604
605 with FilePath('a.img', 'b.img') as (img_a, img_b):
606 # Use img_a and img_b here...
607
608 # a.img and b.img are automatically removed here.
609
610 By default images are created in iotests.test_dir. To create sockets use
611 iotests.sock_dir:
612
613 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
614
615 For convenience, calling with one argument yields a single file instead of
616 a tuple with one item.
617
618 """
619 def __init__(self, *names, base_dir=test_dir):
620 self.paths = [os.path.join(base_dir, file_pattern(name))
621 for name in names]
622
623 def __enter__(self):
624 if len(self.paths) == 1:
625 return self.paths[0]
626 else:
627 return self.paths
628
629 def __exit__(self, exc_type, exc_val, exc_tb):
630 for path in self.paths:
631 try:
632 os.remove(path)
633 except OSError:
634 pass
635 return False
636
637
638 def try_remove(img):
639 try:
640 os.remove(img)
641 except OSError:
642 pass
643
644 def file_path_remover():
645 for path in reversed(file_path_remover.paths):
646 try_remove(path)
647
648
649 def file_path(*names, base_dir=test_dir):
650 ''' Another way to get auto-generated filename that cleans itself up.
651
652 Use is as simple as:
653
654 img_a, img_b = file_path('a.img', 'b.img')
655 sock = file_path('socket')
656 '''
657
658 if not hasattr(file_path_remover, 'paths'):
659 file_path_remover.paths = []
660 atexit.register(file_path_remover)
661
662 paths = []
663 for name in names:
664 filename = file_pattern(name)
665 path = os.path.join(base_dir, filename)
666 file_path_remover.paths.append(path)
667 paths.append(path)
668
669 return paths[0] if len(paths) == 1 else paths
670
671 def remote_filename(path):
672 if imgproto == 'file':
673 return path
674 elif imgproto == 'ssh':
675 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
676 else:
677 raise Exception("Protocol %s not supported" % (imgproto))
678
679 class VM(qtest.QEMUQtestMachine):
680 '''A QEMU VM'''
681
682 def __init__(self, path_suffix=''):
683 name = "qemu%s-%d" % (path_suffix, os.getpid())
684 timer = 15.0 if not (qemu_gdb or qemu_valgrind) else None
685 if qemu_gdb and qemu_valgrind:
686 sys.stderr.write('gdb and valgrind are mutually exclusive\n')
687 sys.exit(1)
688 wrapper = qemu_gdb if qemu_gdb else qemu_valgrind
689 super().__init__(qemu_prog, qemu_opts, wrapper=wrapper,
690 name=name,
691 base_temp_dir=test_dir,
692 sock_dir=sock_dir, qmp_timer=timer)
693 self._num_drives = 0
694
695 def _post_shutdown(self) -> None:
696 super()._post_shutdown()
697 if not qemu_valgrind or not self._popen:
698 return
699 valgrind_filename = f"{test_dir}/{self._popen.pid}.valgrind"
700 if self.exitcode() == 99:
701 with open(valgrind_filename, encoding='utf-8') as f:
702 print(f.read())
703 else:
704 os.remove(valgrind_filename)
705
706 def _pre_launch(self) -> None:
707 super()._pre_launch()
708 if qemu_print:
709 # set QEMU binary output to stdout
710 self._close_qemu_log_file()
711
712 def add_object(self, opts):
713 self._args.append('-object')
714 self._args.append(opts)
715 return self
716
717 def add_device(self, opts):
718 self._args.append('-device')
719 self._args.append(opts)
720 return self
721
722 def add_drive_raw(self, opts):
723 self._args.append('-drive')
724 self._args.append(opts)
725 return self
726
727 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
728 '''Add a virtio-blk drive to the VM'''
729 options = ['if=%s' % interface,
730 'id=drive%d' % self._num_drives]
731
732 if path is not None:
733 options.append('file=%s' % path)
734 options.append('format=%s' % img_format)
735 options.append('cache=%s' % cachemode)
736 options.append('aio=%s' % aiomode)
737
738 if opts:
739 options.append(opts)
740
741 if img_format == 'luks' and 'key-secret' not in opts:
742 # default luks support
743 if luks_default_secret_object not in self._args:
744 self.add_object(luks_default_secret_object)
745
746 options.append(luks_default_key_secret_opt)
747
748 self._args.append('-drive')
749 self._args.append(','.join(options))
750 self._num_drives += 1
751 return self
752
753 def add_blockdev(self, opts):
754 self._args.append('-blockdev')
755 if isinstance(opts, str):
756 self._args.append(opts)
757 else:
758 self._args.append(','.join(opts))
759 return self
760
761 def add_incoming(self, addr):
762 self._args.append('-incoming')
763 self._args.append(addr)
764 return self
765
766 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
767 cmd = 'human-monitor-command'
768 kwargs: Dict[str, Any] = {'command-line': command_line}
769 if use_log:
770 return self.qmp_log(cmd, **kwargs)
771 else:
772 return self.qmp(cmd, **kwargs)
773
774 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
775 """Pause drive r/w operations"""
776 if not event:
777 self.pause_drive(drive, "read_aio")
778 self.pause_drive(drive, "write_aio")
779 return
780 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
781
782 def resume_drive(self, drive: str) -> None:
783 """Resume drive r/w operations"""
784 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
785
786 def hmp_qemu_io(self, drive: str, cmd: str,
787 use_log: bool = False, qdev: bool = False) -> QMPMessage:
788 """Write to a given drive using an HMP command"""
789 d = '-d ' if qdev else ''
790 return self.hmp(f'qemu-io {d}{drive} "{cmd}"', use_log=use_log)
791
792 def flatten_qmp_object(self, obj, output=None, basestr=''):
793 if output is None:
794 output = {}
795 if isinstance(obj, list):
796 for i, item in enumerate(obj):
797 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
798 elif isinstance(obj, dict):
799 for key in obj:
800 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
801 else:
802 output[basestr[:-1]] = obj # Strip trailing '.'
803 return output
804
805 def qmp_to_opts(self, obj):
806 obj = self.flatten_qmp_object(obj)
807 output_list = []
808 for key in obj:
809 output_list += [key + '=' + obj[key]]
810 return ','.join(output_list)
811
812 def get_qmp_events_filtered(self, wait=60.0):
813 result = []
814 for ev in self.get_qmp_events(wait=wait):
815 result.append(filter_qmp_event(ev))
816 return result
817
818 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
819 full_cmd = OrderedDict((
820 ("execute", cmd),
821 ("arguments", ordered_qmp(kwargs))
822 ))
823 log(full_cmd, filters, indent=indent)
824 result = self.qmp(cmd, **kwargs)
825 log(result, filters, indent=indent)
826 return result
827
828 # Returns None on success, and an error string on failure
829 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
830 pre_finalize=None, cancel=False, wait=60.0):
831 """
832 run_job moves a job from creation through to dismissal.
833
834 :param job: String. ID of recently-launched job
835 :param auto_finalize: Bool. True if the job was launched with
836 auto_finalize. Defaults to True.
837 :param auto_dismiss: Bool. True if the job was launched with
838 auto_dismiss=True. Defaults to False.
839 :param pre_finalize: Callback. A callable that takes no arguments to be
840 invoked prior to issuing job-finalize, if any.
841 :param cancel: Bool. When true, cancels the job after the pre_finalize
842 callback.
843 :param wait: Float. Timeout value specifying how long to wait for any
844 event, in seconds. Defaults to 60.0.
845 """
846 match_device = {'data': {'device': job}}
847 match_id = {'data': {'id': job}}
848 events = [
849 ('BLOCK_JOB_COMPLETED', match_device),
850 ('BLOCK_JOB_CANCELLED', match_device),
851 ('BLOCK_JOB_ERROR', match_device),
852 ('BLOCK_JOB_READY', match_device),
853 ('BLOCK_JOB_PENDING', match_id),
854 ('JOB_STATUS_CHANGE', match_id)
855 ]
856 error = None
857 while True:
858 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
859 if ev['event'] != 'JOB_STATUS_CHANGE':
860 log(ev)
861 continue
862 status = ev['data']['status']
863 if status == 'aborting':
864 result = self.qmp('query-jobs')
865 for j in result['return']:
866 if j['id'] == job:
867 error = j['error']
868 log('Job failed: %s' % (j['error']))
869 elif status == 'ready':
870 self.qmp_log('job-complete', id=job)
871 elif status == 'pending' and not auto_finalize:
872 if pre_finalize:
873 pre_finalize()
874 if cancel:
875 self.qmp_log('job-cancel', id=job)
876 else:
877 self.qmp_log('job-finalize', id=job)
878 elif status == 'concluded' and not auto_dismiss:
879 self.qmp_log('job-dismiss', id=job)
880 elif status == 'null':
881 return error
882
883 # Returns None on success, and an error string on failure
884 def blockdev_create(self, options, job_id='job0', filters=None):
885 if filters is None:
886 filters = [filter_qmp_testfiles]
887 result = self.qmp_log('blockdev-create', filters=filters,
888 job_id=job_id, options=options)
889
890 if 'return' in result:
891 assert result['return'] == {}
892 job_result = self.run_job(job_id)
893 else:
894 job_result = result['error']
895
896 log("")
897 return job_result
898
899 def enable_migration_events(self, name):
900 log('Enabling migration QMP events on %s...' % name)
901 log(self.qmp('migrate-set-capabilities', capabilities=[
902 {
903 'capability': 'events',
904 'state': True
905 }
906 ]))
907
908 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
909 while True:
910 event = self.event_wait('MIGRATION')
911 # We use the default timeout, and with a timeout, event_wait()
912 # never returns None
913 assert event
914
915 log(event, filters=[filter_qmp_event])
916 if event['data']['status'] in ('completed', 'failed'):
917 break
918
919 if event['data']['status'] == 'completed':
920 # The event may occur in finish-migrate, so wait for the expected
921 # post-migration runstate
922 runstate = None
923 while runstate != expect_runstate:
924 runstate = self.qmp('query-status')['return']['status']
925 return True
926 else:
927 return False
928
929 def node_info(self, node_name):
930 nodes = self.qmp('query-named-block-nodes')
931 for x in nodes['return']:
932 if x['node-name'] == node_name:
933 return x
934 return None
935
936 def query_bitmaps(self):
937 res = self.qmp("query-named-block-nodes")
938 return {device['node-name']: device['dirty-bitmaps']
939 for device in res['return'] if 'dirty-bitmaps' in device}
940
941 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
942 """
943 get a specific bitmap from the object returned by query_bitmaps.
944 :param recording: If specified, filter results by the specified value.
945 :param bitmaps: If specified, use it instead of call query_bitmaps()
946 """
947 if bitmaps is None:
948 bitmaps = self.query_bitmaps()
949
950 for bitmap in bitmaps[node_name]:
951 if bitmap.get('name', '') == bitmap_name:
952 if recording is None or bitmap.get('recording') == recording:
953 return bitmap
954 return None
955
956 def check_bitmap_status(self, node_name, bitmap_name, fields):
957 ret = self.get_bitmap(node_name, bitmap_name)
958
959 return fields.items() <= ret.items()
960
961 def assert_block_path(self, root, path, expected_node, graph=None):
962 """
963 Check whether the node under the given path in the block graph
964 is @expected_node.
965
966 @root is the node name of the node where the @path is rooted.
967
968 @path is a string that consists of child names separated by
969 slashes. It must begin with a slash.
970
971 Examples for @root + @path:
972 - root="qcow2-node", path="/backing/file"
973 - root="quorum-node", path="/children.2/file"
974
975 Hypothetically, @path could be empty, in which case it would
976 point to @root. However, in practice this case is not useful
977 and hence not allowed.
978
979 @expected_node may be None. (All elements of the path but the
980 leaf must still exist.)
981
982 @graph may be None or the result of an x-debug-query-block-graph
983 call that has already been performed.
984 """
985 if graph is None:
986 graph = self.qmp('x-debug-query-block-graph')['return']
987
988 iter_path = iter(path.split('/'))
989
990 # Must start with a /
991 assert next(iter_path) == ''
992
993 node = next((node for node in graph['nodes'] if node['name'] == root),
994 None)
995
996 # An empty @path is not allowed, so the root node must be present
997 assert node is not None, 'Root node %s not found' % root
998
999 for child_name in iter_path:
1000 assert node is not None, 'Cannot follow path %s%s' % (root, path)
1001
1002 try:
1003 node_id = next(edge['child'] for edge in graph['edges']
1004 if (edge['parent'] == node['id'] and
1005 edge['name'] == child_name))
1006
1007 node = next(node for node in graph['nodes']
1008 if node['id'] == node_id)
1009
1010 except StopIteration:
1011 node = None
1012
1013 if node is None:
1014 assert expected_node is None, \
1015 'No node found under %s (but expected %s)' % \
1016 (path, expected_node)
1017 else:
1018 assert node['name'] == expected_node, \
1019 'Found node %s under %s (but expected %s)' % \
1020 (node['name'], path, expected_node)
1021
1022 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
1023
1024 class QMPTestCase(unittest.TestCase):
1025 '''Abstract base class for QMP test cases'''
1026
1027 def __init__(self, *args, **kwargs):
1028 super().__init__(*args, **kwargs)
1029 # Many users of this class set a VM property we rely on heavily
1030 # in the methods below.
1031 self.vm = None
1032
1033 def dictpath(self, d, path):
1034 '''Traverse a path in a nested dict'''
1035 for component in path.split('/'):
1036 m = index_re.match(component)
1037 if m:
1038 component, idx = m.groups()
1039 idx = int(idx)
1040
1041 if not isinstance(d, dict) or component not in d:
1042 self.fail(f'failed path traversal for "{path}" in "{d}"')
1043 d = d[component]
1044
1045 if m:
1046 if not isinstance(d, list):
1047 self.fail(f'path component "{component}" in "{path}" '
1048 f'is not a list in "{d}"')
1049 try:
1050 d = d[idx]
1051 except IndexError:
1052 self.fail(f'invalid index "{idx}" in path "{path}" '
1053 f'in "{d}"')
1054 return d
1055
1056 def assert_qmp_absent(self, d, path):
1057 try:
1058 result = self.dictpath(d, path)
1059 except AssertionError:
1060 return
1061 self.fail('path "%s" has value "%s"' % (path, str(result)))
1062
1063 def assert_qmp(self, d, path, value):
1064 '''Assert that the value for a specific path in a QMP dict
1065 matches. When given a list of values, assert that any of
1066 them matches.'''
1067
1068 result = self.dictpath(d, path)
1069
1070 # [] makes no sense as a list of valid values, so treat it as
1071 # an actual single value.
1072 if isinstance(value, list) and value != []:
1073 for v in value:
1074 if result == v:
1075 return
1076 self.fail('no match for "%s" in %s' % (str(result), str(value)))
1077 else:
1078 self.assertEqual(result, value,
1079 '"%s" is "%s", expected "%s"'
1080 % (path, str(result), str(value)))
1081
1082 def assert_no_active_block_jobs(self):
1083 result = self.vm.qmp('query-block-jobs')
1084 self.assert_qmp(result, 'return', [])
1085
1086 def assert_has_block_node(self, node_name=None, file_name=None):
1087 """Issue a query-named-block-nodes and assert node_name and/or
1088 file_name is present in the result"""
1089 def check_equal_or_none(a, b):
1090 return a is None or b is None or a == b
1091 assert node_name or file_name
1092 result = self.vm.qmp('query-named-block-nodes')
1093 for x in result["return"]:
1094 if check_equal_or_none(x.get("node-name"), node_name) and \
1095 check_equal_or_none(x.get("file"), file_name):
1096 return
1097 self.fail("Cannot find %s %s in result:\n%s" %
1098 (node_name, file_name, result))
1099
1100 def assert_json_filename_equal(self, json_filename, reference):
1101 '''Asserts that the given filename is a json: filename and that its
1102 content is equal to the given reference object'''
1103 self.assertEqual(json_filename[:5], 'json:')
1104 self.assertEqual(
1105 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
1106 self.vm.flatten_qmp_object(reference)
1107 )
1108
1109 def cancel_and_wait(self, drive='drive0', force=False,
1110 resume=False, wait=60.0):
1111 '''Cancel a block job and wait for it to finish, returning the event'''
1112 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
1113 self.assert_qmp(result, 'return', {})
1114
1115 if resume:
1116 self.vm.resume_drive(drive)
1117
1118 cancelled = False
1119 result = None
1120 while not cancelled:
1121 for event in self.vm.get_qmp_events(wait=wait):
1122 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
1123 event['event'] == 'BLOCK_JOB_CANCELLED':
1124 self.assert_qmp(event, 'data/device', drive)
1125 result = event
1126 cancelled = True
1127 elif event['event'] == 'JOB_STATUS_CHANGE':
1128 self.assert_qmp(event, 'data/id', drive)
1129
1130
1131 self.assert_no_active_block_jobs()
1132 return result
1133
1134 def wait_until_completed(self, drive='drive0', check_offset=True,
1135 wait=60.0, error=None):
1136 '''Wait for a block job to finish, returning the event'''
1137 while True:
1138 for event in self.vm.get_qmp_events(wait=wait):
1139 if event['event'] == 'BLOCK_JOB_COMPLETED':
1140 self.assert_qmp(event, 'data/device', drive)
1141 if error is None:
1142 self.assert_qmp_absent(event, 'data/error')
1143 if check_offset:
1144 self.assert_qmp(event, 'data/offset',
1145 event['data']['len'])
1146 else:
1147 self.assert_qmp(event, 'data/error', error)
1148 self.assert_no_active_block_jobs()
1149 return event
1150 if event['event'] == 'JOB_STATUS_CHANGE':
1151 self.assert_qmp(event, 'data/id', drive)
1152
1153 def wait_ready(self, drive='drive0'):
1154 """Wait until a BLOCK_JOB_READY event, and return the event."""
1155 return self.vm.events_wait([
1156 ('BLOCK_JOB_READY',
1157 {'data': {'type': 'mirror', 'device': drive}}),
1158 ('BLOCK_JOB_READY',
1159 {'data': {'type': 'commit', 'device': drive}})
1160 ])
1161
1162 def wait_ready_and_cancel(self, drive='drive0'):
1163 self.wait_ready(drive=drive)
1164 event = self.cancel_and_wait(drive=drive)
1165 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1166 self.assert_qmp(event, 'data/type', 'mirror')
1167 self.assert_qmp(event, 'data/offset', event['data']['len'])
1168
1169 def complete_and_wait(self, drive='drive0', wait_ready=True,
1170 completion_error=None):
1171 '''Complete a block job and wait for it to finish'''
1172 if wait_ready:
1173 self.wait_ready(drive=drive)
1174
1175 result = self.vm.qmp('block-job-complete', device=drive)
1176 self.assert_qmp(result, 'return', {})
1177
1178 event = self.wait_until_completed(drive=drive, error=completion_error)
1179 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1180
1181 def pause_wait(self, job_id='job0'):
1182 with Timeout(3, "Timeout waiting for job to pause"):
1183 while True:
1184 result = self.vm.qmp('query-block-jobs')
1185 found = False
1186 for job in result['return']:
1187 if job['device'] == job_id:
1188 found = True
1189 if job['paused'] and not job['busy']:
1190 return job
1191 break
1192 assert found
1193
1194 def pause_job(self, job_id='job0', wait=True):
1195 result = self.vm.qmp('block-job-pause', device=job_id)
1196 self.assert_qmp(result, 'return', {})
1197 if wait:
1198 return self.pause_wait(job_id)
1199 return result
1200
1201 def case_skip(self, reason):
1202 '''Skip this test case'''
1203 case_notrun(reason)
1204 self.skipTest(reason)
1205
1206
1207 def notrun(reason):
1208 '''Skip this test suite'''
1209 # Each test in qemu-iotests has a number ("seq")
1210 seq = os.path.basename(sys.argv[0])
1211
1212 with open('%s/%s.notrun' % (output_dir, seq), 'w', encoding='utf-8') \
1213 as outfile:
1214 outfile.write(reason + '\n')
1215 logger.warning("%s not run: %s", seq, reason)
1216 sys.exit(0)
1217
1218 def case_notrun(reason):
1219 '''Mark this test case as not having been run (without actually
1220 skipping it, that is left to the caller). See
1221 QMPTestCase.case_skip() for a variant that actually skips the
1222 current test case.'''
1223
1224 # Each test in qemu-iotests has a number ("seq")
1225 seq = os.path.basename(sys.argv[0])
1226
1227 with open('%s/%s.casenotrun' % (output_dir, seq), 'a', encoding='utf-8') \
1228 as outfile:
1229 outfile.write(' [case not run] ' + reason + '\n')
1230
1231 def _verify_image_format(supported_fmts: Sequence[str] = (),
1232 unsupported_fmts: Sequence[str] = ()) -> None:
1233 if 'generic' in supported_fmts and \
1234 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1235 # similar to
1236 # _supported_fmt generic
1237 # for bash tests
1238 supported_fmts = ()
1239
1240 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1241 if not_sup or (imgfmt in unsupported_fmts):
1242 notrun('not suitable for this image format: %s' % imgfmt)
1243
1244 if imgfmt == 'luks':
1245 verify_working_luks()
1246
1247 def _verify_protocol(supported: Sequence[str] = (),
1248 unsupported: Sequence[str] = ()) -> None:
1249 assert not (supported and unsupported)
1250
1251 if 'generic' in supported:
1252 return
1253
1254 not_sup = supported and (imgproto not in supported)
1255 if not_sup or (imgproto in unsupported):
1256 notrun('not suitable for this protocol: %s' % imgproto)
1257
1258 def _verify_platform(supported: Sequence[str] = (),
1259 unsupported: Sequence[str] = ()) -> None:
1260 if any((sys.platform.startswith(x) for x in unsupported)):
1261 notrun('not suitable for this OS: %s' % sys.platform)
1262
1263 if supported:
1264 if not any((sys.platform.startswith(x) for x in supported)):
1265 notrun('not suitable for this OS: %s' % sys.platform)
1266
1267 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1268 if supported_cache_modes and (cachemode not in supported_cache_modes):
1269 notrun('not suitable for this cache mode: %s' % cachemode)
1270
1271 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1272 if supported_aio_modes and (aiomode not in supported_aio_modes):
1273 notrun('not suitable for this aio mode: %s' % aiomode)
1274
1275 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1276 usf_list = list(set(required_formats) - set(supported_formats()))
1277 if usf_list:
1278 notrun(f'formats {usf_list} are not whitelisted')
1279
1280
1281 def _verify_virtio_blk() -> None:
1282 out = qemu_pipe('-M', 'none', '-device', 'help')
1283 if 'virtio-blk' not in out:
1284 notrun('Missing virtio-blk in QEMU binary')
1285
1286 def _verify_virtio_scsi_pci_or_ccw() -> None:
1287 out = qemu_pipe('-M', 'none', '-device', 'help')
1288 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1289 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1290
1291
1292 def _verify_imgopts(unsupported: Sequence[str] = ()) -> None:
1293 imgopts = os.environ.get('IMGOPTS')
1294 # One of usage examples for IMGOPTS is "data_file=$TEST_IMG.ext_data_file"
1295 # but it supported only for bash tests. We don't have a concept of global
1296 # TEST_IMG in iotests.py, not saying about somehow parsing $variables.
1297 # So, for simplicity let's just not support any IMGOPTS with '$' inside.
1298 unsup = list(unsupported) + ['$']
1299 if imgopts and any(x in imgopts for x in unsup):
1300 notrun(f'not suitable for this imgopts: {imgopts}')
1301
1302
1303 def supports_quorum():
1304 return 'quorum' in qemu_img_pipe('--help')
1305
1306 def verify_quorum():
1307 '''Skip test suite if quorum support is not available'''
1308 if not supports_quorum():
1309 notrun('quorum support missing')
1310
1311 def has_working_luks() -> Tuple[bool, str]:
1312 """
1313 Check whether our LUKS driver can actually create images
1314 (this extends to LUKS encryption for qcow2).
1315
1316 If not, return the reason why.
1317 """
1318
1319 img_file = f'{test_dir}/luks-test.luks'
1320 (output, status) = \
1321 qemu_img_pipe_and_status('create', '-f', 'luks',
1322 '--object', luks_default_secret_object,
1323 '-o', luks_default_key_secret_opt,
1324 '-o', 'iter-time=10',
1325 img_file, '1G')
1326 try:
1327 os.remove(img_file)
1328 except OSError:
1329 pass
1330
1331 if status != 0:
1332 reason = output
1333 for line in output.splitlines():
1334 if img_file + ':' in line:
1335 reason = line.split(img_file + ':', 1)[1].strip()
1336 break
1337
1338 return (False, reason)
1339 else:
1340 return (True, '')
1341
1342 def verify_working_luks():
1343 """
1344 Skip test suite if LUKS does not work
1345 """
1346 (working, reason) = has_working_luks()
1347 if not working:
1348 notrun(reason)
1349
1350 def qemu_pipe(*args: str) -> str:
1351 """
1352 Run qemu with an option to print something and exit (e.g. a help option).
1353
1354 :return: QEMU's stdout output.
1355 """
1356 full_args = [qemu_prog] + qemu_opts + list(args)
1357 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1358 return output
1359
1360 def supported_formats(read_only=False):
1361 '''Set 'read_only' to True to check ro-whitelist
1362 Otherwise, rw-whitelist is checked'''
1363
1364 if not hasattr(supported_formats, "formats"):
1365 supported_formats.formats = {}
1366
1367 if read_only not in supported_formats.formats:
1368 format_message = qemu_pipe("-drive", "format=help")
1369 line = 1 if read_only else 0
1370 supported_formats.formats[read_only] = \
1371 format_message.splitlines()[line].split(":")[1].split()
1372
1373 return supported_formats.formats[read_only]
1374
1375 def skip_if_unsupported(required_formats=(), read_only=False):
1376 '''Skip Test Decorator
1377 Runs the test if all the required formats are whitelisted'''
1378 def skip_test_decorator(func):
1379 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1380 **kwargs: Dict[str, Any]) -> None:
1381 if callable(required_formats):
1382 fmts = required_formats(test_case)
1383 else:
1384 fmts = required_formats
1385
1386 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1387 if usf_list:
1388 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1389 test_case.case_skip(msg)
1390 else:
1391 func(test_case, *args, **kwargs)
1392 return func_wrapper
1393 return skip_test_decorator
1394
1395 def skip_for_formats(formats: Sequence[str] = ()) \
1396 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1397 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1398 '''Skip Test Decorator
1399 Skips the test for the given formats'''
1400 def skip_test_decorator(func):
1401 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1402 **kwargs: Dict[str, Any]) -> None:
1403 if imgfmt in formats:
1404 msg = f'{test_case}: Skipped for format {imgfmt}'
1405 test_case.case_skip(msg)
1406 else:
1407 func(test_case, *args, **kwargs)
1408 return func_wrapper
1409 return skip_test_decorator
1410
1411 def skip_if_user_is_root(func):
1412 '''Skip Test Decorator
1413 Runs the test only without root permissions'''
1414 def func_wrapper(*args, **kwargs):
1415 if os.getuid() == 0:
1416 case_notrun('{}: cannot be run as root'.format(args[0]))
1417 return None
1418 else:
1419 return func(*args, **kwargs)
1420 return func_wrapper
1421
1422 # We need to filter out the time taken from the output so that
1423 # qemu-iotest can reliably diff the results against master output,
1424 # and hide skipped tests from the reference output.
1425
1426 class ReproducibleTestResult(unittest.TextTestResult):
1427 def addSkip(self, test, reason):
1428 # Same as TextTestResult, but print dot instead of "s"
1429 unittest.TestResult.addSkip(self, test, reason)
1430 if self.showAll:
1431 self.stream.writeln("skipped {0!r}".format(reason))
1432 elif self.dots:
1433 self.stream.write(".")
1434 self.stream.flush()
1435
1436 class ReproducibleStreamWrapper:
1437 def __init__(self, stream: TextIO):
1438 self.stream = stream
1439
1440 def __getattr__(self, attr):
1441 if attr in ('stream', '__getstate__'):
1442 raise AttributeError(attr)
1443 return getattr(self.stream, attr)
1444
1445 def write(self, arg=None):
1446 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1447 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1448 self.stream.write(arg)
1449
1450 class ReproducibleTestRunner(unittest.TextTestRunner):
1451 def __init__(self, stream: Optional[TextIO] = None,
1452 resultclass: Type[unittest.TestResult] =
1453 ReproducibleTestResult,
1454 **kwargs: Any) -> None:
1455 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1456 super().__init__(stream=rstream, # type: ignore
1457 descriptions=True,
1458 resultclass=resultclass,
1459 **kwargs)
1460
1461 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1462 """Executes unittests within the calling module."""
1463
1464 # Some tests have warnings, especially ResourceWarnings for unclosed
1465 # files and sockets. Ignore them for now to ensure reproducibility of
1466 # the test output.
1467 unittest.main(argv=argv,
1468 testRunner=ReproducibleTestRunner,
1469 verbosity=2 if debug else 1,
1470 warnings=None if sys.warnoptions else 'ignore')
1471
1472 def execute_setup_common(supported_fmts: Sequence[str] = (),
1473 supported_platforms: Sequence[str] = (),
1474 supported_cache_modes: Sequence[str] = (),
1475 supported_aio_modes: Sequence[str] = (),
1476 unsupported_fmts: Sequence[str] = (),
1477 supported_protocols: Sequence[str] = (),
1478 unsupported_protocols: Sequence[str] = (),
1479 required_fmts: Sequence[str] = (),
1480 unsupported_imgopts: Sequence[str] = ()) -> bool:
1481 """
1482 Perform necessary setup for either script-style or unittest-style tests.
1483
1484 :return: Bool; Whether or not debug mode has been requested via the CLI.
1485 """
1486 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1487
1488 debug = '-d' in sys.argv
1489 if debug:
1490 sys.argv.remove('-d')
1491 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1492
1493 _verify_image_format(supported_fmts, unsupported_fmts)
1494 _verify_protocol(supported_protocols, unsupported_protocols)
1495 _verify_platform(supported=supported_platforms)
1496 _verify_cache_mode(supported_cache_modes)
1497 _verify_aio_mode(supported_aio_modes)
1498 _verify_formats(required_fmts)
1499 _verify_virtio_blk()
1500 _verify_imgopts(unsupported_imgopts)
1501
1502 return debug
1503
1504 def execute_test(*args, test_function=None, **kwargs):
1505 """Run either unittest or script-style tests."""
1506
1507 debug = execute_setup_common(*args, **kwargs)
1508 if not test_function:
1509 execute_unittest(sys.argv, debug)
1510 else:
1511 test_function()
1512
1513 def activate_logging():
1514 """Activate iotests.log() output to stdout for script-style tests."""
1515 handler = logging.StreamHandler(stream=sys.stdout)
1516 formatter = logging.Formatter('%(message)s')
1517 handler.setFormatter(formatter)
1518 test_logger.addHandler(handler)
1519 test_logger.setLevel(logging.INFO)
1520 test_logger.propagate = False
1521
1522 # This is called from script-style iotests without a single point of entry
1523 def script_initialize(*args, **kwargs):
1524 """Initialize script-style tests without running any tests."""
1525 activate_logging()
1526 execute_setup_common(*args, **kwargs)
1527
1528 # This is called from script-style iotests with a single point of entry
1529 def script_main(test_function, *args, **kwargs):
1530 """Run script-style tests outside of the unittest framework"""
1531 activate_logging()
1532 execute_test(*args, test_function=test_function, **kwargs)
1533
1534 # This is called from unittest style iotests
1535 def main(*args, **kwargs):
1536 """Run tests using the unittest framework"""
1537 execute_test(*args, **kwargs)