]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/filesystem.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
index 7f01b0ff48000d60c21a2343f7fbb5a896ec32e1..34ee42f13ec6d0cb12c1744292698113129d4a24 100644 (file)
@@ -10,13 +10,14 @@ import errno
 import random
 import traceback
 
-from io import BytesIO
-from six import StringIO
+from io import BytesIO, StringIO
+from errno import EBUSY
 
 from teuthology.exceptions import CommandFailedError
 from teuthology import misc
 from teuthology.nuke import clear_firewall
 from teuthology.parallel import parallel
+from teuthology import contextutil
 from tasks.ceph_manager import write_conf
 from tasks import ceph_manager
 
@@ -59,6 +60,13 @@ class ObjectNotFound(Exception):
     def __str__(self):
         return "Object not found: '{0}'".format(self._object_name)
 
+class FSMissing(Exception):
+    def __init__(self, ident):
+        self.ident = ident
+
+    def __str__(self):
+        return f"File system {self.ident} does not exist in the map"
+
 class FSStatus(object):
     """
     Operations on a snapshot of the FSMap.
@@ -108,7 +116,7 @@ class FSStatus(object):
         for fs in self.map['filesystems']:
             if fscid is None or fs['id'] == fscid:
                 return fs
-        raise RuntimeError("FSCID {0} not in map".format(fscid))
+        raise FSMissing(fscid)
 
     def get_fsmap_byname(self, name):
         """
@@ -117,7 +125,7 @@ class FSStatus(object):
         for fs in self.map['filesystems']:
             if name is None or fs['mdsmap']['fs_name'] == name:
                 return fs
-        raise RuntimeError("FS {0} not in map".format(name))
+        raise FSMissing(name)
 
     def get_replays(self, fscid):
         """
@@ -166,6 +174,17 @@ class FSStatus(object):
             log.warning(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
             raise RuntimeError("MDS id '{0}' not found in map".format(name))
 
+    def get_mds_addrs(self, name):
+        """
+        Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
+        """
+        info = self.get_mds(name)
+        if info:
+            return [e['addr'] for e in info['addrs']['addrvec']]
+        else:
+            log.warn(json.dumps(list(self.get_all()), indent=2))  # dump for debugging
+            raise RuntimeError("MDS id '{0}' not found in map".format(name))
+
     def get_mds_gid(self, gid):
         """
         Get the info for the given MDS gid.
@@ -195,7 +214,7 @@ class CephCluster(object):
         (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
         return result
 
-    def __init__(self, ctx):
+    def __init__(self, ctx) -> None:
         self._ctx = ctx
         self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
 
@@ -235,6 +254,20 @@ class CephCluster(object):
             log.debug("_json_asok output empty")
             return None
 
+    def is_addr_blocklisted(self, addr=None):
+        if addr is None:
+            log.warn("Couldn't get the client address, so the blocklisted "
+                     "status undetermined")
+            return False
+
+        blocklist = json.loads(self.mon_manager.run_cluster_cmd(
+            args=["osd", "blocklist", "ls", "--format=json"],
+            stdout=StringIO()).stdout.getvalue())
+        for b in blocklist:
+            if addr == b["addr"]:
+                return True
+        return False
+
 
 class MDSCluster(CephCluster):
     """
@@ -245,17 +278,18 @@ class MDSCluster(CephCluster):
     a parent of Filesystem.  The correct way to use MDSCluster going forward is
     as a separate instance outside of your (multiple) Filesystem instances.
     """
+
     def __init__(self, ctx):
         super(MDSCluster, self).__init__(ctx)
 
-        self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
-
-        if len(self.mds_ids) == 0:
-            raise RuntimeError("This task requires at least one MDS")
+    @property
+    def mds_ids(self):
+        # do this dynamically because the list of ids may change periodically with cephadm
+        return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
 
-        if hasattr(self._ctx, "daemons"):
-            # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
-            self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
+    @property
+    def mds_daemons(self):
+        return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
 
     def _one_or_all(self, mds_id, cb, in_parallel=True):
         """
@@ -270,6 +304,7 @@ class MDSCluster(CephCluster):
         :param cb: Callback taking single argument of MDS daemon name
         :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
         """
+
         if mds_id is None:
             if in_parallel:
                 with parallel() as p:
@@ -336,42 +371,6 @@ class MDSCluster(CephCluster):
     def status(self):
         return FSStatus(self.mon_manager)
 
-    def delete_all_filesystems(self):
-        """
-        Remove all filesystems that exist, and any pools in use by them.
-        """
-        pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
-        pool_id_name = {}
-        for pool in pools:
-            pool_id_name[pool['pool']] = pool['pool_name']
-
-        # mark cluster down for each fs to prevent churn during deletion
-        status = self.status()
-        for fs in status.get_filesystems():
-            self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
-
-        # get a new copy as actives may have since changed
-        status = self.status()
-        for fs in status.get_filesystems():
-            mdsmap = fs['mdsmap']
-            metadata_pool = pool_id_name[mdsmap['metadata_pool']]
-
-            self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
-            self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                             metadata_pool, metadata_pool,
-                                             '--yes-i-really-really-mean-it')
-            for data_pool in mdsmap['data_pools']:
-                data_pool = pool_id_name[data_pool]
-                try:
-                    self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
-                                                     data_pool, data_pool,
-                                                     '--yes-i-really-really-mean-it')
-                except CommandFailedError as e:
-                    if e.exitstatus == 16: # EBUSY, this data pool is used
-                        pass               # by two metadata pools, let the 2nd
-                    else:                  # pass delete it
-                        raise
-
     def get_standby_daemons(self):
         return set([s['name'] for s in self.status().get_standbys()])
 
