]> git.proxmox.com Git - ceph.git/blobdiff - ceph/qa/tasks/cephfs/mount.py
import ceph quincy 17.2.4
[ceph.git] / ceph / qa / tasks / cephfs / mount.py
index 7a833a1182539c2e6da2720f65879d27e1a8605c..d3e3e4587ace0495af7501c217ae832b35b46814 100644 (file)
-from contextlib import contextmanager
+import hashlib
 import json
 import logging
 import datetime
-import six
+import os
+import re
 import time
-from six import StringIO
+
+from io import StringIO
+from contextlib import contextmanager
 from textwrap import dedent
-import os
+from IPy import IP
+
+from teuthology.contextutil import safe_while
+from teuthology.misc import get_file, write_file
 from teuthology.orchestra import run
-from teuthology.orchestra.run import CommandFailedError, ConnectionLostError, Raw
+from teuthology.orchestra.run import Raw
+from teuthology.exceptions import CommandFailedError, ConnectionLostError
+
 from tasks.cephfs.filesystem import Filesystem
 
 log = logging.getLogger(__name__)
 
-
 class CephFSMount(object):
-    def __init__(self, ctx, test_dir, client_id, client_remote):
+    def __init__(self, ctx, test_dir, client_id, client_remote,
+                 client_keyring_path=None, hostfs_mntpt=None,
+                 cephfs_name=None, cephfs_mntpt=None, brxnet=None):
         """
         :param test_dir: Global teuthology test dir
         :param client_id: Client ID, the 'foo' in client.foo
-        :param client_remote: Remote instance for the host where client will run
+        :param client_keyring_path: path to keyring for given client_id
+        :param client_remote: Remote instance for the host where client will
+                              run
+        :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
+                             be mounted
+        :param cephfs_name: Name of Ceph FS to be mounted
+        :param cephfs_mntpt: Path to directory inside Ceph FS that will be
+                             mounted as root
         """
-
+        self.mounted = False
         self.ctx = ctx
         self.test_dir = test_dir
+
+        self._verify_attrs(client_id=client_id,
+                           client_keyring_path=client_keyring_path,
+                           hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
+                           cephfs_mntpt=cephfs_mntpt)
+
         self.client_id = client_id
+        self.client_keyring_path = client_keyring_path
         self.client_remote = client_remote
-        self.mountpoint_dir_name = 'mnt.{id}'.format(id=self.client_id)
-        self._mountpoint = None
+        if hostfs_mntpt:
+            self.hostfs_mntpt = hostfs_mntpt
+            self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt)
+        else:
+            self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}')
+        self.cephfs_name = cephfs_name
+        self.cephfs_mntpt = cephfs_mntpt
+
+        self.cluster_name = 'ceph' # TODO: use config['cluster']
+
         self.fs = None
 
+        self._netns_name = None
+        self.nsid = -1
+        if brxnet is None:
+            self.ceph_brx_net = '192.168.0.0/16'
+        else:
+            self.ceph_brx_net = brxnet
+
         self.test_files = ['a', 'b', 'c']
 
         self.background_procs = []
 
+    # This will cleanup the stale netnses, which are from the
+    # last failed test cases.
+    @staticmethod
+    def cleanup_stale_netnses_and_bridge(remote):
+        p = remote.run(args=['ip', 'netns', 'list'],
+                       stdout=StringIO(), timeout=(5*60))
+        p = p.stdout.getvalue().strip()
+
+        # Get the netns name list
+        netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p)
+
+        # Remove the stale netnses
+        for ns in netns_list:
+            ns_name = ns.split()[0]
+            args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)]
+            try:
+                remote.run(args=args, timeout=(5*60), omit_sudo=False)
+            except Exception:
+                pass
+
+        # Remove the stale 'ceph-brx'
+        try:
+            args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
+            remote.run(args=args, timeout=(5*60), omit_sudo=False)
+        except Exception:
+            pass
+
+    def _parse_netns_name(self):
+        self._netns_name = '-'.join(["ceph-ns",
+                                     re.sub(r'/+', "-", self.mountpoint)])
+
     @property
     def mountpoint(self):
-        if self._mountpoint == None:
-            self._mountpoint= os.path.join(
-                self.test_dir, '{dir_name}'.format(dir_name=self.mountpoint_dir_name))
-        return self._mountpoint
+        if self.hostfs_mntpt == None:
+            self.hostfs_mntpt = os.path.join(self.test_dir,
+                                             self.hostfs_mntpt_dirname)
+        return self.hostfs_mntpt
 
     @mountpoint.setter
     def mountpoint(self, path):
         if not isinstance(path, str):
             raise RuntimeError('path should be of str type.')
