]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | TestCmd.py: a testing framework for commands and scripts. | |
3 | ||
4 | The TestCmd module provides a framework for portable automated testing of | |
5 | executable commands and scripts (in any language, not just Python), especially | |
6 | commands and scripts that require file system interaction. | |
7 | ||
8 | In addition to running tests and evaluating conditions, the TestCmd module | |
9 | manages and cleans up one or more temporary workspace directories, and provides | |
10 | methods for creating files and directories in those workspace directories from | |
11 | in-line data, here-documents), allowing tests to be completely self-contained. | |
12 | ||
13 | A TestCmd environment object is created via the usual invocation: | |
14 | ||
15 | test = TestCmd() | |
16 | ||
17 | The TestCmd module provides pass_test(), fail_test(), and no_result() unbound | |
18 | methods that report test results for use with the Aegis change management | |
19 | system. These methods terminate the test immediately, reporting PASSED, FAILED | |
20 | or NO RESULT respectively and exiting with status 0 (success), 1 or 2 | |
21 | respectively. This allows for a distinction between an actual failed test and a | |
22 | test that could not be properly evaluated because of an external condition (such | |
23 | as a full file system or incorrect permissions). | |
24 | ||
25 | """ | |
26 | ||
27 | # Copyright 2000 Steven Knight | |
28 | # This module is free software, and you may redistribute it and/or modify | |
29 | # it under the same terms as Python itself, so long as this copyright message | |
30 | # and disclaimer are retained in their original form. | |
31 | # | |
32 | # IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, | |
33 | # SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF | |
34 | # THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH | |
35 | # DAMAGE. | |
36 | # | |
37 | # THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT | |
38 | # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A | |
39 | # PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, | |
40 | # AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE, | |
41 | # SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. | |
42 | ||
43 | # Copyright 2002-2003 Vladimir Prus. | |
44 | # Copyright 2002-2003 Dave Abrahams. | |
45 | # Copyright 2006 Rene Rivera. | |
46 | # Distributed under the Boost Software License, Version 1.0. | |
47 | # (See accompanying file LICENSE_1_0.txt or copy at | |
48 | # http://www.boost.org/LICENSE_1_0.txt) | |
49 | ||
50 | ||
51 | from string import join, split | |
52 | ||
53 | __author__ = "Steven Knight <knight@baldmt.com>" | |
54 | __revision__ = "TestCmd.py 0.D002 2001/08/31 14:56:12 software" | |
55 | __version__ = "0.02" | |
56 | ||
57 | from types import * | |
58 | ||
59 | import os | |
60 | import os.path | |
61 | import re | |
62 | import shutil | |
63 | import stat | |
64 | import subprocess | |
65 | import sys | |
66 | import tempfile | |
67 | import traceback | |
68 | ||
69 | ||
70 | tempfile.template = 'testcmd.' | |
71 | ||
72 | _Cleanup = [] | |
73 | ||
74 | def _clean(): | |
75 | global _Cleanup | |
76 | list = _Cleanup[:] | |
77 | _Cleanup = [] | |
78 | list.reverse() | |
79 | for test in list: | |
80 | test.cleanup() | |
81 | ||
82 | sys.exitfunc = _clean | |
83 | ||
84 | ||
85 | def caller(tblist, skip): | |
86 | string = "" | |
87 | arr = [] | |
88 | for file, line, name, text in tblist: | |
89 | if file[-10:] == "TestCmd.py": | |
90 | break | |
91 | arr = [(file, line, name, text)] + arr | |
92 | atfrom = "at" | |
93 | for file, line, name, text in arr[skip:]: | |
94 | if name == "?": | |
95 | name = "" | |
96 | else: | |
97 | name = " (" + name + ")" | |
98 | string = string + ("%s line %d of %s%s\n" % (atfrom, line, file, name)) | |
99 | atfrom = "\tfrom" | |
100 | return string | |
101 | ||
102 | ||
103 | def fail_test(self=None, condition=True, function=None, skip=0): | |
104 | """Cause the test to fail. | |
105 | ||
106 | By default, the fail_test() method reports that the test FAILED and exits | |
107 | with a status of 1. If a condition argument is supplied, the test fails | |
108 | only if the condition is true. | |
109 | ||
110 | """ | |
111 | if not condition: | |
112 | return | |
113 | if not function is None: | |
114 | function() | |
115 | of = "" | |
116 | desc = "" | |
117 | sep = " " | |
118 | if not self is None: | |
119 | if self.program: | |
120 | of = " of " + join(self.program, " ") | |
121 | sep = "\n\t" | |
122 | if self.description: | |
123 | desc = " [" + self.description + "]" | |
124 | sep = "\n\t" | |
125 | ||
126 | at = caller(traceback.extract_stack(), skip) | |
127 | ||
128 | sys.stderr.write("FAILED test" + of + desc + sep + at + """ | |
129 | in directory: """ + os.getcwd() ) | |
130 | sys.exit(1) | |
131 | ||
132 | ||
133 | def no_result(self=None, condition=True, function=None, skip=0): | |
134 | """Causes a test to exit with no valid result. | |
135 | ||
136 | By default, the no_result() method reports NO RESULT for the test and | |
137 | exits with a status of 2. If a condition argument is supplied, the test | |
138 | fails only if the condition is true. | |
139 | ||
140 | """ | |
141 | if not condition: | |
142 | return | |
143 | if not function is None: | |
144 | function() | |
145 | of = "" | |
146 | desc = "" | |
147 | sep = " " | |
148 | if not self is None: | |
149 | if self.program: | |
150 | of = " of " + self.program | |
151 | sep = "\n\t" | |
152 | if self.description: | |
153 | desc = " [" + self.description + "]" | |
154 | sep = "\n\t" | |
155 | ||
156 | at = caller(traceback.extract_stack(), skip) | |
157 | sys.stderr.write("NO RESULT for test" + of + desc + sep + at) | |
158 | sys.exit(2) | |
159 | ||
160 | ||
161 | def pass_test(self=None, condition=True, function=None): | |
162 | """Causes a test to pass. | |
163 | ||
164 | By default, the pass_test() method reports PASSED for the test and exits | |
165 | with a status of 0. If a condition argument is supplied, the test passes | |
166 | only if the condition is true. | |
167 | ||
168 | """ | |
169 | if not condition: | |
170 | return | |
171 | if not function is None: | |
172 | function() | |
173 | sys.stderr.write("PASSED\n") | |
174 | sys.exit(0) | |
175 | ||
b32b8144 FG |
176 | class MatchError(object): |
177 | def __init__(self, message): | |
178 | self.message = message | |
179 | def __nonzero__(self): | |
180 | return False | |
181 | def __bool__(self): | |
182 | return False | |
7c673cae FG |
183 | |
184 | def match_exact(lines=None, matches=None): | |
185 | """ | |
186 | Returns whether the given lists or strings containing lines separated | |
187 | using newline characters contain exactly the same data. | |
188 | ||
189 | """ | |
190 | if not type(lines) is ListType: | |
191 | lines = split(lines, "\n") | |
192 | if not type(matches) is ListType: | |
193 | matches = split(matches, "\n") | |
194 | if len(lines) != len(matches): | |
195 | return | |
196 | for i in range(len(lines)): | |
197 | if lines[i] != matches[i]: | |
b32b8144 FG |
198 | return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % |
199 | (i+1, matches[i], lines[i])) | |
200 | if len(lines) < len(matches): | |
201 | return MatchError("Missing lines at line %d\n- %s" % | |
202 | (len(lines), "\n- ".join(matches[len(lines):]))) | |
203 | if len(lines) > len(matches): | |
204 | return MatchError("Extra lines at line %d\n+ %s" % | |
205 | (len(matches), "\n+ ".join(lines[len(matches):]))) | |
7c673cae FG |
206 | return 1 |
207 | ||
208 | ||
209 | def match_re(lines=None, res=None): | |
210 | """ | |
211 | Given lists or strings contain lines separated using newline characters. | |
212 | This function matches those lines one by one, interpreting the lines in the | |
213 | res parameter as regular expressions. | |
214 | ||
215 | """ | |
216 | if not type(lines) is ListType: | |
217 | lines = split(lines, "\n") | |
218 | if not type(res) is ListType: | |
219 | res = split(res, "\n") | |
b32b8144 | 220 | for i in range(min(len(lines), len(res))): |
7c673cae | 221 | if not re.compile("^" + res[i] + "$").search(lines[i]): |
b32b8144 FG |
222 | return MatchError("Mismatch at line %d\n- %s\n+ %s\n" % |
223 | (i+1, res[i], lines[i])) | |
224 | if len(lines) < len(res): | |
225 | return MatchError("Missing lines at line %d\n- %s" % | |
226 | (len(lines), "\n- ".join(res[len(lines):]))) | |
227 | if len(lines) > len(res): | |
228 | return MatchError("Extra lines at line %d\n+ %s" % | |
229 | (len(res), "\n+ ".join(lines[len(res):]))) | |
7c673cae FG |
230 | return 1 |
231 | ||
232 | ||
233 | class TestCmd: | |
234 | def __init__(self, description=None, program=None, workdir=None, | |
235 | subdir=None, verbose=False, match=None, inpath=None): | |
236 | ||
237 | self._cwd = os.getcwd() | |
238 | self.description_set(description) | |
239 | self.program_set(program, inpath) | |
240 | self.verbose_set(verbose) | |
241 | if match is None: | |
242 | self.match_func = match_re | |
243 | else: | |
244 | self.match_func = match | |
245 | self._dirlist = [] | |
246 | self._preserve = {'pass_test': 0, 'fail_test': 0, 'no_result': 0} | |
247 | env = os.environ.get('PRESERVE') | |
248 | if env: | |
249 | self._preserve['pass_test'] = env | |
250 | self._preserve['fail_test'] = env | |
251 | self._preserve['no_result'] = env | |
252 | else: | |
253 | env = os.environ.get('PRESERVE_PASS') | |
254 | if env is not None: | |
255 | self._preserve['pass_test'] = env | |
256 | env = os.environ.get('PRESERVE_FAIL') | |
257 | if env is not None: | |
258 | self._preserve['fail_test'] = env | |
259 | env = os.environ.get('PRESERVE_PASS') | |
260 | if env is not None: | |
261 | self._preserve['PRESERVE_NO_RESULT'] = env | |
262 | self._stdout = [] | |
263 | self._stderr = [] | |
264 | self.status = None | |
265 | self.condition = 'no_result' | |
266 | self.workdir_set(workdir) | |
267 | self.subdir(subdir) | |
268 | ||
269 | def __del__(self): | |
270 | self.cleanup() | |
271 | ||
272 | def __repr__(self): | |
273 | return "%x" % id(self) | |
274 | ||
275 | def cleanup(self, condition=None): | |
276 | """ | |
277 | Removes any temporary working directories for the specified TestCmd | |
278 | environment. If the environment variable PRESERVE was set when the | |
279 | TestCmd environment was created, temporary working directories are not | |
280 | removed. If any of the environment variables PRESERVE_PASS, | |
281 | PRESERVE_FAIL or PRESERVE_NO_RESULT were set when the TestCmd | |
282 | environment was created, then temporary working directories are not | |
283 | removed if the test passed, failed or had no result, respectively. | |
284 | Temporary working directories are also preserved for conditions | |
285 | specified via the preserve method. | |
286 | ||
287 | Typically, this method is not called directly, but is used when the | |
288 | script exits to clean up temporary working directories as appropriate | |
289 | for the exit status. | |
290 | ||
291 | """ | |
292 | if not self._dirlist: | |
293 | return | |
294 | if condition is None: | |
295 | condition = self.condition | |
296 | if self._preserve[condition]: | |
297 | for dir in self._dirlist: | |
298 | print("Preserved directory %s" % dir) | |
299 | else: | |
300 | list = self._dirlist[:] | |
301 | list.reverse() | |
302 | for dir in list: | |
303 | self.writable(dir, 1) | |
304 | shutil.rmtree(dir, ignore_errors=1) | |
305 | ||
306 | self._dirlist = [] | |
307 | self.workdir = None | |
308 | os.chdir(self._cwd) | |
309 | try: | |
310 | global _Cleanup | |
311 | _Cleanup.remove(self) | |
312 | except (AttributeError, ValueError): | |
313 | pass | |
314 | ||
315 | def description_set(self, description): | |
316 | """Set the description of the functionality being tested.""" | |
317 | self.description = description | |
318 | ||
319 | def fail_test(self, condition=True, function=None, skip=0): | |
320 | """Cause the test to fail.""" | |
321 | if not condition: | |
322 | return | |
323 | self.condition = 'fail_test' | |
324 | fail_test(self = self, | |
325 | condition = condition, | |
326 | function = function, | |
327 | skip = skip) | |
328 | ||
329 | def match(self, lines, matches): | |
330 | """Compare actual and expected file contents.""" | |
331 | return self.match_func(lines, matches) | |
332 | ||
333 | def match_exact(self, lines, matches): | |
334 | """Compare actual and expected file content exactly.""" | |
335 | return match_exact(lines, matches) | |
336 | ||
337 | def match_re(self, lines, res): | |
338 | """Compare file content with a regular expression.""" | |
339 | return match_re(lines, res) | |
340 | ||
341 | def no_result(self, condition=True, function=None, skip=0): | |
342 | """Report that the test could not be run.""" | |
343 | if not condition: | |
344 | return | |
345 | self.condition = 'no_result' | |
346 | no_result(self = self, | |
347 | condition = condition, | |
348 | function = function, | |
349 | skip = skip) | |
350 | ||
351 | def pass_test(self, condition=True, function=None): | |
352 | """Cause the test to pass.""" | |
353 | if not condition: | |
354 | return | |
355 | self.condition = 'pass_test' | |
356 | pass_test(self, condition, function) | |
357 | ||
358 | def preserve(self, *conditions): | |
359 | """ | |
360 | Arrange for the temporary working directories for the specified | |
361 | TestCmd environment to be preserved for one or more conditions. If no | |
362 | conditions are specified, arranges for the temporary working | |
363 | directories to be preserved for all conditions. | |
364 | ||
365 | """ | |
366 | if conditions is (): | |
367 | conditions = ('pass_test', 'fail_test', 'no_result') | |
368 | for cond in conditions: | |
369 | self._preserve[cond] = 1 | |
370 | ||
371 | def program_set(self, program, inpath): | |
372 | """Set the executable program or script to be tested.""" | |
373 | if not inpath and program and not os.path.isabs(program[0]): | |
374 | program[0] = os.path.join(self._cwd, program[0]) | |
375 | self.program = program | |
376 | ||
377 | def read(self, file, mode='rb'): | |
378 | """ | |
379 | Reads and returns the contents of the specified file name. The file | |
380 | name may be a list, in which case the elements are concatenated with | |
381 | the os.path.join() method. The file is assumed to be under the | |
382 | temporary working directory unless it is an absolute path name. The I/O | |
383 | mode for the file may be specified and must begin with an 'r'. The | |
384 | default is 'rb' (binary read). | |
385 | ||
386 | """ | |
387 | if type(file) is ListType: | |
388 | file = apply(os.path.join, tuple(file)) | |
389 | if not os.path.isabs(file): | |
390 | file = os.path.join(self.workdir, file) | |
391 | if mode[0] != 'r': | |
392 | raise ValueError, "mode must begin with 'r'" | |
393 | return open(file, mode).read() | |
394 | ||
395 | def run(self, program=None, arguments=None, chdir=None, stdin=None, | |
396 | universal_newlines=True): | |
397 | """ | |
398 | Runs a test of the program or script for the test environment. | |
399 | Standard output and error output are saved for future retrieval via the | |
400 | stdout() and stderr() methods. | |
401 | ||
402 | 'universal_newlines' parameter controls how the child process | |
403 | input/output streams are opened as defined for the same named Python | |
404 | subprocess.POpen constructor parameter. | |
405 | ||
406 | """ | |
407 | if chdir: | |
408 | if not os.path.isabs(chdir): | |
409 | chdir = os.path.join(self.workpath(chdir)) | |
410 | if self.verbose: | |
411 | sys.stderr.write("chdir(" + chdir + ")\n") | |
412 | else: | |
413 | chdir = self.workdir | |
414 | ||
415 | cmd = [] | |
416 | if program and program[0]: | |
417 | if program[0] != self.program[0] and not os.path.isabs(program[0]): | |
418 | program[0] = os.path.join(self._cwd, program[0]) | |
419 | cmd += program | |
420 | else: | |
421 | cmd += self.program | |
422 | if arguments: | |
423 | cmd += arguments.split(" ") | |
424 | if self.verbose: | |
425 | sys.stderr.write(join(cmd, " ") + "\n") | |
426 | p = subprocess.Popen(cmd, stdin=subprocess.PIPE, | |
427 | stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=chdir, | |
428 | universal_newlines=universal_newlines) | |
429 | ||
430 | if stdin: | |
431 | if type(stdin) is ListType: | |
b32b8144 FG |
432 | stdin = "".join(stdin) |
433 | out, err = p.communicate(stdin) | |
7c673cae FG |
434 | self._stdout.append(out) |
435 | self._stderr.append(err) | |
436 | self.status = p.returncode | |
437 | ||
438 | if self.verbose: | |
439 | sys.stdout.write(self._stdout[-1]) | |
440 | sys.stderr.write(self._stderr[-1]) | |
441 | ||
442 | def stderr(self, run=None): | |
443 | """ | |
444 | Returns the error output from the specified run number. If there is | |
445 | no specified run number, then returns the error output of the last run. | |
446 | If the run number is less than zero, then returns the error output from | |
447 | that many runs back from the current run. | |
448 | ||
449 | """ | |
450 | if not run: | |
451 | run = len(self._stderr) | |
452 | elif run < 0: | |
453 | run = len(self._stderr) + run | |
454 | run -= 1 | |
455 | if run < 0: | |
456 | return '' | |
457 | return self._stderr[run] | |
458 | ||
459 | def stdout(self, run=None): | |
460 | """ | |
461 | Returns the standard output from the specified run number. If there | |
462 | is no specified run number, then returns the standard output of the | |
463 | last run. If the run number is less than zero, then returns the | |
464 | standard output from that many runs back from the current run. | |
465 | ||
466 | """ | |
467 | if not run: | |
468 | run = len(self._stdout) | |
469 | elif run < 0: | |
470 | run = len(self._stdout) + run | |
471 | run -= 1 | |
472 | if run < 0: | |
473 | return '' | |
474 | return self._stdout[run] | |
475 | ||
476 | def subdir(self, *subdirs): | |
477 | """ | |
478 | Create new subdirectories under the temporary working directory, one | |
479 | for each argument. An argument may be a list, in which case the list | |
480 | elements are concatenated using the os.path.join() method. | |
481 | Subdirectories multiple levels deep must be created using a separate | |
482 | argument for each level: | |
483 | ||
484 | test.subdir('sub', ['sub', 'dir'], ['sub', 'dir', 'ectory']) | |
485 | ||
486 | Returns the number of subdirectories actually created. | |
487 | ||
488 | """ | |
489 | count = 0 | |
490 | for sub in subdirs: | |
491 | if sub is None: | |
492 | continue | |
493 | if type(sub) is ListType: | |
494 | sub = apply(os.path.join, tuple(sub)) | |
495 | new = os.path.join(self.workdir, sub) | |
496 | try: | |
497 | os.mkdir(new) | |
498 | except: | |
499 | pass | |
500 | else: | |
501 | count += 1 | |
502 | return count | |
503 | ||
504 | def unlink(self, file): | |
505 | """ | |
506 | Unlinks the specified file name. The file name may be a list, in | |
507 | which case the elements are concatenated using the os.path.join() | |
508 | method. The file is assumed to be under the temporary working directory | |
509 | unless it is an absolute path name. | |
510 | ||
511 | """ | |
512 | if type(file) is ListType: | |
513 | file = apply(os.path.join, tuple(file)) | |
514 | if not os.path.isabs(file): | |
515 | file = os.path.join(self.workdir, file) | |
516 | os.unlink(file) | |
517 | ||
518 | def verbose_set(self, verbose): | |
519 | """Set the verbose level.""" | |
520 | self.verbose = verbose | |
521 | ||
522 | def workdir_set(self, path): | |
523 | """ | |
524 | Creates a temporary working directory with the specified path name. | |
525 | If the path is a null string (''), a unique directory name is created. | |
526 | ||
527 | """ | |
528 | if os.path.isabs(path): | |
529 | self.workdir = path | |
530 | else: | |
531 | if path != None: | |
532 | if path == '': | |
533 | path = tempfile.mktemp() | |
534 | if path != None: | |
535 | os.mkdir(path) | |
536 | self._dirlist.append(path) | |
537 | global _Cleanup | |
538 | try: | |
539 | _Cleanup.index(self) | |
540 | except ValueError: | |
541 | _Cleanup.append(self) | |
542 | # We would like to set self.workdir like this: | |
543 | # self.workdir = path | |
544 | # But symlinks in the path will report things differently from | |
545 | # os.getcwd(), so chdir there and back to fetch the canonical | |
546 | # path. | |
547 | cwd = os.getcwd() | |
548 | os.chdir(path) | |
549 | self.workdir = os.getcwd() | |
550 | os.chdir(cwd) | |
551 | else: | |
552 | self.workdir = None | |
553 | ||
554 | def workpath(self, *args): | |
555 | """ | |
556 | Returns the absolute path name to a subdirectory or file within the | |
557 | current temporary working directory. Concatenates the temporary working | |
558 | directory name with the specified arguments using os.path.join(). | |
559 | ||
560 | """ | |
561 | return apply(os.path.join, (self.workdir,) + tuple(args)) | |
562 | ||
563 | def writable(self, top, write): | |
564 | """ | |
565 | Make the specified directory tree writable (write == 1) or not | |
566 | (write == None). | |
567 | ||
568 | """ | |
569 | def _walk_chmod(arg, dirname, names): | |
570 | st = os.stat(dirname) | |
571 | os.chmod(dirname, arg(st[stat.ST_MODE])) | |
572 | for name in names: | |
573 | fullname = os.path.join(dirname, name) | |
574 | st = os.stat(fullname) | |
575 | os.chmod(fullname, arg(st[stat.ST_MODE])) | |
576 | ||
577 | _mode_writable = lambda mode: stat.S_IMODE(mode|0200) | |
578 | _mode_non_writable = lambda mode: stat.S_IMODE(mode&~0200) | |
579 | ||
580 | if write: | |
581 | f = _mode_writable | |
582 | else: | |
583 | f = _mode_non_writable | |
584 | try: | |
585 | os.path.walk(top, _walk_chmod, f) | |
586 | except: | |
587 | pass # Ignore any problems changing modes. | |
588 | ||
589 | def write(self, file, content, mode='wb'): | |
590 | """ | |
591 | Writes the specified content text (second argument) to the specified | |
592 | file name (first argument). The file name may be a list, in which case | |
593 | the elements are concatenated using the os.path.join() method. The file | |
594 | is created under the temporary working directory. Any subdirectories in | |
595 | the path must already exist. The I/O mode for the file may be specified | |
596 | and must begin with a 'w'. The default is 'wb' (binary write). | |
597 | ||
598 | """ | |
599 | if type(file) is ListType: | |
600 | file = apply(os.path.join, tuple(file)) | |
601 | if not os.path.isabs(file): | |
602 | file = os.path.join(self.workdir, file) | |
603 | if mode[0] != 'w': | |
604 | raise ValueError, "mode must begin with 'w'" | |
605 | open(file, mode).write(content) |