@@ -410,6 +409,45 @@ class MDSCluster(CephCluster):
 
         self._one_or_all(mds_id, set_block, in_parallel=False)
 
+    def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
+        """
+        Block (using iptables) communications from a provided MDS to other MDSs.
+        Block all ports that an MDS uses for communication.
+
+        :param blocked: True to block the MDS, False otherwise
+        :param mds_rank_1: MDS rank
+        :param mds_rank_2: MDS rank
+        :return:
+        """
+        da_flag = "-A" if blocked else "-D"
+
+        def set_block(mds_ids):
+            status = self.status()
+
+            mds = mds_ids[0]
+            remote = self.mon_manager.find_remote('mds', mds)
+            addrs = status.get_mds_addrs(mds)
+            for addr in addrs:
+                ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+                remote.run(
+                    args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+                          "comment", "--comment", "teuthology"])
+
+
+            mds = mds_ids[1]
+            remote = self.mon_manager.find_remote('mds', mds)
+            addrs = status.get_mds_addrs(mds)
+            for addr in addrs:
+                ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+                remote.run(
+                    args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+                          "comment", "--comment", "teuthology"])
+                remote.run(
+                    args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+                          "comment", "--comment", "teuthology"])
+
+        self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
+
     def clear_firewall(self):
         clear_firewall(self._ctx)
 
@@ -424,23 +462,30 @@ class MDSCluster(CephCluster):
 
         raise RuntimeError("Pool not found '{0}'".format(pool_name))
 
+    def delete_all_filesystems(self):
+        """
+        Remove all filesystems that exist, and any pools in use by them.
+        """
+        for fs in self.status().get_filesystems():
+            Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
+
+
 class Filesystem(MDSCluster):
     """
     This object is for driving a CephFS filesystem.  The MDS daemons driven by
     MDSCluster may be shared with other Filesystems.
     """
-    def __init__(self, ctx, fs_config=None, fscid=None, name=None, create=False,
-                 ec_profile=None):
+    def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
         super(Filesystem, self).__init__(ctx)
 
         self.name = name
-        self.ec_profile = ec_profile
         self.id = None
         self.metadata_pool_name = None
         self.metadata_overlay = False
         self.data_pool_name = None
         self.data_pools = None
         self.fs_config = fs_config
+        self.ec_profile = fs_config.get('ec_profile')
 
         client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
         self.client_id = client_list[0]
@@ -462,6 +507,12 @@ class Filesystem(MDSCluster):
         if not hasattr(self._ctx, "filesystem"):
             self._ctx.filesystem = self
 
+    def dead(self):
+        try:
+            return not bool(self.get_mds_map())
+        except FSMissing:
+            return True
+
     def get_task_status(self, status_key):
         return self.mon_manager.get_service_task_status("mds", status_key)
 
@@ -525,6 +576,9 @@ class Filesystem(MDSCluster):
         assert(mds_map['max_mds'] == max_mds)
         assert(mds_map['in'] == list(range(0, max_mds)))
 