-        self._mountpoint = path
+        self._mountpoint = self.hostfs_mntpt = path
+
+    @property
+    def netns_name(self):
+        if self._netns_name == None:
+            self._parse_netns_name()
+        return self._netns_name
+
+    @netns_name.setter
+    def netns_name(self, name):
+        self._netns_name = name
+
+    def assert_that_ceph_fs_exists(self):
+        output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls")
+        if self.cephfs_name:
+            assert self.cephfs_name in output, \
+                'expected ceph fs is not present on the cluster'
+            log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
+        else:
+            assert 'No filesystems enabled' not in output, \
+                'ceph cluster has no ceph fs, not even the default ceph fs'
+            log.info('Mounting default Ceph FS; just confirmed its presence on cluster')
+
+    def assert_and_log_minimum_mount_details(self):
+        """
+        Make sure we have minimum details required for mounting. Ideally, this
+        method should be called at the beginning of the mount method.
+        """
+        if not self.client_id or not self.client_remote or \
+           not self.hostfs_mntpt:
+            errmsg = ('Mounting CephFS requires that at least following '
+                      'details to be provided -\n'
+                      '1. the client ID,\n2. the mountpoint and\n'
+                      '3. the remote machine where CephFS will be mounted.\n')
+            raise RuntimeError(errmsg)
+
+        self.assert_that_ceph_fs_exists()
+
+        log.info('Mounting Ceph FS. Following are details of mount; remember '
+                 '"None" represents Python type None -')
+        log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
+        log.info(f'self.client.name = client.{self.client_id}')
+        log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}')
+        log.info(f'self.cephfs_name = {self.cephfs_name}')
+        log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}')
+        log.info(f'self.client_keyring_path = {self.client_keyring_path}')
+        if self.client_keyring_path:
+            log.info('keyring content -\n' +
+                     get_file(self.client_remote, self.client_keyring_path,
+                              sudo=True).decode())
 
     def is_mounted(self):
-        raise NotImplementedError()
+        return self.mounted
 
     def setupfs(self, name=None):
         if name is None and self.fs is not None:
@@ -59,18 +177,282 @@ class CephFSMount(object):
         self.fs.wait_for_daemons()
         log.info('Ready to start {}...'.format(type(self).__name__))
 
