]> git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/iotests.py
Merge remote-tracking branch 'remotes/nvme/tags/nvme-next-pull-request' into staging
[mirror_qemu.git] / tests / qemu-iotests / iotests.py
1 # Common utilities and Python wrappers for qemu-iotests
2 #
3 # Copyright (C) 2012 IBM Corp.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import atexit
20 import bz2
21 from collections import OrderedDict
22 import faulthandler
23 import json
24 import logging
25 import os
26 import re
27 import shutil
28 import signal
29 import struct
30 import subprocess
31 import sys
32 import time
33 from typing import (Any, Callable, Dict, Iterable,
34 List, Optional, Sequence, TextIO, Tuple, Type, TypeVar)
35 import unittest
36
37 from contextlib import contextmanager
38
39 # pylint: disable=import-error, wrong-import-position
40 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
41 from qemu import qtest
42 from qemu.qmp import QMPMessage
43
44 # Use this logger for logging messages directly from the iotests module
45 logger = logging.getLogger('qemu.iotests')
46 logger.addHandler(logging.NullHandler())
47
48 # Use this logger for messages that ought to be used for diff output.
49 test_logger = logging.getLogger('qemu.iotests.diff_io')
50
51
52 faulthandler.enable()
53
54 # This will not work if arguments contain spaces but is necessary if we
55 # want to support the override options that ./check supports.
56 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
57 if os.environ.get('QEMU_IMG_OPTIONS'):
58 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
59
60 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
61 if os.environ.get('QEMU_IO_OPTIONS'):
62 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
63
64 qemu_io_args_no_fmt = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
65 if os.environ.get('QEMU_IO_OPTIONS_NO_FMT'):
66 qemu_io_args_no_fmt += \
67 os.environ['QEMU_IO_OPTIONS_NO_FMT'].strip().split(' ')
68
69 qemu_nbd_prog = os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')
70 qemu_nbd_args = [qemu_nbd_prog]
71 if os.environ.get('QEMU_NBD_OPTIONS'):
72 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
73
74 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
75 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
76
77 imgfmt = os.environ.get('IMGFMT', 'raw')
78 imgproto = os.environ.get('IMGPROTO', 'file')
79 output_dir = os.environ.get('OUTPUT_DIR', '.')
80
81 try:
82 test_dir = os.environ['TEST_DIR']
83 sock_dir = os.environ['SOCK_DIR']
84 cachemode = os.environ['CACHEMODE']
85 aiomode = os.environ['AIOMODE']
86 qemu_default_machine = os.environ['QEMU_DEFAULT_MACHINE']
87 except KeyError:
88 # We are using these variables as proxies to indicate that we're
89 # not being run via "check". There may be other things set up by
90 # "check" that individual test cases rely on.
91 sys.stderr.write('Please run this test via the "check" script\n')
92 sys.exit(os.EX_USAGE)
93
94 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
95
96 luks_default_secret_object = 'secret,id=keysec0,data=' + \
97 os.environ.get('IMGKEYSECRET', '')
98 luks_default_key_secret_opt = 'key-secret=keysec0'
99
100 sample_img_dir = os.environ['SAMPLE_IMG_DIR']
101
102
103 def unarchive_sample_image(sample, fname):
104 sample_fname = os.path.join(sample_img_dir, sample + '.bz2')
105 with bz2.open(sample_fname) as f_in, open(fname, 'wb') as f_out:
106 shutil.copyfileobj(f_in, f_out)
107
108
109 def qemu_tool_pipe_and_status(tool: str, args: Sequence[str],
110 connect_stderr: bool = True) -> Tuple[str, int]:
111 """
112 Run a tool and return both its output and its exit code
113 """
114 stderr = subprocess.STDOUT if connect_stderr else None
115 with subprocess.Popen(args, stdout=subprocess.PIPE,
116 stderr=stderr, universal_newlines=True) as subp:
117 output = subp.communicate()[0]
118 if subp.returncode < 0:
119 cmd = ' '.join(args)
120 sys.stderr.write(f'{tool} received signal \
121 {-subp.returncode}: {cmd}\n')
122 return (output, subp.returncode)
123
124 def qemu_img_pipe_and_status(*args: str) -> Tuple[str, int]:
125 """
126 Run qemu-img and return both its output and its exit code
127 """
128 full_args = qemu_img_args + list(args)
129 return qemu_tool_pipe_and_status('qemu-img', full_args)
130
131 def qemu_img(*args: str) -> int:
132 '''Run qemu-img and return the exit code'''
133 return qemu_img_pipe_and_status(*args)[1]
134
135 def ordered_qmp(qmsg, conv_keys=True):
136 # Dictionaries are not ordered prior to 3.6, therefore:
137 if isinstance(qmsg, list):
138 return [ordered_qmp(atom) for atom in qmsg]
139 if isinstance(qmsg, dict):
140 od = OrderedDict()
141 for k, v in sorted(qmsg.items()):
142 if conv_keys:
143 k = k.replace('_', '-')
144 od[k] = ordered_qmp(v, conv_keys=False)
145 return od
146 return qmsg
147
148 def qemu_img_create(*args):
149 args = list(args)
150
151 # default luks support
152 if '-f' in args and args[args.index('-f') + 1] == 'luks':
153 if '-o' in args:
154 i = args.index('-o')
155 if 'key-secret' not in args[i + 1]:
156 args[i + 1].append(luks_default_key_secret_opt)
157 args.insert(i + 2, '--object')
158 args.insert(i + 3, luks_default_secret_object)
159 else:
160 args = ['-o', luks_default_key_secret_opt,
161 '--object', luks_default_secret_object] + args
162
163 args.insert(0, 'create')
164
165 return qemu_img(*args)
166
167 def qemu_img_measure(*args):
168 return json.loads(qemu_img_pipe("measure", "--output", "json", *args))
169
170 def qemu_img_check(*args):
171 return json.loads(qemu_img_pipe("check", "--output", "json", *args))
172
173 def qemu_img_verbose(*args):
174 '''Run qemu-img without suppressing its output and return the exit code'''
175 exitcode = subprocess.call(qemu_img_args + list(args))
176 if exitcode < 0:
177 sys.stderr.write('qemu-img received signal %i: %s\n'
178 % (-exitcode, ' '.join(qemu_img_args + list(args))))
179 return exitcode
180
181 def qemu_img_pipe(*args: str) -> str:
182 '''Run qemu-img and return its output'''
183 return qemu_img_pipe_and_status(*args)[0]
184
185 def qemu_img_log(*args):
186 result = qemu_img_pipe(*args)
187 log(result, filters=[filter_testfiles])
188 return result
189
190 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()):
191 args = ['info']
192 if imgopts:
193 args.append('--image-opts')
194 else:
195 args += ['-f', imgfmt]
196 args += extra_args
197 args.append(filename)
198
199 output = qemu_img_pipe(*args)
200 if not filter_path:
201 filter_path = filename
202 log(filter_img_info(output, filter_path))
203
204 def qemu_io(*args):
205 '''Run qemu-io and return the stdout data'''
206 args = qemu_io_args + list(args)
207 return qemu_tool_pipe_and_status('qemu-io', args)[0]
208
209 def qemu_io_log(*args):
210 result = qemu_io(*args)
211 log(result, filters=[filter_testfiles, filter_qemu_io])
212 return result
213
214 def qemu_io_silent(*args):
215 '''Run qemu-io and return the exit code, suppressing stdout'''
216 if '-f' in args or '--image-opts' in args:
217 default_args = qemu_io_args_no_fmt
218 else:
219 default_args = qemu_io_args
220
221 args = default_args + list(args)
222 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
223 if exitcode < 0:
224 sys.stderr.write('qemu-io received signal %i: %s\n' %
225 (-exitcode, ' '.join(args)))
226 return exitcode
227
228 def qemu_io_silent_check(*args):
229 '''Run qemu-io and return the true if subprocess returned 0'''
230 args = qemu_io_args + list(args)
231 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'),
232 stderr=subprocess.STDOUT)
233 return exitcode == 0
234
235 class QemuIoInteractive:
236 def __init__(self, *args):
237 self.args = qemu_io_args_no_fmt + list(args)
238 # We need to keep the Popen objext around, and not
239 # close it immediately. Therefore, disable the pylint check:
240 # pylint: disable=consider-using-with
241 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
242 stdout=subprocess.PIPE,
243 stderr=subprocess.STDOUT,
244 universal_newlines=True)
245 out = self._p.stdout.read(9)
246 if out != 'qemu-io> ':
247 # Most probably qemu-io just failed to start.
248 # Let's collect the whole output and exit.
249 out += self._p.stdout.read()
250 self._p.wait(timeout=1)
251 raise ValueError(out)
252
253 def close(self):
254 self._p.communicate('q\n')
255
256 def _read_output(self):
257 pattern = 'qemu-io> '
258 n = len(pattern)
259 pos = 0
260 s = []
261 while pos != n:
262 c = self._p.stdout.read(1)
263 # check unexpected EOF
264 assert c != ''
265 s.append(c)
266 if c == pattern[pos]:
267 pos += 1
268 else:
269 pos = 0
270
271 return ''.join(s[:-n])
272
273 def cmd(self, cmd):
274 # quit command is in close(), '\n' is added automatically
275 assert '\n' not in cmd
276 cmd = cmd.strip()
277 assert cmd not in ('q', 'quit')
278 self._p.stdin.write(cmd + '\n')
279 self._p.stdin.flush()
280 return self._read_output()
281
282
283 def qemu_nbd(*args):
284 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
285 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
286
287 def qemu_nbd_early_pipe(*args: str) -> Tuple[int, str]:
288 '''Run qemu-nbd in daemon mode and return both the parent's exit code
289 and its output in case of an error'''
290 full_args = qemu_nbd_args + ['--fork'] + list(args)
291 output, returncode = qemu_tool_pipe_and_status('qemu-nbd', full_args,
292 connect_stderr=False)
293 return returncode, output if returncode else ''
294
295 def qemu_nbd_list_log(*args: str) -> str:
296 '''Run qemu-nbd to list remote exports'''
297 full_args = [qemu_nbd_prog, '-L'] + list(args)
298 output, _ = qemu_tool_pipe_and_status('qemu-nbd', full_args)
299 log(output, filters=[filter_testfiles, filter_nbd_exports])
300 return output
301
302 @contextmanager
303 def qemu_nbd_popen(*args):
304 '''Context manager running qemu-nbd within the context'''
305 pid_file = file_path("qemu_nbd_popen-nbd-pid-file")
306
307 assert not os.path.exists(pid_file)
308
309 cmd = list(qemu_nbd_args)
310 cmd.extend(('--persistent', '--pid-file', pid_file))
311 cmd.extend(args)
312
313 log('Start NBD server')
314 with subprocess.Popen(cmd) as p:
315 try:
316 while not os.path.exists(pid_file):
317 if p.poll() is not None:
318 raise RuntimeError(
319 "qemu-nbd terminated with exit code {}: {}"
320 .format(p.returncode, ' '.join(cmd)))
321
322 time.sleep(0.01)
323 yield
324 finally:
325 if os.path.exists(pid_file):
326 os.remove(pid_file)
327 log('Kill NBD server')
328 p.kill()
329 p.wait()
330
331 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
332 '''Return True if two image files are identical'''
333 return qemu_img('compare', '-f', fmt1,
334 '-F', fmt2, img1, img2) == 0
335
336 def create_image(name, size):
337 '''Create a fully-allocated raw image with sector markers'''
338 with open(name, 'wb') as file:
339 i = 0
340 while i < size:
341 sector = struct.pack('>l504xl', i // 512, i // 512)
342 file.write(sector)
343 i = i + 512
344
345 def image_size(img):
346 '''Return image's virtual size'''
347 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
348 return json.loads(r)['virtual-size']
349
350 def is_str(val):
351 return isinstance(val, str)
352
353 test_dir_re = re.compile(r"%s" % test_dir)
354 def filter_test_dir(msg):
355 return test_dir_re.sub("TEST_DIR", msg)
356
357 win32_re = re.compile(r"\r")
358 def filter_win32(msg):
359 return win32_re.sub("", msg)
360
361 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* "
362 r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec "
363 r"and [0-9\/.inf]* ops\/sec\)")
364 def filter_qemu_io(msg):
365 msg = filter_win32(msg)
366 return qemu_io_re.sub("X ops; XX:XX:XX.X "
367 "(XXX YYY/sec and XXX ops/sec)", msg)
368
369 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
370 def filter_chown(msg):
371 return chown_re.sub("chown UID:GID", msg)
372
373 def filter_qmp_event(event):
374 '''Filter a QMP event dict'''
375 event = dict(event)
376 if 'timestamp' in event:
377 event['timestamp']['seconds'] = 'SECS'
378 event['timestamp']['microseconds'] = 'USECS'
379 return event
380
381 def filter_qmp(qmsg, filter_fn):
382 '''Given a string filter, filter a QMP object's values.
383 filter_fn takes a (key, value) pair.'''
384 # Iterate through either lists or dicts;
385 if isinstance(qmsg, list):
386 items = enumerate(qmsg)
387 else:
388 items = qmsg.items()
389
390 for k, v in items:
391 if isinstance(v, (dict, list)):
392 qmsg[k] = filter_qmp(v, filter_fn)
393 else:
394 qmsg[k] = filter_fn(k, v)
395 return qmsg
396
397 def filter_testfiles(msg):
398 pref1 = os.path.join(test_dir, "%s-" % (os.getpid()))
399 pref2 = os.path.join(sock_dir, "%s-" % (os.getpid()))
400 return msg.replace(pref1, 'TEST_DIR/PID-').replace(pref2, 'SOCK_DIR/PID-')
401
402 def filter_qmp_testfiles(qmsg):
403 def _filter(_key, value):
404 if is_str(value):
405 return filter_testfiles(value)
406 return value
407 return filter_qmp(qmsg, _filter)
408
409 def filter_virtio_scsi(output: str) -> str:
410 return re.sub(r'(virtio-scsi)-(ccw|pci)', r'\1', output)
411
412 def filter_qmp_virtio_scsi(qmsg):
413 def _filter(_key, value):
414 if is_str(value):
415 return filter_virtio_scsi(value)
416 return value
417 return filter_qmp(qmsg, _filter)
418
419 def filter_generated_node_ids(msg):
420 return re.sub("#block[0-9]+", "NODE_NAME", msg)
421
422 def filter_img_info(output, filename):
423 lines = []
424 for line in output.split('\n'):
425 if 'disk size' in line or 'actual-size' in line:
426 continue
427 line = line.replace(filename, 'TEST_IMG')
428 line = filter_testfiles(line)
429 line = line.replace(imgfmt, 'IMGFMT')
430 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
431 line = re.sub('uuid: [-a-f0-9]+',
432 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',
433 line)
434 line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line)
435 lines.append(line)
436 return '\n'.join(lines)
437
438 def filter_imgfmt(msg):
439 return msg.replace(imgfmt, 'IMGFMT')
440
441 def filter_qmp_imgfmt(qmsg):
442 def _filter(_key, value):
443 if is_str(value):
444 return filter_imgfmt(value)
445 return value
446 return filter_qmp(qmsg, _filter)
447
448 def filter_nbd_exports(output: str) -> str:
449 return re.sub(r'((min|opt|max) block): [0-9]+', r'\1: XXX', output)
450
451
452 Msg = TypeVar('Msg', Dict[str, Any], List[Any], str)
453
454 def log(msg: Msg,
455 filters: Iterable[Callable[[Msg], Msg]] = (),
456 indent: Optional[int] = None) -> None:
457 """
458 Logs either a string message or a JSON serializable message (like QMP).
459 If indent is provided, JSON serializable messages are pretty-printed.
460 """
461 for flt in filters:
462 msg = flt(msg)
463 if isinstance(msg, (dict, list)):
464 # Don't sort if it's already sorted
465 do_sort = not isinstance(msg, OrderedDict)
466 test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent))
467 else:
468 test_logger.info(msg)
469
470 class Timeout:
471 def __init__(self, seconds, errmsg="Timeout"):
472 self.seconds = seconds
473 self.errmsg = errmsg
474 def __enter__(self):
475 signal.signal(signal.SIGALRM, self.timeout)
476 signal.setitimer(signal.ITIMER_REAL, self.seconds)
477 return self
478 def __exit__(self, exc_type, value, traceback):
479 signal.setitimer(signal.ITIMER_REAL, 0)
480 return False
481 def timeout(self, signum, frame):
482 raise Exception(self.errmsg)
483
484 def file_pattern(name):
485 return "{0}-{1}".format(os.getpid(), name)
486
487 class FilePath:
488 """
489 Context manager generating multiple file names. The generated files are
490 removed when exiting the context.
491
492 Example usage:
493
494 with FilePath('a.img', 'b.img') as (img_a, img_b):
495 # Use img_a and img_b here...
496
497 # a.img and b.img are automatically removed here.
498
499 By default images are created in iotests.test_dir. To create sockets use
500 iotests.sock_dir:
501
502 with FilePath('a.sock', base_dir=iotests.sock_dir) as sock:
503
504 For convenience, calling with one argument yields a single file instead of
505 a tuple with one item.
506
507 """
508 def __init__(self, *names, base_dir=test_dir):
509 self.paths = [os.path.join(base_dir, file_pattern(name))
510 for name in names]
511
512 def __enter__(self):
513 if len(self.paths) == 1:
514 return self.paths[0]
515 else:
516 return self.paths
517
518 def __exit__(self, exc_type, exc_val, exc_tb):
519 for path in self.paths:
520 try:
521 os.remove(path)
522 except OSError:
523 pass
524 return False
525
526
527 def try_remove(img):
528 try:
529 os.remove(img)
530 except OSError:
531 pass
532
533 def file_path_remover():
534 for path in reversed(file_path_remover.paths):
535 try_remove(path)
536
537
538 def file_path(*names, base_dir=test_dir):
539 ''' Another way to get auto-generated filename that cleans itself up.
540
541 Use is as simple as:
542
543 img_a, img_b = file_path('a.img', 'b.img')
544 sock = file_path('socket')
545 '''
546
547 if not hasattr(file_path_remover, 'paths'):
548 file_path_remover.paths = []
549 atexit.register(file_path_remover)
550
551 paths = []
552 for name in names:
553 filename = file_pattern(name)
554 path = os.path.join(base_dir, filename)
555 file_path_remover.paths.append(path)
556 paths.append(path)
557
558 return paths[0] if len(paths) == 1 else paths
559
560 def remote_filename(path):
561 if imgproto == 'file':
562 return path
563 elif imgproto == 'ssh':
564 return "ssh://%s@127.0.0.1:22%s" % (os.environ.get('USER'), path)
565 else:
566 raise Exception("Protocol %s not supported" % (imgproto))
567
568 class VM(qtest.QEMUQtestMachine):
569 '''A QEMU VM'''
570
571 def __init__(self, path_suffix=''):
572 name = "qemu%s-%d" % (path_suffix, os.getpid())
573 super().__init__(qemu_prog, qemu_opts, name=name,
574 test_dir=test_dir,
575 socket_scm_helper=socket_scm_helper,
576 sock_dir=sock_dir)
577 self._num_drives = 0
578
579 def add_object(self, opts):
580 self._args.append('-object')
581 self._args.append(opts)
582 return self
583
584 def add_device(self, opts):
585 self._args.append('-device')
586 self._args.append(opts)
587 return self
588
589 def add_drive_raw(self, opts):
590 self._args.append('-drive')
591 self._args.append(opts)
592 return self
593
594 def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt):
595 '''Add a virtio-blk drive to the VM'''
596 options = ['if=%s' % interface,
597 'id=drive%d' % self._num_drives]
598
599 if path is not None:
600 options.append('file=%s' % path)
601 options.append('format=%s' % img_format)
602 options.append('cache=%s' % cachemode)
603 options.append('aio=%s' % aiomode)
604
605 if opts:
606 options.append(opts)
607
608 if img_format == 'luks' and 'key-secret' not in opts:
609 # default luks support
610 if luks_default_secret_object not in self._args:
611 self.add_object(luks_default_secret_object)
612
613 options.append(luks_default_key_secret_opt)
614
615 self._args.append('-drive')
616 self._args.append(','.join(options))
617 self._num_drives += 1
618 return self
619
620 def add_blockdev(self, opts):
621 self._args.append('-blockdev')
622 if isinstance(opts, str):
623 self._args.append(opts)
624 else:
625 self._args.append(','.join(opts))
626 return self
627
628 def add_incoming(self, addr):
629 self._args.append('-incoming')
630 self._args.append(addr)
631 return self
632
633 def hmp(self, command_line: str, use_log: bool = False) -> QMPMessage:
634 cmd = 'human-monitor-command'
635 kwargs: Dict[str, Any] = {'command-line': command_line}
636 if use_log:
637 return self.qmp_log(cmd, **kwargs)
638 else:
639 return self.qmp(cmd, **kwargs)
640
641 def pause_drive(self, drive: str, event: Optional[str] = None) -> None:
642 """Pause drive r/w operations"""
643 if not event:
644 self.pause_drive(drive, "read_aio")
645 self.pause_drive(drive, "write_aio")
646 return
647 self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"')
648
649 def resume_drive(self, drive: str) -> None:
650 """Resume drive r/w operations"""
651 self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"')
652
653 def hmp_qemu_io(self, drive: str, cmd: str,
654 use_log: bool = False) -> QMPMessage:
655 """Write to a given drive using an HMP command"""
656 return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log)
657
658 def flatten_qmp_object(self, obj, output=None, basestr=''):
659 if output is None:
660 output = dict()
661 if isinstance(obj, list):
662 for i, item in enumerate(obj):
663 self.flatten_qmp_object(item, output, basestr + str(i) + '.')
664 elif isinstance(obj, dict):
665 for key in obj:
666 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
667 else:
668 output[basestr[:-1]] = obj # Strip trailing '.'
669 return output
670
671 def qmp_to_opts(self, obj):
672 obj = self.flatten_qmp_object(obj)
673 output_list = list()
674 for key in obj:
675 output_list += [key + '=' + obj[key]]
676 return ','.join(output_list)
677
678 def get_qmp_events_filtered(self, wait=60.0):
679 result = []
680 for ev in self.get_qmp_events(wait=wait):
681 result.append(filter_qmp_event(ev))
682 return result
683
684 def qmp_log(self, cmd, filters=(), indent=None, **kwargs):
685 full_cmd = OrderedDict((
686 ("execute", cmd),
687 ("arguments", ordered_qmp(kwargs))
688 ))
689 log(full_cmd, filters, indent=indent)
690 result = self.qmp(cmd, **kwargs)
691 log(result, filters, indent=indent)
692 return result
693
694 # Returns None on success, and an error string on failure
695 def run_job(self, job, auto_finalize=True, auto_dismiss=False,
696 pre_finalize=None, cancel=False, wait=60.0):
697 """
698 run_job moves a job from creation through to dismissal.
699
700 :param job: String. ID of recently-launched job
701 :param auto_finalize: Bool. True if the job was launched with
702 auto_finalize. Defaults to True.
703 :param auto_dismiss: Bool. True if the job was launched with
704 auto_dismiss=True. Defaults to False.
705 :param pre_finalize: Callback. A callable that takes no arguments to be
706 invoked prior to issuing job-finalize, if any.
707 :param cancel: Bool. When true, cancels the job after the pre_finalize
708 callback.
709 :param wait: Float. Timeout value specifying how long to wait for any
710 event, in seconds. Defaults to 60.0.
711 """
712 match_device = {'data': {'device': job}}
713 match_id = {'data': {'id': job}}
714 events = [
715 ('BLOCK_JOB_COMPLETED', match_device),
716 ('BLOCK_JOB_CANCELLED', match_device),
717 ('BLOCK_JOB_ERROR', match_device),
718 ('BLOCK_JOB_READY', match_device),
719 ('BLOCK_JOB_PENDING', match_id),
720 ('JOB_STATUS_CHANGE', match_id)
721 ]
722 error = None
723 while True:
724 ev = filter_qmp_event(self.events_wait(events, timeout=wait))
725 if ev['event'] != 'JOB_STATUS_CHANGE':
726 log(ev)
727 continue
728 status = ev['data']['status']
729 if status == 'aborting':
730 result = self.qmp('query-jobs')
731 for j in result['return']:
732 if j['id'] == job:
733 error = j['error']
734 log('Job failed: %s' % (j['error']))
735 elif status == 'ready':
736 self.qmp_log('job-complete', id=job)
737 elif status == 'pending' and not auto_finalize:
738 if pre_finalize:
739 pre_finalize()
740 if cancel:
741 self.qmp_log('job-cancel', id=job)
742 else:
743 self.qmp_log('job-finalize', id=job)
744 elif status == 'concluded' and not auto_dismiss:
745 self.qmp_log('job-dismiss', id=job)
746 elif status == 'null':
747 return error
748
749 # Returns None on success, and an error string on failure
750 def blockdev_create(self, options, job_id='job0', filters=None):
751 if filters is None:
752 filters = [filter_qmp_testfiles]
753 result = self.qmp_log('blockdev-create', filters=filters,
754 job_id=job_id, options=options)
755
756 if 'return' in result:
757 assert result['return'] == {}
758 job_result = self.run_job(job_id)
759 else:
760 job_result = result['error']
761
762 log("")
763 return job_result
764
765 def enable_migration_events(self, name):
766 log('Enabling migration QMP events on %s...' % name)
767 log(self.qmp('migrate-set-capabilities', capabilities=[
768 {
769 'capability': 'events',
770 'state': True
771 }
772 ]))
773
774 def wait_migration(self, expect_runstate: Optional[str]) -> bool:
775 while True:
776 event = self.event_wait('MIGRATION')
777 # We use the default timeout, and with a timeout, event_wait()
778 # never returns None
779 assert event
780
781 log(event, filters=[filter_qmp_event])
782 if event['data']['status'] in ('completed', 'failed'):
783 break
784
785 if event['data']['status'] == 'completed':
786 # The event may occur in finish-migrate, so wait for the expected
787 # post-migration runstate
788 runstate = None
789 while runstate != expect_runstate:
790 runstate = self.qmp('query-status')['return']['status']
791 return True
792 else:
793 return False
794
795 def node_info(self, node_name):
796 nodes = self.qmp('query-named-block-nodes')
797 for x in nodes['return']:
798 if x['node-name'] == node_name:
799 return x
800 return None
801
802 def query_bitmaps(self):
803 res = self.qmp("query-named-block-nodes")
804 return {device['node-name']: device['dirty-bitmaps']
805 for device in res['return'] if 'dirty-bitmaps' in device}
806
807 def get_bitmap(self, node_name, bitmap_name, recording=None, bitmaps=None):
808 """
809 get a specific bitmap from the object returned by query_bitmaps.
810 :param recording: If specified, filter results by the specified value.
811 :param bitmaps: If specified, use it instead of call query_bitmaps()
812 """
813 if bitmaps is None:
814 bitmaps = self.query_bitmaps()
815
816 for bitmap in bitmaps[node_name]:
817 if bitmap.get('name', '') == bitmap_name:
818 if recording is None or bitmap.get('recording') == recording:
819 return bitmap
820 return None
821
822 def check_bitmap_status(self, node_name, bitmap_name, fields):
823 ret = self.get_bitmap(node_name, bitmap_name)
824
825 return fields.items() <= ret.items()
826
827 def assert_block_path(self, root, path, expected_node, graph=None):
828 """
829 Check whether the node under the given path in the block graph
830 is @expected_node.
831
832 @root is the node name of the node where the @path is rooted.
833
834 @path is a string that consists of child names separated by
835 slashes. It must begin with a slash.
836
837 Examples for @root + @path:
838 - root="qcow2-node", path="/backing/file"
839 - root="quorum-node", path="/children.2/file"
840
841 Hypothetically, @path could be empty, in which case it would
842 point to @root. However, in practice this case is not useful
843 and hence not allowed.
844
845 @expected_node may be None. (All elements of the path but the
846 leaf must still exist.)
847
848 @graph may be None or the result of an x-debug-query-block-graph
849 call that has already been performed.
850 """
851 if graph is None:
852 graph = self.qmp('x-debug-query-block-graph')['return']
853
854 iter_path = iter(path.split('/'))
855
856 # Must start with a /
857 assert next(iter_path) == ''
858
859 node = next((node for node in graph['nodes'] if node['name'] == root),
860 None)
861
862 # An empty @path is not allowed, so the root node must be present
863 assert node is not None, 'Root node %s not found' % root
864
865 for child_name in iter_path:
866 assert node is not None, 'Cannot follow path %s%s' % (root, path)
867
868 try:
869 node_id = next(edge['child'] for edge in graph['edges']
870 if (edge['parent'] == node['id'] and
871 edge['name'] == child_name))
872
873 node = next(node for node in graph['nodes']
874 if node['id'] == node_id)
875
876 except StopIteration:
877 node = None
878
879 if node is None:
880 assert expected_node is None, \
881 'No node found under %s (but expected %s)' % \
882 (path, expected_node)
883 else:
884 assert node['name'] == expected_node, \
885 'Found node %s under %s (but expected %s)' % \
886 (node['name'], path, expected_node)
887
888 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
889
890 class QMPTestCase(unittest.TestCase):
891 '''Abstract base class for QMP test cases'''
892
893 def __init__(self, *args, **kwargs):
894 super().__init__(*args, **kwargs)
895 # Many users of this class set a VM property we rely on heavily
896 # in the methods below.
897 self.vm = None
898
899 def dictpath(self, d, path):
900 '''Traverse a path in a nested dict'''
901 for component in path.split('/'):
902 m = index_re.match(component)
903 if m:
904 component, idx = m.groups()
905 idx = int(idx)
906
907 if not isinstance(d, dict) or component not in d:
908 self.fail(f'failed path traversal for "{path}" in "{d}"')
909 d = d[component]
910
911 if m:
912 if not isinstance(d, list):
913 self.fail(f'path component "{component}" in "{path}" '
914 f'is not a list in "{d}"')
915 try:
916 d = d[idx]
917 except IndexError:
918 self.fail(f'invalid index "{idx}" in path "{path}" '
919 f'in "{d}"')
920 return d
921
922 def assert_qmp_absent(self, d, path):
923 try:
924 result = self.dictpath(d, path)
925 except AssertionError:
926 return
927 self.fail('path "%s" has value "%s"' % (path, str(result)))
928
929 def assert_qmp(self, d, path, value):
930 '''Assert that the value for a specific path in a QMP dict
931 matches. When given a list of values, assert that any of
932 them matches.'''
933
934 result = self.dictpath(d, path)
935
936 # [] makes no sense as a list of valid values, so treat it as
937 # an actual single value.
938 if isinstance(value, list) and value != []:
939 for v in value:
940 if result == v:
941 return
942 self.fail('no match for "%s" in %s' % (str(result), str(value)))
943 else:
944 self.assertEqual(result, value,
945 '"%s" is "%s", expected "%s"'
946 % (path, str(result), str(value)))
947
948 def assert_no_active_block_jobs(self):
949 result = self.vm.qmp('query-block-jobs')
950 self.assert_qmp(result, 'return', [])
951
952 def assert_has_block_node(self, node_name=None, file_name=None):
953 """Issue a query-named-block-nodes and assert node_name and/or
954 file_name is present in the result"""
955 def check_equal_or_none(a, b):
956 return a is None or b is None or a == b
957 assert node_name or file_name
958 result = self.vm.qmp('query-named-block-nodes')
959 for x in result["return"]:
960 if check_equal_or_none(x.get("node-name"), node_name) and \
961 check_equal_or_none(x.get("file"), file_name):
962 return
963 self.fail("Cannot find %s %s in result:\n%s" %
964 (node_name, file_name, result))
965
966 def assert_json_filename_equal(self, json_filename, reference):
967 '''Asserts that the given filename is a json: filename and that its
968 content is equal to the given reference object'''
969 self.assertEqual(json_filename[:5], 'json:')
970 self.assertEqual(
971 self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
972 self.vm.flatten_qmp_object(reference)
973 )
974
975 def cancel_and_wait(self, drive='drive0', force=False,
976 resume=False, wait=60.0):
977 '''Cancel a block job and wait for it to finish, returning the event'''
978 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
979 self.assert_qmp(result, 'return', {})
980
981 if resume:
982 self.vm.resume_drive(drive)
983
984 cancelled = False
985 result = None
986 while not cancelled:
987 for event in self.vm.get_qmp_events(wait=wait):
988 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
989 event['event'] == 'BLOCK_JOB_CANCELLED':
990 self.assert_qmp(event, 'data/device', drive)
991 result = event
992 cancelled = True
993 elif event['event'] == 'JOB_STATUS_CHANGE':
994 self.assert_qmp(event, 'data/id', drive)
995
996
997 self.assert_no_active_block_jobs()
998 return result
999
1000 def wait_until_completed(self, drive='drive0', check_offset=True,
1001 wait=60.0, error=None):
1002 '''Wait for a block job to finish, returning the event'''
1003 while True:
1004 for event in self.vm.get_qmp_events(wait=wait):
1005 if event['event'] == 'BLOCK_JOB_COMPLETED':
1006 self.assert_qmp(event, 'data/device', drive)
1007 if error is None:
1008 self.assert_qmp_absent(event, 'data/error')
1009 if check_offset:
1010 self.assert_qmp(event, 'data/offset',
1011 event['data']['len'])
1012 else:
1013 self.assert_qmp(event, 'data/error', error)
1014 self.assert_no_active_block_jobs()
1015 return event
1016 if event['event'] == 'JOB_STATUS_CHANGE':
1017 self.assert_qmp(event, 'data/id', drive)
1018
1019 def wait_ready(self, drive='drive0'):
1020 """Wait until a BLOCK_JOB_READY event, and return the event."""
1021 return self.vm.events_wait([
1022 ('BLOCK_JOB_READY',
1023 {'data': {'type': 'mirror', 'device': drive}}),
1024 ('BLOCK_JOB_READY',
1025 {'data': {'type': 'commit', 'device': drive}})
1026 ])
1027
1028 def wait_ready_and_cancel(self, drive='drive0'):
1029 self.wait_ready(drive=drive)
1030 event = self.cancel_and_wait(drive=drive)
1031 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
1032 self.assert_qmp(event, 'data/type', 'mirror')
1033 self.assert_qmp(event, 'data/offset', event['data']['len'])
1034
1035 def complete_and_wait(self, drive='drive0', wait_ready=True,
1036 completion_error=None):
1037 '''Complete a block job and wait for it to finish'''
1038 if wait_ready:
1039 self.wait_ready(drive=drive)
1040
1041 result = self.vm.qmp('block-job-complete', device=drive)
1042 self.assert_qmp(result, 'return', {})
1043
1044 event = self.wait_until_completed(drive=drive, error=completion_error)
1045 self.assertTrue(event['data']['type'] in ['mirror', 'commit'])
1046
1047 def pause_wait(self, job_id='job0'):
1048 with Timeout(3, "Timeout waiting for job to pause"):
1049 while True:
1050 result = self.vm.qmp('query-block-jobs')
1051 found = False
1052 for job in result['return']:
1053 if job['device'] == job_id:
1054 found = True
1055 if job['paused'] and not job['busy']:
1056 return job
1057 break
1058 assert found
1059
1060 def pause_job(self, job_id='job0', wait=True):
1061 result = self.vm.qmp('block-job-pause', device=job_id)
1062 self.assert_qmp(result, 'return', {})
1063 if wait:
1064 return self.pause_wait(job_id)
1065 return result
1066
1067 def case_skip(self, reason):
1068 '''Skip this test case'''
1069 case_notrun(reason)
1070 self.skipTest(reason)
1071
1072
1073 def notrun(reason):
1074 '''Skip this test suite'''
1075 # Each test in qemu-iotests has a number ("seq")
1076 seq = os.path.basename(sys.argv[0])
1077
1078 open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n')
1079 logger.warning("%s not run: %s", seq, reason)
1080 sys.exit(0)
1081
1082 def case_notrun(reason):
1083 '''Mark this test case as not having been run (without actually
1084 skipping it, that is left to the caller). See
1085 QMPTestCase.case_skip() for a variant that actually skips the
1086 current test case.'''
1087
1088 # Each test in qemu-iotests has a number ("seq")
1089 seq = os.path.basename(sys.argv[0])
1090
1091 open('%s/%s.casenotrun' % (output_dir, seq), 'a').write(
1092 ' [case not run] ' + reason + '\n')
1093
1094 def _verify_image_format(supported_fmts: Sequence[str] = (),
1095 unsupported_fmts: Sequence[str] = ()) -> None:
1096 if 'generic' in supported_fmts and \
1097 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
1098 # similar to
1099 # _supported_fmt generic
1100 # for bash tests
1101 supported_fmts = ()
1102
1103 not_sup = supported_fmts and (imgfmt not in supported_fmts)
1104 if not_sup or (imgfmt in unsupported_fmts):
1105 notrun('not suitable for this image format: %s' % imgfmt)
1106
1107 if imgfmt == 'luks':
1108 verify_working_luks()
1109
1110 def _verify_protocol(supported: Sequence[str] = (),
1111 unsupported: Sequence[str] = ()) -> None:
1112 assert not (supported and unsupported)
1113
1114 if 'generic' in supported:
1115 return
1116
1117 not_sup = supported and (imgproto not in supported)
1118 if not_sup or (imgproto in unsupported):
1119 notrun('not suitable for this protocol: %s' % imgproto)
1120
1121 def _verify_platform(supported: Sequence[str] = (),
1122 unsupported: Sequence[str] = ()) -> None:
1123 if any((sys.platform.startswith(x) for x in unsupported)):
1124 notrun('not suitable for this OS: %s' % sys.platform)
1125
1126 if supported:
1127 if not any((sys.platform.startswith(x) for x in supported)):
1128 notrun('not suitable for this OS: %s' % sys.platform)
1129
1130 def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None:
1131 if supported_cache_modes and (cachemode not in supported_cache_modes):
1132 notrun('not suitable for this cache mode: %s' % cachemode)
1133
1134 def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()) -> None:
1135 if supported_aio_modes and (aiomode not in supported_aio_modes):
1136 notrun('not suitable for this aio mode: %s' % aiomode)
1137
1138 def _verify_formats(required_formats: Sequence[str] = ()) -> None:
1139 usf_list = list(set(required_formats) - set(supported_formats()))
1140 if usf_list:
1141 notrun(f'formats {usf_list} are not whitelisted')
1142
1143
1144 def _verify_virtio_blk() -> None:
1145 out = qemu_pipe('-M', 'none', '-device', 'help')
1146 if 'virtio-blk' not in out:
1147 notrun('Missing virtio-blk in QEMU binary')
1148
1149 def _verify_virtio_scsi_pci_or_ccw() -> None:
1150 out = qemu_pipe('-M', 'none', '-device', 'help')
1151 if 'virtio-scsi-pci' not in out and 'virtio-scsi-ccw' not in out:
1152 notrun('Missing virtio-scsi-pci or virtio-scsi-ccw in QEMU binary')
1153
1154
1155 def supports_quorum():
1156 return 'quorum' in qemu_img_pipe('--help')
1157
1158 def verify_quorum():
1159 '''Skip test suite if quorum support is not available'''
1160 if not supports_quorum():
1161 notrun('quorum support missing')
1162
1163 def has_working_luks() -> Tuple[bool, str]:
1164 """
1165 Check whether our LUKS driver can actually create images
1166 (this extends to LUKS encryption for qcow2).
1167
1168 If not, return the reason why.
1169 """
1170
1171 img_file = f'{test_dir}/luks-test.luks'
1172 (output, status) = \
1173 qemu_img_pipe_and_status('create', '-f', 'luks',
1174 '--object', luks_default_secret_object,
1175 '-o', luks_default_key_secret_opt,
1176 '-o', 'iter-time=10',
1177 img_file, '1G')
1178 try:
1179 os.remove(img_file)
1180 except OSError:
1181 pass
1182
1183 if status != 0:
1184 reason = output
1185 for line in output.splitlines():
1186 if img_file + ':' in line:
1187 reason = line.split(img_file + ':', 1)[1].strip()
1188 break
1189
1190 return (False, reason)
1191 else:
1192 return (True, '')
1193
1194 def verify_working_luks():
1195 """
1196 Skip test suite if LUKS does not work
1197 """
1198 (working, reason) = has_working_luks()
1199 if not working:
1200 notrun(reason)
1201
1202 def qemu_pipe(*args: str) -> str:
1203 """
1204 Run qemu with an option to print something and exit (e.g. a help option).
1205
1206 :return: QEMU's stdout output.
1207 """
1208 full_args = [qemu_prog] + qemu_opts + list(args)
1209 output, _ = qemu_tool_pipe_and_status('qemu', full_args)
1210 return output
1211
1212 def supported_formats(read_only=False):
1213 '''Set 'read_only' to True to check ro-whitelist
1214 Otherwise, rw-whitelist is checked'''
1215
1216 if not hasattr(supported_formats, "formats"):
1217 supported_formats.formats = {}
1218
1219 if read_only not in supported_formats.formats:
1220 format_message = qemu_pipe("-drive", "format=help")
1221 line = 1 if read_only else 0
1222 supported_formats.formats[read_only] = \
1223 format_message.splitlines()[line].split(":")[1].split()
1224
1225 return supported_formats.formats[read_only]
1226
1227 def skip_if_unsupported(required_formats=(), read_only=False):
1228 '''Skip Test Decorator
1229 Runs the test if all the required formats are whitelisted'''
1230 def skip_test_decorator(func):
1231 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1232 **kwargs: Dict[str, Any]) -> None:
1233 if callable(required_formats):
1234 fmts = required_formats(test_case)
1235 else:
1236 fmts = required_formats
1237
1238 usf_list = list(set(fmts) - set(supported_formats(read_only)))
1239 if usf_list:
1240 msg = f'{test_case}: formats {usf_list} are not whitelisted'
1241 test_case.case_skip(msg)
1242 else:
1243 func(test_case, *args, **kwargs)
1244 return func_wrapper
1245 return skip_test_decorator
1246
1247 def skip_for_formats(formats: Sequence[str] = ()) \
1248 -> Callable[[Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]],
1249 Callable[[QMPTestCase, List[Any], Dict[str, Any]], None]]:
1250 '''Skip Test Decorator
1251 Skips the test for the given formats'''
1252 def skip_test_decorator(func):
1253 def func_wrapper(test_case: QMPTestCase, *args: List[Any],
1254 **kwargs: Dict[str, Any]) -> None:
1255 if imgfmt in formats:
1256 msg = f'{test_case}: Skipped for format {imgfmt}'
1257 test_case.case_skip(msg)
1258 else:
1259 func(test_case, *args, **kwargs)
1260 return func_wrapper
1261 return skip_test_decorator
1262
1263 def skip_if_user_is_root(func):
1264 '''Skip Test Decorator
1265 Runs the test only without root permissions'''
1266 def func_wrapper(*args, **kwargs):
1267 if os.getuid() == 0:
1268 case_notrun('{}: cannot be run as root'.format(args[0]))
1269 return None
1270 else:
1271 return func(*args, **kwargs)
1272 return func_wrapper
1273
1274 # We need to filter out the time taken from the output so that
1275 # qemu-iotest can reliably diff the results against master output,
1276 # and hide skipped tests from the reference output.
1277
1278 class ReproducibleTestResult(unittest.TextTestResult):
1279 def addSkip(self, test, reason):
1280 # Same as TextTestResult, but print dot instead of "s"
1281 unittest.TestResult.addSkip(self, test, reason)
1282 if self.showAll:
1283 self.stream.writeln("skipped {0!r}".format(reason))
1284 elif self.dots:
1285 self.stream.write(".")
1286 self.stream.flush()
1287
1288 class ReproducibleStreamWrapper:
1289 def __init__(self, stream: TextIO):
1290 self.stream = stream
1291
1292 def __getattr__(self, attr):
1293 if attr in ('stream', '__getstate__'):
1294 raise AttributeError(attr)
1295 return getattr(self.stream, attr)
1296
1297 def write(self, arg=None):
1298 arg = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', arg)
1299 arg = re.sub(r' \(skipped=\d+\)', r'', arg)
1300 self.stream.write(arg)
1301
1302 class ReproducibleTestRunner(unittest.TextTestRunner):
1303 def __init__(self, stream: Optional[TextIO] = None,
1304 resultclass: Type[unittest.TestResult] = ReproducibleTestResult,
1305 **kwargs: Any) -> None:
1306 rstream = ReproducibleStreamWrapper(stream or sys.stdout)
1307 super().__init__(stream=rstream, # type: ignore
1308 descriptions=True,
1309 resultclass=resultclass,
1310 **kwargs)
1311
1312 def execute_unittest(argv: List[str], debug: bool = False) -> None:
1313 """Executes unittests within the calling module."""
1314
1315 # Some tests have warnings, especially ResourceWarnings for unclosed
1316 # files and sockets. Ignore them for now to ensure reproducibility of
1317 # the test output.
1318 unittest.main(argv=argv,
1319 testRunner=ReproducibleTestRunner,
1320 verbosity=2 if debug else 1,
1321 warnings=None if sys.warnoptions else 'ignore')
1322
1323 def execute_setup_common(supported_fmts: Sequence[str] = (),
1324 supported_platforms: Sequence[str] = (),
1325 supported_cache_modes: Sequence[str] = (),
1326 supported_aio_modes: Sequence[str] = (),
1327 unsupported_fmts: Sequence[str] = (),
1328 supported_protocols: Sequence[str] = (),
1329 unsupported_protocols: Sequence[str] = (),
1330 required_fmts: Sequence[str] = ()) -> bool:
1331 """
1332 Perform necessary setup for either script-style or unittest-style tests.
1333
1334 :return: Bool; Whether or not debug mode has been requested via the CLI.
1335 """
1336 # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'.
1337
1338 debug = '-d' in sys.argv
1339 if debug:
1340 sys.argv.remove('-d')
1341 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
1342
1343 _verify_image_format(supported_fmts, unsupported_fmts)
1344 _verify_protocol(supported_protocols, unsupported_protocols)
1345 _verify_platform(supported=supported_platforms)
1346 _verify_cache_mode(supported_cache_modes)
1347 _verify_aio_mode(supported_aio_modes)
1348 _verify_formats(required_fmts)
1349 _verify_virtio_blk()
1350
1351 return debug
1352
1353 def execute_test(*args, test_function=None, **kwargs):
1354 """Run either unittest or script-style tests."""
1355
1356 debug = execute_setup_common(*args, **kwargs)
1357 if not test_function:
1358 execute_unittest(sys.argv, debug)
1359 else:
1360 test_function()
1361
1362 def activate_logging():
1363 """Activate iotests.log() output to stdout for script-style tests."""
1364 handler = logging.StreamHandler(stream=sys.stdout)
1365 formatter = logging.Formatter('%(message)s')
1366 handler.setFormatter(formatter)
1367 test_logger.addHandler(handler)
1368 test_logger.setLevel(logging.INFO)
1369 test_logger.propagate = False
1370
1371 # This is called from script-style iotests without a single point of entry
1372 def script_initialize(*args, **kwargs):
1373 """Initialize script-style tests without running any tests."""
1374 activate_logging()
1375 execute_setup_common(*args, **kwargs)
1376
1377 # This is called from script-style iotests with a single point of entry
1378 def script_main(test_function, *args, **kwargs):
1379 """Run script-style tests outside of the unittest framework"""
1380 activate_logging()
1381 execute_test(*args, test_function=test_function, **kwargs)
1382
1383 # This is called from unittest style iotests
1384 def main(*args, **kwargs):
1385 """Run tests using the unittest framework"""
1386 execute_test(*args, **kwargs)