+    def reset(self):
+        self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
+
     def fail(self):
         self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
 
@@ -557,6 +611,10 @@ class Filesystem(MDSCluster):
     def set_allow_new_snaps(self, yes):
         self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
 
+    def required_client_features(self, *args, **kwargs):
+        c = ["fs", "required_client_features", self.name, *args]
+        return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
+
     # In Octopus+, the PG count can be omitted to use the default. We keep the
     # hard-coded value for deployments of Mimic/Nautilus.
     pgs_per_fs_pool = 8
@@ -575,34 +633,41 @@ class Filesystem(MDSCluster):
 
         self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
                                          self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
+
+        self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+                                         data_pool_name, self.pgs_per_fs_pool.__str__())
+
         if self.metadata_overlay:
             self.mon_manager.raw_cluster_cmd('fs', 'new',
                                              self.name, self.metadata_pool_name, data_pool_name,
                                              '--allow-dangerous-metadata-overlay')
         else:
+            self.mon_manager.raw_cluster_cmd('fs', 'new',
+                                             self.name,
+                                             self.metadata_pool_name,
+                                             data_pool_name)
+
             if self.ec_profile and 'disabled' not in self.ec_profile:
+                ec_data_pool_name = data_pool_name + "_ec"
                 log.debug("EC profile is %s", self.ec_profile)
-                cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
+                cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
                 cmd.extend(self.ec_profile)
                 self.mon_manager.raw_cluster_cmd(*cmd)
                 self.mon_manager.raw_cluster_cmd(
                     'osd', 'pool', 'create',
-                    data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
-                    data_pool_name)
+                    ec_data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
+                    ec_data_pool_name)
                 self.mon_manager.raw_cluster_cmd(
                     'osd', 'pool', 'set',
-                    data_pool_name, 'allow_ec_overwrites', 'true')
-            else:
-                self.mon_manager.raw_cluster_cmd(
-                    'osd', 'pool', 'create',
-                    data_pool_name, self.pgs_per_fs_pool.__str__())
-            self.mon_manager.raw_cluster_cmd('fs', 'new',
-                                             self.name,
-                                             self.metadata_pool_name,
-                                             data_pool_name,
-                                             "--force")
+                    ec_data_pool_name, 'allow_ec_overwrites', 'true')
+                self.add_data_pool(ec_data_pool_name, create=False)
+                self.check_pool_application(ec_data_pool_name)
+
+                self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
+
         self.check_pool_application(self.metadata_pool_name)
         self.check_pool_application(data_pool_name)
+
         # Turn off spurious standby count warnings from modifying max_mds in tests.
         try:
             self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
@@ -618,6 +683,9 @@ class Filesystem(MDSCluster):
             if max_mds > 1:
                 self.set_max_mds(max_mds)
 
+            standby_replay = self.fs_config.get('standby_replay', False)
+            self.set_allow_standby_replay(standby_replay)
+
             # If absent will use the default value (60 seconds)
             session_timeout = self.fs_config.get('session_timeout', 60)
             if session_timeout != 60:
@@ -625,7 +693,64 @@ class Filesystem(MDSCluster):
 
         self.getinfo(refresh = True)
 
-        
+    def run_client_payload(self, cmd):
+        # avoid circular dep by importing here:
+        from tasks.cephfs.fuse_mount import FuseMount
+        d = misc.get_testdir(self._ctx)
+        m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
+        m.mount()
+        m.run_shell_payload(cmd)
+        m.umount_wait(require_clean=True)
+
+    def _remove_pool(self, name, **kwargs):
+        c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
+        return self.mon_manager.ceph(c, **kwargs)
+
+    def rm(self, **kwargs):
+        c = f'fs rm {self.name} --yes-i-really-mean-it'
+        return self.mon_manager.ceph(c, **kwargs)
+
+    def remove_pools(self, data_pools):
+        self._remove_pool(self.get_metadata_pool_name())
+        for poolname in data_pools:
+            try:
+                self._remove_pool(poolname)
+            except CommandFailedError as e:
+                # EBUSY, this data pool is used by two metadata pools, let the
+                # 2nd pass delete it
+                if e.exitstatus == EBUSY:
+                    pass
+                else:
+                    raise
+
+    def destroy(self, reset_obj_attrs=True):
+        log.info(f'Destroying file system {self.name} and related pools')
+
+        if self.dead():
+            log.debug('already dead...')
+            return
+
+        data_pools = self.get_data_pool_names(refresh=True)
+
+        # make sure no MDSs are attached to given FS.
+        self.fail()
+        self.rm()
+
+        self.remove_pools(data_pools)
+
+        if reset_obj_attrs:
+            self.id = None
+            self.name = None
+            self.metadata_pool_name = None
+            self.data_pool_name = None
+            self.data_pools = None
+
+    def recreate(self):
+        self.destroy()
+
+        self.create()
+        self.getinfo(refresh=True)
+
     def check_pool_application(self, pool_name):
         osd_map = self.mon_manager.get_osd_dump_json()
         for pool in osd_map['pools']:
@@ -634,7 +759,6 @@ class Filesystem(MDSCluster):
                     if not "cephfs" in pool['application_metadata']:
                         raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
                                            format(pool_name=pool_name))
-        
 
     def __del__(self):
         if getattr(self._ctx, "filesystem", None) == self:
@@ -671,6 +795,7 @@ class Filesystem(MDSCluster):
     def _df(self):
         return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
 
+    # may raise FSMissing
     def get_mds_map(self, status=None):
         if status is None:
             status = self.status()
@@ -772,15 +897,7 @@ class Filesystem(MDSCluster):
             mds.check_status()
 
         active_count = 0
-        try:
-            mds_map = self.get_mds_map(status=status)
-        except CommandFailedError as cfe:
-            # Old version, fall back to non-multi-fs commands
-            if cfe.exitstatus == errno.EINVAL:
-                mds_map = json.loads(
-                        self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
-            else:
-                raise
+        mds_map = self.get_mds_map(status=status)
 
         log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
 
@@ -804,7 +921,7 @@ class Filesystem(MDSCluster):
                 for mds_id, mds_status in mds_map['info'].items():
                     if mds_status['state'] == 'up:active':
                         try:
-                            daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
+                            daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
                         except CommandFailedError as cfe:
                             if cfe.exitstatus == errno.EINVAL:
                                 # Old version, can't do this check
@@ -937,70 +1054,46 @@ class Filesystem(MDSCluster):
 
             status = self.status()
 
-    def get_lone_mds_id(self):
+    def dencoder(self, obj_type, obj_blob):
+        args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
+        p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
+        return p.stdout.getvalue()
+
+    def rados(self, *args, **kwargs):
         """
-        Get a single MDS ID: the only one if there is only one
-        configured, else the only one currently holding a rank,
-        else raise an error.
+        Callout to rados CLI.
         """
-        if len(self.mds_ids) != 1:
-            alive = self.get_rank_names()
-            if len(alive) == 1:
-                return alive[0]
-            else:
-                raise ValueError("Explicit MDS argument required when multiple MDSs in use")
-        else:
-            return self.mds_ids[0]
 
-    def recreate(self):
-        log.info("Creating new filesystem")
-        self.delete_all_filesystems()
-        self.id = None
-        self.create()
+        return self.mon_manager.do_rados(*args, **kwargs)
 
-    def put_metadata_object_raw(self, object_id, infile):
+    def radosm(self, *args, **kwargs):
         """
-        Save an object to the metadata pool
+        Interact with the metadata pool via rados CLI.
         """
-        temp_bin_path = infile
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
-        ])
 
-    def get_metadata_object_raw(self, object_id):
+        return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
+
+    def radosmo(self, *args, stdout=BytesIO(), **kwargs):
         """
-        Retrieve an object from the metadata pool and store it in a file.
+        Interact with the metadata pool via rados CLI. Get the stdout.
         """
-        temp_bin_path = '/tmp/' + object_id + '.bin'
 
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
-        ])
-
-        return temp_bin_path
+        return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
 
     def get_metadata_object(self, object_type, object_id):
         """
         Retrieve an object from the metadata pool, pass it through
         ceph-dencoder to dump it to JSON, and return the decoded object.
         """
-        temp_bin_path = '/tmp/out.bin'
-
-        self.client_remote.run(args=[
-            'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
-        ])
 
-        dump_json = self.client_remote.sh([
-            'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
-        ]).strip()
+        o = self.radosmo(['get', object_id, '-'])
+        j = self.dencoder(object_type, o)
         try:
-            dump = json.loads(dump_json)
+            return json.loads(j)
         except (TypeError, ValueError):