-    def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
+    def _create_mntpt(self):
+        self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}',
+                               timeout=60)
+        # Use 0000 mode to prevent undesired modifications to the mountpoint on
+        # the local file system.
+        self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}',
+                               timeout=60)
+
+    @property
+    def _nsenter_args(self):
+        return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
+
+    def _set_filemode_on_mntpt(self):
+        stderr = StringIO()
+        try:
+            self.client_remote.run(
+                args=['sudo', 'chmod', '1777', self.hostfs_mntpt],
+                stderr=stderr, timeout=(5*60))
+        except CommandFailedError:
+            # the client does not have write permissions in the caps it holds
+            # for the Ceph FS that was just mounted.
+            if 'permission denied' in stderr.getvalue().lower():
+                pass
+
+    def _setup_brx_and_nat(self):
+        # The ip for ceph-brx should be
+        ip = IP(self.ceph_brx_net)[-2]
+        mask = self.ceph_brx_net.split('/')[1]
+        brd = IP(self.ceph_brx_net).broadcast()
+
+        brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
+                                     stdout=StringIO(), timeout=(5*60))
+        brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
+        if brx:
+            # If the 'ceph-brx' already exists, then check whether
+            # the new net is conflicting with it
+            _ip, _mask = brx[0].split()[1].split('/', 1)
+            if _ip != "{}".format(ip) or _mask != mask:
+                raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask))
+
+        # Setup the ceph-brx and always use the last valid IP
+        if not brx:
+            log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask))
+
+            self.run_shell_payload(f"""
+                set -e
+                sudo ip link add name ceph-brx type bridge
+                sudo ip addr flush dev ceph-brx
+                sudo ip link set ceph-brx up
+                sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
+            """, timeout=(5*60), omit_sudo=False, cwd='/')
+        
+        args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
+        self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
+        
+        # Setup the NAT
+        p = self.client_remote.run(args=['route'], stderr=StringIO(),
+                                   stdout=StringIO(), timeout=(5*60))
+        p = re.findall(r'default .*', p.stdout.getvalue())
+        if p == False:
+            raise RuntimeError("No default gw found")
+        gw = p[0].split()[7]
+
+        self.run_shell_payload(f"""
+            set -e
+            sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
+            sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
+            sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
+        """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+    def _setup_netns(self):
+        p = self.client_remote.run(args=['ip', 'netns', 'list'],
+                                   stderr=StringIO(), stdout=StringIO(),
+                                   timeout=(5*60)).stdout.getvalue().strip()
+
+        # Get the netns name list
+        netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p)
+
+        out = re.search(r"{0}".format(self.netns_name), p)
+        if out is None:
+            # Get an uniq nsid for the new netns
+            nsid = 0
+            p = self.client_remote.run(args=['ip', 'netns', 'list-id'],
+                                       stderr=StringIO(), stdout=StringIO(),
+                                       timeout=(5*60)).stdout.getvalue()
+            while True:
+                out = re.search(r"nsid {} ".format(nsid), p)
+                if out is None:
+                    break
+
+                nsid += 1
+
+            # Add one new netns and set it id
+            self.run_shell_payload(f"""
+                set -e
+                sudo ip netns add {self.netns_name}
+                sudo ip netns set {self.netns_name} {nsid}
+            """, timeout=(5*60), omit_sudo=False, cwd='/')
+            self.nsid = nsid;
+        else:
+            # The netns already exists and maybe suspended by self.kill()
+            self.resume_netns();
+
+            nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1))
+            self.nsid = nsid;
+            return
+
+        # Get one ip address for netns
+        ips = IP(self.ceph_brx_net)
+        for ip in ips:
+            found = False
+            if ip == ips[0]:
+                continue
+            if ip == ips[-2]:
+                raise RuntimeError("we have ran out of the ip addresses")
+
+            for ns in netns_list:
+                ns_name = ns.split()[0]
+                args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr']
+                try:
+                    p = self.client_remote.run(args=args, stderr=StringIO(),
+                                               stdout=StringIO(), timeout=(5*60),
+                                               omit_sudo=False)
+                    q = re.search("{0}".format(ip), p.stdout.getvalue())
+                    if q is not None:
+                        found = True
+                        break
+                except CommandFailedError:
+                    if "No such file or directory" in p.stderr.getvalue():
+                        pass
+                    if "Invalid argument" in p.stderr.getvalue():
+                        pass
+
+            if found == False:
+                break
+
+        mask = self.ceph_brx_net.split('/')[1]
+        brd = IP(self.ceph_brx_net).broadcast()
+
+        log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask))
+
+        # Setup the veth interfaces
+        brxip = IP(self.ceph_brx_net)[-2]
+        self.run_shell_payload(f"""
+            set -e
+            sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
+            sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
+            sudo ip netns exec {self.netns_name} ip link set veth0 up
+            sudo ip netns exec {self.netns_name} ip link set lo up
+            sudo ip netns exec {self.netns_name} ip route add default via {brxip}
+        """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+        # Bring up the brx interface and join it to 'ceph-brx'
+        self.run_shell_payload(f"""
+            set -e
+            sudo ip link set brx.{nsid} up
+            sudo ip link set dev brx.{nsid} master ceph-brx
+        """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+    def _cleanup_netns(self):
+        if self.nsid == -1:
+            return
+        log.info("Removing the netns '{0}'".format(self.netns_name))
+
+        # Delete the netns and the peer veth interface
+        self.run_shell_payload(f"""
+            set -e
+            sudo ip link set brx.{self.nsid} down
+            sudo ip link delete dev brx.{self.nsid}
+            sudo ip netns delete {self.netns_name}
+        """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+        self.nsid = -1
+
+    def _cleanup_brx_and_nat(self):
+        brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
+                                     stdout=StringIO(), timeout=(5*60))
+        brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
+        if not brx:
+            return
+
+        # If we are the last netns, will delete the ceph-brx
+        args = ['sudo', 'ip', 'link', 'show']
+        p = self.client_remote.run(args=args, stdout=StringIO(),
+                                   timeout=(5*60), omit_sudo=False)
+        _list = re.findall(r'brx\.', p.stdout.getvalue().strip())
+        if len(_list) != 0:
+            return
+
+        log.info("Removing the 'ceph-brx'")
+
+        self.run_shell_payload("""
+            set -e
+            sudo ip link set ceph-brx down
+            sudo ip link delete ceph-brx
+        """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+        # Drop the iptables NAT rules
+        ip = IP(self.ceph_brx_net)[-2]
+        mask = self.ceph_brx_net.split('/')[1]
+
+        p = self.client_remote.run(args=['route'], stderr=StringIO(),
+                                   stdout=StringIO(), timeout=(5*60))
+        p = re.findall(r'default .*', p.stdout.getvalue())
+        if p == False:
+            raise RuntimeError("No default gw found")
+        gw = p[0].split()[7]
+        self.run_shell_payload(f"""
+            set -e
+            sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
+            sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
+            sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
+        """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+    def setup_netns(self):
+        """
+        Setup the netns for the mountpoint.
+        """
+        log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+        self._setup_brx_and_nat()
+        self._setup_netns()
+
+    def cleanup_netns(self):
+        """
+        Cleanup the netns for the mountpoint.
+        """
+        # We will defer cleaning the netnses and bridge until the last
+        # mountpoint is unmounted, this will be a temporary work around
+        # for issue#46282.
+
+        # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+        # self._cleanup_netns()
+        # self._cleanup_brx_and_nat()
+
+    def suspend_netns(self):
+        """
+        Suspend the netns veth interface.
+        """
+        if self.nsid == -1:
+            return
+
+        log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+
+        args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down']
+        self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
+
+    def resume_netns(self):
+        """
+        Resume the netns veth interface.
+        """
+        if self.nsid == -1:
+            return
+
+        log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+
+        args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
+        self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
+
+    def mount(self, mntopts=[], check_status=True, **kwargs):
+        """
+        kwargs expects its members to be same as the arguments accepted by
+        self.update_attrs().
+        """
         raise NotImplementedError()
 
-    def mount_wait(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
-        self.mount(mount_path=mount_path, mount_fs_name=mount_fs_name, mountpoint=mountpoint,
-                   mount_options=mount_options)
+    def mount_wait(self, **kwargs):
+        """
+        Accepts arguments same as self.mount().
+        """
+        self.mount(**kwargs)
         self.wait_until_mounted()
 
     def umount(self):
         raise NotImplementedError()
 
-    def umount_wait(self, force=False, require_clean=False):
+    def umount_wait(self, force=False, require_clean=False, timeout=None):
         """
 
         :param force: Expect that the mount will not shutdown cleanly: kill
@@ -78,25 +460,125 @@ class CephFSMount(object):
         :param require_clean: Wait for the Ceph client associated with the
                               mount (e.g. ceph-fuse) to terminate, and
                               raise if it doesn't do so cleanly.
+        :param timeout: amount of time to be waited for umount command to finish
         :return:
         """
         raise NotImplementedError()
 
-    def kill_cleanup(self):
-        raise NotImplementedError()
+    def _verify_attrs(self, **kwargs):
+        """
+        Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
+        cephfs_name, cephfs_mntpt are either type str or None.
+        """
+        for k, v in kwargs.items():
+            if v is not None and not isinstance(v, str):
+                raise RuntimeError('value of attributes should be either str '
+                                   f'or None. {k} - {v}')
+
+    def update_attrs(self, client_id=None, client_keyring_path=None,
+                     client_remote=None, hostfs_mntpt=None, cephfs_name=None,
+                     cephfs_mntpt=None):
+        if not (client_id or client_keyring_path or client_remote or
+                cephfs_name or cephfs_mntpt or hostfs_mntpt):
+            return
+
+        self._verify_attrs(client_id=client_id,
+                           client_keyring_path=client_keyring_path,
+                           hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
+                           cephfs_mntpt=cephfs_mntpt)
+
+        if client_id:
+            self.client_id = client_id
+        if client_keyring_path:
+            self.client_keyring_path = client_keyring_path
+        if client_remote:
+            self.client_remote = client_remote
+        if hostfs_mntpt:
+            self.hostfs_mntpt = hostfs_mntpt
+        if cephfs_name:
+            self.cephfs_name = cephfs_name
+        if cephfs_mntpt:
+            self.cephfs_mntpt = cephfs_mntpt
+
+    def remount(self, **kwargs):
+        """
+        Update mount object's attributes and attempt remount with these
+        new values for these attrbiutes.
+
+        1. Run umount_wait().
+        2. Run update_attrs().
+        3. Run mount().
+
+        Accepts arguments of self.mount() and self.update_attrs() with 1
+        exception: wait accepted too which can be True or False.
+        """
+        self.umount_wait()
+        assert not self.mounted
+
+        mntopts = kwargs.pop('mntopts', [])
+        check_status = kwargs.pop('check_status', True)
+        wait = kwargs.pop('wait', True)
+
+        self.update_attrs(**kwargs)
+
+        retval = self.mount(mntopts=mntopts, check_status=check_status)
+        # avoid this scenario (again): mount command might've failed and
+        # check_status might have silenced the exception, yet we attempt to
+        # wait which might lead to an error.
+        if retval is None and wait:
+            self.wait_until_mounted()
+
+        return retval
 
     def kill(self):
-        raise NotImplementedError()
+        """
+        Suspend the netns veth interface to make the client disconnected
+        from the ceph cluster
+        """
+        log.info('Killing connection on {0}...'.format(self.client_remote.name))
+        self.suspend_netns()
+
+    def kill_cleanup(self):
+        """
+        Follow up ``kill`` to get to a clean unmounted state.
+        """
+        log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name))
+        self.umount_wait(force=True)
 
     def cleanup(self):
-        raise NotImplementedError()
+        """
+        Remove the mount point.
+
+        Prerequisite: the client is not mounted.
+        """
+        log.info('Cleaning up mount {0}'.format(self.client_remote.name))
+        stderr = StringIO()
+        try:
+            self.client_remote.run(args=['rmdir', '--', self.mountpoint],
+                                   cwd=self.test_dir, stderr=stderr,
+                                   timeout=(60*5), check_status=False)
+        except CommandFailedError:
+            if "no such file or directory" not in stderr.getvalue().lower():
+                raise
+
+        self.cleanup_netns()
 
     def wait_until_mounted(self):
         raise NotImplementedError()
 
     def get_keyring_path(self):
+        # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
         return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
 
+    def get_key_from_keyfile(self):
+        # XXX: don't call run_shell(), since CephFS might be unmounted.
+        keyring = self.client_remote.run(
+            args=['sudo', 'cat', self.client_keyring_path], stdout=StringIO(),
+            omit_sudo=False).stdout.getvalue()
+        for line in keyring.split('\n'):
+            if line.find('key') != -1:
+                return line[line.find('=') + 1 : ].strip()
+
     @property
     def config_path(self):
         """
@@ -106,7 +588,7 @@ class CephFSMount(object):
         return "/etc/ceph/ceph.conf"
 
     @contextmanager
-    def mounted(self):
+    def mounted_wait(self):
         """
         A context manager, from an initially unmounted state, to mount
         this, yield, and then unmount and clean up.
@@ -118,14 +600,6 @@ class CephFSMount(object):
         finally:
             self.umount_wait()
 
-    def is_blacklisted(self):
-        addr = self.get_global_addr()
-        blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "ls", "--format=json"))
-        for b in blacklist:
-            if addr == b["addr"]:
-                return True
-        return False
-
     def create_file(self, filename='testfile', dirname=None, user=None,
                     check_status=True):
         assert(self.is_mounted())
@@ -135,9 +609,9 @@ class CephFSMount(object):
                 if os.path.isabs(dirname):
                     path = os.path.join(dirname, filename)
                 else:
-                    path = os.path.join(self.mountpoint, dirname, filename)
+                    path = os.path.join(self.hostfs_mntpt, dirname, filename)
             else:
-                path = os.path.join(self.mountpoint, filename)
+                path = os.path.join(self.hostfs_mntpt, filename)
         else:
             path = filename
 
@@ -154,7 +628,7 @@ class CephFSMount(object):
         for suffix in self.test_files:
             log.info("Creating file {0}".format(suffix))
             self.client_remote.run(args=[
-                'sudo', 'touch', os.path.join(self.mountpoint, suffix)
+                'touch', os.path.join(self.hostfs_mntpt, suffix)
             ])
 
     def test_create_file(self, filename='testfile', dirname=None, user=None,
@@ -168,48 +642,145 @@ class CephFSMount(object):
         for suffix in self.test_files:
             log.info("Checking file {0}".format(suffix))
             r = self.client_remote.run(args=[
-                'sudo', 'ls', os.path.join(self.mountpoint, suffix)
+                'ls', os.path.join(self.hostfs_mntpt, suffix)
             ], check_status=False)
             if r.exitstatus != 0:
                 raise RuntimeError("Expected file {0} not found".format(suffix))
 
+    def write_file(self, path, data, perms=None):
+        """
+        Write the given data at the given path and set the given perms to the
+        file on the path.
+        """
+        if path.find(self.hostfs_mntpt) == -1:
+            path = os.path.join(self.hostfs_mntpt, path)
+
+        write_file(self.client_remote, path, data)
+
+        if perms:
+            self.run_shell(args=f'chmod {perms} {path}')
+
+    def read_file(self, path):
+        """
+        Return the data from the file on given path.
+        """
+        if path.find(self.hostfs_mntpt) == -1:
+            path = os.path.join(self.hostfs_mntpt, path)
+
+        return self.run_shell(args=['cat', path]).\
+            stdout.getvalue().strip()
+
     def create_destroy(self):
         assert(self.is_mounted())
 
         filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
         log.debug("Creating test file {0}".format(filename))
         self.client_remote.run(args=[
-            'sudo', 'touch', os.path.join(self.mountpoint, filename)
+            'touch', os.path.join(self.hostfs_mntpt, filename)
         ])
         log.debug("Deleting test file {0}".format(filename))
         self.client_remote.run(args=[
-            'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
+            'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
         ])
 
-    def _run_python(self, pyscript, py_version='python3'):
-        return self.client_remote.run(
-               args=['sudo', 'adjust-ulimits', 'daemon-helper', 'kill',
-                     py_version, '-c', pyscript], wait=False, stdin=run.PIPE,
-               stdout=StringIO())
+    def _run_python(self, pyscript, py_version='python3', sudo=False):
+        args = []
+        if sudo:
+            args.append('sudo')
+        args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript]
+        return self.client_remote.run(args=args, wait=False, stdin=run.PIPE, stdout=StringIO())
 
-    def run_python(self, pyscript, py_version='python3'):
-        p = self._run_python(pyscript, py_version)
+    def run_python(self, pyscript, py_version='python3', sudo=False):
+        p = self._run_python(pyscript, py_version, sudo=sudo)
         p.wait()
-        return six.ensure_str(p.stdout.getvalue().strip())
+        return p.stdout.getvalue().strip()
+
+    def run_shell(self, args, timeout=900, **kwargs):
+        args = args.split() if isinstance(args, str) else args
+        omit_sudo = kwargs.pop('omit_sudo', False)
+        sudo = kwargs.pop('sudo', False)
+        cwd = kwargs.pop('cwd', self.mountpoint)
+        stdout = kwargs.pop('stdout', StringIO())
+        stderr = kwargs.pop('stderr', StringIO())
+
+        if sudo:
+            args.insert(0, 'sudo')
+
+        return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
+                                      stdout=stdout, stderr=stderr,
+                                      omit_sudo=omit_sudo, **kwargs)
 
     def run_shell_payload(self, payload, **kwargs):
         return self.run_shell(["bash", "-c", Raw(f"'{payload}'")], **kwargs)
 
