]>
git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/testrunner.py
a56b6da3968eec3432b4304dc16f9dec5fa15f92
1 # Class for actually running tests.
3 # Copyright (c) 2020-2021 Virtuozzo International GmbH
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from pathlib
import Path
29 from contextlib
import contextmanager
30 from typing
import List
, Optional
, Iterator
, Any
, Sequence
, Dict
, \
33 from testenv
import TestEnv
36 def silent_unlink(path
: Path
) -> None:
43 def file_diff(file1
: str, file2
: str) -> List
[str]:
44 with
open(file1
, encoding
="utf-8") as f1
, \
45 open(file2
, encoding
="utf-8") as f2
:
46 # We want to ignore spaces at line ends. There are a lot of mess about
48 # TODO: fix all tests to not produce extra spaces, fix all .out files
49 # and use strict diff here!
50 seq1
= [line
.rstrip() for line
in f1
]
51 seq2
= [line
.rstrip() for line
in f2
]
53 for line
in difflib
.unified_diff(seq1
, seq2
, file1
, file2
)]
57 # We want to save current tty settings during test run,
58 # since an aborting qemu call may leave things screwed up.
60 def savetty() -> Iterator
[None]:
61 isterm
= sys
.stdin
.isatty()
63 fd
= sys
.stdin
.fileno()
64 attr
= termios
.tcgetattr(fd
)
70 termios
.tcsetattr(fd
, termios
.TCSADRAIN
, attr
)
73 class LastElapsedTime(ContextManager
['LastElapsedTime']):
74 """ Cache for elapsed time for tests, to show it during new test run
76 It is safe to use get() at any time. To use update(), you must either
77 use it inside with-block or use save() after update().
79 def __init__(self
, cache_file
: str, env
: TestEnv
) -> None:
81 self
.cache_file
= cache_file
82 self
.cache
: Dict
[str, Dict
[str, Dict
[str, float]]]
85 with
open(cache_file
, encoding
="utf-8") as f
:
86 self
.cache
= json
.load(f
)
87 except (OSError, ValueError):
90 def get(self
, test
: str,
91 default
: Optional
[float] = None) -> Optional
[float]:
92 if test
not in self
.cache
:
95 if self
.env
.imgproto
not in self
.cache
[test
]:
98 return self
.cache
[test
][self
.env
.imgproto
].get(self
.env
.imgfmt
,
101 def update(self
, test
: str, elapsed
: float) -> None:
102 d
= self
.cache
.setdefault(test
, {})
103 d
.setdefault(self
.env
.imgproto
, {})[self
.env
.imgfmt
] = elapsed
105 def save(self
) -> None:
106 with
open(self
.cache_file
, 'w', encoding
="utf-8") as f
:
107 json
.dump(self
.cache
, f
)
109 def __enter__(self
) -> 'LastElapsedTime':
112 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
117 def __init__(self
, status
: str, description
: str = '',
118 elapsed
: Optional
[float] = None, diff
: Sequence
[str] = (),
119 casenotrun
: str = '', interrupted
: bool = False) -> None:
121 self
.description
= description
122 self
.elapsed
= elapsed
124 self
.casenotrun
= casenotrun
125 self
.interrupted
= interrupted
128 class TestRunner(ContextManager
['TestRunner']):
129 def __init__(self
, env
: TestEnv
, makecheck
: bool = False,
130 color
: str = 'auto') -> None:
132 self
.makecheck
= makecheck
133 self
.last_elapsed
= LastElapsedTime('.last-elapsed-cache', env
)
135 assert color
in ('auto', 'on', 'off')
136 self
.color
= (color
== 'on') or (color
== 'auto' and
139 self
._stack
: contextlib
.ExitStack
141 def __enter__(self
) -> 'TestRunner':
142 self
._stack
= contextlib
.ExitStack()
143 self
._stack
.enter_context(self
.env
)
144 self
._stack
.enter_context(self
.last_elapsed
)
145 self
._stack
.enter_context(savetty())
148 def __exit__(self
, exc_type
: Any
, exc_value
: Any
, traceback
: Any
) -> None:
151 def test_print_one_line(self
, test
: str, starttime
: str,
152 endtime
: Optional
[str] = None, status
: str = '...',
153 lasttime
: Optional
[float] = None,
154 thistime
: Optional
[float] = None,
155 description
: str = '',
156 test_field_width
: Optional
[int] = None,
157 end
: str = '\n') -> None:
158 """ Print short test info before/after test run """
159 test
= os
.path
.basename(test
)
161 if test_field_width
is None:
164 if self
.makecheck
and status
!= '...':
165 if status
and status
!= 'pass':
166 status
= f
' [{status}]'
170 print(f
' TEST iotest-{self.env.imgfmt}: {test}{status}')
174 lasttime_s
= f
' (last: {lasttime:.1f}s)'
178 thistime_s
= f
'{thistime:.1f}s'
183 endtime
= f
'[{endtime}]'
190 elif status
== 'fail':
191 col
= '\033[1m\033[31m'
192 elif status
== 'not run':
202 print(f
'{test:{test_field_width}} {col}{status:10}{col_end} '
203 f
'[{starttime}] {endtime:13}{thistime_s:5} {lasttime_s:14} '
204 f
'{description}', end
=end
)
206 def find_reference(self
, test
: str) -> str:
207 if self
.env
.cachemode
== 'none':
208 ref
= f
'{test}.out.nocache'
209 if os
.path
.isfile(ref
):
212 ref
= f
'{test}.out.{self.env.imgfmt}'
213 if os
.path
.isfile(ref
):
216 ref
= f
'{test}.{self.env.qemu_default_machine}.out'
217 if os
.path
.isfile(ref
):
222 def do_run_test(self
, test
: str) -> TestResult
:
224 f_bad
= Path(f_test
.name
+ '.out.bad')
225 f_notrun
= Path(f_test
.name
+ '.notrun')
226 f_casenotrun
= Path(f_test
.name
+ '.casenotrun')
227 f_reference
= Path(self
.find_reference(test
))
229 if not f_test
.exists():
230 return TestResult(status
='fail',
231 description
=f
'No such test file: {f_test}')
233 if not os
.access(str(f_test
), os
.X_OK
):
234 sys
.exit(f
'Not executable: {f_test}')
236 if not f_reference
.exists():
237 return TestResult(status
='not run',
238 description
='No qualified output '
239 f
'(expected {f_reference})')
241 for p
in (f_bad
, f_notrun
, f_casenotrun
):
244 args
= [str(f_test
.resolve())]
245 env
= self
.env
.prepare_subprocess(args
)
248 with f_bad
.open('w', encoding
="utf-8") as f
:
249 with subprocess
.Popen(args
, cwd
=str(f_test
.parent
), env
=env
,
250 stdout
=f
, stderr
=subprocess
.STDOUT
) as proc
:
253 except KeyboardInterrupt:
256 return TestResult(status
='not run',
257 description
='Interrupted by user',
259 ret
= proc
.returncode
261 elapsed
= round(time
.time() - t0
, 1)
264 return TestResult(status
='fail', elapsed
=elapsed
,
265 description
=f
'failed, exit status {ret}',
266 diff
=file_diff(str(f_reference
), str(f_bad
)))
268 if f_notrun
.exists():
271 description
=f_notrun
.read_text(encoding
='utf-8').strip())
274 if f_casenotrun
.exists():
275 casenotrun
= f_casenotrun
.read_text(encoding
='utf-8')
277 diff
= file_diff(str(f_reference
), str(f_bad
))
279 return TestResult(status
='fail', elapsed
=elapsed
,
280 description
=f
'output mismatch (see {f_bad})',
281 diff
=diff
, casenotrun
=casenotrun
)
284 self
.last_elapsed
.update(test
, elapsed
)
285 return TestResult(status
='pass', elapsed
=elapsed
,
286 casenotrun
=casenotrun
)
288 def run_test(self
, test
: str,
289 test_field_width
: Optional
[int] = None) -> TestResult
:
290 last_el
= self
.last_elapsed
.get(test
)
291 start
= datetime
.datetime
.now().strftime('%H:%M:%S')
293 if not self
.makecheck
:
294 self
.test_print_one_line(test
=test
, starttime
=start
,
295 lasttime
=last_el
, end
='\r',
296 test_field_width
=test_field_width
)
298 res
= self
.do_run_test(test
)
300 end
= datetime
.datetime
.now().strftime('%H:%M:%S')
301 self
.test_print_one_line(test
=test
, status
=res
.status
,
302 starttime
=start
, endtime
=end
,
303 lasttime
=last_el
, thistime
=res
.elapsed
,
304 description
=res
.description
,
305 test_field_width
=test_field_width
)
308 print(res
.casenotrun
)
312 def run_tests(self
, tests
: List
[str]) -> bool:
318 if not self
.makecheck
:
321 test_field_width
= max(len(os
.path
.basename(t
)) for t
in tests
) + 2
324 name
= os
.path
.basename(t
)
325 res
= self
.run_test(t
, test_field_width
=test_field_width
)
327 assert res
.status
in ('pass', 'fail', 'not run')
332 if res
.status
!= 'not run':
335 if res
.status
== 'fail':
340 print('\n'.join(res
.diff
))
341 elif res
.status
== 'not run':
348 print('Not run:', ' '.join(notrun
))
351 print('Some cases not run in:', ' '.join(casenotrun
))
354 print('Failures:', ' '.join(failed
))
355 print(f
'Failed {len(failed)} of {n_run} iotests')
358 print(f
'Passed all {n_run} iotests')