-            log.error("Failed to decode JSON: '{0}'".format(dump_json))
+            log.error("Failed to decode JSON: '{0}'".format(j))
             raise
 
-        return dump
-
     def get_journal_version(self):
         """
         Read the JournalPointer and Journal::Header objects to learn the version of
@@ -1020,17 +1113,22 @@ class Filesystem(MDSCluster):
 
     def mds_asok(self, command, mds_id=None, timeout=None):
         if mds_id is None:
-            mds_id = self.get_lone_mds_id()
+            return self.rank_asok(command, timeout=timeout)
 
         return self.json_asok(command, 'mds', mds_id, timeout=timeout)
 
+    def mds_tell(self, command, mds_id=None):
+        if mds_id is None:
+            return self.rank_tell(command)
+
+        return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
+
     def rank_asok(self, command, rank=0, status=None, timeout=None):
         info = self.get_rank(rank=rank, status=status)
         return self.json_asok(command, 'mds', info['name'], timeout=timeout)
 
     def rank_tell(self, command, rank=0, status=None):
-        info = self.get_rank(rank=rank, status=status)
-        return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
+        return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
 
     def ranks_tell(self, command, status=None):
         if status is None:
@@ -1118,33 +1216,21 @@ class Filesystem(MDSCluster):
             else:
                 time.sleep(1)
 
-    def _read_data_xattr(self, ino_no, xattr_name, type, pool):
-        mds_id = self.mds_ids[0]
-        remote = self.mds_daemons[mds_id].remote
+    def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
         if pool is None:
             pool = self.get_data_pool_name()
 
         obj_name = "{0:x}.00000000".format(ino_no)
 
-        args = [
-            os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
-        ]
+        args = ["getxattr", obj_name, xattr_name]
         try:
-            proc = remote.run(args=args, stdout=BytesIO())
+            proc = self.rados(args, pool=pool, stdout=BytesIO())
         except CommandFailedError as e:
             log.error(e.__str__())
             raise ObjectNotFound(obj_name)
 
-        data = proc.stdout.getvalue()
-        dump = remote.sh(
-            [os.path.join(self._prefix, "ceph-dencoder"),
-                                            "type", type,
-                                            "import", "-",
-                                            "decode", "dump_json"],
-            stdin=data
-        )
-
-        return json.loads(dump.strip())
+        obj_blob = proc.stdout.getvalue()
+        return json.loads(self.dencoder(obj_type, obj_blob).strip())
 
     def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
         """
@@ -1157,16 +1243,12 @@ class Filesystem(MDSCluster):
         :param pool: name of data pool or None to use primary data pool
         :return: None
         """
-        remote = self.mds_daemons[self.mds_ids[0]].remote
         if pool is None:
             pool = self.get_data_pool_name()
 
         obj_name = "{0:x}.00000000".format(ino_no)
-        args = [
-            os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
-            obj_name, xattr_name, data
-        ]
-        remote.sh(args)
+        args = ["setxattr", obj_name, xattr_name, data]
+        self.rados(args, pool=pool)
 
     def read_backtrace(self, ino_no, pool=None):
         """
@@ -1224,7 +1306,7 @@ class Filesystem(MDSCluster):
             for n in range(0, ((size - 1) // stripe_size) + 1)
         ]
 
-        exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
+        exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
 
         return want_objects, exist_objects
 
@@ -1260,43 +1342,12 @@ class Filesystem(MDSCluster):
 
     def dirfrag_exists(self, ino, frag):
         try:
-            self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
+            self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
         except CommandFailedError:
             return False
         else:
             return True
 
-    def rados(self, args, pool=None, namespace=None, stdin_data=None,
-              stdin_file=None,
-              stdout_data=None):
-        """
-        Call into the `rados` CLI from an MDS
-        """
-
-        if pool is None:
-            pool = self.get_metadata_pool_name()
-
-        # Doesn't matter which MDS we use to run rados commands, they all
-        # have access to the pools
-        mds_id = self.mds_ids[0]
-        remote = self.mds_daemons[mds_id].remote
-
-        # NB we could alternatively use librados pybindings for this, but it's a one-liner
-        # using the `rados` CLI
-        args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
-                (["--namespace", namespace] if namespace else []) +
-                args)
-
-        if stdin_file is not None:
-            args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
-        if stdout_data is None:
-            stdout_data = StringIO()
-
-        p = remote.run(args=args,
-                       stdin=stdin_data,
-                       stdout=stdout_data)
-        return p.stdout.getvalue().strip()
-
     def list_dirfrag(self, dir_ino):
         """
         Read the named object and return the list of omap keys
@@ -1307,12 +1358,25 @@ class Filesystem(MDSCluster):
         dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
 
         try:
-            key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
+            key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
         except CommandFailedError as e:
             log.error(e.__str__())
             raise ObjectNotFound(dirfrag_obj_name)
 
-        return key_list_str.split("\n") if key_list_str else []
+        return key_list_str.strip().split("\n") if key_list_str else []
+
+    def get_meta_of_fs_file(self, dir_ino, obj_name, out):
+        """
+        get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
+        warning : The splitting of directory is not considered here.
+        """
+
+        dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
+        try:
+            self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
+        except CommandFailedError as e:
+            log.error(e.__str__())
+            raise ObjectNotFound(dir_ino)
 
     def erase_metadata_objects(self, prefix):
         """