-    def run_shell(self, args, wait=True, stdin=None, check_status=True,
-                  omit_sudo=True):
+    def run_as_user(self, **kwargs):
+        """
+        Besides the arguments defined for run_shell() this method also
+        accepts argument 'user'.
+        """
+        args = kwargs.pop('args')
+        user = kwargs.pop('user')
         if isinstance(args, str):
-            args = args.split()
+            args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args]
+        elif isinstance(args, list):
+            cmdlist = args
+            cmd = ''
+            for i in cmdlist:
+                cmd = cmd + i + ' '
+            # get rid of extra space at the end.
+            cmd = cmd[:-1]
+
+            args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd]
 
-        args = ["cd", self.mountpoint, run.Raw('&&'), "sudo"] + args
-        return self.client_remote.run(args=args, stdout=StringIO(),
-                                      stderr=StringIO(), wait=wait,
-                                      stdin=stdin, check_status=check_status,
-                                      omit_sudo=omit_sudo)
+        kwargs['args'] = args
+        return self.run_shell(**kwargs)
+
+    def run_as_root(self, **kwargs):
+        """
+        Accepts same arguments as run_shell().
+        """
+        kwargs['user'] = 'root'
+        return self.run_as_user(**kwargs)
+
+    def _verify(self, proc, retval=None, errmsg=None):
+        if retval:
+            msg = ('expected return value: {}\nreceived return value: '
+                   '{}\n'.format(retval, proc.returncode))
+            assert proc.returncode == retval, msg
+
+        if errmsg:
+            stderr = proc.stderr.getvalue().lower()
+            msg = ('didn\'t find given string in stderr -\nexpected string: '
+                   '{}\nreceived error message: {}\nnote: received error '
+                   'message is converted to lowercase'.format(errmsg, stderr))
+            assert errmsg in stderr, msg
+
+    def negtestcmd(self, args, retval=None, errmsg=None, stdin=None,
+                   cwd=None, wait=True):
+        """
+        Conduct a negative test for the given command.
+
+        retval and errmsg are parameters to confirm the cause of command
+        failure.
+        """
+        proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
+                              check_status=False)
+        self._verify(proc, retval, errmsg)
+        return proc
+
+    def negtestcmd_as_user(self, args, user, retval=None, errmsg=None,
+                           stdin=None, cwd=None, wait=True):
+        proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
+                                cwd=cwd, check_status=False)
+        self._verify(proc, retval, errmsg)
+        return proc
+
+    def negtestcmd_as_root(self, args, retval=None, errmsg=None, stdin=None,
+                           cwd=None, wait=True):
+        proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
+                                check_status=False)
+        self._verify(proc, retval, errmsg)
+        return proc
 
     def open_no_data(self, basename):
         """
@@ -217,7 +788,7 @@ class CephFSMount(object):
         """
         assert(self.is_mounted())
 
