]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/vstart_runner.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / vstart_runner.py
index 741d464c0e51f766dda8669b7fd29fb076358c8d..61ca076151add3169c30c07e1b0d88e84481bbf7 100644 (file)
@@ -30,7 +30,6 @@ Alternative usage:
 
 """
 
-from io import BytesIO
 from io import StringIO
 from collections import defaultdict
 import getpass
@@ -44,15 +43,20 @@ import os
 import time
 import sys
 import errno
-from unittest import suite, loader
+from IPy import IP
 import unittest
 import platform
-from teuthology import misc
+import logging
+
+from unittest import suite, loader
+
 from teuthology.orchestra.run import Raw, quote
 from teuthology.orchestra.daemon import DaemonGroup
+from teuthology.orchestra.remote import Remote
 from teuthology.config import config as teuth_config
-import six
-import logging
+from teuthology.contextutil import safe_while
+from teuthology.contextutil import MaxWhileTries
+from teuthology.orchestra.run import CommandFailedError
 try:
     import urllib3
     urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -130,13 +134,12 @@ if os.path.exists("./CMakeCache.txt") and os.path.exists("./bin"):
 
 
 try:
-    from teuthology.exceptions import CommandFailedError
     from tasks.ceph_manager import CephManager
     from tasks.cephfs.fuse_mount import FuseMount
     from tasks.cephfs.kernel_mount import KernelMount
     from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
+    from tasks.cephfs.mount import CephFSMount
     from tasks.mgr.mgr_test_case import MgrCluster
-    from teuthology.contextutil import MaxWhileTries
     from teuthology.task import interactive
 except ImportError:
     sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
@@ -165,8 +168,13 @@ class LocalRemoteProcess(object):
     def __init__(self, args, subproc, check_status, stdout, stderr):
         self.args = args
         self.subproc = subproc
-        self.stdout = stdout or BytesIO()
-        self.stderr = stderr or BytesIO()
+        self.stdout = stdout
+        self.stderr = stderr
+        # this variable is meant for instance of this class named fuse_daemon.
+        # child process of the command launched with sudo must be killed,
+        # since killing parent process alone has no impact on the child
+        # process.
+        self.fuse_pid = -1
 
         self.check_status = check_status
         self.exitstatus = self.returncode = None
@@ -184,18 +192,22 @@ class LocalRemoteProcess(object):
         out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
         if isinstance(self.stdout, StringIO):
             self.stdout.write(out.decode(errors='ignore'))
+        elif self.stdout is None:
+            pass
         else:
             self.stdout.write(out)
         if isinstance(self.stderr, StringIO):
             self.stderr.write(err.decode(errors='ignore'))
+        elif self.stderr is None:
+            pass
         else:
             self.stderr.write(err)
 
         self.exitstatus = self.returncode = self.subproc.returncode
 
         if self.exitstatus != 0:
-            sys.stderr.write(six.ensure_str(out))
-            sys.stderr.write(six.ensure_str(err))
+            sys.stderr.write(out.decode())
+            sys.stderr.write(err.decode())
 
         if self.check_status and self.exitstatus != 0:
             raise CommandFailedError(self.args, self.exitstatus)
@@ -207,8 +219,18 @@ class LocalRemoteProcess(object):
 
         if self.subproc.poll() is not None:
             out, err = self.subproc.communicate()
-            self.stdout.write(out)
-            self.stderr.write(err)
+            if isinstance(self.stdout, StringIO):
+                self.stdout.write(out.decode(errors='ignore'))
+            elif self.stdout is None:
+                pass
+            else:
+                self.stdout.write(out)
+            if isinstance(self.stderr, StringIO):
+                self.stderr.write(err.decode(errors='ignore'))
+            elif self.stderr is None:
+                pass
+            else:
+                self.stderr.write(err)
             self.exitstatus = self.returncode = self.subproc.returncode
             return True
         else:
@@ -219,7 +241,10 @@ class LocalRemoteProcess(object):
         if self.subproc.pid and not self.finished:
             log.debug("kill: killing pid {0} ({1})".format(
                 self.subproc.pid, self.args))
-            safe_kill(self.subproc.pid)
+            if self.fuse_pid != -1:
+                safe_kill(self.fuse_pid)
+            else:
+                safe_kill(self.subproc.pid)
         else:
             log.debug("kill: already terminated ({0})".format(self.args))
 
@@ -243,6 +268,9 @@ class LocalRemote(object):
     Run this inside your src/ dir!
     """
 
+    os = Remote.os
+    arch = Remote.arch
+
     def __init__(self):
         self.name = "local"
         self.hostname = "localhost"
@@ -257,18 +285,10 @@ class LocalRemote(object):
     # holding same path. For teuthology, same path still represents
     # different locations as they lie on different machines.
     def put_file(self, src, dst, sudo=False):
-        if sys.version_info.major < 3:
-            exception = shutil.Error
-        elif sys.version_info.major >= 3:
-            exception = shutil.SameFileError
-
         try:
             shutil.copy(src, dst)
-        except exception as e:
-            if sys.version_info.major < 3 and e.message.find('are the same '
-               'file') != -1:
-                return
-            raise e
+        except shutil.SameFileError:
+            pass
 
     # XXX: accepts only two arugments to maintain compatibility with
     # teuthology's mkdtemp.
@@ -292,10 +312,47 @@ class LocalRemote(object):
         from tempfile import mktemp
         return mktemp(suffix=suffix, dir=parentdir)
 
+    def write_file(self, path, data, sudo=False, mode=None, owner=None,
+                                     mkdir=False, append=False):
+        """
+        Write data to file
+
+        :param path:    file path on host
+        :param data:    str, binary or fileobj to be written
+        :param sudo:    use sudo to write file, defaults False
+        :param mode:    set file mode bits if provided
+        :param owner:   set file owner if provided
+        :param mkdir:   preliminary create the file directory, defaults False
+        :param append:  append data to the file, defaults False
+        """
+        dd = 'sudo dd' if sudo else 'dd'
+        args = dd + ' of=' + path
+        if append:
+            args += ' conv=notrunc oflag=append'
+        if mkdir:
+            mkdirp = 'sudo mkdir -p' if sudo else 'mkdir -p'
+            dirpath = os.path.dirname(path)
+            if dirpath:
+                args = mkdirp + ' ' + dirpath + '\n' + args
+        if mode:
+            chmod = 'sudo chmod' if sudo else 'chmod'
+            args += '\n' + chmod + ' ' + mode + ' ' + path
+        if owner:
+            chown = 'sudo chown' if sudo else 'chown'
+            args += '\n' + chown + ' ' + owner + ' ' + path
+        omit_sudo = False if sudo else True
+        self.run(args=args, stdin=data, omit_sudo=omit_sudo)
+
+    def sudo_write_file(self, path, data, **kwargs):
+        """
+        Write data to file with sudo, for more info see `write_file()`.
+        """
+        self.write_file(path, data, sudo=True, **kwargs)
+
     def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
         # Since Python's shell simulation can only work when commands are
         # provided as a list of argumensts...
-        if isinstance(args, str) or isinstance(args, six.text_type):
+        if isinstance(args, str):
             args = args.split()
 
         # We'll let sudo be a part of command even omit flag says otherwise in
@@ -341,9 +398,12 @@ class LocalRemote(object):
     def run(self, **kwargs):
         return self._do_run(**kwargs)
 
+    # XXX: omit_sudo is set to True since using sudo can change the ownership
+    # of files which becomes problematic for following executions of
+    # vstart_runner.py.
     def _do_run(self, args, check_status=True, wait=True, stdout=None,
                 stderr=None, cwd=None, stdin=None, logger=None, label=None,
-                env=None, timeout=None, omit_sudo=False):
+                env=None, timeout=None, omit_sudo=True):
         args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
 
         # We have to use shell=True if any run.Raw was present, e.g. &&
@@ -354,7 +414,7 @@ class LocalRemote(object):
                                              'ceph-coverage')]
 
         # Adjust binary path prefix if given a bare program name
-        if "/" not in args[0]:
+        if not isinstance(args[0], Raw) and "/" not in args[0]:
             # If they asked for a bare binary name, and it exists
             # in our built tree, use the one there.
             local_bin = os.path.join(BIN_PREFIX, args[0])
@@ -365,7 +425,8 @@ class LocalRemote(object):
                     args[0]
                 ))
 
-        log.debug("Running {0}".format(args))
+        log.debug('> ' +
+                 ' '.join([str(a.value) if isinstance(a, Raw) else a for a in args]))
 
         if shell:
             subproc = subprocess.Popen(quote(args),
@@ -373,11 +434,12 @@ class LocalRemote(object):
                                        stderr=subprocess.PIPE,
                                        stdin=subprocess.PIPE,
                                        cwd=cwd,
+                                       env=env,
                                        shell=True)
         else:
             # Sanity check that we've got a list of strings
             for arg in args:
-                if not isinstance(arg, six.string_types):
+                if not isinstance(arg, str):
                     raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
                         arg, arg.__class__
                     ))
@@ -390,12 +452,12 @@ class LocalRemote(object):
                                        env=env)
 
         if stdin:
-            if not isinstance(stdin, str):
-                raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
-
             # Hack: writing to stdin is not deadlock-safe, but it "always" works
             # as long as the input buffer is "small"
-            subproc.stdin.write(stdin.encode())
+            if isinstance(stdin, str):
+                subproc.stdin.write(stdin.encode())
+            else:
+                subproc.stdin.write(stdin)
 
         proc = LocalRemoteProcess(
             args, subproc, check_status,
@@ -452,9 +514,9 @@ class LocalDaemon(object):
         """
         Return PID as an integer or None if not found
         """
-        ps_txt = six.ensure_str(self.controller.run(
-            args=["ps", "ww", "-u"+str(os.getuid())]
-        ).stdout.getvalue()).strip()
+        ps_txt = self.controller.run(args=["ps", "ww", "-u"+str(os.getuid())],
+                                     stdout=StringIO()).\
+            stdout.getvalue().strip()
         lines = ps_txt.split("\n")[1:]
 
         for line in lines:
@@ -536,10 +598,26 @@ def safe_kill(pid):
         else:
             raise
 
+def mon_in_localhost(config_path="./ceph.conf"):
+    """
+    If the ceph cluster is using the localhost IP as mon host, will must disable ns unsharing
+    """
+    with open(config_path) as f:
+        for line in f:
+            local = re.match(r'^\s*mon host\s*=\s*\[((v1|v2):127\.0\.0\.1:\d+,?)+\]', line)
+            if local:
+                return True
+    return False
 
 class LocalKernelMount(KernelMount):
-    def __init__(self, ctx, test_dir, client_id):
-        super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None)
+    def __init__(self, ctx, test_dir, client_id=None,
+                 client_keyring_path=None, client_remote=None,
+                 hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
+                 brxnet=None):
+        super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
+            client_id=client_id, client_keyring_path=client_keyring_path,
+            client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
 
     @property
     def config_path(self):
@@ -556,67 +634,6 @@ class LocalKernelMount(KernelMount):
         else:
             return keyring_path
 
-    def run_shell(self, args, wait=True, stdin=None, check_status=True,
-                  omit_sudo=False):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
-                                      stdin=stdin, check_status=check_status,
-                                      omit_sudo=omit_sudo)
-
-    def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        if isinstance(args, str):
-            args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
-        elif isinstance(args, list):
-            cmdlist = args
-            cmd = ''
-            for i in cmdlist:
-                cmd = cmd + i + ' '
-            args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
-            args.append(cmd)
-
-        return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
-                                      check_status=check_status, stdin=stdin,
-                                      omit_sudo=False)
-
-    def run_as_root(self, args, wait=True, stdin=None, check_status=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        if isinstance(args, str):
-            args = 'sudo ' + args
-        if isinstance(args, list):
-            args.insert(0, 'sudo')
-
-        return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
-                                      check_status=check_status,
-                                      omit_sudo=False)
-
-    def testcmd(self, args, wait=True, stdin=None, omit_sudo=False):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
-                              omit_sudo=omit_sudo)
-
-    def testcmd_as_user(self, args, user, wait=True, stdin=None):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
-                                check_status=False)
-
-    def testcmd_as_root(self, args, wait=True, stdin=None):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.run_as_root(args, wait=wait, stdin=stdin,
-                                check_status=False)
-
     def setupfs(self, name=None):
         if name is None and self.fs is not None:
             # Previous mount existed, reuse the old name
@@ -647,67 +664,100 @@ class LocalKernelMount(KernelMount):
                 if asok_conf:
                     d = asok_conf.groups(1)[0]
                     break
-        path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
-        log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+        path = "{0}/client.{1}.*.asok".format(d, self.client_id)
         return path
 
-    def mount(self, mount_path=None, mount_fs_name=None, mount_options=[], **kwargs):
-        self.setupfs(name=mount_fs_name)
+    def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
+        self.update_attrs(**kwargs)
+        self.assert_and_log_minimum_mount_details()
 
-        log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format(
-            id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
-
-        self.client_remote.run(
-            args=[
-                'mkdir',
-                '--',
-                self.mountpoint,
-            ],
-            timeout=(5*60),
-        )
-
-        if mount_path is None:
-            mount_path = "/"
-
-        opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id,
-                                                        conf=self.config_path)
-
-        if mount_fs_name is not None:
-            opts += ",mds_namespace={0}".format(mount_fs_name)
-
-        for mount_opt in mount_options:
-            opts += ",{0}".format(mount_opt)
-
-        self.client_remote.run(
-            args=[
-                'sudo',
-                './bin/mount.ceph',
-                ':{mount_path}'.format(mount_path=mount_path),
-                self.mountpoint,
-                '-v',
-                '-o',
-                opts
-            ],
-            timeout=(30*60),
-            omit_sudo=False,
-        )
+        if opt_use_ns:
+            self.using_namespace = True
+            self.setup_netns()
+        else:
+            self.using_namespace = False
+
+        if not self.cephfs_mntpt:
+            self.cephfs_mntpt = "/"
+        # TODO: don't call setupfs() from within mount()
+        if createfs:
+            self.setupfs(name=self.cephfs_name)
+
+        opts = 'norequire_active_mds'
+        if self.client_id:
+            opts += ',name=' + self.client_id
+        if self.client_keyring_path and self.client_id:
+            opts += ",secret=" + self.get_key_from_keyfile()
+        if self.config_path:
+            opts += ',conf=' + self.config_path
+        if self.cephfs_name:
+            opts += ",mds_namespace={0}".format(self.cephfs_name)
+        if mntopts:
+            opts += ',' + ','.join(mntopts)
+
+        stderr = StringIO()
+        try:
+            self.client_remote.run(args=['mkdir', '--', self.hostfs_mntpt],
+                                   timeout=(5*60), stderr=stderr)
+        except CommandFailedError:
+            if 'file exists' not in stderr.getvalue().lower():
+                raise
+
+        if self.cephfs_mntpt is None:
+            self.cephfs_mntpt = "/"
+        cmdargs = ['sudo']
+        if self.using_namespace:
+           cmdargs += ['nsenter',
+                       '--net=/var/run/netns/{0}'.format(self.netns_name)]
+        cmdargs += ['./bin/mount.ceph', ':' + self.cephfs_mntpt,
+                    self.hostfs_mntpt, '-v', '-o', opts]
+
+        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
+        try:
+            self.client_remote.run(args=cmdargs, timeout=(30*60),
+                omit_sudo=False, stdout=mountcmd_stdout,
+                stderr=mountcmd_stderr)
+        except CommandFailedError as e:
+            if check_status:
+                raise
+            else:
+                return (e, mountcmd_stdout.getvalue(),
+                        mountcmd_stderr.getvalue())
 
-        self.client_remote.run(
-            args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60))
+        stderr = StringIO()
+        try:
+            self.client_remote.run(args=['sudo', 'chmod', '1777',
+                                   self.hostfs_mntpt], stderr=stderr,
+                                   timeout=(5*60))
+        except CommandFailedError:
+            # the client does not have write permissions in cap it holds for
+            # the Ceph FS that was just mounted.
+            if 'permission denied' in stderr.getvalue().lower():
+                pass
 
         self.mounted = True
 
+    def cleanup_netns(self):
+        if self.using_namespace:
+            super(type(self), self).cleanup_netns()
+
     def _run_python(self, pyscript, py_version='python'):
         """
         Override this to remove the daemon-helper prefix that is used otherwise
         to make the process killable.
         """
         return self.client_remote.run(args=[py_version, '-c', pyscript],
-                                      wait=False)
+                                      wait=False, stdout=StringIO())
 
 class LocalFuseMount(FuseMount):