@@ -1322,10 +1386,10 @@ class Filesystem(MDSCluster):
         This O(N) with the number of objects in the pool, so only suitable
         for use on toy test filesystems.
         """
-        all_objects = self.rados(["ls"]).split("\n")
+        all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
         matching_objects = [o for o in all_objects if o.startswith(prefix)]
         for o in matching_objects:
-            self.rados(["rm", o])
+            self.radosm(["rm", o])
 
     def erase_mds_objects(self, rank):
         """
@@ -1386,8 +1450,7 @@ class Filesystem(MDSCluster):
         it'll definitely have keys with perms to access cephfs metadata pool.  This is public
         so that tests can use this remote to go get locally written output files from the tools.
         """
-        mds_id = self.mds_ids[0]
-        return self.mds_daemons[mds_id].remote
+        return self.mon_manager.controller
 
     def journal_tool(self, args, rank, quiet=False):
         """
@@ -1396,6 +1459,13 @@ class Filesystem(MDSCluster):
         fs_rank = self._make_rank(rank)
         return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
 
+    def meta_tool(self, args, rank, quiet=False):
+        """
+        Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
+        """
+        fs_rank = self._make_rank(rank)
+        return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
+
     def table_tool(self, args, quiet=False):
         """
         Invoke cephfs-table-tool with the passed arguments, and return its stdout
@@ -1434,3 +1504,63 @@ class Filesystem(MDSCluster):
 
     def is_full(self):
         return self.is_pool_full(self.get_data_pool_name())
+
+    def authorize(self, client_id, caps=('/', 'rw')):
+        """
+        Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
+        keyring.
+
+        client_id: client id that will be authorized
+        caps: tuple containing the path and permission (can be r or rw)
+              respectively.
+        """
+        client_name = 'client.' + client_id
+        return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
+                                                client_name, *caps)
+
+    def grow(self, new_max_mds, status=None):
+        oldmax = self.get_var('max_mds', status=status)
+        assert(new_max_mds > oldmax)
+        self.set_max_mds(new_max_mds)
+        return self.wait_for_daemons()
+
+    def shrink(self, new_max_mds, status=None):
+        oldmax = self.get_var('max_mds', status=status)
+        assert(new_max_mds < oldmax)
+        self.set_max_mds(new_max_mds)
+        return self.wait_for_daemons()
+
+    def run_scrub(self, cmd, rank=0):
+        return self.rank_tell(["scrub"] + cmd, rank)
+
+    def get_scrub_status(self, rank=0):
+        return self.run_scrub(["status"], rank)
+
+    def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
+                                  timeout=300, reverse=False):
+        # time out after "timeout" seconds and assume as done
+        if result is None:
+            result = "no active scrubs running"
+        with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
+            while proceed():
+                out_json = self.rank_tell(["scrub", "status"], rank=rank)
+                assert out_json is not None
+                if not reverse:
+                    if result in out_json['status']:
+                        log.info("all active scrubs completed")
+                        return True
+                else:
+                    if result not in out_json['status']:
+                        log.info("all active scrubs completed")
+                        return True
+
+                if tag is not None:
+                    status = out_json['scrubs'][tag]
+                    if status is not None:
+                        log.info(f"scrub status for tag:{tag} - {status}")
+                    else:
+                        log.info(f"scrub has completed for tag:{tag}")
+                        return True
+
+        # timed out waiting for scrub to complete
+        return False