-        path = os.path.join(self.mountpoint, basename)
+        path = os.path.join(self.hostfs_mntpt, basename)
 
         p = self._run_python(dedent(
             """
@@ -236,7 +807,7 @@ class CephFSMount(object):
         """
         assert(self.is_mounted())
 
-        path = os.path.join(self.mountpoint, basename)
+        path = os.path.join(self.hostfs_mntpt, basename)
 
         if write:
             pyscript = dedent("""
@@ -268,27 +839,45 @@ class CephFSMount(object):
 
         return rproc
 
-    def wait_for_dir_empty(self, dirname, timeout=30):
-        i = 0
-        dirpath = os.path.join(self.mountpoint, dirname)
-        while i < timeout:
-            nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries"))
-            if nr_entries == 0:
-                log.debug("Directory {0} seen empty from {1} after {2}s ".format(
-                    dirname, self.client_id, i))
-                return
-            else:
+    def open_dir_background(self, basename):
+        """
+        Create and hold a capability to a directory.
+        """
+        assert(self.is_mounted())
+
+        path = os.path.join(self.hostfs_mntpt, basename)
+
+        pyscript = dedent("""
+            import time
+            import os
+
+            os.mkdir("{path}")
+            fd = os.open("{path}", os.O_RDONLY)
+            while True:
                 time.sleep(1)
-                i += 1
+            """).format(path=path)
+
+        rproc = self._run_python(pyscript)
+        self.background_procs.append(rproc)
+
+        self.wait_for_visible(basename)
+
+        return rproc
 
-        raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format(
-            i, dirname, self.client_id))
+    def wait_for_dir_empty(self, dirname, timeout=30):
+        dirpath = os.path.join(self.hostfs_mntpt, dirname)
+        with safe_while(sleep=5, tries=(timeout//5)) as proceed:
+            while proceed():
+                p = self.run_shell_payload(f"stat -c %h {dirpath}")
+                nr_links = int(p.stdout.getvalue().strip())
+                if nr_links == 2:
+                    return
 
     def wait_for_visible(self, basename="background_file", timeout=30):
         i = 0
         while i < timeout:
             r = self.client_remote.run(args=[
-                'sudo', 'ls', os.path.join(self.mountpoint, basename)
+                'stat', os.path.join(self.hostfs_mntpt, basename)
             ], check_status=False)
             if r.exitstatus == 0:
                 log.debug("File {0} became visible from {1} after {2}s".format(
@@ -307,7 +896,7 @@ class CephFSMount(object):
         """
         assert(self.is_mounted())
 
-        path = os.path.join(self.mountpoint, basename)
+        path = os.path.join(self.hostfs_mntpt, basename)
 
         script_builder = """
             import time
@@ -335,7 +924,7 @@ class CephFSMount(object):
     def lock_and_release(self, basename="background_file"):
         assert(self.is_mounted())
 
-        path = os.path.join(self.mountpoint, basename)
+        path = os.path.join(self.hostfs_mntpt, basename)
 
         script = """
             import time
@@ -355,7 +944,7 @@ class CephFSMount(object):
     def check_filelock(self, basename="background_file", do_flock=True):
         assert(self.is_mounted())
 
-        path = os.path.join(self.mountpoint, basename)
+        path = os.path.join(self.hostfs_mntpt, basename)
 
         script_builder = """
             import fcntl
@@ -386,7 +975,7 @@ class CephFSMount(object):
 
         log.info("check lock on file {0}".format(basename))
         self.client_remote.run(args=[
-            'sudo', 'python3', '-c', pyscript
+            'python3', '-c', pyscript
         ])
 
     def write_background(self, basename="background_file", loop=False):
@@ -397,7 +986,7 @@ class CephFSMount(object):
         """
         assert(self.is_mounted())
 
-        path = os.path.join(self.mountpoint, basename)
+        path = os.path.join(self.hostfs_mntpt, basename)
 
         pyscript = dedent("""
             import os
@@ -441,12 +1030,13 @@ class CephFSMount(object):
                     val = zlib.crc32(str(i).encode('utf-8')) & 7
                     f.write(chr(val))
         """.format(
-            path=os.path.join(self.mountpoint, filename),
+            path=os.path.join(self.hostfs_mntpt, filename),
             size=size
         )))
 
     def validate_test_pattern(self, filename, size):
         log.info("Validating {0} bytes from {1}".format(size, filename))
+        # Use sudo because cephfs-data-scan may recreate the file with owner==root
         return self.run_python(dedent("""
             import zlib
             path = "{path}"
@@ -461,9 +1051,9 @@ class CephFSMount(object):
                 if b != chr(val):
                     raise RuntimeError("Bad data at offset {{0}}".format(i))
         """.format(
-            path=os.path.join(self.mountpoint, filename),
+            path=os.path.join(self.hostfs_mntpt, filename),
             size=size
-        )))
+        )), sudo=True)
 
     def open_n_background(self, fs_path, count):
         """
@@ -474,7 +1064,7 @@ class CephFSMount(object):
         """
         assert(self.is_mounted())
 
