# Copyright (c) 2021, LabN Consulting, L.L.C.
#
import argparse
-import glob
+import atexit
import logging
import os
import re
import subprocess
import sys
+import tempfile
from collections import OrderedDict
import xmltodict
+def get_range_list(rangestr):
+ result = []
+ for e in rangestr.split(","):
+ e = e.strip()
+ if not e:
+ continue
+ if e.find("-") == -1:
+ result.append(int(e))
+ else:
+ start, end = e.split("-")
+ result.extend(list(range(int(start), int(end) + 1)))
+ return result
+
+
+def dict_range_(dct, rangestr, dokeys):
+ keys = list(dct.keys())
+ if not rangestr or rangestr == "all":
+ for key in keys:
+ if dokeys:
+ yield key
+ else:
+ yield dct[key]
+ return
+
+ dlen = len(keys)
+ for index in get_range_list(rangestr):
+ if index >= dlen:
+ break
+ key = keys[index]
+ if dokeys:
+ yield key
+ else:
+ yield dct[key]
+
+
+def dict_range_keys(dct, rangestr):
+ return dict_range_(dct, rangestr, True)
+
+
+def dict_range_values(dct, rangestr):
+ return dict_range_(dct, rangestr, False)
+
+
def get_summary(results):
ntest = int(results["@tests"])
nfail = int(results["@failures"])
else:
if not fname:
fname = cname.replace(".", "/") + ".py"
- if args.files_only or "@name" not in testcase:
+ if "@name" not in testcase:
tcname = fname
else:
tcname = fname + "::" + testcase["@name"]
return found_files
-def dump_testcase(testcase):
- expand_keys = ("failure", "error", "skipped")
+def search_testcase(testcase, regexp):
+ for key, val in testcase.items():
+ if regexp.search(str(val)):
+ return True
+ return False
+
+def dump_testcase(testcase):
s = ""
for key, val in testcase.items():
if isinstance(val, str) or isinstance(val, float) or isinstance(val, int):
def main():
parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "-a",
+ "--save-xml",
+ action="store_true",
+ help=(
+ "Move [container:]/tmp/topotests/topotests.xml "
+ "to --results value if --results does not exist yet"
+ ),
+ )
parser.add_argument(
"-A",
"--save",
action="store_true",
- help="Save /tmp/topotests{,.xml} in --rundir if --rundir does not yet exist",
+ help=(
+ "Move [container:]/tmp/topotests{,.xml} "
+ "to --results value if --results does not exist yet"
+ ),
)
parser.add_argument(
- "-F",
- "--files-only",
+ "-C",
+ "--container",
+ help="specify docker/podman container of the run",
+ )
+ parser.add_argument(
+ "--use-podman",
action="store_true",
- help="print test file names rather than individual full testcase names",
+ help="Use `podman` instead of `docker` for saving container data",
)
parser.add_argument(
"-S",
"--select",
- default="fe",
- help="select results combination of letters: 'e'rrored 'f'ailed 'p'assed 's'kipped.",
+ help=(
+ "select results combination of letters: "
+ "'e'rrored 'f'ailed 'p'assed 's'kipped. "
+ "Default is 'fe', unless --search or --time which default to 'efps'"
+ ),
+ )
+ parser.add_argument(
+ "-R",
+ "--search",
+ help=(
+ "filter results to those which match a regex. "
+ "All test text is search unless restricted by --errmsg or --errtext"
+ ),
)
parser.add_argument(
"-r",
action="store_true",
help="enumerate each item (results scoped)",
)
- parser.add_argument("-T", "--test", help="print testcase at enumeration")
+ parser.add_argument(
+ "-T", "--test", help="select testcase at given ordinal from the enumerated list"
+ )
parser.add_argument(
"--errmsg", action="store_true", help="print testcase error message"
)
parser.add_argument(
"--errtext", action="store_true", help="print testcase error text"
)
+ parser.add_argument(
+ "--full", action="store_true", help="print all logging for selected testcases"
+ )
parser.add_argument("--time", action="store_true", help="print testcase run times")
parser.add_argument("-s", "--summary", action="store_true", help="print summary")
parser.add_argument("-v", "--verbose", action="store_true", help="be verbose")
args = parser.parse_args()
- if args.save and args.results and not os.path.exists(args.results):
- if not os.path.exists("/tmp/topotests"):
- logging.critical('No "/tmp/topotests" directory to save')
+ if args.save and args.save_xml:
+ logging.critical("Only one of --save or --save-xml allowed")
+ sys.exit(1)
+
+ scount = bool(args.save) + bool(args.save_xml)
+
+ #
+ # Saving/Archiving results
+ #
+
+ docker_bin = "podman" if args.use_podman else "docker"
+ contid = ""
+ if args.container:
+ # check for container existence
+ contid = args.container
+ try:
+ # p =
+ subprocess.run(
+ f"{docker_bin} inspect {contid}",
+ check=True,
+ shell=True,
+ errors="ignore",
+ capture_output=True,
+ )
+ except subprocess.CalledProcessError:
+ print(f"{docker_bin} container '{contid}' does not exist")
sys.exit(1)
- subprocess.run(["mv", "/tmp/topotests", args.results])
+ # If you need container info someday...
+ # cont_info = json.loads(p.stdout)
+
+ cppath = "/tmp/topotests"
+ if args.save_xml or scount == 0:
+ cppath += "/topotests.xml"
+ if contid:
+ cppath = contid + ":" + cppath
+
+ tresfile = None
+
+ if scount and args.results and not os.path.exists(args.results):
+ if not contid:
+ if not os.path.exists(cppath):
+ print(f"'{cppath}' doesn't exist to save")
+ sys.exit(1)
+ if args.save_xml:
+ subprocess.run(["cp", cppath, args.results])
+ else:
+ subprocess.run(["mv", cppath, args.results])
+ else:
+ try:
+ subprocess.run(
+ f"{docker_bin} cp {cppath} {args.results}",
+ check=True,
+ shell=True,
+ errors="ignore",
+ capture_output=True,
+ )
+ except subprocess.CalledProcessError as error:
+ print(f"Can't {docker_bin} cp '{cppath}': %s", str(error))
+ sys.exit(1)
+
if "SUDO_USER" in os.environ:
subprocess.run(["chown", "-R", os.environ["SUDO_USER"], args.results])
- # # Old location for results
- # if os.path.exists("/tmp/topotests.xml", args.results):
- # subprocess.run(["mv", "/tmp/topotests.xml", args.results])
+ elif not args.results:
+ # User doesn't want to save results just use them inplace
+ if not contid:
+ if not os.path.exists(cppath):
+ print(f"'{cppath}' doesn't exist")
+ sys.exit(1)
+ args.results = cppath
+ else:
+ tresfile, tresname = tempfile.mkstemp(
+ suffix=".xml", prefix="topotests-", text=True
+ )
+ atexit.register(lambda: os.unlink(tresname))
+ os.close(tresfile)
+ try:
+ subprocess.run(
+ f"{docker_bin} cp {cppath} {tresname}",
+ check=True,
+ shell=True,
+ errors="ignore",
+ capture_output=True,
+ )
+ except subprocess.CalledProcessError as error:
+ print(f"Can't {docker_bin} cp '{cppath}': %s", str(error))
+ sys.exit(1)
+ args.results = tresname
- assert (
- args.test is None or not args.files_only
- ), "Can't have both --files and --test"
+ #
+ # Result option validation
+ #
+
+ count = 0
+ if args.errmsg:
+ count += 1
+ if args.errtext:
+ count += 1
+ if args.full:
+ count += 1
+ if count > 1:
+ logging.critical("Only one of --full, --errmsg or --errtext allowed")
+ sys.exit(1)
+
+ if args.time and count:
+ logging.critical("Can't use --full, --errmsg or --errtext with --time")
+ sys.exit(1)
+
+ if args.enumerate and (count or args.time or args.test):
+ logging.critical(
+ "Can't use --enumerate with --errmsg, --errtext, --full, --test or --time"
+ )
+ sys.exit(1)
results = {}
ttfiles = []
- if args.rundir:
- basedir = os.path.realpath(args.rundir)
- os.chdir(basedir)
-
- newfiles = glob.glob("tt-group-*/topotests.xml")
- if newfiles:
- ttfiles.extend(newfiles)
- if os.path.exists("topotests.xml"):
- ttfiles.append("topotests.xml")
- else:
- if args.results:
- if os.path.exists(os.path.join(args.results, "topotests.xml")):
- args.results = os.path.join(args.results, "topotests.xml")
- if not os.path.exists(args.results):
- logging.critical("%s doesn't exist", args.results)
- sys.exit(1)
- ttfiles = [args.results]
- elif os.path.exists("/tmp/topotests/topotests.xml"):
- ttfiles.append("/tmp/topotests/topotests.xml")
- if not ttfiles:
- if os.path.exists("/tmp/topotests.xml"):
- ttfiles.append("/tmp/topotests.xml")
+ if os.path.exists(os.path.join(args.results, "topotests.xml")):
+ args.results = os.path.join(args.results, "topotests.xml")
+ if not os.path.exists(args.results):
+ logging.critical("%s doesn't exist", args.results)
+ sys.exit(1)
+
+ ttfiles = [args.results]
for f in ttfiles:
m = re.match(r"tt-group-(\d+)/topotests.xml", f)
with open(f) as xml_file:
results[group] = xmltodict.parse(xml_file.read())["testsuites"]["testsuite"]
+ search_re = re.compile(args.search) if args.search else None
+
+ if args.select is None:
+ if search_re or args.time:
+ args.select = "efsp"
+ else:
+ args.select = "fe"
+
filters = []
if "e" in args.select:
filters.append("error")
filters.append(None)
found_files = get_filtered(filters, results, args)
- if found_files:
- if args.test is not None:
- if args.test == "all":
- keys = found_files.keys()
- else:
- keys = [list(found_files.keys())[int(args.test)]]
- for key in keys:
- testcase = found_files[key]
- if args.errtext:
- if "error" in testcase:
- errmsg = testcase["error"]["#text"]
- elif "failure" in testcase:
- errmsg = testcase["failure"]["#text"]
- else:
- errmsg = "none found"
- s = "{}: {}".format(key, errmsg)
- elif args.time:
- text = testcase["@time"]
- s = "{}: {}".format(text, key)
- elif args.errmsg:
- if "error" in testcase:
- errmsg = testcase["error"]["@message"]
- elif "failure" in testcase:
- errmsg = testcase["failure"]["@message"]
- else:
- errmsg = "none found"
- s = "{}: {}".format(key, errmsg)
+
+ if search_re:
+ found_files = {
+ k: v for k, v in found_files.items() if search_testcase(v, search_re)
+ }
+
+ if args.enumerate:
+ # print the selected test names with ordinal
+ print("\n".join(["{} {}".format(i, x) for i, x in enumerate(found_files)]))
+ elif args.test is None and count == 0 and not args.time:
+ # print the selected test names
+ print("\n".join([str(x) for x in found_files]))
+ else:
+ rangestr = args.test if args.test else "all"
+ for key in dict_range_keys(found_files, rangestr):
+ testcase = found_files[key]
+ if args.time:
+ text = testcase["@time"]
+ s = "{}: {}".format(text, key)
+ elif args.errtext:
+ if "error" in testcase:
+ errmsg = testcase["error"]["#text"]
+ elif "failure" in testcase:
+ errmsg = testcase["failure"]["#text"]
else:
- s = dump_testcase(testcase)
- print(s)
- elif filters:
- if args.enumerate:
- print(
- "\n".join(["{} {}".format(i, x) for i, x in enumerate(found_files)])
- )
+ errmsg = "none found"
+ s = "{}: {}".format(key, errmsg)
+ elif args.errmsg:
+ if "error" in testcase:
+ errmsg = testcase["error"]["@message"]
+ elif "failure" in testcase:
+ errmsg = testcase["failure"]["@message"]
+ else:
+ errmsg = "none found"
+ s = "{}: {}".format(key, errmsg)
else:
- print("\n".join(found_files))
+ s = dump_testcase(testcase)
+ print(s)
if args.summary:
print_summary(results, args)