-    def __init__(self, ctx, test_dir, client_id):
-        super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
+    def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
+                 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
+                 cephfs_mntpt=None, brxnet=None):
+        super(LocalFuseMount, self).__init__(ctx=ctx, client_config=None,
+            test_dir=test_dir, client_id=client_id,
+            client_keyring_path=client_keyring_path,
+            client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
 
     @property
     def config_path(self):
@@ -718,66 +768,6 @@ class LocalFuseMount(FuseMount):
         # to avoid assumptions about daemons' pwd
         return os.path.abspath("./client.{0}.keyring".format(self.client_id))
 
-    def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
-                                      stdin=stdin, check_status=check_status,
-                                      omit_sudo=omit_sudo)
-
-    def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        if isinstance(args, str):
-            args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
-        elif isinstance(args, list):
-            cmdlist = args
-            cmd = ''
-            for i in cmdlist:
-                cmd = cmd + i + ' '
-            args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
-            args.append(cmd)
-
-        return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
-                                      check_status=check_status, stdin=stdin,
-                                      omit_sudo=False)
-
-    def run_as_root(self, args, wait=True, stdin=None, check_status=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        if isinstance(args, str):
-            args = 'sudo ' + args
-        if isinstance(args, list):
-            args.insert(0, 'sudo')
-
-        return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
-                                      check_status=check_status,
-                                      omit_sudo=False)
-
-    def testcmd(self, args, wait=True, stdin=None, omit_sudo=True):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
-                              omit_sudo=omit_sudo)
-
-    def testcmd_as_user(self, args, user, wait=True, stdin=None):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
-                                check_status=False)
-
-    def testcmd_as_root(self, args, wait=True, stdin=None):
-        # FIXME maybe should add a pwd arg to teuthology.orchestra so that
-        # the "cd foo && bar" shenanigans isn't needed to begin with and
-        # then we wouldn't have to special case this
-        return self.run_as_root(args, wait=wait, stdin=stdin,
-                                check_status=False)
-
     def setupfs(self, name=None):
         if name is None and self.fs is not None:
             # Previous mount existed, reuse the old name
@@ -808,31 +798,44 @@ class LocalFuseMount(FuseMount):
                 if asok_conf:
                     d = asok_conf.groups(1)[0]
                     break
-        path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
-        log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+        path = "{0}/client.{1}.*.asok".format(d, self.client_id)
         return path
 
-    def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
-        if mountpoint is not None:
-            self.mountpoint = mountpoint
-        self.setupfs(name=mount_fs_name)
+    def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
+        self.update_attrs(**kwargs)
+        self.assert_and_log_minimum_mount_details()
+
+        if opt_use_ns:
+            self.using_namespace = True
+            self.setup_netns()
+        else:
+            self.using_namespace = False
+
+        # TODO: don't call setupfs() from within mount()
+        if createfs:
+            self.setupfs(name=self.cephfs_name)
 
-        self.client_remote.run(args=['mkdir', '-p', self.mountpoint])
+        stderr = StringIO()
+        try:
+            self.client_remote.run(args=['mkdir', '-p', self.hostfs_mntpt],
+                                   stderr=stderr)
+        except CommandFailedError:
+            if 'file exists' not in stderr.getvalue().lower():
+                raise
 
         def list_connections():
             self.client_remote.run(
                 args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
                 check_status=False
             )
-            p = self.client_remote.run(
-                args=["ls", "/sys/fs/fuse/connections"],
-                check_status=False
-            )
+
+            p = self.client_remote.run(args=["ls", "/sys/fs/fuse/connections"],
+                                       check_status=False, stdout=StringIO())
             if p.exitstatus != 0:
                 log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
                 return []
 
-            ls_str = six.ensure_str(p.stdout.getvalue().strip())
+            ls_str = p.stdout.getvalue().strip()
             if ls_str:
                 return [int(n) for n in ls_str.split("\n")]
             else:
@@ -843,27 +846,32 @@ class LocalFuseMount(FuseMount):
         pre_mount_conns = list_connections()
         log.debug("Pre-mount connections: {0}".format(pre_mount_conns))
 
-        prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
+        cmdargs = []
+        if self.using_namespace:
+            cmdargs = ['sudo', 'nsenter',
+                       '--net=/var/run/netns/{0}'.format(self.netns_name),
+                       '--setuid', str(os.getuid())]
+        cmdargs += [os.path.join(BIN_PREFIX, 'ceph-fuse'), self.hostfs_mntpt,
+                    '-f']
+        if self.client_id is not None:
+            cmdargs += ["--id", self.client_id]
+        if self.client_keyring_path and self.client_id is not None:
+            cmdargs.extend(['-k', self.client_keyring_path])
+        if self.cephfs_name:
+            cmdargs += ["--client_fs=" + self.cephfs_name]
+        if self.cephfs_mntpt:
+            cmdargs += ["--client_mountpoint=" + self.cephfs_mntpt]
         if os.getuid() != 0:
-            prefix += ["--client_die_on_failed_dentry_invalidate=false"]
-
-        if mount_path is not None:
-            prefix += ["--client_mountpoint={0}".format(mount_path)]
-
-        if mount_fs_name is not None:
-            prefix += ["--client_fs={0}".format(mount_fs_name)]
-
-        prefix += mount_options;
+            cmdargs += ["--client_die_on_failed_dentry_invalidate=false"]
+        if mntopts:
+            cmdargs += mntopts
 
-        self.fuse_daemon = self.client_remote.run(args=
-                                            prefix + [
-                                                "-f",
-                                                "--name",
-                                                "client.{0}".format(self.client_id),
-                                                self.mountpoint
-                                            ], wait=False)
-
-        log.debug("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
+        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
+        self.fuse_daemon = self.client_remote.run(args=cmdargs, wait=False,
+            omit_sudo=False, stdout=mountcmd_stdout, stderr=mountcmd_stderr)
+        self._set_fuse_daemon_pid(check_status)
+        log.debug("Mounting client.{0} with pid "
+                 "{1}".format(self.client_id, self.fuse_daemon.subproc.pid))
 
         # Wait for the connection reference to appear in /sys
         waited = 0
@@ -872,7 +880,14 @@ class LocalFuseMount(FuseMount):
             if self.fuse_daemon.finished:
                 # Did mount fail?  Raise the CommandFailedError instead of
                 # hitting the "failed to populate /sys/" timeout
-                self.fuse_daemon.wait()
+                try:
+                    self.fuse_daemon.wait()
+                except CommandFailedError as e:
+                    if check_status:
+                        raise
+                    else:
+                        return (e, mountcmd_stdout.getvalue(),
+                                mountcmd_stderr.getvalue())
             time.sleep(1)
             waited += 1
             if waited > 30:
@@ -897,13 +912,38 @@ class LocalFuseMount(FuseMount):
 
         self.mounted = True
 
+    def _set_fuse_daemon_pid(self, check_status):
+        # NOTE: When a command <args> is launched with sudo, two processes are
+        # launched, one with sudo in <args> and other without. Make sure we
+        # get the PID of latter one.
+        try:
+            with safe_while(sleep=1, tries=15) as proceed:
+                while proceed():
+                    try:
+                        sock = self.find_admin_socket()
+                    except (RuntimeError, CommandFailedError):
+                        continue
+
+                    self.fuse_daemon.fuse_pid = int(re.match(".*\.(\d+)\.asok$",
+                                                             sock).group(1))
+                    break
+        except MaxWhileTries:
+            if check_status:
+                raise
+            else:
+                pass
+
+    def cleanup_netns(self):
+        if self.using_namespace:
+            super(type(self), self).cleanup_netns()
+
     def _run_python(self, pyscript, py_version='python'):
         """
         Override this to remove the daemon-helper prefix that is used otherwise
         to make the process killable.
         """
         return self.client_remote.run(args=[py_version, '-c', pyscript],
-                                      wait=False)
+                                      wait=False, stdout=StringIO())
 
 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
 # the same name.
@@ -945,30 +985,42 @@ class LocalCephManager(CephManager):
         proc = self.controller.run(args=args, wait=False, stdout=StringIO())
         return proc
 
-    def raw_cluster_cmd(self, *args, **kwargs):
+    def run_cluster_cmd(self, **kwargs):
+        """
+        Run a Ceph command and the object representing the process for the
+        command.
+
+        Accepts arguments same as teuthology.orchestra.remote.run().
+        """
+        kwargs['args'] = [os.path.join(BIN_PREFIX,'ceph')]+list(kwargs['args'])
+        return self.controller.run(**kwargs)
+
+    def raw_cluster_cmd(self, *args, **kwargs) -> str:
         """
         args like ["osd", "dump"}
         return stdout string
         """
-        proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
-                                        list(args), **kwargs)
-        return six.ensure_str(proc.stdout.getvalue())
+        kwargs['args'] = args
+        if kwargs.get('stdout') is None:
+            kwargs['stdout'] = StringIO()
+        return self.run_cluster_cmd(**kwargs).stdout.getvalue()
 
     def raw_cluster_cmd_result(self, *args, **kwargs):
         """
         like raw_cluster_cmd but don't check status, just return rc
         """
-        kwargs['check_status'] = False
-        proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
-                                        list(args), **kwargs)
-        return proc.exitstatus
+        kwargs['args'], kwargs['check_status'] = args, False
+        return self.run_cluster_cmd(**kwargs).exitstatus
+
+    def admin_socket(self, daemon_type, daemon_id, command, check_status=True,
+                     timeout=None, stdout=None):
+        if stdout is None:
+            stdout = StringIO()
 
