]> git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/iotests.py
iotests: add qmp recursive sorting function
[mirror_qemu.git] / tests / qemu-iotests / iotests.py
1 from __future__ import print_function
2 # Common utilities and Python wrappers for qemu-iotests
3 #
4 # Copyright (C) 2012 IBM Corp.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import errno
21 import os
22 import re
23 import subprocess
24 import string
25 import unittest
26 import sys
27 import struct
28 import json
29 import signal
30 import logging
31 import atexit
32 import io
33 from collections import OrderedDict
34
35 sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
36 import qtest
37
38
39 # This will not work if arguments contain spaces but is necessary if we
40 # want to support the override options that ./check supports.
41 qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
42 if os.environ.get('QEMU_IMG_OPTIONS'):
43 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
44
45 qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
46 if os.environ.get('QEMU_IO_OPTIONS'):
47 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
48
49 qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
50 if os.environ.get('QEMU_NBD_OPTIONS'):
51 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
52
53 qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
54 qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
55
56 imgfmt = os.environ.get('IMGFMT', 'raw')
57 imgproto = os.environ.get('IMGPROTO', 'file')
58 test_dir = os.environ.get('TEST_DIR')
59 output_dir = os.environ.get('OUTPUT_DIR', '.')
60 cachemode = os.environ.get('CACHEMODE')
61 qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
62
63 socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
64 debug = False
65
66 luks_default_secret_object = 'secret,id=keysec0,data=' + \
67 os.environ.get('IMGKEYSECRET', '')
68 luks_default_key_secret_opt = 'key-secret=keysec0'
69
70
71 def qemu_img(*args):
72 '''Run qemu-img and return the exit code'''
73 devnull = open('/dev/null', 'r+')
74 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
75 if exitcode < 0:
76 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
77 return exitcode
78
79 def ordered_kwargs(kwargs):
80 # kwargs prior to 3.6 are not ordered, so:
81 od = OrderedDict()
82 for k, v in sorted(kwargs.items()):
83 if isinstance(v, dict):
84 od[k] = ordered_kwargs(v)
85 else:
86 od[k] = v
87 return od
88
89 def qemu_img_create(*args):
90 args = list(args)
91
92 # default luks support
93 if '-f' in args and args[args.index('-f') + 1] == 'luks':
94 if '-o' in args:
95 i = args.index('-o')
96 if 'key-secret' not in args[i + 1]:
97 args[i + 1].append(luks_default_key_secret_opt)
98 args.insert(i + 2, '--object')
99 args.insert(i + 3, luks_default_secret_object)
100 else:
101 args = ['-o', luks_default_key_secret_opt,
102 '--object', luks_default_secret_object] + args
103
104 args.insert(0, 'create')
105
106 return qemu_img(*args)
107
108 def qemu_img_verbose(*args):
109 '''Run qemu-img without suppressing its output and return the exit code'''
110 exitcode = subprocess.call(qemu_img_args + list(args))
111 if exitcode < 0:
112 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
113 return exitcode
114
115 def qemu_img_pipe(*args):
116 '''Run qemu-img and return its output'''
117 subp = subprocess.Popen(qemu_img_args + list(args),
118 stdout=subprocess.PIPE,
119 stderr=subprocess.STDOUT,
120 universal_newlines=True)
121 exitcode = subp.wait()
122 if exitcode < 0:
123 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
124 return subp.communicate()[0]
125
126 def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]):
127 args = [ 'info' ]
128 if imgopts:
129 args.append('--image-opts')
130 else:
131 args += [ '-f', imgfmt ]
132 args += extra_args
133 args.append(filename)
134
135 output = qemu_img_pipe(*args)
136 if not filter_path:
137 filter_path = filename
138 log(filter_img_info(output, filter_path))
139
140 def qemu_io(*args):
141 '''Run qemu-io and return the stdout data'''
142 args = qemu_io_args + list(args)
143 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
144 stderr=subprocess.STDOUT,
145 universal_newlines=True)
146 exitcode = subp.wait()
147 if exitcode < 0:
148 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
149 return subp.communicate()[0]
150
151 def qemu_io_silent(*args):
152 '''Run qemu-io and return the exit code, suppressing stdout'''
153 args = qemu_io_args + list(args)
154 exitcode = subprocess.call(args, stdout=open('/dev/null', 'w'))
155 if exitcode < 0:
156 sys.stderr.write('qemu-io received signal %i: %s\n' %
157 (-exitcode, ' '.join(args)))
158 return exitcode
159
160
161 class QemuIoInteractive:
162 def __init__(self, *args):
163 self.args = qemu_io_args + list(args)
164 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
165 stdout=subprocess.PIPE,
166 stderr=subprocess.STDOUT,
167 universal_newlines=True)
168 assert self._p.stdout.read(9) == 'qemu-io> '
169
170 def close(self):
171 self._p.communicate('q\n')
172
173 def _read_output(self):
174 pattern = 'qemu-io> '
175 n = len(pattern)
176 pos = 0
177 s = []
178 while pos != n:
179 c = self._p.stdout.read(1)
180 # check unexpected EOF
181 assert c != ''
182 s.append(c)
183 if c == pattern[pos]:
184 pos += 1
185 else:
186 pos = 0
187
188 return ''.join(s[:-n])
189
190 def cmd(self, cmd):
191 # quit command is in close(), '\n' is added automatically
192 assert '\n' not in cmd
193 cmd = cmd.strip()
194 assert cmd != 'q' and cmd != 'quit'
195 self._p.stdin.write(cmd + '\n')
196 self._p.stdin.flush()
197 return self._read_output()
198
199
200 def qemu_nbd(*args):
201 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
202 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
203
204 def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
205 '''Return True if two image files are identical'''
206 return qemu_img('compare', '-f', fmt1,
207 '-F', fmt2, img1, img2) == 0
208
209 def create_image(name, size):
210 '''Create a fully-allocated raw image with sector markers'''
211 file = open(name, 'wb')
212 i = 0
213 while i < size:
214 sector = struct.pack('>l504xl', i // 512, i // 512)
215 file.write(sector)
216 i = i + 512
217 file.close()
218
219 def image_size(img):
220 '''Return image's virtual size'''
221 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
222 return json.loads(r)['virtual-size']
223
224 test_dir_re = re.compile(r"%s" % test_dir)
225 def filter_test_dir(msg):
226 return test_dir_re.sub("TEST_DIR", msg)
227
228 win32_re = re.compile(r"\r")
229 def filter_win32(msg):
230 return win32_re.sub("", msg)
231
232 qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
233 def filter_qemu_io(msg):
234 msg = filter_win32(msg)
235 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
236
237 chown_re = re.compile(r"chown [0-9]+:[0-9]+")
238 def filter_chown(msg):
239 return chown_re.sub("chown UID:GID", msg)
240
241 def filter_qmp_event(event):
242 '''Filter a QMP event dict'''
243 event = dict(event)
244 if 'timestamp' in event:
245 event['timestamp']['seconds'] = 'SECS'
246 event['timestamp']['microseconds'] = 'USECS'
247 return event
248
249 def filter_testfiles(msg):
250 prefix = os.path.join(test_dir, "%s-" % (os.getpid()))
251 return msg.replace(prefix, 'TEST_DIR/PID-')
252
253 def filter_generated_node_ids(msg):
254 return re.sub("#block[0-9]+", "NODE_NAME", msg)
255
256 def filter_img_info(output, filename):
257 lines = []
258 for line in output.split('\n'):
259 if 'disk size' in line or 'actual-size' in line:
260 continue
261 line = line.replace(filename, 'TEST_IMG') \
262 .replace(imgfmt, 'IMGFMT')
263 line = re.sub('iters: [0-9]+', 'iters: XXX', line)
264 line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line)
265 lines.append(line)
266 return '\n'.join(lines)
267
268 def log(msg, filters=[]):
269 for flt in filters:
270 msg = flt(msg)
271 if isinstance(msg, dict) or isinstance(msg, list):
272 # Don't sort if it's already sorted
273 do_sort = not isinstance(msg, OrderedDict)
274 print(json.dumps(msg, sort_keys=do_sort))
275 else:
276 print(msg)
277
278 class Timeout:
279 def __init__(self, seconds, errmsg = "Timeout"):
280 self.seconds = seconds
281 self.errmsg = errmsg
282 def __enter__(self):
283 signal.signal(signal.SIGALRM, self.timeout)
284 signal.setitimer(signal.ITIMER_REAL, self.seconds)
285 return self
286 def __exit__(self, type, value, traceback):
287 signal.setitimer(signal.ITIMER_REAL, 0)
288 return False
289 def timeout(self, signum, frame):
290 raise Exception(self.errmsg)
291
292
293 class FilePath(object):
294 '''An auto-generated filename that cleans itself up.
295
296 Use this context manager to generate filenames and ensure that the file
297 gets deleted::
298
299 with TestFilePath('test.img') as img_path:
300 qemu_img('create', img_path, '1G')
301 # migration_sock_path is automatically deleted
302 '''
303 def __init__(self, name):
304 filename = '{0}-{1}'.format(os.getpid(), name)
305 self.path = os.path.join(test_dir, filename)
306
307 def __enter__(self):
308 return self.path
309
310 def __exit__(self, exc_type, exc_val, exc_tb):
311 try:
312 os.remove(self.path)
313 except OSError:
314 pass
315 return False
316
317
318 def file_path_remover():
319 for path in reversed(file_path_remover.paths):
320 try:
321 os.remove(path)
322 except OSError:
323 pass
324
325
326 def file_path(*names):
327 ''' Another way to get auto-generated filename that cleans itself up.
328
329 Use is as simple as:
330
331 img_a, img_b = file_path('a.img', 'b.img')
332 sock = file_path('socket')
333 '''
334
335 if not hasattr(file_path_remover, 'paths'):
336 file_path_remover.paths = []
337 atexit.register(file_path_remover)
338
339 paths = []
340 for name in names:
341 filename = '{0}-{1}'.format(os.getpid(), name)
342 path = os.path.join(test_dir, filename)
343 file_path_remover.paths.append(path)
344 paths.append(path)
345
346 return paths[0] if len(paths) == 1 else paths
347
348 def remote_filename(path):
349 if imgproto == 'file':
350 return path
351 elif imgproto == 'ssh':
352 return "ssh://127.0.0.1%s" % (path)
353 else:
354 raise Exception("Protocol %s not supported" % (imgproto))
355
356 class VM(qtest.QEMUQtestMachine):
357 '''A QEMU VM'''
358
359 def __init__(self, path_suffix=''):
360 name = "qemu%s-%d" % (path_suffix, os.getpid())
361 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
362 test_dir=test_dir,
363 socket_scm_helper=socket_scm_helper)
364 self._num_drives = 0
365
366 def add_object(self, opts):
367 self._args.append('-object')
368 self._args.append(opts)
369 return self
370
371 def add_device(self, opts):
372 self._args.append('-device')
373 self._args.append(opts)
374 return self
375
376 def add_drive_raw(self, opts):
377 self._args.append('-drive')
378 self._args.append(opts)
379 return self
380
381 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
382 '''Add a virtio-blk drive to the VM'''
383 options = ['if=%s' % interface,
384 'id=drive%d' % self._num_drives]
385
386 if path is not None:
387 options.append('file=%s' % path)
388 options.append('format=%s' % format)
389 options.append('cache=%s' % cachemode)
390
391 if opts:
392 options.append(opts)
393
394 if format == 'luks' and 'key-secret' not in opts:
395 # default luks support
396 if luks_default_secret_object not in self._args:
397 self.add_object(luks_default_secret_object)
398
399 options.append(luks_default_key_secret_opt)
400
401 self._args.append('-drive')
402 self._args.append(','.join(options))
403 self._num_drives += 1
404 return self
405
406 def add_blockdev(self, opts):
407 self._args.append('-blockdev')
408 if isinstance(opts, str):
409 self._args.append(opts)
410 else:
411 self._args.append(','.join(opts))
412 return self
413
414 def add_incoming(self, addr):
415 self._args.append('-incoming')
416 self._args.append(addr)
417 return self
418
419 def pause_drive(self, drive, event=None):
420 '''Pause drive r/w operations'''
421 if not event:
422 self.pause_drive(drive, "read_aio")
423 self.pause_drive(drive, "write_aio")
424 return
425 self.qmp('human-monitor-command',
426 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
427
428 def resume_drive(self, drive):
429 self.qmp('human-monitor-command',
430 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
431
432 def hmp_qemu_io(self, drive, cmd):
433 '''Write to a given drive using an HMP command'''
434 return self.qmp('human-monitor-command',
435 command_line='qemu-io %s "%s"' % (drive, cmd))
436
437 def flatten_qmp_object(self, obj, output=None, basestr=''):
438 if output is None:
439 output = dict()
440 if isinstance(obj, list):
441 for i in range(len(obj)):
442 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
443 elif isinstance(obj, dict):
444 for key in obj:
445 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
446 else:
447 output[basestr[:-1]] = obj # Strip trailing '.'
448 return output
449
450 def qmp_to_opts(self, obj):
451 obj = self.flatten_qmp_object(obj)
452 output_list = list()
453 for key in obj:
454 output_list += [key + '=' + obj[key]]
455 return ','.join(output_list)
456
457 def get_qmp_events_filtered(self, wait=True):
458 result = []
459 for ev in self.get_qmp_events(wait=wait):
460 result.append(filter_qmp_event(ev))
461 return result
462
463 def qmp_log(self, cmd, filters=[filter_testfiles], **kwargs):
464 full_cmd = OrderedDict((
465 ("execute", cmd),
466 ("arguments", ordered_kwargs(kwargs))
467 ))
468 logmsg = json.dumps(full_cmd)
469 log(logmsg, filters)
470 result = self.qmp(cmd, **kwargs)
471 log(json.dumps(result, sort_keys=True), filters)
472 return result
473
474 def run_job(self, job, auto_finalize=True, auto_dismiss=False):
475 while True:
476 for ev in self.get_qmp_events_filtered(wait=True):
477 if ev['event'] == 'JOB_STATUS_CHANGE':
478 status = ev['data']['status']
479 if status == 'aborting':
480 result = self.qmp('query-jobs')
481 for j in result['return']:
482 if j['id'] == job:
483 log('Job failed: %s' % (j['error']))
484 elif status == 'pending' and not auto_finalize:
485 self.qmp_log('job-finalize', id=job)
486 elif status == 'concluded' and not auto_dismiss:
487 self.qmp_log('job-dismiss', id=job)
488 elif status == 'null':
489 return
490 else:
491 iotests.log(ev)
492
493
494 index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
495
496 class QMPTestCase(unittest.TestCase):
497 '''Abstract base class for QMP test cases'''
498
499 def dictpath(self, d, path):
500 '''Traverse a path in a nested dict'''
501 for component in path.split('/'):
502 m = index_re.match(component)
503 if m:
504 component, idx = m.groups()
505 idx = int(idx)
506
507 if not isinstance(d, dict) or component not in d:
508 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
509 d = d[component]
510
511 if m:
512 if not isinstance(d, list):
513 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
514 try:
515 d = d[idx]
516 except IndexError:
517 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
518 return d
519
520 def assert_qmp_absent(self, d, path):
521 try:
522 result = self.dictpath(d, path)
523 except AssertionError:
524 return
525 self.fail('path "%s" has value "%s"' % (path, str(result)))
526
527 def assert_qmp(self, d, path, value):
528 '''Assert that the value for a specific path in a QMP dict matches'''
529 result = self.dictpath(d, path)
530 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
531
532 def assert_no_active_block_jobs(self):
533 result = self.vm.qmp('query-block-jobs')
534 self.assert_qmp(result, 'return', [])
535
536 def assert_has_block_node(self, node_name=None, file_name=None):
537 """Issue a query-named-block-nodes and assert node_name and/or
538 file_name is present in the result"""
539 def check_equal_or_none(a, b):
540 return a == None or b == None or a == b
541 assert node_name or file_name
542 result = self.vm.qmp('query-named-block-nodes')
543 for x in result["return"]:
544 if check_equal_or_none(x.get("node-name"), node_name) and \
545 check_equal_or_none(x.get("file"), file_name):
546 return
547 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
548 (node_name, file_name, result))
549
550 def assert_json_filename_equal(self, json_filename, reference):
551 '''Asserts that the given filename is a json: filename and that its
552 content is equal to the given reference object'''
553 self.assertEqual(json_filename[:5], 'json:')
554 self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])),
555 self.vm.flatten_qmp_object(reference))
556
557 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
558 '''Cancel a block job and wait for it to finish, returning the event'''
559 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
560 self.assert_qmp(result, 'return', {})
561
562 if resume:
563 self.vm.resume_drive(drive)
564
565 cancelled = False
566 result = None
567 while not cancelled:
568 for event in self.vm.get_qmp_events(wait=True):
569 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
570 event['event'] == 'BLOCK_JOB_CANCELLED':
571 self.assert_qmp(event, 'data/device', drive)
572 result = event
573 cancelled = True
574 elif event['event'] == 'JOB_STATUS_CHANGE':
575 self.assert_qmp(event, 'data/id', drive)
576
577
578 self.assert_no_active_block_jobs()
579 return result
580
581 def wait_until_completed(self, drive='drive0', check_offset=True):
582 '''Wait for a block job to finish, returning the event'''
583 while True:
584 for event in self.vm.get_qmp_events(wait=True):
585 if event['event'] == 'BLOCK_JOB_COMPLETED':
586 self.assert_qmp(event, 'data/device', drive)
587 self.assert_qmp_absent(event, 'data/error')
588 if check_offset:
589 self.assert_qmp(event, 'data/offset', event['data']['len'])
590 self.assert_no_active_block_jobs()
591 return event
592 elif event['event'] == 'JOB_STATUS_CHANGE':
593 self.assert_qmp(event, 'data/id', drive)
594
595 def wait_ready(self, drive='drive0'):
596 '''Wait until a block job BLOCK_JOB_READY event'''
597 f = {'data': {'type': 'mirror', 'device': drive } }
598 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
599
600 def wait_ready_and_cancel(self, drive='drive0'):
601 self.wait_ready(drive=drive)
602 event = self.cancel_and_wait(drive=drive)
603 self.assertEqual(event['event'], 'BLOCK_JOB_COMPLETED')
604 self.assert_qmp(event, 'data/type', 'mirror')
605 self.assert_qmp(event, 'data/offset', event['data']['len'])
606
607 def complete_and_wait(self, drive='drive0', wait_ready=True):
608 '''Complete a block job and wait for it to finish'''
609 if wait_ready:
610 self.wait_ready(drive=drive)
611
612 result = self.vm.qmp('block-job-complete', device=drive)
613 self.assert_qmp(result, 'return', {})
614
615 event = self.wait_until_completed(drive=drive)
616 self.assert_qmp(event, 'data/type', 'mirror')
617
618 def pause_wait(self, job_id='job0'):
619 with Timeout(1, "Timeout waiting for job to pause"):
620 while True:
621 result = self.vm.qmp('query-block-jobs')
622 found = False
623 for job in result['return']:
624 if job['device'] == job_id:
625 found = True
626 if job['paused'] == True and job['busy'] == False:
627 return job
628 break
629 assert found
630
631 def pause_job(self, job_id='job0', wait=True):
632 result = self.vm.qmp('block-job-pause', device=job_id)
633 self.assert_qmp(result, 'return', {})
634 if wait:
635 return self.pause_wait(job_id)
636 return result
637
638
639 def notrun(reason):
640 '''Skip this test suite'''
641 # Each test in qemu-iotests has a number ("seq")
642 seq = os.path.basename(sys.argv[0])
643
644 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
645 print('%s not run: %s' % (seq, reason))
646 sys.exit(0)
647
648 def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
649 assert not (supported_fmts and unsupported_fmts)
650
651 if 'generic' in supported_fmts and \
652 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
653 # similar to
654 # _supported_fmt generic
655 # for bash tests
656 return
657
658 not_sup = supported_fmts and (imgfmt not in supported_fmts)
659 if not_sup or (imgfmt in unsupported_fmts):
660 notrun('not suitable for this image format: %s' % imgfmt)
661
662 def verify_protocol(supported=[], unsupported=[]):
663 assert not (supported and unsupported)
664
665 if 'generic' in supported:
666 return
667
668 not_sup = supported and (imgproto not in supported)
669 if not_sup or (imgproto in unsupported):
670 notrun('not suitable for this protocol: %s' % imgproto)
671
672 def verify_platform(supported_oses=['linux']):
673 if True not in [sys.platform.startswith(x) for x in supported_oses]:
674 notrun('not suitable for this OS: %s' % sys.platform)
675
676 def verify_cache_mode(supported_cache_modes=[]):
677 if supported_cache_modes and (cachemode not in supported_cache_modes):
678 notrun('not suitable for this cache mode: %s' % cachemode)
679
680 def supports_quorum():
681 return 'quorum' in qemu_img_pipe('--help')
682
683 def verify_quorum():
684 '''Skip test suite if quorum support is not available'''
685 if not supports_quorum():
686 notrun('quorum support missing')
687
688 def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
689 unsupported_fmts=[]):
690 '''Run tests'''
691
692 global debug
693
694 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
695 # indicate that we're not being run via "check". There may be
696 # other things set up by "check" that individual test cases rely
697 # on.
698 if test_dir is None or qemu_default_machine is None:
699 sys.stderr.write('Please run this test via the "check" script\n')
700 sys.exit(os.EX_USAGE)
701
702 debug = '-d' in sys.argv
703 verbosity = 1
704 verify_image_format(supported_fmts, unsupported_fmts)
705 verify_platform(supported_oses)
706 verify_cache_mode(supported_cache_modes)
707
708 if debug:
709 output = sys.stdout
710 verbosity = 2
711 sys.argv.remove('-d')
712 else:
713 # We need to filter out the time taken from the output so that
714 # qemu-iotest can reliably diff the results against master output.
715 if sys.version_info.major >= 3:
716 output = io.StringIO()
717 else:
718 # io.StringIO is for unicode strings, which is not what
719 # 2.x's test runner emits.
720 output = io.BytesIO()
721
722 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
723
724 class MyTestRunner(unittest.TextTestRunner):
725 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
726 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
727
728 # unittest.main() will use sys.exit() so expect a SystemExit exception
729 try:
730 unittest.main(testRunner=MyTestRunner)
731 finally:
732 if not debug:
733 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))