]> git.proxmox.com Git - ceph.git/blame - ceph/src/boost/tools/build/test/TestCmd.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / tools / build / test / TestCmd.py
CommitLineData
7c673cae
FG
1"""
2TestCmd.py: a testing framework for commands and scripts.
3
4The TestCmd module provides a framework for portable automated testing of
5executable commands and scripts (in any language, not just Python), especially
6commands and scripts that require file system interaction.
7
8In addition to running tests and evaluating conditions, the TestCmd module
9manages and cleans up one or more temporary workspace directories, and provides
10methods for creating files and directories in those workspace directories from
11in-line data, here-documents), allowing tests to be completely self-contained.
12
13A TestCmd environment object is created via the usual invocation:
14
15 test = TestCmd()
16
17The TestCmd module provides pass_test(), fail_test(), and no_result() unbound
18methods that report test results for use with the Aegis change management
19system. These methods terminate the test immediately, reporting PASSED, FAILED
20or NO RESULT respectively and exiting with status 0 (success), 1 or 2
21respectively. This allows for a distinction between an actual failed test and a
22test that could not be properly evaluated because of an external condition (such
23as a full file system or incorrect permissions).
24
25"""
26
27# Copyright 2000 Steven Knight
28# This module is free software, and you may redistribute it and/or modify
29# it under the same terms as Python itself, so long as this copyright message
30# and disclaimer are retained in their original form.
31#
32# IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
33# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
34# THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
35# DAMAGE.
36#
37# THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
38# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
39# PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
40# AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
41# SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
42
43# Copyright 2002-2003 Vladimir Prus.
44# Copyright 2002-2003 Dave Abrahams.
45# Copyright 2006 Rene Rivera.
46# Distributed under the Boost Software License, Version 1.0.
47# (See accompanying file LICENSE_1_0.txt or copy at
48# http://www.boost.org/LICENSE_1_0.txt)
49
92f5a8d4 50from __future__ import print_function
7c673cae
FG
51
52__author__ = "Steven Knight <knight@baldmt.com>"
53__revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software"
54__version__ = "0.02"
55
56from types import *
57
58import os
59import os.path
60import re
61import shutil
62import stat
63import subprocess
64import sys
65import tempfile
66import traceback
67
68
69tempfile.template = 'testcmd.'
70
71_Cleanup = []
72
73def _clean():
74 global _Cleanup
75 list = _Cleanup[:]
76 _Cleanup = []
77 list.reverse()
78 for test in list:
79 test.cleanup()
80
81sys.exitfunc = _clean
82
83
84def caller(tblist, skip):
85 string = ""
86 arr = []
87 for file, line, name, text in tblist:
88 if file[-10:] == "TestCmd.py":
89 break
90 arr = [(file, line, name, text)] + arr
91 atfrom = "at"
92 for file, line, name, text in arr[skip:]:
93 if name == "?":
94 name = ""
95 else:
96 name = " (" + name + ")"
97 string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name))
98 atfrom = "\tfrom"
99 return string
100
101
102def fail_test(self=None, condition=True, function=None, skip=0):
103 """Cause the test to fail.
104
105 By default, the fail_test() method reports that the test FAILED and exits
106 with a status of 1. If a condition argument is supplied, the test fails
107 only if the condition is true.
108
109 """
110 if not condition:
111 return
112 if not function is None:
113 function()
114 of = ""
115 desc = ""
116 sep = " "
117 if not self is None:
118 if self.program:
92f5a8d4 119 of = " of " + " ".join(self.program)
7c673cae
FG
120 sep = "\n\t"
121 if self.description:
122 desc = " [" + self.description + "]"
123 sep = "\n\t"
124
125 at = caller(traceback.extract_stack(), skip)
126
127 sys.stderr.write("FAILED test" + of + desc + sep + at + """
128in directory: """ + os.getcwd() )
129 sys.exit(1)
130
131
132def no_result(self=None, condition=True, function=None, skip=0):
133 """Causes a test to exit with no valid result.
134
135 By default, the no_result() method reports NO RESULT for the test and
136 exits with a status of 2. If a condition argument is supplied, the test
137 fails only if the condition is true.
138
139 """
140 if not condition:
141 return
142 if not function is None:
143 function()
144 of = ""
145 desc = ""
146 sep = " "
147 if not self is None:
148 if self.program:
149 of = " of " + self.program
150 sep = "\n\t"
151 if self.description:
152 desc = " [" + self.description + "]"
153 sep = "\n\t"
154
155 at = caller(traceback.extract_stack(), skip)
156 sys.stderr.write("NO RESULT for test" + of + desc + sep + at)
157 sys.exit(2)
158
159
160def pass_test(self=None, condition=True, function=None):
161 """Causes a test to pass.
162
163 By default, the pass_test() method reports PASSED for the test and exits
164 with a status of 0. If a condition argument is supplied, the test passes
165 only if the condition is true.
166
167 """
168 if not condition:
169 return
170 if not function is None:
171 function()
172 sys.stderr.write("PASSED\n")
173 sys.exit(0)
174
b32b8144
FG
175class MatchError(object):
176 def __init__(self, message):
177 self.message = message
178 def __nonzero__(self):
179 return False
180 def __bool__(self):
181 return False
7c673cae
FG
182
183def match_exact(lines=None, matches=None):
184 """
185 Returns whether the given lists or strings containing lines separated
186 using newline characters contain exactly the same data.
187
188 """
92f5a8d4
TL
189 if not type(lines) is list:
190 lines = lines.split("\n")
191 if not type(matches) is list:
192 matches = matches.split("\n")
7c673cae
FG
193 if len(lines) != len(matches):
194 return
195 for i in range(len(lines)):
196 if lines[i] != matches[i]:
b32b8144
FG
197 return MatchError("Mismatch at line %d\n- %s\n+ %s\n" %
198 (i+1, matches[i], lines[i]))
199 if len(lines) < len(matches):
200 return MatchError("Missing lines at line %d\n- %s" %
201 (len(lines), "\n- ".join(matches[len(lines):])))
202 if len(lines) > len(matches):
203 return MatchError("Extra lines at line %d\n+ %s" %
204 (len(matches), "\n+ ".join(lines[len(matches):])))
7c673cae
FG
205 return 1
206
207
208def match_re(lines=None, res=None):
209 """
210 Given lists or strings contain lines separated using newline characters.
211 This function matches those lines one by one, interpreting the lines in the
212 res parameter as regular expressions.
213
214 """
92f5a8d4
TL
215 if not type(lines) is list:
216 lines = lines.split("\n")
217 if not type(res) is list:
218 res = res.split("\n")
b32b8144 219 for i in range(min(len(lines), len(res))):
7c673cae 220 if not re.compile("^" + res[i] + "$").search(lines[i]):
b32b8144
FG
221 return MatchError("Mismatch at line %d\n- %s\n+ %s\n" %
222 (i+1, res[i], lines[i]))
223 if len(lines) < len(res):
224 return MatchError("Missing lines at line %d\n- %s" %
225 (len(lines), "\n- ".join(res[len(lines):])))
226 if len(lines) > len(res):
227 return MatchError("Extra lines at line %d\n+ %s" %
228 (len(res), "\n+ ".join(lines[len(res):])))
7c673cae
FG
229 return 1
230
231
232class TestCmd:
233 def __init__(self, description=None, program=None, workdir=None,
234 subdir=None, verbose=False, match=None, inpath=None):
235
236 self._cwd = os.getcwd()
237 self.description_set(description)
238 self.program_set(program, inpath)
239 self.verbose_set(verbose)
240 if match is None:
241 self.match_func = match_re
242 else:
243 self.match_func = match
244 self._dirlist = []
245 self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0}
246 env = os.environ.get('PRESERVE')
247 if env:
248 self._preserve['pass_test'] = env
249 self._preserve['fail_test'] = env
250 self._preserve['no_result'] = env
251 else:
252 env = os.environ.get('PRESERVE_PASS')
253 if env is not None:
254 self._preserve['pass_test'] = env
255 env = os.environ.get('PRESERVE_FAIL')
256 if env is not None:
257 self._preserve['fail_test'] = env
258 env = os.environ.get('PRESERVE_PASS')
259 if env is not None:
260 self._preserve['PRESERVE_NO_RESULT'] = env
261 self._stdout = []
262 self._stderr = []
263 self.status = None
264 self.condition = 'no_result'
265 self.workdir_set(workdir)
266 self.subdir(subdir)
267
268 def __del__(self):
269 self.cleanup()
270
271 def __repr__(self):
272 return "%x" % id(self)
273
274 def cleanup(self, condition=None):
275 """
276 Removes any temporary working directories for the specified TestCmd
277 environment. If the environment variable PRESERVE was set when the
278 TestCmd environment was created, temporary working directories are not
279 removed. If any of the environment variables PRESERVE_PASS,
280 PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd
281 environment was created, then temporary working directories are not
282 removed if the test passed, failed or had no result, respectively.
283 Temporary working directories are also preserved for conditions
284 specified via the preserve method.
285
286 Typically, this method is not called directly, but is used when the
287 script exits to clean up temporary working directories as appropriate
288 for the exit status.
289
290 """
291 if not self._dirlist:
292 return
293 if condition is None:
294 condition = self.condition
295 if self._preserve[condition]:
296 for dir in self._dirlist:
297 print("Preserved directory %s" % dir)
298 else:
299 list = self._dirlist[:]
300 list.reverse()
301 for dir in list:
302 self.writable(dir, 1)
303 shutil.rmtree(dir, ignore_errors=1)
304
305 self._dirlist = []
306 self.workdir = None
307 os.chdir(self._cwd)
308 try:
309 global _Cleanup
310 _Cleanup.remove(self)
311 except (AttributeError, ValueError):
312 pass
313
314 def description_set(self, description):
315 """Set the description of the functionality being tested."""
316 self.description = description
317
318 def fail_test(self, condition=True, function=None, skip=0):
319 """Cause the test to fail."""
320 if not condition:
321 return
322 self.condition = 'fail_test'
323 fail_test(self = self,
324 condition = condition,
325 function = function,
326 skip = skip)
327
328 def match(self, lines, matches):
329 """Compare actual and expected file contents."""
330 return self.match_func(lines, matches)
331
332 def match_exact(self, lines, matches):
333 """Compare actual and expected file content exactly."""
334 return match_exact(lines, matches)
335
336 def match_re(self, lines, res):
337 """Compare file content with a regular expression."""
338 return match_re(lines, res)
339
340 def no_result(self, condition=True, function=None, skip=0):
341 """Report that the test could not be run."""
342 if not condition:
343 return
344 self.condition = 'no_result'
345 no_result(self = self,
346 condition = condition,
347 function = function,
348 skip = skip)
349
350 def pass_test(self, condition=True, function=None):
351 """Cause the test to pass."""
352 if not condition:
353 return
354 self.condition = 'pass_test'
355 pass_test(self, condition, function)
356
357 def preserve(self, *conditions):
358 """
359 Arrange for the temporary working directories for the specified
360 TestCmd environment to be preserved for one or more conditions. If no
361 conditions are specified, arranges for the temporary working
362 directories to be preserved for all conditions.
363
364 """
365 if conditions is ():
366 conditions = ('pass_test', 'fail_test', 'no_result')
367 for cond in conditions:
368 self._preserve[cond] = 1
369
370 def program_set(self, program, inpath):
371 """Set the executable program or script to be tested."""
372 if not inpath and program and not os.path.isabs(program[0]):
373 program[0] = os.path.join(self._cwd, program[0])
374 self.program = program
375
376 def read(self, file, mode='rb'):
377 """
378 Reads and returns the contents of the specified file name. The file
379 name may be a list, in which case the elements are concatenated with
380 the os.path.join() method. The file is assumed to be under the
381 temporary working directory unless it is an absolute path name. The I/O
382 mode for the file may be specified and must begin with an 'r'. The
383 default is 'rb' (binary read).
384
385 """
92f5a8d4
TL
386 if type(file) is list:
387 file = os.path.join(*file)
7c673cae
FG
388 if not os.path.isabs(file):
389 file = os.path.join(self.workdir, file)
390 if mode[0] != 'r':
92f5a8d4 391 raise ValueError("mode must begin with 'r'")
7c673cae
FG
392 return open(file, mode).read()
393
394 def run(self, program=None, arguments=None, chdir=None, stdin=None,
395 universal_newlines=True):
396 """
397 Runs a test of the program or script for the test environment.
398 Standard output and error output are saved for future retrieval via the
399 stdout() and stderr() methods.
400
401 'universal_newlines' parameter controls how the child process
402 input/output streams are opened as defined for the same named Python
403 subprocess.POpen constructor parameter.
404
405 """
406 if chdir:
407 if not os.path.isabs(chdir):
408 chdir = os.path.join(self.workpath(chdir))
409 if self.verbose:
410 sys.stderr.write("chdir(" + chdir + ")\n")
411 else:
412 chdir = self.workdir
413
414 cmd = []
415 if program and program[0]:
416 if program[0] != self.program[0] and not os.path.isabs(program[0]):
417 program[0] = os.path.join(self._cwd, program[0])
418 cmd += program
419 else:
420 cmd += self.program
421 if arguments:
422 cmd += arguments.split(" ")
423 if self.verbose:
20effc67 424 sys.stderr.write("run(" + " ".join(cmd) + ")\n")
7c673cae
FG
425 p = subprocess.Popen(cmd, stdin=subprocess.PIPE,
426 stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir,
427 universal_newlines=universal_newlines)
428
429 if stdin:
92f5a8d4 430 if type(stdin) is list:
b32b8144
FG
431 stdin = "".join(stdin)
432 out, err = p.communicate(stdin)
92f5a8d4
TL
433 if not type(out) is str:
434 out = out.decode()
435 if not type(err) is str:
436 err = err.decode()
7c673cae
FG
437 self._stdout.append(out)
438 self._stderr.append(err)
439 self.status = p.returncode
440
441 if self.verbose:
442 sys.stdout.write(self._stdout[-1])
443 sys.stderr.write(self._stderr[-1])
444
445 def stderr(self, run=None):
446 """
447 Returns the error output from the specified run number. If there is
448 no specified run number, then returns the error output of the last run.
449 If the run number is less than zero, then returns the error output from
450 that many runs back from the current run.
451
452 """
453 if not run:
454 run = len(self._stderr)
455 elif run < 0:
456 run = len(self._stderr) + run
457 run -= 1
458 if run < 0:
459 return ''
460 return self._stderr[run]
461
462 def stdout(self, run=None):
463 """
464 Returns the standard output from the specified run number. If there
465 is no specified run number, then returns the standard output of the
466 last run. If the run number is less than zero, then returns the
467 standard output from that many runs back from the current run.
468
469 """
470 if not run:
471 run = len(self._stdout)
472 elif run < 0:
473 run = len(self._stdout) + run
474 run -= 1
475 if run < 0:
476 return ''
477 return self._stdout[run]
478
479 def subdir(self, *subdirs):
480 """
481 Create new subdirectories under the temporary working directory, one
482 for each argument. An argument may be a list, in which case the list
483 elements are concatenated using the os.path.join() method.
484 Subdirectories multiple levels deep must be created using a separate
485 argument for each level:
486
487 test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory'])
488
489 Returns the number of subdirectories actually created.
490
491 """
492 count = 0
493 for sub in subdirs:
494 if sub is None:
495 continue
92f5a8d4
TL
496 if type(sub) is list:
497 sub = os.path.join(*tuple(sub))
7c673cae
FG
498 new = os.path.join(self.workdir, sub)
499 try:
500 os.mkdir(new)
501 except:
502 pass
503 else:
504 count += 1
505 return count
506
507 def unlink(self, file):
508 """
509 Unlinks the specified file name. The file name may be a list, in
510 which case the elements are concatenated using the os.path.join()
511 method. The file is assumed to be under the temporary working directory
512 unless it is an absolute path name.
513
514 """
92f5a8d4
TL
515 if type(file) is list:
516 file = os.path.join(*tuple(file))
7c673cae
FG
517 if not os.path.isabs(file):
518 file = os.path.join(self.workdir, file)
519 os.unlink(file)
520
521 def verbose_set(self, verbose):
522 """Set the verbose level."""
523 self.verbose = verbose
524
525 def workdir_set(self, path):
526 """
527 Creates a temporary working directory with the specified path name.
528 If the path is a null string (''), a unique directory name is created.
529
530 """
531 if os.path.isabs(path):
532 self.workdir = path
533 else:
534 if path != None:
535 if path == '':
536 path = tempfile.mktemp()
537 if path != None:
538 os.mkdir(path)
539 self._dirlist.append(path)
540 global _Cleanup
541 try:
542 _Cleanup.index(self)
543 except ValueError:
544 _Cleanup.append(self)
545 # We would like to set self.workdir like this:
546 # self.workdir = path
547 # But symlinks in the path will report things differently from
548 # os.getcwd(), so chdir there and back to fetch the canonical
549 # path.
550 cwd = os.getcwd()
551 os.chdir(path)
552 self.workdir = os.getcwd()
553 os.chdir(cwd)
554 else:
555 self.workdir = None
556
557 def workpath(self, *args):
558 """
559 Returns the absolute path name to a subdirectory or file within the
560 current temporary working directory. Concatenates the temporary working
561 directory name with the specified arguments using os.path.join().
562
563 """
92f5a8d4 564 return os.path.join(self.workdir, *tuple(args))
7c673cae
FG
565
566 def writable(self, top, write):
567 """
568 Make the specified directory tree writable (write == 1) or not
569 (write == None).
570
571 """
572 def _walk_chmod(arg, dirname, names):
573 st = os.stat(dirname)
574 os.chmod(dirname, arg(st[stat.ST_MODE]))
575 for name in names:
576 fullname = os.path.join(dirname, name)
577 st = os.stat(fullname)
578 os.chmod(fullname, arg(st[stat.ST_MODE]))
579
92f5a8d4
TL
580 _mode_writable = lambda mode: stat.S_IMODE(mode|0o200)
581 _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0o200)
7c673cae
FG
582
583 if write:
584 f = _mode_writable
585 else:
586 f = _mode_non_writable
587 try:
92f5a8d4
TL
588 for root, _, files in os.walk(top):
589 _walk_chmod(f, root, files)
7c673cae
FG
590 except:
591 pass # Ignore any problems changing modes.
592
593 def write(self, file, content, mode='wb'):
594 """
595 Writes the specified content text (second argument) to the specified
596 file name (first argument). The file name may be a list, in which case
597 the elements are concatenated using the os.path.join() method. The file
598 is created under the temporary working directory. Any subdirectories in
599 the path must already exist. The I/O mode for the file may be specified
600 and must begin with a 'w'. The default is 'wb' (binary write).
601
602 """
92f5a8d4
TL
603 if type(file) is list:
604 file = os.path.join(*tuple(file))
7c673cae
FG
605 if not os.path.isabs(file):
606 file = os.path.join(self.workdir, file)
607 if mode[0] != 'w':
92f5a8d4 608 raise ValueError("mode must begin with 'w'")
7c673cae 609 open(file, mode).write(content)