-    def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
         return self.controller.run(
-            args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
-            check_status=check_status,
-            timeout=timeout
-        )
+            args=[os.path.join(BIN_PREFIX, "ceph"), "daemon",
+                  "{0}.{1}".format(daemon_type, daemon_id)] + command,
+            check_status=check_status, timeout=timeout, stdout=stdout)
 
     def get_mon_socks(self):
         """
@@ -1020,7 +1072,7 @@ class LocalCephManager(CephManager):
 
 class LocalCephCluster(CephCluster):
     def __init__(self, ctx):
-        # Deliberately skip calling parent constructor
+        # Deliberately skip calling CephCluster constructor
         self._ctx = ctx
         self.mon_manager = LocalCephManager()
         self._conf = defaultdict(dict)
@@ -1091,10 +1143,19 @@ class LocalCephCluster(CephCluster):
 
 class LocalMDSCluster(LocalCephCluster, MDSCluster):
     def __init__(self, ctx):
-        super(LocalMDSCluster, self).__init__(ctx)
+        LocalCephCluster.__init__(self, ctx)
+        # Deliberately skip calling MDSCluster constructor
+        self._mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
+        log.debug("Discovered MDS IDs: {0}".format(self._mds_ids))
+        self._mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+    @property
+    def mds_ids(self):
+        return self._mds_ids
 
-        self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
-        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+    @property
+    def mds_daemons(self):
+        return self._mds_daemons
 
     def clear_firewall(self):
         # FIXME: unimplemented
@@ -1103,6 +1164,13 @@ class LocalMDSCluster(LocalCephCluster, MDSCluster):
     def newfs(self, name='cephfs', create=True):
         return LocalFilesystem(self._ctx, name=name, create=create)
 
+    def delete_all_filesystems(self):
+        """
+        Remove all filesystems that exist, and any pools in use by them.
+        """
+        for fs in self.status().get_filesystems():
+            LocalFilesystem(ctx=self._ctx, fscid=fs['id']).destroy()
+
 
 class LocalMgrCluster(LocalCephCluster, MgrCluster):
     def __init__(self, ctx):
@@ -1112,38 +1180,22 @@ class LocalMgrCluster(LocalCephCluster, MgrCluster):
         self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
 
 
-class LocalFilesystem(Filesystem, LocalMDSCluster):
-    def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None):
-        # Deliberately skip calling parent constructor
-        self._ctx = ctx
+class LocalFilesystem(LocalMDSCluster, Filesystem):
+    def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
+        # Deliberately skip calling Filesystem constructor
+        LocalMDSCluster.__init__(self, ctx)
 
         self.id = None
         self.name = name
-        self.ec_profile = ec_profile
         self.metadata_pool_name = None
         self.metadata_overlay = False
         self.data_pool_name = None
         self.data_pools = None
-        self.fs_config = None
-
-        # Hack: cheeky inspection of ceph.conf to see what MDSs exist
-        self.mds_ids = set()
-        for line in open("ceph.conf").readlines():
-            match = re.match("^\[mds\.(.+)\]$", line)
-            if match:
-                self.mds_ids.add(match.group(1))
-
-        if not self.mds_ids:
-            raise RuntimeError("No MDSs found in ceph.conf!")
-
-        self.mds_ids = list(self.mds_ids)
-
-        log.debug("Discovered MDS IDs: {0}".format(self.mds_ids))
+        self.fs_config = fs_config
+        self.ec_profile = fs_config.get('ec_profile')
 
         self.mon_manager = LocalCephManager()
 
-        self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
-
         self.client_remote = LocalRemote()
 
         self._conf = defaultdict(dict)
@@ -1172,26 +1224,48 @@ class LocalFilesystem(Filesystem, LocalMDSCluster):
         raise NotImplementedError()
 
 
-class InteractiveFailureResult(unittest.TextTestResult):
-    """
-    Specialization that implements interactive-on-error style
-    behavior.
-    """
-    def addFailure(self, test, err):
-        super(InteractiveFailureResult, self).addFailure(test, err)
-        log.error(self._exc_info_to_string(err, test))
-        log.error("Failure in test '{0}', going interactive".format(
-            self.getDescription(test)
-        ))
-        interactive.task(ctx=None, config=None)
+class LocalCluster(object):
+    def __init__(self, rolename="placeholder"):
+        self.remotes = {
+            LocalRemote(): [rolename]
+        }
 
-    def addError(self, test, err):
-        super(InteractiveFailureResult, self).addError(test, err)
-        log.error(self._exc_info_to_string(err, test))
-        log.error("Error in test '{0}', going interactive".format(
-            self.getDescription(test)
-        ))
-        interactive.task(ctx=None, config=None)
+    def only(self, requested):
+        return self.__class__(rolename=requested)
+
+
+class LocalContext(object):
+    def __init__(self):
+        self.config = {}
+        self.teuthology_config = teuth_config
+        self.cluster = LocalCluster()
+        self.daemons = DaemonGroup()
+
+        # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
+        # tests that want to look these up via ctx can do so.
+        # Inspect ceph.conf to see what roles exist
+        for conf_line in open("ceph.conf").readlines():
+            for svc_type in ["mon", "osd", "mds", "mgr"]:
+                prefixed_type = "ceph." + svc_type
+                if prefixed_type not in self.daemons.daemons:
+                    self.daemons.daemons[prefixed_type] = {}
+                match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
+                if match:
+                    svc_id = match.group(1)
+                    self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
+
+    def __del__(self):
+        test_path = self.teuthology_config['test_path']
+        # opt_create_cluster_only does not create the test path
+        if test_path:
+            shutil.rmtree(test_path)
+
+
+#########################################
+#
+# stuff necessary for launching tests...
+#
+#########################################
 
 
 def enumerate_methods(s):
@@ -1242,47 +1316,22 @@ def scan_tests(modules):
             max_required_mgr, require_memstore
 
 
-class LocalCluster(object):
-    def __init__(self, rolename="placeholder"):
-        self.remotes = {
-            LocalRemote(): [rolename]
-        }
-
-    def only(self, requested):
-        return self.__class__(rolename=requested)
-
-
-class LocalContext(object):
+class LogRotate():
     def __init__(self):
-        self.config = {}
-        self.teuthology_config = teuth_config
-        self.cluster = LocalCluster()
-        self.daemons = DaemonGroup()
+        self.conf_file_path = os.path.join(os.getcwd(), 'logrotate.conf')
+        self.state_file_path = os.path.join(os.getcwd(), 'logrotate.state')
 
-        # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
-        # tests that want to look these up via ctx can do so.
-        # Inspect ceph.conf to see what roles exist
-        for conf_line in open("ceph.conf").readlines():
-            for svc_type in ["mon", "osd", "mds", "mgr"]:
-                prefixed_type = "ceph." + svc_type
-                if prefixed_type not in self.daemons.daemons:
-                    self.daemons.daemons[prefixed_type] = {}
-                match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
-                if match:
-                    svc_id = match.group(1)
-                    self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
+    def run_logrotate(self):
+        remote.run(args=['logrotate', '-f', self.conf_file_path, '-s',
+                         self.state_file_path, '--verbose'])
 
-    def __del__(self):
-        test_path = self.teuthology_config['test_path']
-        # opt_create_cluster_only does not create the test path
-        if test_path:
-            shutil.rmtree(test_path)
 
 def teardown_cluster():
     log.info('\ntearing down the cluster...')
     remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
     remote.run(args=['rm', '-rf', './dev', './out'])
 
+
 def clear_old_log():
     from os import stat
 
@@ -1299,8 +1348,161 @@ def clear_old_log():
         init_log()
         log.debug('logging in a fresh file now...')
 
+
+class LogStream(object):
+    def __init__(self):
+        self.buffer = ""
+        self.omit_result_lines = False
+
+    def _del_result_lines(self):
+        """
+        Don't let unittest.TextTestRunner print "Ran X tests in Ys",
+        vstart_runner.py will do it for itself since it runs tests in a
+        testsuite one by one.
+        """
+        if self.omit_result_lines:
+            self.buffer = re.sub('-'*70+'\nran [0-9]* test in [0-9.]*s\n*',
+                                 '', self.buffer, flags=re.I)
+        self.buffer = re.sub('failed \(failures=[0-9]*\)\n', '', self.buffer,
+                             flags=re.I)
+        self.buffer = self.buffer.replace('OK\n', '')
+
+    def write(self, data):
+        self.buffer += data
+        if self.buffer.count("\n") > 5:
+            self._write()
+
+    def _write(self):
+        if opt_rotate_logs:
+            self._del_result_lines()
+        if self.buffer == '':
+            return
+
+        lines = self.buffer.split("\n")
+        for line in lines:
+            # sys.stderr.write(line + "\n")
+            log.info(line)
+        self.buffer = ''
+
+    def flush(self):
+        pass
+
+    def __del__(self):
+        self._write()
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+    """
+    Specialization that implements interactive-on-error style
+    behavior.
+    """
+    def addFailure(self, test, err):
+        super(InteractiveFailureResult, self).addFailure(test, err)
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Failure in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=None, config=None)
+
+    def addError(self, test, err):
+        super(InteractiveFailureResult, self).addError(test, err)
+        log.error(self._exc_info_to_string(err, test))
+        log.error("Error in test '{0}', going interactive".format(
+            self.getDescription(test)
+        ))
+        interactive.task(ctx=None, config=None)
+
+
+# XXX: class we require would be inherited from this one and one of
+# InteractiveFailureResult and unittestunittest.TextTestResult.
+class LoggingResultTemplate(object):
+    fail_on_skip = False
+
+    def startTest(self, test):
+        log.info("Starting test: {0}".format(self.getDescription(test)))
+        test.started_at = datetime.datetime.utcnow()
+        return super(LoggingResultTemplate, self).startTest(test)
+
+    def stopTest(self, test):
+        log.info("Stopped test: {0} in {1}s".format(
+            self.getDescription(test),
+            (datetime.datetime.utcnow() - test.started_at).total_seconds()
+        ))
+
+    def addSkip(self, test, reason):
+        if LoggingResultTemplate.fail_on_skip:
+            # Don't just call addFailure because that requires a traceback
+            self.failures.append((test, reason))
+        else:
+            super(LoggingResultTemplate, self).addSkip(test, reason)
+
+
+def launch_tests(overall_suite):
+    if opt_rotate_logs or not opt_exit_on_test_failure:
+        return launch_individually(overall_suite)
+    else:
+        return launch_entire_suite(overall_suite)
+
+
+def get_logging_result_class():
+    result_class = InteractiveFailureResult if opt_interactive_on_error else \
+        unittest.TextTestResult
+    return type('', (LoggingResultTemplate, result_class), {})
+
+
+def launch_individually(overall_suite):
+    no_of_tests_execed = 0
+    no_of_tests_failed, no_of_tests_execed = 0, 0
+    LoggingResult = get_logging_result_class()
+    stream = LogStream()
+    stream.omit_result_lines = True
+    if opt_rotate_logs:
+        logrotate = LogRotate()
+
+    started_at = datetime.datetime.utcnow()
+    for suite_, case in enumerate_methods(overall_suite):
+        # don't run logrotate beforehand since some ceph daemons might be
+        # down and pre/post-rotate scripts in logrotate.conf might fail.
+        if opt_rotate_logs:
+            logrotate.run_logrotate()
+
+        result = unittest.TextTestRunner(stream=stream,
+                                         resultclass=LoggingResult,
+                                         verbosity=2, failfast=True).run(case)
+
+        if not result.wasSuccessful():
+            if opt_exit_on_test_failure:
+                break
+            else:
+                no_of_tests_failed += 1
+
+        no_of_tests_execed += 1
+    time_elapsed = (datetime.datetime.utcnow() - started_at).total_seconds()
+
+    if result.wasSuccessful():
+        log.info('')
+        log.info('-'*70)
+        log.info(f'Ran {no_of_tests_execed} tests in {time_elapsed}s')
+        if no_of_tests_failed > 0:
+            log.info(f'{no_of_tests_failed} tests failed')
+        log.info('')
+        log.info('OK')
+
+    return result
+
+
+def launch_entire_suite(overall_suite):
+    LoggingResult = get_logging_result_class()
+
+    testrunner = unittest.TextTestRunner(stream=LogStream(),
+                                         resultclass=LoggingResult,
+                                         verbosity=2, failfast=True)
+    return testrunner.run(overall_suite)
+
+
 def exec_test():
     # Parse arguments
+    global opt_interactive_on_error
     opt_interactive_on_error = False
     opt_create_cluster = False
     opt_create_cluster_only = False
@@ -1309,7 +1511,14 @@ def exec_test():
     global opt_log_ps_output
     opt_log_ps_output = False
     use_kernel_client = False
+    global opt_use_ns
+    opt_use_ns = False
+    opt_brxnet= None
     opt_verbose = True
+    global opt_rotate_logs
+    opt_rotate_logs = False
+    global opt_exit_on_test_failure
+    opt_exit_on_test_failure = True
 
     args = sys.argv[1:]
     flags = [a for a in args if a.startswith("-")]
@@ -1331,8 +1540,28 @@ def exec_test():
             clear_old_log()
         elif f == "--kclient":
             use_kernel_client = True
+        elif f == '--usens':
+            opt_use_ns = True
+        elif '--brxnet' in f:
+            if re.search(r'=[0-9./]+', f) is None:
+                log.error("--brxnet=<ip/mask> option needs one argument: '{0}'".format(f))
+                sys.exit(-1)
+            opt_brxnet=f.split('=')[1]
+            try:
+                IP(opt_brxnet)
+                if IP(opt_brxnet).iptype() == 'PUBLIC':
+                    raise RuntimeError('is public')
+            except Exception as e:
+                log.error("Invalid ip '{0}' {1}".format(opt_brxnet, e))
+                sys.exit(-1)
         elif '--no-verbose' == f:
             opt_verbose = False
+        elif f == '--rotate-logs':
+            opt_rotate_logs = True
+        elif f == '--run-all-tests':
+            opt_exit_on_test_failure = False
+        elif f == '--debug':
+            log.setLevel(logging.DEBUG)
         else:
             log.error("Unknown option '{0}'".format(f))
             sys.exit(-1)
@@ -1340,7 +1569,7 @@ def exec_test():
     # Help developers by stopping up-front if their tree isn't built enough for all the
     # tools that the tests might want to use (add more here if needed)
     require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
-                        "cephfs-table-tool", "ceph-fuse", "rados"]
+                        "cephfs-table-tool", "ceph-fuse", "rados", "cephfs-meta-injection"]
     missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
     if missing_binaries and not opt_ignore_missing_binaries:
         log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
@@ -1352,11 +1581,11 @@ def exec_test():
     global remote
     remote = LocalRemote()
 
+    CephFSMount.cleanup_stale_netnses_and_bridge(remote)
+
     # Tolerate no MDSs or clients running at start
-    ps_txt = six.ensure_str(remote.run(
-        args=["ps", "-u"+str(os.getuid())],
-        stdout=StringIO()
-    ).stdout.getvalue().strip())
+    ps_txt = remote.run(args=["ps", "-u"+str(os.getuid())],
+                        stdout=StringIO()).stdout.getvalue().strip()
     lines = ps_txt.split("\n")[1:]
     for line in lines:
         if 'ceph-fuse' in line or 'ceph-mds' in line:
@@ -1397,6 +1626,9 @@ def exec_test():
     if opt_create_cluster_only:
         return
 
+    if opt_use_ns and mon_in_localhost() and not opt_create_cluster:
+        raise RuntimeError("cluster is on localhost; '--usens' option is incompatible. Or you can pass an extra '--create' option to create a new cluster without localhost!")
+
     # List of client mounts, sufficient to run the selected tests
     clients = [i.__str__() for i in range(0, max_required_clients)]
 
@@ -1419,42 +1651,27 @@ def exec_test():
             p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
                                  "osd", "allow rw",
                                  "mds", "allow",
-                                 "mon", "allow r"])
+                                 "mon", "allow r"], stdout=StringIO())
 
-            open("./keyring", "ab").write(p.stdout.getvalue())
+            open("./keyring", "at").write(p.stdout.getvalue())
 
         if use_kernel_client:
-            mount = LocalKernelMount(ctx, test_dir, client_id)
+            mount = LocalKernelMount(ctx=ctx, test_dir=test_dir,
+                                     client_id=client_id, brxnet=opt_brxnet)
         else:
-            mount = LocalFuseMount(ctx, test_dir, client_id)
+            mount = LocalFuseMount(ctx=ctx, test_dir=test_dir,
+                                   client_id=client_id, brxnet=opt_brxnet)
 
         mounts.append(mount)
-        if os.path.exists(mount.mountpoint):
+        if os.path.exists(mount.hostfs_mntpt):
             if mount.is_mounted():
-                log.warning("unmounting {0}".format(mount.mountpoint))
+                log.warning("unmounting {0}".format(mount.hostfs_mntpt))
                 mount.umount_wait()
             else:
-                os.rmdir(mount.mountpoint)
+                os.rmdir(mount.hostfs_mntpt)
 
     from tasks.cephfs_test_runner import DecoratingLoader
 
-    class LogStream(object):
-        def __init__(self):
-            self.buffer = ""
-
-        def write(self, data):
-            self.buffer += data
-            if "\n" in self.buffer:
-                lines = self.buffer.split("\n")
-                for line in lines[:-1]:
-                    pass
-                    # sys.stderr.write(line + "\n")
-                    log.info(line)
-                self.buffer = lines[-1]
-
-        def flush(self):
-            pass
-
     decorating_loader = DecoratingLoader({
         "ctx": ctx,
         "mounts": mounts,
@@ -1518,43 +1735,18 @@ def exec_test():
     for s, method in victims:
         s._tests.remove(method)
 
-    if opt_interactive_on_error:
-        result_class = InteractiveFailureResult
-    else:
-        result_class = unittest.TextTestResult
-    fail_on_skip = False
-
-    class LoggingResult(result_class):
-        def startTest(self, test):
-            log.info("Starting test: {0}".format(self.getDescription(test)))
-            test.started_at = datetime.datetime.utcnow()
-            return super(LoggingResult, self).startTest(test)
-
-        def stopTest(self, test):
-            log.info("Stopped test: {0} in {1}s".format(
-                self.getDescription(test),
-                (datetime.datetime.utcnow() - test.started_at).total_seconds()
-            ))
-
-        def addSkip(self, test, reason):
-            if fail_on_skip:
-                # Don't just call addFailure because that requires a traceback
-                self.failures.append((test, reason))
-            else:
-                super(LoggingResult, self).addSkip(test, reason)
-
-    # Execute!
-    result = unittest.TextTestRunner(
-        stream=LogStream(),
-        resultclass=LoggingResult,
-        verbosity=2,
-        failfast=True).run(overall_suite)
+    overall_suite = load_tests(modules, loader.TestLoader())
+    result = launch_tests(overall_suite)
 
+    CephFSMount.cleanup_stale_netnses_and_bridge(remote)
     if opt_teardown_cluster:
         teardown_cluster()
 
     if not result.wasSuccessful():
-        result.printErrors()  # duplicate output at end for convenience
+        # no point in duplicating if we can have multiple failures in same
+        # run.
+        if opt_exit_on_test_failure:
+            result.printErrors()  # duplicate output at end for convenience
 
         bad_tests = []
         for test, error in result.errors: