]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/vstart_runner.py
import quincy beta 17.1.0
[ceph.git] / ceph / qa / tasks / vstart_runner.py
index db0cb41cf15c32177eeec5566a8fc90cf453dfd8..234ad1985dfb0e159695981ddf52893a29c60ad7 100644 (file)
@@ -50,20 +50,20 @@ import logging
 
 from unittest import suite, loader
 
-from teuthology.orchestra.run import Raw, quote
+from teuthology.orchestra.run import Raw, quote, PIPE
 from teuthology.orchestra.daemon import DaemonGroup
 from teuthology.orchestra.remote import Remote
 from teuthology.config import config as teuth_config
 from teuthology.contextutil import safe_while
 from teuthology.contextutil import MaxWhileTries
-from teuthology.orchestra.run import CommandFailedError
+from teuthology.exceptions import CommandFailedError
 try:
     import urllib3
     urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 except:
     pass
 
-def init_log():
+def init_log(log_level=logging.INFO):
     global log
     if log is not None:
         del log
@@ -78,7 +78,7 @@ def init_log():
         datefmt='%Y-%m-%dT%H:%M:%S')
     handler.setFormatter(formatter)
     log.addHandler(handler)
-    log.setLevel(logging.INFO)
+    log.setLevel(log_level)
 
 log = None
 init_log()
@@ -158,6 +158,8 @@ else:
     BIN_PREFIX = "./"
     SRC_PREFIX = "./"
 
+CEPH_CMD = os.path.join(BIN_PREFIX, 'ceph')
+
 
 def rm_nonascii_chars(var):
     var = var.replace(b'\xe2\x80\x98', b'\'')
@@ -179,23 +181,15 @@ class LocalRemoteProcess(object):
         self.check_status = check_status
         self.exitstatus = self.returncode = None
 
-    def wait(self):
-        if self.finished:
-            # Avoid calling communicate() on a dead process because it'll
-            # give you stick about std* already being closed
-            if self.check_status and self.exitstatus != 0:
-                raise CommandFailedError(self.args, self.exitstatus)
-            else:
-                return
-
-        out, err = self.subproc.communicate()
-        out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
+    def _write_stdout(self, out):
         if isinstance(self.stdout, StringIO):
             self.stdout.write(out.decode(errors='ignore'))
         elif self.stdout is None:
             pass
         else:
             self.stdout.write(out)
+
+    def _write_stderr(self, err):
         if isinstance(self.stderr, StringIO):
             self.stderr.write(err.decode(errors='ignore'))
         elif self.stderr is None:
@@ -203,6 +197,20 @@ class LocalRemoteProcess(object):
         else:
             self.stderr.write(err)
 
+    def wait(self):
+        if self.finished:
+            # Avoid calling communicate() on a dead process because it'll
+            # give you stick about std* already being closed
+            if self.check_status and self.exitstatus != 0:
+                raise CommandFailedError(self.args, self.exitstatus)
+            else:
+                return
+
+        out, err = self.subproc.communicate()
+        out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
+        self._write_stdout(out)
+        self._write_stderr(err)
+
         self.exitstatus = self.returncode = self.subproc.returncode
 
         if self.exitstatus != 0:
@@ -219,19 +227,11 @@ class LocalRemoteProcess(object):
 
         if self.subproc.poll() is not None:
             out, err = self.subproc.communicate()
-            if isinstance(self.stdout, StringIO):
-                self.stdout.write(out.decode(errors='ignore'))
-            elif self.stdout is None:
-                pass
-            else:
-                self.stdout.write(out)
-            if isinstance(self.stderr, StringIO):
-                self.stderr.write(err.decode(errors='ignore'))
-            elif self.stderr is None:
-                pass
-            else:
-                self.stderr.write(err)
+            self._write_stdout(out)
+            self._write_stderr(err)
+
             self.exitstatus = self.returncode = self.subproc.returncode
+
             return True
         else:
             return False
@@ -303,14 +303,25 @@ class LocalRemote(object):
         # None
         return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
 
-    def mktemp(self, suffix=None, parentdir=None):
+    def mktemp(self, suffix='', parentdir='', path=None, data=None,
+               owner=None, mode=None):
         """
         Make a remote temporary file
 
         Returns: the path of the temp file created.
         """
         from tempfile import mktemp