-        abs_path = os.path.join(self.mountpoint, fs_path)
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
 
         pyscript = dedent("""
             import sys
@@ -501,30 +1091,49 @@ class CephFSMount(object):
         self.background_procs.append(rproc)
         return rproc
 
-    def create_n_files(self, fs_path, count, sync=False):
+    def create_n_files(self, fs_path, count, sync=False, dirsync=False, unlink=False, finaldirsync=False):
+        """
+        Create n files.
+
+        :param sync: sync the file after writing
+        :param dirsync: sync the containing directory after closing the file
+        :param unlink: unlink the file after closing
+        :param finaldirsync: sync the containing directory after closing the last file
+        """
+
         assert(self.is_mounted())
 
-        abs_path = os.path.join(self.mountpoint, fs_path)
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
 
-        pyscript = dedent("""
-            import sys
-            import time
+        pyscript = dedent(f"""
             import os
 
             n = {count}
-            abs_path = "{abs_path}"
+            path = "{abs_path}"
 
-            if not os.path.exists(os.path.dirname(abs_path)):
-                os.makedirs(os.path.dirname(abs_path))
+            dpath = os.path.dirname(path)
+            fnameprefix = os.path.basename(path)
+            os.makedirs(dpath, exist_ok=True)
 
-            for i in range(0, n):
-                fname = "{{0}}_{{1}}".format(abs_path, i)
-                with open(fname, 'w') as f:
-                    f.write('content')
-                    if {sync}:
-                        f.flush()
-                        os.fsync(f.fileno())
-            """).format(abs_path=abs_path, count=count, sync=str(sync))
+            try:
+                dirfd = os.open(dpath, os.O_DIRECTORY)
+
+                for i in range(n):
+                    fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
+                    with open(fpath, 'w') as f:
+                        f.write(f"{{i}}")
+                        if {sync}:
+                            f.flush()
+                            os.fsync(f.fileno())
+                    if {unlink}:
+                        os.unlink(fpath)
+                    if {dirsync}:
+                        os.fsync(dirfd)
+                if {finaldirsync}:
+                    os.fsync(dirfd)
+            finally:
+                os.close(dirfd)
+            """)
 
         self.run_python(pyscript)
 
@@ -571,10 +1180,27 @@ class CephFSMount(object):
     def get_osd_epoch(self):
         raise NotImplementedError()
 
+    def get_op_read_count(self):
+        raise NotImplementedError()
+
+    def readlink(self, fs_path):
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
+
+        pyscript = dedent("""
+            import os
+
+            print(os.readlink("{path}"))
+            """).format(path=abs_path)
+
+        proc = self._run_python(pyscript)
+        proc.wait()
+        return str(proc.stdout.getvalue().strip())
+
+
     def lstat(self, fs_path, follow_symlinks=False, wait=True):
         return self.stat(fs_path, follow_symlinks=False, wait=True)
 
-    def stat(self, fs_path, follow_symlinks=True, wait=True):
+    def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs):
         """
         stat a file, and return the result as a dictionary like this:
         {
@@ -592,7 +1218,7 @@ class CephFSMount(object):
 
         Raises exception on absent file.
         """
-        abs_path = os.path.join(self.mountpoint, fs_path)
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
         if follow_symlinks:
             stat_call = "os.stat('" + abs_path + "')"
         else:
@@ -614,7 +1240,7 @@ class CephFSMount(object):
                 dict([(a, getattr(s, a)) for a in attrs]),
                 indent=2))
             """).format(stat_call=stat_call)
-        proc = self._run_python(pyscript)
+        proc = self._run_python(pyscript, **kwargs)
         if wait:
             proc.wait()
             return json.loads(proc.stdout.getvalue().strip())
@@ -630,7 +1256,7 @@ class CephFSMount(object):
         :param fs_path:
         :return:
         """
-        abs_path = os.path.join(self.mountpoint, fs_path)
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
         pyscript = dedent("""
             import sys
             import errno
@@ -645,7 +1271,7 @@ class CephFSMount(object):
         proc.wait()
 
     def path_to_ino(self, fs_path, follow_symlinks=True):
-        abs_path = os.path.join(self.mountpoint, fs_path)
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
 
         if follow_symlinks:
             pyscript = dedent("""
@@ -667,7 +1293,7 @@ class CephFSMount(object):
         return int(proc.stdout.getvalue().strip())
 
     def path_to_nlink(self, fs_path):
-        abs_path = os.path.join(self.mountpoint, fs_path)
+        abs_path = os.path.join(self.hostfs_mntpt, fs_path)
 
         pyscript = dedent("""
             import os
@@ -680,7 +1306,7 @@ class CephFSMount(object):
         proc.wait()
         return int(proc.stdout.getvalue().strip())
 
-    def ls(self, path=None):
+    def ls(self, path=None, **kwargs):
         """
         Wrap ls: return a list of strings
         """
@@ -688,7 +1314,7 @@ class CephFSMount(object):
         if path:
             cmd.append(path)
 
-        ls_text = self.run_shell(cmd).stdout.getvalue().strip()
+        ls_text = self.run_shell(cmd, **kwargs).stdout.getvalue().strip()
 
         if ls_text:
             return ls_text.split("\n")
@@ -697,7 +1323,7 @@ class CephFSMount(object):
             # gives you [''] instead of []
             return []
 
-    def setfattr(self, path, key, val):
+    def setfattr(self, path, key, val, **kwargs):
         """
         Wrap setfattr.
 
@@ -706,16 +1332,16 @@ class CephFSMount(object):
         :param val: xattr value
         :return: None
         """
-        self.run_shell(["setfattr", "-n", key, "-v", val, path])
+        self.run_shell(["setfattr", "-n", key, "-v", val, path], **kwargs)
 
-    def getfattr(self, path, attr):
+    def getfattr(self, path, attr, **kwargs):
         """
         Wrap getfattr: return the values of a named xattr on one file, or
         None if the attribute is not found.
 
         :return: a string
         """
-        p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False)
+        p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False, **kwargs)
         try:
             p.wait()
         except CommandFailedError as e:
@@ -741,3 +1367,14 @@ class CephFSMount(object):
             "used": int(used),
             "available": int(avail)
         }
+
+    def dir_checksum(self, path=None, follow_symlinks=False):
+        cmd = ["find"]
+        if follow_symlinks:
+            cmd.append("-L")
+        if path:
+            cmd.append(path)
+        cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
+        checksum_text = self.run_shell(cmd).stdout.getvalue().strip()
+        checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1])
+        return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest()