]> git.proxmox.com Git - mirror_qemu.git/blame - tests/qemu-iotests/iotests.py
job: Introduce qapi/job.json
[mirror_qemu.git] / tests / qemu-iotests / iotests.py
CommitLineData
f345cfd0
SH
1# Common utilities and Python wrappers for qemu-iotests
2#
3# Copyright (C) 2012 IBM Corp.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 2 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program. If not, see <http://www.gnu.org/licenses/>.
17#
18
c1c71e49 19import errno
f345cfd0
SH
20import os
21import re
22import subprocess
4f450568 23import string
f345cfd0 24import unittest
ed338bb0 25import sys
2499a096 26import struct
74f69050 27import json
2c93c5cb 28import signal
43851b5b 29import logging
ef6e9228 30import atexit
f345cfd0 31
02f3a911
VSO
32sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'scripts'))
33import qtest
34
f345cfd0 35
934659c4 36# This will not work if arguments contain spaces but is necessary if we
f345cfd0 37# want to support the override options that ./check supports.
934659c4
HR
38qemu_img_args = [os.environ.get('QEMU_IMG_PROG', 'qemu-img')]
39if os.environ.get('QEMU_IMG_OPTIONS'):
40 qemu_img_args += os.environ['QEMU_IMG_OPTIONS'].strip().split(' ')
41
42qemu_io_args = [os.environ.get('QEMU_IO_PROG', 'qemu-io')]
43if os.environ.get('QEMU_IO_OPTIONS'):
44 qemu_io_args += os.environ['QEMU_IO_OPTIONS'].strip().split(' ')
45
bec87774
HR
46qemu_nbd_args = [os.environ.get('QEMU_NBD_PROG', 'qemu-nbd')]
47if os.environ.get('QEMU_NBD_OPTIONS'):
48 qemu_nbd_args += os.environ['QEMU_NBD_OPTIONS'].strip().split(' ')
49
4c44b4a4 50qemu_prog = os.environ.get('QEMU_PROG', 'qemu')
66613974 51qemu_opts = os.environ.get('QEMU_OPTIONS', '').strip().split(' ')
f345cfd0
SH
52
53imgfmt = os.environ.get('IMGFMT', 'raw')
54imgproto = os.environ.get('IMGPROTO', 'file')
5a8fabf3 55test_dir = os.environ.get('TEST_DIR')
e8f8624d 56output_dir = os.environ.get('OUTPUT_DIR', '.')
58cc2ae1 57cachemode = os.environ.get('CACHEMODE')
e166b414 58qemu_default_machine = os.environ.get('QEMU_DEFAULT_MACHINE')
f345cfd0 59
30b005d9 60socket_scm_helper = os.environ.get('SOCKET_SCM_HELPER', 'socket_scm_helper')
c0088d79 61debug = False
30b005d9 62
85a353a0
VSO
63luks_default_secret_object = 'secret,id=keysec0,data=' + \
64 os.environ['IMGKEYSECRET']
65luks_default_key_secret_opt = 'key-secret=keysec0'
66
67
f345cfd0
SH
68def qemu_img(*args):
69 '''Run qemu-img and return the exit code'''
70 devnull = open('/dev/null', 'r+')
2ef6093c
HR
71 exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull)
72 if exitcode < 0:
73 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
74 return exitcode
f345cfd0 75
85a353a0
VSO
76def qemu_img_create(*args):
77 args = list(args)
78
79 # default luks support
80 if '-f' in args and args[args.index('-f') + 1] == 'luks':
81 if '-o' in args:
82 i = args.index('-o')
83 if 'key-secret' not in args[i + 1]:
84 args[i + 1].append(luks_default_key_secret_opt)
85 args.insert(i + 2, '--object')
86 args.insert(i + 3, luks_default_secret_object)
87 else:
88 args = ['-o', luks_default_key_secret_opt,
89 '--object', luks_default_secret_object] + args
90
91 args.insert(0, 'create')
92
93 return qemu_img(*args)
94
d2ef210c 95def qemu_img_verbose(*args):
993d46ce 96 '''Run qemu-img without suppressing its output and return the exit code'''
2ef6093c
HR
97 exitcode = subprocess.call(qemu_img_args + list(args))
98 if exitcode < 0:
99 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
100 return exitcode
d2ef210c 101
3677e6f6
HR
102def qemu_img_pipe(*args):
103 '''Run qemu-img and return its output'''
491e5e85
DB
104 subp = subprocess.Popen(qemu_img_args + list(args),
105 stdout=subprocess.PIPE,
106 stderr=subprocess.STDOUT)
2ef6093c
HR
107 exitcode = subp.wait()
108 if exitcode < 0:
109 sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args))))
110 return subp.communicate()[0]
3677e6f6 111
f345cfd0
SH
112def qemu_io(*args):
113 '''Run qemu-io and return the stdout data'''
114 args = qemu_io_args + list(args)
491e5e85
DB
115 subp = subprocess.Popen(args, stdout=subprocess.PIPE,
116 stderr=subprocess.STDOUT)
2ef6093c
HR
117 exitcode = subp.wait()
118 if exitcode < 0:
119 sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args)))
120 return subp.communicate()[0]
f345cfd0 121
9fa90eec
VSO
122
123class QemuIoInteractive:
124 def __init__(self, *args):
125 self.args = qemu_io_args + list(args)
126 self._p = subprocess.Popen(self.args, stdin=subprocess.PIPE,
127 stdout=subprocess.PIPE,
128 stderr=subprocess.STDOUT)
129 assert self._p.stdout.read(9) == 'qemu-io> '
130
131 def close(self):
132 self._p.communicate('q\n')
133
134 def _read_output(self):
135 pattern = 'qemu-io> '
136 n = len(pattern)
137 pos = 0
138 s = []
139 while pos != n:
140 c = self._p.stdout.read(1)
141 # check unexpected EOF
142 assert c != ''
143 s.append(c)
144 if c == pattern[pos]:
145 pos += 1
146 else:
147 pos = 0
148
149 return ''.join(s[:-n])
150
151 def cmd(self, cmd):
152 # quit command is in close(), '\n' is added automatically
153 assert '\n' not in cmd
154 cmd = cmd.strip()
155 assert cmd != 'q' and cmd != 'quit'
156 self._p.stdin.write(cmd + '\n')
157 return self._read_output()
158
159
bec87774
HR
160def qemu_nbd(*args):
161 '''Run qemu-nbd in daemon mode and return the parent's exit code'''
162 return subprocess.call(qemu_nbd_args + ['--fork'] + list(args))
163
e1b5c51f 164def compare_images(img1, img2, fmt1=imgfmt, fmt2=imgfmt):
3a3918c3 165 '''Return True if two image files are identical'''
e1b5c51f
PB
166 return qemu_img('compare', '-f', fmt1,
167 '-F', fmt2, img1, img2) == 0
3a3918c3 168
2499a096
SH
169def create_image(name, size):
170 '''Create a fully-allocated raw image with sector markers'''
171 file = open(name, 'w')
172 i = 0
173 while i < size:
174 sector = struct.pack('>l504xl', i / 512, i / 512)
175 file.write(sector)
176 i = i + 512
177 file.close()
178
74f69050
FZ
179def image_size(img):
180 '''Return image's virtual size'''
181 r = qemu_img_pipe('info', '--output=json', '-f', imgfmt, img)
182 return json.loads(r)['virtual-size']
183
a2d1c8fd
DB
184test_dir_re = re.compile(r"%s" % test_dir)
185def filter_test_dir(msg):
186 return test_dir_re.sub("TEST_DIR", msg)
187
188win32_re = re.compile(r"\r")
189def filter_win32(msg):
190 return win32_re.sub("", msg)
191
192qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)")
193def filter_qemu_io(msg):
194 msg = filter_win32(msg)
195 return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg)
196
197chown_re = re.compile(r"chown [0-9]+:[0-9]+")
198def filter_chown(msg):
199 return chown_re.sub("chown UID:GID", msg)
200
12314f2d
SH
201def filter_qmp_event(event):
202 '''Filter a QMP event dict'''
203 event = dict(event)
204 if 'timestamp' in event:
205 event['timestamp']['seconds'] = 'SECS'
206 event['timestamp']['microseconds'] = 'USECS'
207 return event
208
a2d1c8fd
DB
209def log(msg, filters=[]):
210 for flt in filters:
211 msg = flt(msg)
212 print msg
213
2c93c5cb
KW
214class Timeout:
215 def __init__(self, seconds, errmsg = "Timeout"):
216 self.seconds = seconds
217 self.errmsg = errmsg
218 def __enter__(self):
219 signal.signal(signal.SIGALRM, self.timeout)
220 signal.setitimer(signal.ITIMER_REAL, self.seconds)
221 return self
222 def __exit__(self, type, value, traceback):
223 signal.setitimer(signal.ITIMER_REAL, 0)
224 return False
225 def timeout(self, signum, frame):
226 raise Exception(self.errmsg)
227
f4844ac0
SH
228
229class FilePath(object):
230 '''An auto-generated filename that cleans itself up.
231
232 Use this context manager to generate filenames and ensure that the file
233 gets deleted::
234
235 with TestFilePath('test.img') as img_path:
236 qemu_img('create', img_path, '1G')
237 # migration_sock_path is automatically deleted
238 '''
239 def __init__(self, name):
240 filename = '{0}-{1}'.format(os.getpid(), name)
241 self.path = os.path.join(test_dir, filename)
242
243 def __enter__(self):
244 return self.path
245
246 def __exit__(self, exc_type, exc_val, exc_tb):
247 try:
248 os.remove(self.path)
249 except OSError:
250 pass
251 return False
252
253
ef6e9228
VSO
254def file_path_remover():
255 for path in reversed(file_path_remover.paths):
256 try:
257 os.remove(path)
258 except OSError:
259 pass
260
261
262def file_path(*names):
263 ''' Another way to get auto-generated filename that cleans itself up.
264
265 Use is as simple as:
266
267 img_a, img_b = file_path('a.img', 'b.img')
268 sock = file_path('socket')
269 '''
270
271 if not hasattr(file_path_remover, 'paths'):
272 file_path_remover.paths = []
273 atexit.register(file_path_remover)
274
275 paths = []
276 for name in names:
277 filename = '{0}-{1}'.format(os.getpid(), name)
278 path = os.path.join(test_dir, filename)
279 file_path_remover.paths.append(path)
280 paths.append(path)
281
282 return paths[0] if len(paths) == 1 else paths
283
284
4c44b4a4 285class VM(qtest.QEMUQtestMachine):
f345cfd0
SH
286 '''A QEMU VM'''
287
5fcbdf50
HR
288 def __init__(self, path_suffix=''):
289 name = "qemu%s-%d" % (path_suffix, os.getpid())
290 super(VM, self).__init__(qemu_prog, qemu_opts, name=name,
291 test_dir=test_dir,
4c44b4a4 292 socket_scm_helper=socket_scm_helper)
f345cfd0 293 self._num_drives = 0
30b005d9 294
ccc15f7d
SH
295 def add_object(self, opts):
296 self._args.append('-object')
297 self._args.append(opts)
298 return self
299
486b88bd
KW
300 def add_device(self, opts):
301 self._args.append('-device')
302 self._args.append(opts)
303 return self
304
78b666f4
FZ
305 def add_drive_raw(self, opts):
306 self._args.append('-drive')
307 self._args.append(opts)
308 return self
309
e1b5c51f 310 def add_drive(self, path, opts='', interface='virtio', format=imgfmt):
f345cfd0 311 '''Add a virtio-blk drive to the VM'''
8e492253 312 options = ['if=%s' % interface,
f345cfd0 313 'id=drive%d' % self._num_drives]
8e492253
HR
314
315 if path is not None:
316 options.append('file=%s' % path)
e1b5c51f 317 options.append('format=%s' % format)
fc17c259 318 options.append('cache=%s' % cachemode)
8e492253 319
f345cfd0
SH
320 if opts:
321 options.append(opts)
322
85a353a0
VSO
323 if format == 'luks' and 'key-secret' not in opts:
324 # default luks support
325 if luks_default_secret_object not in self._args:
326 self.add_object(luks_default_secret_object)
327
328 options.append(luks_default_key_secret_opt)
329
f345cfd0
SH
330 self._args.append('-drive')
331 self._args.append(','.join(options))
332 self._num_drives += 1
333 return self
334
5694923a
HR
335 def add_blockdev(self, opts):
336 self._args.append('-blockdev')
337 if isinstance(opts, str):
338 self._args.append(opts)
339 else:
340 self._args.append(','.join(opts))
341 return self
342
12314f2d
SH
343 def add_incoming(self, addr):
344 self._args.append('-incoming')
345 self._args.append(addr)
346 return self
347
3cf53c77
FZ
348 def pause_drive(self, drive, event=None):
349 '''Pause drive r/w operations'''
350 if not event:
351 self.pause_drive(drive, "read_aio")
352 self.pause_drive(drive, "write_aio")
353 return
354 self.qmp('human-monitor-command',
355 command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive))
356
357 def resume_drive(self, drive):
358 self.qmp('human-monitor-command',
359 command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive))
360
e3409362
IM
361 def hmp_qemu_io(self, drive, cmd):
362 '''Write to a given drive using an HMP command'''
363 return self.qmp('human-monitor-command',
364 command_line='qemu-io %s "%s"' % (drive, cmd))
365
7898f74e 366
f345cfd0
SH
367index_re = re.compile(r'([^\[]+)\[([^\]]+)\]')
368
369class QMPTestCase(unittest.TestCase):
370 '''Abstract base class for QMP test cases'''
371
372 def dictpath(self, d, path):
373 '''Traverse a path in a nested dict'''
374 for component in path.split('/'):
375 m = index_re.match(component)
376 if m:
377 component, idx = m.groups()
378 idx = int(idx)
379
380 if not isinstance(d, dict) or component not in d:
381 self.fail('failed path traversal for "%s" in "%s"' % (path, str(d)))
382 d = d[component]
383
384 if m:
385 if not isinstance(d, list):
386 self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d)))
387 try:
388 d = d[idx]
389 except IndexError:
390 self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d)))
391 return d
392
e07375f5
HR
393 def flatten_qmp_object(self, obj, output=None, basestr=''):
394 if output is None:
395 output = dict()
396 if isinstance(obj, list):
397 for i in range(len(obj)):
398 self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.')
399 elif isinstance(obj, dict):
400 for key in obj:
401 self.flatten_qmp_object(obj[key], output, basestr + key + '.')
402 else:
403 output[basestr[:-1]] = obj # Strip trailing '.'
404 return output
405
5694923a
HR
406 def qmp_to_opts(self, obj):
407 obj = self.flatten_qmp_object(obj)
408 output_list = list()
409 for key in obj:
410 output_list += [key + '=' + obj[key]]
411 return ','.join(output_list)
412
90f0b711
PB
413 def assert_qmp_absent(self, d, path):
414 try:
415 result = self.dictpath(d, path)
416 except AssertionError:
417 return
418 self.fail('path "%s" has value "%s"' % (path, str(result)))
419
f345cfd0
SH
420 def assert_qmp(self, d, path, value):
421 '''Assert that the value for a specific path in a QMP dict matches'''
422 result = self.dictpath(d, path)
423 self.assertEqual(result, value, 'values not equal "%s" and "%s"' % (str(result), str(value)))
424
ecc1c88e
SH
425 def assert_no_active_block_jobs(self):
426 result = self.vm.qmp('query-block-jobs')
427 self.assert_qmp(result, 'return', [])
428
e71fc0ba
FZ
429 def assert_has_block_node(self, node_name=None, file_name=None):
430 """Issue a query-named-block-nodes and assert node_name and/or
431 file_name is present in the result"""
432 def check_equal_or_none(a, b):
433 return a == None or b == None or a == b
434 assert node_name or file_name
435 result = self.vm.qmp('query-named-block-nodes')
436 for x in result["return"]:
437 if check_equal_or_none(x.get("node-name"), node_name) and \
438 check_equal_or_none(x.get("file"), file_name):
439 return
440 self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \
441 (node_name, file_name, result))
442
e07375f5
HR
443 def assert_json_filename_equal(self, json_filename, reference):
444 '''Asserts that the given filename is a json: filename and that its
445 content is equal to the given reference object'''
446 self.assertEqual(json_filename[:5], 'json:')
447 self.assertEqual(self.flatten_qmp_object(json.loads(json_filename[5:])),
448 self.flatten_qmp_object(reference))
449
3cf53c77 450 def cancel_and_wait(self, drive='drive0', force=False, resume=False):
2575fe16
SH
451 '''Cancel a block job and wait for it to finish, returning the event'''
452 result = self.vm.qmp('block-job-cancel', device=drive, force=force)
453 self.assert_qmp(result, 'return', {})
454
3cf53c77
FZ
455 if resume:
456 self.vm.resume_drive(drive)
457
2575fe16
SH
458 cancelled = False
459 result = None
460 while not cancelled:
461 for event in self.vm.get_qmp_events(wait=True):
462 if event['event'] == 'BLOCK_JOB_COMPLETED' or \
463 event['event'] == 'BLOCK_JOB_CANCELLED':
464 self.assert_qmp(event, 'data/device', drive)
465 result = event
466 cancelled = True
467
468 self.assert_no_active_block_jobs()
469 return result
470
9974ad40 471 def wait_until_completed(self, drive='drive0', check_offset=True):
0dbe8a1b 472 '''Wait for a block job to finish, returning the event'''
c3988519 473 while True:
0dbe8a1b
SH
474 for event in self.vm.get_qmp_events(wait=True):
475 if event['event'] == 'BLOCK_JOB_COMPLETED':
476 self.assert_qmp(event, 'data/device', drive)
477 self.assert_qmp_absent(event, 'data/error')
9974ad40 478 if check_offset:
1d3ba15a 479 self.assert_qmp(event, 'data/offset', event['data']['len'])
c3988519
PX
480 self.assert_no_active_block_jobs()
481 return event
0dbe8a1b 482
866323f3
FZ
483 def wait_ready(self, drive='drive0'):
484 '''Wait until a block job BLOCK_JOB_READY event'''
d7b25297
FZ
485 f = {'data': {'type': 'mirror', 'device': drive } }
486 event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f)
866323f3
FZ
487
488 def wait_ready_and_cancel(self, drive='drive0'):
489 self.wait_ready(drive=drive)
490 event = self.cancel_and_wait(drive=drive)
491 self.assertEquals(event['event'], 'BLOCK_JOB_COMPLETED')
492 self.assert_qmp(event, 'data/type', 'mirror')
493 self.assert_qmp(event, 'data/offset', event['data']['len'])
494
495 def complete_and_wait(self, drive='drive0', wait_ready=True):
496 '''Complete a block job and wait for it to finish'''
497 if wait_ready:
498 self.wait_ready(drive=drive)
499
500 result = self.vm.qmp('block-job-complete', device=drive)
501 self.assert_qmp(result, 'return', {})
502
503 event = self.wait_until_completed(drive=drive)
504 self.assert_qmp(event, 'data/type', 'mirror')
505
f03d9d24 506 def pause_wait(self, job_id='job0'):
2c93c5cb
KW
507 with Timeout(1, "Timeout waiting for job to pause"):
508 while True:
509 result = self.vm.qmp('query-block-jobs')
510 for job in result['return']:
511 if job['device'] == job_id and job['paused'] == True and job['busy'] == False:
512 return job
513
f03d9d24
JS
514 def pause_job(self, job_id='job0', wait=True):
515 result = self.vm.qmp('block-job-pause', device=job_id)
516 self.assert_qmp(result, 'return', {})
517 if wait:
518 return self.pause_wait(job_id)
519 return result
520
2c93c5cb 521
f345cfd0
SH
522def notrun(reason):
523 '''Skip this test suite'''
524 # Each test in qemu-iotests has a number ("seq")
525 seq = os.path.basename(sys.argv[0])
526
e8f8624d 527 open('%s/%s.notrun' % (output_dir, seq), 'wb').write(reason + '\n')
f345cfd0
SH
528 print '%s not run: %s' % (seq, reason)
529 sys.exit(0)
530
3f5c4076 531def verify_image_format(supported_fmts=[], unsupported_fmts=[]):
f48351d2
VSO
532 assert not (supported_fmts and unsupported_fmts)
533
534 if 'generic' in supported_fmts and \
535 os.environ.get('IMGFMT_GENERIC', 'true') == 'true':
536 # similar to
537 # _supported_fmt generic
538 # for bash tests
539 return
540
541 not_sup = supported_fmts and (imgfmt not in supported_fmts)
542 if not_sup or (imgfmt in unsupported_fmts):
3f5c4076 543 notrun('not suitable for this image format: %s' % imgfmt)
f345cfd0 544
c6a92369 545def verify_platform(supported_oses=['linux']):
79e7a019 546 if True not in [sys.platform.startswith(x) for x in supported_oses]:
bc521696
FZ
547 notrun('not suitable for this OS: %s' % sys.platform)
548
ac8bd439
VSO
549def verify_cache_mode(supported_cache_modes=[]):
550 if supported_cache_modes and (cachemode not in supported_cache_modes):
551 notrun('not suitable for this cache mode: %s' % cachemode)
552
b0f90495
AG
553def supports_quorum():
554 return 'quorum' in qemu_img_pipe('--help')
555
3f647b51
SS
556def verify_quorum():
557 '''Skip test suite if quorum support is not available'''
b0f90495 558 if not supports_quorum():
3f647b51
SS
559 notrun('quorum support missing')
560
febc8c86
VSO
561def main(supported_fmts=[], supported_oses=['linux'], supported_cache_modes=[],
562 unsupported_fmts=[]):
c6a92369
DB
563 '''Run tests'''
564
c0088d79
KW
565 global debug
566
5a8fabf3
SS
567 # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to
568 # indicate that we're not being run via "check". There may be
569 # other things set up by "check" that individual test cases rely
570 # on.
571 if test_dir is None or qemu_default_machine is None:
572 sys.stderr.write('Please run this test via the "check" script\n')
573 sys.exit(os.EX_USAGE)
574
c6a92369
DB
575 debug = '-d' in sys.argv
576 verbosity = 1
febc8c86 577 verify_image_format(supported_fmts, unsupported_fmts)
c6a92369 578 verify_platform(supported_oses)
ac8bd439 579 verify_cache_mode(supported_cache_modes)
c6a92369 580
f345cfd0
SH
581 # We need to filter out the time taken from the output so that qemu-iotest
582 # can reliably diff the results against master output.
583 import StringIO
aa4f592a
FZ
584 if debug:
585 output = sys.stdout
586 verbosity = 2
587 sys.argv.remove('-d')
588 else:
589 output = StringIO.StringIO()
f345cfd0 590
43851b5b
EH
591 logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN))
592
f345cfd0 593 class MyTestRunner(unittest.TextTestRunner):
aa4f592a 594 def __init__(self, stream=output, descriptions=True, verbosity=verbosity):
f345cfd0
SH
595 unittest.TextTestRunner.__init__(self, stream, descriptions, verbosity)
596
597 # unittest.main() will use sys.exit() so expect a SystemExit exception
598 try:
599 unittest.main(testRunner=MyTestRunner)
600 finally:
aa4f592a
FZ
601 if not debug:
602 sys.stderr.write(re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', output.getvalue()))