-        return mktemp(suffix=suffix, dir=parentdir)
+        if not path:
+            path = mktemp(suffix=suffix, dir=parentdir)
+        if not parentdir:
+            path = os.path.join('/tmp', path)
+
+        if data:
+            # sudo is set to False since root user can't write files in /tmp
+            # owned by other users.
+            self.write_file(path=path, data=data, sudo=False)
+
+        return path
 
     def write_file(self, path, data, sudo=False, mode=None, owner=None,
                                      mkdir=False, append=False):
@@ -403,11 +414,12 @@ class LocalRemote(object):
     # vstart_runner.py.
     def _do_run(self, args, check_status=True, wait=True, stdout=None,
                 stderr=None, cwd=None, stdin=None, logger=None, label=None,
-                env=None, timeout=None, omit_sudo=True):
+                env=None, timeout=None, omit_sudo=True, shell=True):
         args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
 
         # We have to use shell=True if any run.Raw was present, e.g. &&
-        shell = any([a for a in args if isinstance(a, Raw)])
+        if not shell:
+            shell = any([a for a in args if isinstance(a, Raw)])
 
         # Filter out helper tools that don't exist in a vstart environment
         args = [a for a in args if a not in ('adjust-ulimits',
@@ -420,10 +432,6 @@ class LocalRemote(object):
             local_bin = os.path.join(BIN_PREFIX, args[0])
             if os.path.exists(local_bin):
                 args = [local_bin] + args[1:]
-            else:
-                log.debug("'{0}' is not a binary in the Ceph build dir".format(
-                    args[0]
-                ))
 
         log.debug('> ' +
                  ' '.join([str(a.value) if isinstance(a, Raw) else a for a in args]))
@@ -456,6 +464,10 @@ class LocalRemote(object):
             # as long as the input buffer is "small"
             if isinstance(stdin, str):
                 subproc.stdin.write(stdin.encode())
+            elif stdin == subprocess.PIPE or stdin == PIPE:
+                pass
+            elif isinstance(stdin, StringIO):
+                subproc.stdin.write(bytes(stdin.getvalue(),encoding='utf8'))
             else:
                 subproc.stdin.write(stdin)
 
@@ -523,13 +535,11 @@ class LocalDaemon(object):
             if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
                 log.debug("Found ps line for daemon: {0}".format(line))
                 return int(line.split()[0])
-        if opt_log_ps_output:
-            log.debug("No match for {0} {1}: {2}".format(
-                self.daemon_type, self.daemon_id, ps_txt))
-        else:
-            log.debug("No match for {0} {1}".format(self.daemon_type,
-                     self.daemon_id))
-            return None
+        if not opt_log_ps_output:
+            ps_txt = '(omitted)'
+        log.debug("No match for {0} {1}: {2}".format(
+            self.daemon_type, self.daemon_id, ps_txt))
+        return None
 
     def wait(self, timeout):
         waited = 0
@@ -545,6 +555,8 @@ class LocalDaemon(object):
             return
 
         pid = self._get_pid()
+        if pid is None:
+            return
         log.debug("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
         os.kill(pid, signal.SIGTERM)
 
@@ -609,16 +621,7 @@ def mon_in_localhost(config_path="./ceph.conf"):
                 return True
     return False
 
-class LocalKernelMount(KernelMount):
-    def __init__(self, ctx, test_dir, client_id=None,
-                 client_keyring_path=None, client_remote=None,
-                 hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
-                 brxnet=None):
-        super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
-            client_id=client_id, client_keyring_path=client_keyring_path,
-            client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
-            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
-
+class LocalCephFSMount():
     @property
     def config_path(self):
         return "./ceph.conf"
@@ -634,25 +637,17 @@ class LocalKernelMount(KernelMount):
         else:
             return keyring_path
 
-    def setupfs(self, name=None):
-        if name is None and self.fs is not None:
-            # Previous mount existed, reuse the old name
-            name = self.fs.name
-        self.fs = LocalFilesystem(self.ctx, name=name)
-        log.debug('Wait for MDS to reach steady state...')
-        self.fs.wait_for_daemons()
-        log.debug('Ready to start {}...'.format(type(self).__name__))
-
     @property
     def _prefix(self):
         return BIN_PREFIX
 
     def _asok_path(self):
-        # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
-        # run foreground.  When running it daemonized however, the asok is named after
-        # the PID of the launching process, not the long running ceph-fuse process.  Therefore
-        # we need to give an exact path here as the logic for checking /proc/ for which
-        # asok is alive does not work.
+        # In teuthology, the asok is named after the PID of the ceph-fuse
+        # process, because it's run foreground.  When running it daemonized
+        # however, the asok is named after the PID of the launching process,
+        # not the long running ceph-fuse process.  Therefore we need to give
+        # an exact path here as the logic for checking /proc/ for which asok
+        # is alive does not work.
 
         # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
         # in a tmpdir. All of the paths are the same, so no need to select
@@ -667,250 +662,92 @@ class LocalKernelMount(KernelMount):
         path = "{0}/client.{1}.*.asok".format(d, self.client_id)
         return path
 
-    def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
-        self.update_attrs(**kwargs)
-        self.assert_and_log_minimum_mount_details()
-
-        if opt_use_ns:
-            self.using_namespace = True
-            self.setup_netns()
-        else:
-            self.using_namespace = False
-
-        if not self.cephfs_mntpt:
-            self.cephfs_mntpt = "/"
-        # TODO: don't call setupfs() from within mount()
-        if createfs:
-            self.setupfs(name=self.cephfs_name)
-
-        opts = 'norequire_active_mds'
-        if self.client_id:
-            opts += ',name=' + self.client_id
-        if self.client_keyring_path and self.client_id:
-            opts += ",secret=" + self.get_key_from_keyfile()
-        if self.config_path:
-            opts += ',conf=' + self.config_path
-        if self.cephfs_name:
-            opts += ",mds_namespace={0}".format(self.cephfs_name)
-        if mntopts:
-            opts += ',' + ','.join(mntopts)
-
-        stderr = StringIO()
-        try:
-            self.client_remote.run(args=['mkdir', '--', self.hostfs_mntpt],
-                                   timeout=(5*60), stderr=stderr)
-        except CommandFailedError:
-            if 'file exists' not in stderr.getvalue().lower():
-                raise
-
-        if self.cephfs_mntpt is None:
-            self.cephfs_mntpt = "/"
-        cmdargs = ['sudo']
-        if self.using_namespace:
-           cmdargs += ['nsenter',
-                       '--net=/var/run/netns/{0}'.format(self.netns_name)]
-        cmdargs += ['./bin/mount.ceph', ':' + self.cephfs_mntpt,
-                    self.hostfs_mntpt, '-v', '-o', opts]
-
-        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
-        try:
-            self.client_remote.run(args=cmdargs, timeout=(30*60),
-                omit_sudo=False, stdout=mountcmd_stdout,
-                stderr=mountcmd_stderr)
-        except CommandFailedError as e:
-            if check_status:
-                raise
-            else:
-                return (e, mountcmd_stdout.getvalue(),
-                        mountcmd_stderr.getvalue())
-
-        stderr = StringIO()
-        try:
-            self.client_remote.run(args=['sudo', 'chmod', '1777',
-                                   self.hostfs_mntpt], stderr=stderr,
-                                   timeout=(5*60))
-        except CommandFailedError:
-            # the client does not have write permissions in cap it holds for
-            # the Ceph FS that was just mounted.
-            if 'permission denied' in stderr.getvalue().lower():
-                pass
-
-        self.mounted = True
-
-    def cleanup_netns(self):
-        if self.using_namespace:
-            super(type(self), self).cleanup_netns()
-
-    def _run_python(self, pyscript, py_version='python'):
+    def _run_python(self, pyscript, py_version='python', sudo=False):
         """
         Override this to remove the daemon-helper prefix that is used otherwise
         to make the process killable.
         """
-        return self.client_remote.run(args=[py_version, '-c', pyscript],
+        args = []
+        if sudo:
+            args.append('sudo')
+        args += [py_version, '-c', pyscript]
+        return self.client_remote.run(args=args,
                                       wait=False, stdout=StringIO())
 
-class LocalFuseMount(FuseMount):
-    def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
-                 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
-                 cephfs_mntpt=None, brxnet=None):
-        super(LocalFuseMount, self).__init__(ctx=ctx, client_config=None,
-            test_dir=test_dir, client_id=client_id,
-            client_keyring_path=client_keyring_path,
-            client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
-            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
+    def setup_netns(self):
+        if opt_use_ns:
+            super(type(self), self).setup_netns()
 
     @property
-    def config_path(self):
-        return "./ceph.conf"
-
-    def get_keyring_path(self):
-        # This is going to end up in a config file, so use an absolute path
-        # to avoid assumptions about daemons' pwd
-        return os.path.abspath("./client.{0}.keyring".format(self.client_id))
+    def _nsenter_args(self):
+            if opt_use_ns:
+                return super(type(self), self)._nsenter_args
+            else:
+                return []
 
     def setupfs(self, name=None):
         if name is None and self.fs is not None:
             # Previous mount existed, reuse the old name
             name = self.fs.name
         self.fs = LocalFilesystem(self.ctx, name=name)
-        log.debug('Wait for MDS to reach steady state...')
+        log.info('Wait for MDS to reach steady state...')
         self.fs.wait_for_daemons()
-        log.debug('Ready to start {}...'.format(type(self).__name__))
-
-    @property
-    def _prefix(self):
-        return BIN_PREFIX
-
-    def _asok_path(self):
-        # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
-        # run foreground.  When running it daemonized however, the asok is named after
-        # the PID of the launching process, not the long running ceph-fuse process.  Therefore
-        # we need to give an exact path here as the logic for checking /proc/ for which
-        # asok is alive does not work.
-
-        # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
-        # in a tmpdir. All of the paths are the same, so no need to select
-        # based off of the service type.
-        d = "./out"
-        with open(self.config_path) as f:
-            for line in f:
-                asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
-                if asok_conf:
-                    d = asok_conf.groups(1)[0]
-                    break
-        path = "{0}/client.{1}.*.asok".format(d, self.client_id)
-        return path
+        log.info('Ready to start {}...'.format(type(self).__name__))
 
-    def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
-        self.update_attrs(**kwargs)
-        self.assert_and_log_minimum_mount_details()
 
-        if opt_use_ns:
-            self.using_namespace = True
-            self.setup_netns()
-        else:
-            self.using_namespace = False
-
-        # TODO: don't call setupfs() from within mount()
-        if createfs:
-            self.setupfs(name=self.cephfs_name)
+class LocalKernelMount(LocalCephFSMount, KernelMount):
+    def __init__(self, ctx, test_dir, client_id=None,
+                 client_keyring_path=None, client_remote=None,
+                 hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
+                 brxnet=None):
+        super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
+            client_id=client_id, client_keyring_path=client_keyring_path,
+            client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
 
-        stderr = StringIO()
-        try:
-            self.client_remote.run(args=['mkdir', '-p', self.hostfs_mntpt],
-                                   stderr=stderr)
-        except CommandFailedError:
-            if 'file exists' not in stderr.getvalue().lower():
-                raise
+        # Make vstart_runner compatible with teuth and qa/tasks/cephfs.
+        self._mount_bin = [os.path.join(BIN_PREFIX , 'mount.ceph')]
 
-        def list_connections():
-            self.client_remote.run(
-                args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
-                check_status=False
-            )
 
-            p = self.client_remote.run(args=["ls", "/sys/fs/fuse/connections"],
-                                       check_status=False, stdout=StringIO())
-            if p.exitstatus != 0:
-                log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
-                return []
+class LocalFuseMount(LocalCephFSMount, FuseMount):
+    def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
+                 client_remote=None, hostfs_mntpt=None, cephfs_name=None,
+                 cephfs_mntpt=None, brxnet=None):
+        super(LocalFuseMount, self).__init__(ctx=ctx, test_dir=test_dir,
+            client_id=client_id, client_keyring_path=client_keyring_path,
+            client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+            cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
 
-            ls_str = p.stdout.getvalue().strip()
-            if ls_str:
-                return [int(n) for n in ls_str.split("\n")]
-            else:
-                return []
+        # Following block makes tests meant for teuthology compatible with
+        # vstart_runner.
+        self._mount_bin = [os.path.join(BIN_PREFIX, 'ceph-fuse')]
+        self._mount_cmd_cwd, self._mount_cmd_logger, \
+            self._mount_cmd_stdin = None, None, None
 
-        # Before starting ceph-fuse process, note the contents of
-        # /sys/fs/fuse/connections
-        pre_mount_conns = list_connections()
-        log.debug("Pre-mount connections: {0}".format(pre_mount_conns))
-
-        cmdargs = []
-        if self.using_namespace:
-            cmdargs = ['sudo', 'nsenter',
-                       '--net=/var/run/netns/{0}'.format(self.netns_name),
-                       '--setuid', str(os.getuid())]
-        cmdargs += [os.path.join(BIN_PREFIX, 'ceph-fuse'), self.hostfs_mntpt,
-                    '-f']
-        if self.client_id is not None:
-            cmdargs += ["--id", self.client_id]
-        if self.client_keyring_path and self.client_id is not None:
-            cmdargs.extend(['-k', self.client_keyring_path])
-        if self.cephfs_name:
-            cmdargs += ["--client_fs=" + self.cephfs_name]
-        if self.cephfs_mntpt:
-            cmdargs += ["--client_mountpoint=" + self.cephfs_mntpt]
-        if os.getuid() != 0:
-            cmdargs += ["--client_die_on_failed_dentry_invalidate=false"]
-        if mntopts:
-            cmdargs += mntopts
+    # XXX: CephFSMount._create_mntpt() sets mountpoint's permission mode to
+    # 0000 which doesn't work for vstart_runner since superuser privileges are
+    # not used for mounting Ceph FS with FUSE.
+    def _create_mntpt(self):
+        self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}')
 
-        mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
-        self.fuse_daemon = self.client_remote.run(args=cmdargs, wait=False,
-            omit_sudo=False, stdout=mountcmd_stdout, stderr=mountcmd_stderr)
+    def _run_mount_cmd(self, mntopts, check_status):
+        super(type(self), self)._run_mount_cmd(mntopts, check_status)
         self._set_fuse_daemon_pid(check_status)
-        log.debug("Mounting client.{0} with pid "
-                 "{1}".format(self.client_id, self.fuse_daemon.subproc.pid))
 
-        # Wait for the connection reference to appear in /sys
-        waited = 0
-        post_mount_conns = list_connections()
-        while len(post_mount_conns) <= len(pre_mount_conns):
-            if self.fuse_daemon.finished:
-                # Did mount fail?  Raise the CommandFailedError instead of
-                # hitting the "failed to populate /sys/" timeout
-                try:
-                    self.fuse_daemon.wait()
-                except CommandFailedError as e:
-                    if check_status:
-                        raise
-                    else:
-                        return (e, mountcmd_stdout.getvalue(),
-                                mountcmd_stderr.getvalue())
-            time.sleep(1)
-            waited += 1
-            if waited > 30:
-                raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
-                    waited
-                ))
-            post_mount_conns = list_connections()
+    def _get_mount_cmd(self, mntopts):
+        mount_cmd = super(type(self), self)._get_mount_cmd(mntopts)
 
-        log.debug("Post-mount connections: {0}".format(post_mount_conns))
+        if os.getuid() != 0:
+            mount_cmd += ['--client_die_on_failed_dentry_invalidate=false']
 
-        # Record our fuse connection number so that we can use it when
-        # forcing an unmount
-        new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
-        if len(new_conns) == 0:
-            raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
-        elif len(new_conns) > 1:
-            raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
-        else:
-            self._fuse_conn = new_conns[0]
+        return mount_cmd
 
-        self.gather_mount_info()
+    @property
+    def _fuse_conn_check_timeout(self):
+        return 30
 
-        self.mounted = True
+    def _add_valgrind_args(self, mount_cmd):
+        return []
 
     def _set_fuse_daemon_pid(self, check_status):
         # NOTE: When a command <args> is launched with sudo, two processes are
@@ -933,18 +770,6 @@ class LocalFuseMount(FuseMount):
             else:
                 pass
 
-    def cleanup_netns(self):
-        if self.using_namespace:
-            super(type(self), self).cleanup_netns()
-
-    def _run_python(self, pyscript, py_version='python'):
-        """
-        Override this to remove the daemon-helper prefix that is used otherwise
-        to make the process killable.
-        """
-        return self.client_remote.run(args=[py_version, '-c', pyscript],
-                                      wait=False, stdout=StringIO())
-
 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
 # the same name.
 class LocalCephManager(CephManager):
@@ -965,6 +790,24 @@ class LocalCephManager(CephManager):
         # methods to work though.
         self.pools = {}
 
+        # NOTE: These variables are being overriden here so that parent class
+        # can pick it up.
+        self.cephadm = False
+        self.rook = False
+        self.testdir = None
+        self.run_cluster_cmd_prefix = [CEPH_CMD]
+        # XXX: Ceph API test CI job crashes because "ceph -w" process launched
+        # by run_ceph_w() crashes when shell is set to True.
+        # See https://tracker.ceph.com/issues/49644.
+        #
+        # The 2 possible workaround this are either setting "shell" to "False"
+        # when command "ceph -w" is executed or to prepend "exec sudo" to
+        # command arguments. We are going with latter since former would make
+        # it necessary to pass "shell" parameter to run() method. This leads
+        # to incompatibility with the method teuthology.orchestra.run's run()
+        # since it doesn't accept "shell" as parameter.
+        self.run_ceph_w_prefix = ['exec', 'sudo', CEPH_CMD]
+
     def find_remote(self, daemon_type, daemon_id):
         """
         daemon_type like 'mds', 'osd'
@@ -972,102 +815,14 @@ class LocalCephManager(CephManager):
         """
         return LocalRemote()
 
-    def run_ceph_w(self, watch_channel=None):
-        """
-        :param watch_channel: Specifies the channel to be watched.
-                              This can be 'cluster', 'audit', ...
-        :type watch_channel: str
-        """
-        args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
-        if watch_channel is not None:
-            args.append("--watch-channel")
-            args.append(watch_channel)
-        proc = self.controller.run(args=args, wait=False, stdout=StringIO())
-        return proc
-
-    def run_cluster_cmd(self, **kwargs):
-        """
-        Run a Ceph command and the object representing the process for the
-        command.
-
-        Accepts arguments same as teuthology.orchestra.remote.run().
-        """
-        kwargs['args'] = [os.path.join(BIN_PREFIX,'ceph')]+list(kwargs['args'])
-        return self.controller.run(**kwargs)
-
-    def raw_cluster_cmd(self, *args, **kwargs) -> str:
-        """
-        args like ["osd", "dump"}
-        return stdout string
-        """
-        kwargs['args'] = args
-        if kwargs.get('stdout') is None:
-            kwargs['stdout'] = StringIO()
-        return self.run_cluster_cmd(**kwargs).stdout.getvalue()
-
-    def raw_cluster_cmd_result(self, *args, **kwargs):
-        """
-        like raw_cluster_cmd but don't check status, just return rc
-        """
-        kwargs['args'], kwargs['check_status'] = args, False
-        return self.run_cluster_cmd(**kwargs).exitstatus
-
     def admin_socket(self, daemon_type, daemon_id, command, check_status=True,
                      timeout=None, stdout=None):
         if stdout is None:
             stdout = StringIO()
 
-        return self.controller.run(
-            args=[os.path.join(BIN_PREFIX, "ceph"), "daemon",
-                  "{0}.{1}".format(daemon_type, daemon_id)] + command,
-            check_status=check_status, timeout=timeout, stdout=stdout)
-
-    def get_mon_socks(self):
-        """
-        Get monitor sockets.
-
-        :return socks: tuple of strings; strings are individual sockets.
-        """
-        from json import loads
-
-        output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
-        socks = []
-        for mon in output['mons']:
-            for addrvec_mem in mon['public_addrs']['addrvec']:
-                socks.append(addrvec_mem['addr'])
-        return tuple(socks)
-
-    def get_msgrv1_mon_socks(self):
-        """
-        Get monitor sockets that use msgrv2 to operate.
-
-        :return socks: tuple of strings; strings are individual sockets.
-        """
-        from json import loads
-
-        output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
-        socks = []
-        for mon in output['mons']:
-            for addrvec_mem in mon['public_addrs']['addrvec']:
-                if addrvec_mem['type'] == 'v1':
-                    socks.append(addrvec_mem['addr'])
-        return tuple(socks)
-
-    def get_msgrv2_mon_socks(self):
-        """
-        Get monitor sockets that use msgrv2 to operate.
-
-        :return socks: tuple of strings; strings are individual sockets.
-        """
-        from json import loads
-
-        output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
-        socks = []
-        for mon in output['mons']:
-            for addrvec_mem in mon['public_addrs']['addrvec']:
-                if addrvec_mem['type'] == 'v2':
-                    socks.append(addrvec_mem['addr'])
-        return tuple(socks)
+        args=[CEPH_CMD, "daemon", f"{daemon_type}.{daemon_id}"] + command
+        return self.controller.run(args=args, check_status=check_status,
+                                   timeout=timeout, stdout=stdout)
 
 
 class LocalCephCluster(CephCluster):
@@ -1242,10 +997,13 @@ class LocalCluster(object):
 
 class LocalContext(object):
     def __init__(self):
-        self.config = {}
+        self.config = {'cluster': 'ceph'}
         self.teuthology_config = teuth_config
         self.cluster = LocalCluster()
         self.daemons = DaemonGroup()
+        if not hasattr(self, 'managers'):
+            self.managers = {}
+        self.managers[self.config['cluster']] = LocalCephManager()
 
         # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
         # tests that want to look these up via ctx can do so.
@@ -1302,7 +1060,6 @@ def load_tests(modules, loader):
 
 def scan_tests(modules):
     overall_suite = load_tests(modules, loader.TestLoader())
-
     max_required_mds = 0
     max_required_clients = 0
     max_required_mgr = 0
@@ -1335,23 +1092,20 @@ class LogRotate():
 def teardown_cluster():
     log.info('\ntearing down the cluster...')
     remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
+    log.info('\nceph cluster torn down')
     remote.run(args=['rm', '-rf', './dev', './out'])
 
 
 def clear_old_log():
-    from os import stat
-
     try:
-        stat(logpath)
-    # would need an update when making this py3 compatible. Use FileNotFound
-    # instead.
-    except OSError:
+        os.stat(logpath)
+    except FileNotFoundError:
         return
     else:
         os.remove(logpath)
         with open(logpath, 'w') as logfile:
             logfile.write('')
-        init_log()
+        init_log(log.level)
         log.debug('logging in a fresh file now...')
 
 
@@ -1576,6 +1330,8 @@ def exec_test():
     # tools that the tests might want to use (add more here if needed)
     require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
                         "cephfs-table-tool", "ceph-fuse", "rados", "cephfs-meta-injection"]
+    # What binaries may be required is task specific
+    require_binaries = ["ceph-dencoder",  "rados"]
     missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
     if missing_binaries and not opt_ignore_missing_binaries:
         log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
@@ -1621,9 +1377,11 @@ def exec_test():
         if opt_verbose:
             args.append("-d")
 
+        log.info('\nrunning vstart.sh now...')
         # usually, i get vstart.sh running completely in less than 100
         # seconds.
         remote.run(args=args, env=vstart_env, timeout=(3 * 60))
+        log.info('\nvstart.sh finished running')
 
         # Wait for OSD to come up so that subsequent injectargs etc will
         # definitely succeed
@@ -1654,7 +1412,7 @@ def exec_test():
         client_name = "client.{0}".format(client_id)
 
         if client_name not in open("./keyring").read():
-            p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
+            p = remote.run(args=[CEPH_CMD, "auth", "get-or-create", client_name,
                                  "osd", "allow rw",
                                  "mds", "allow",
                                  "mon", "allow r"], stdout=StringIO())
@@ -1688,7 +1446,7 @@ def exec_test():
 
     # For the benefit of polling tests like test_full -- in teuthology land we set this
     # in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
-    remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
+    remote.run(args=[CEPH_CMD, "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
     ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
 
     # Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning