-from contextlib import contextmanager
+import hashlib
import json
import logging
import datetime
-import six
+import os
+import re
import time
-from six import StringIO
+
+from io import StringIO
+from contextlib import contextmanager
from textwrap import dedent
-import os
+from IPy import IP
+
+from teuthology.contextutil import safe_while
+from teuthology.misc import get_file, write_file
from teuthology.orchestra import run
-from teuthology.orchestra.run import CommandFailedError, ConnectionLostError, Raw
+from teuthology.orchestra.run import Raw
+from teuthology.exceptions import CommandFailedError, ConnectionLostError
+
from tasks.cephfs.filesystem import Filesystem
log = logging.getLogger(__name__)
-
class CephFSMount(object):
- def __init__(self, ctx, test_dir, client_id, client_remote):
+ def __init__(self, ctx, test_dir, client_id, client_remote,
+ client_keyring_path=None, hostfs_mntpt=None,
+ cephfs_name=None, cephfs_mntpt=None, brxnet=None):
"""
:param test_dir: Global teuthology test dir
:param client_id: Client ID, the 'foo' in client.foo
- :param client_remote: Remote instance for the host where client will run
+ :param client_keyring_path: path to keyring for given client_id
+ :param client_remote: Remote instance for the host where client will
+ run
+ :param hostfs_mntpt: Path to directory on the FS on which Ceph FS will
+ be mounted
+ :param cephfs_name: Name of Ceph FS to be mounted
+ :param cephfs_mntpt: Path to directory inside Ceph FS that will be
+ mounted as root
"""
-
+ self.mounted = False
self.ctx = ctx
self.test_dir = test_dir
+
+ self._verify_attrs(client_id=client_id,
+ client_keyring_path=client_keyring_path,
+ hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
+ cephfs_mntpt=cephfs_mntpt)
+
self.client_id = client_id
+ self.client_keyring_path = client_keyring_path
self.client_remote = client_remote
- self.mountpoint_dir_name = 'mnt.{id}'.format(id=self.client_id)
- self._mountpoint = None
+ if hostfs_mntpt:
+ self.hostfs_mntpt = hostfs_mntpt
+ self.hostfs_mntpt_dirname = os.path.basename(self.hostfs_mntpt)
+ else:
+ self.hostfs_mntpt = os.path.join(self.test_dir, f'mnt.{self.client_id}')
+ self.cephfs_name = cephfs_name
+ self.cephfs_mntpt = cephfs_mntpt
+
+ self.cluster_name = 'ceph' # TODO: use config['cluster']
+
self.fs = None
+ self._netns_name = None
+ self.nsid = -1
+ if brxnet is None:
+ self.ceph_brx_net = '192.168.0.0/16'
+ else:
+ self.ceph_brx_net = brxnet
+
self.test_files = ['a', 'b', 'c']
self.background_procs = []
+ # This will cleanup the stale netnses, which are from the
+ # last failed test cases.
+ @staticmethod
+ def cleanup_stale_netnses_and_bridge(remote):
+ p = remote.run(args=['ip', 'netns', 'list'],
+ stdout=StringIO(), timeout=(5*60))
+ p = p.stdout.getvalue().strip()
+
+ # Get the netns name list
+ netns_list = re.findall(r'ceph-ns-[^()\s][-.\w]+[^():\s]', p)
+
+ # Remove the stale netnses
+ for ns in netns_list:
+ ns_name = ns.split()[0]
+ args = ['sudo', 'ip', 'netns', 'delete', '{0}'.format(ns_name)]
+ try:
+ remote.run(args=args, timeout=(5*60), omit_sudo=False)
+ except Exception:
+ pass
+
+ # Remove the stale 'ceph-brx'
+ try:
+ args = ['sudo', 'ip', 'link', 'delete', 'ceph-brx']
+ remote.run(args=args, timeout=(5*60), omit_sudo=False)
+ except Exception:
+ pass
+
+ def _parse_netns_name(self):
+ self._netns_name = '-'.join(["ceph-ns",
+ re.sub(r'/+', "-", self.mountpoint)])
+
@property
def mountpoint(self):
- if self._mountpoint == None:
- self._mountpoint= os.path.join(
- self.test_dir, '{dir_name}'.format(dir_name=self.mountpoint_dir_name))
- return self._mountpoint
+ if self.hostfs_mntpt == None:
+ self.hostfs_mntpt = os.path.join(self.test_dir,
+ self.hostfs_mntpt_dirname)
+ return self.hostfs_mntpt
@mountpoint.setter
def mountpoint(self, path):
if not isinstance(path, str):
raise RuntimeError('path should be of str type.')
- self._mountpoint = path
+ self._mountpoint = self.hostfs_mntpt = path
+
+ @property
+ def netns_name(self):
+ if self._netns_name == None:
+ self._parse_netns_name()
+ return self._netns_name
+
+ @netns_name.setter
+ def netns_name(self, name):
+ self._netns_name = name
+
+ def assert_that_ceph_fs_exists(self):
+ output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls")
+ if self.cephfs_name:
+ assert self.cephfs_name in output, \
+ 'expected ceph fs is not present on the cluster'
+ log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
+ else:
+ assert 'No filesystems enabled' not in output, \
+ 'ceph cluster has no ceph fs, not even the default ceph fs'
+ log.info('Mounting default Ceph FS; just confirmed its presence on cluster')
+
+ def assert_and_log_minimum_mount_details(self):
+ """
+ Make sure we have minimum details required for mounting. Ideally, this
+ method should be called at the beginning of the mount method.
+ """
+ if not self.client_id or not self.client_remote or \
+ not self.hostfs_mntpt:
+ errmsg = ('Mounting CephFS requires that at least following '
+ 'details to be provided -\n'
+ '1. the client ID,\n2. the mountpoint and\n'
+ '3. the remote machine where CephFS will be mounted.\n')
+ raise RuntimeError(errmsg)
+
+ self.assert_that_ceph_fs_exists()
+
+ log.info('Mounting Ceph FS. Following are details of mount; remember '
+ '"None" represents Python type None -')
+ log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
+ log.info(f'self.client.name = client.{self.client_id}')
+ log.info(f'self.hostfs_mntpt = {self.hostfs_mntpt}')
+ log.info(f'self.cephfs_name = {self.cephfs_name}')
+ log.info(f'self.cephfs_mntpt = {self.cephfs_mntpt}')
+ log.info(f'self.client_keyring_path = {self.client_keyring_path}')
+ if self.client_keyring_path:
+ log.info('keyring content -\n' +
+ get_file(self.client_remote, self.client_keyring_path,
+ sudo=True).decode())
def is_mounted(self):
- raise NotImplementedError()
+ return self.mounted
def setupfs(self, name=None):
if name is None and self.fs is not None:
self.fs.wait_for_daemons()
log.info('Ready to start {}...'.format(type(self).__name__))
- def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
+ def _create_mntpt(self):
+ self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}',
+ timeout=60)
+ # Use 0000 mode to prevent undesired modifications to the mountpoint on
+ # the local file system.
+ self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}',
+ timeout=60)
+
+ @property
+ def _nsenter_args(self):
+ return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
+
+ def _set_filemode_on_mntpt(self):
+ stderr = StringIO()
+ try:
+ self.client_remote.run(
+ args=['sudo', 'chmod', '1777', self.hostfs_mntpt],
+ stderr=stderr, timeout=(5*60))
+ except CommandFailedError:
+ # the client does not have write permissions in the caps it holds
+ # for the Ceph FS that was just mounted.
+ if 'permission denied' in stderr.getvalue().lower():
+ pass
+
+ def _setup_brx_and_nat(self):
+ # The ip for ceph-brx should be
+ ip = IP(self.ceph_brx_net)[-2]
+ mask = self.ceph_brx_net.split('/')[1]
+ brd = IP(self.ceph_brx_net).broadcast()
+
+ brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
+ stdout=StringIO(), timeout=(5*60))
+ brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
+ if brx:
+ # If the 'ceph-brx' already exists, then check whether
+ # the new net is conflicting with it
+ _ip, _mask = brx[0].split()[1].split('/', 1)
+ if _ip != "{}".format(ip) or _mask != mask:
+ raise RuntimeError("Conflict with existing ceph-brx {0}, new {1}/{2}".format(brx[0].split()[1], ip, mask))
+
+ # Setup the ceph-brx and always use the last valid IP
+ if not brx:
+ log.info("Setuping the 'ceph-brx' with {0}/{1}".format(ip, mask))
+
+ self.run_shell_payload(f"""
+ set -e
+ sudo ip link add name ceph-brx type bridge
+ sudo ip addr flush dev ceph-brx
+ sudo ip link set ceph-brx up
+ sudo ip addr add {ip}/{mask} brd {brd} dev ceph-brx
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ args = "echo 1 | sudo tee /proc/sys/net/ipv4/ip_forward"
+ self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
+
+ # Setup the NAT
+ p = self.client_remote.run(args=['route'], stderr=StringIO(),
+ stdout=StringIO(), timeout=(5*60))
+ p = re.findall(r'default .*', p.stdout.getvalue())
+ if p == False:
+ raise RuntimeError("No default gw found")
+ gw = p[0].split()[7]
+
+ self.run_shell_payload(f"""
+ set -e
+ sudo iptables -A FORWARD -o {gw} -i ceph-brx -j ACCEPT
+ sudo iptables -A FORWARD -i {gw} -o ceph-brx -j ACCEPT
+ sudo iptables -t nat -A POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ def _setup_netns(self):
+ p = self.client_remote.run(args=['ip', 'netns', 'list'],
+ stderr=StringIO(), stdout=StringIO(),
+ timeout=(5*60)).stdout.getvalue().strip()
+
+ # Get the netns name list
+ netns_list = re.findall(r'[^()\s][-.\w]+[^():\s]', p)
+
+ out = re.search(r"{0}".format(self.netns_name), p)
+ if out is None:
+ # Get an uniq nsid for the new netns
+ nsid = 0
+ p = self.client_remote.run(args=['ip', 'netns', 'list-id'],
+ stderr=StringIO(), stdout=StringIO(),
+ timeout=(5*60)).stdout.getvalue()
+ while True:
+ out = re.search(r"nsid {} ".format(nsid), p)
+ if out is None:
+ break
+
+ nsid += 1
+
+ # Add one new netns and set it id
+ self.run_shell_payload(f"""
+ set -e
+ sudo ip netns add {self.netns_name}
+ sudo ip netns set {self.netns_name} {nsid}
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+ self.nsid = nsid;
+ else:
+ # The netns already exists and maybe suspended by self.kill()
+ self.resume_netns();
+
+ nsid = int(re.search(r"{0} \(id: (\d+)\)".format(self.netns_name), p).group(1))
+ self.nsid = nsid;
+ return
+
+ # Get one ip address for netns
+ ips = IP(self.ceph_brx_net)
+ for ip in ips:
+ found = False
+ if ip == ips[0]:
+ continue
+ if ip == ips[-2]:
+ raise RuntimeError("we have ran out of the ip addresses")
+
+ for ns in netns_list:
+ ns_name = ns.split()[0]
+ args = ['sudo', 'ip', 'netns', 'exec', '{0}'.format(ns_name), 'ip', 'addr']
+ try:
+ p = self.client_remote.run(args=args, stderr=StringIO(),
+ stdout=StringIO(), timeout=(5*60),
+ omit_sudo=False)
+ q = re.search("{0}".format(ip), p.stdout.getvalue())
+ if q is not None:
+ found = True
+ break
+ except CommandFailedError:
+ if "No such file or directory" in p.stderr.getvalue():
+ pass
+ if "Invalid argument" in p.stderr.getvalue():
+ pass
+
+ if found == False:
+ break
+
+ mask = self.ceph_brx_net.split('/')[1]
+ brd = IP(self.ceph_brx_net).broadcast()
+
+ log.info("Setuping the netns '{0}' with {1}/{2}".format(self.netns_name, ip, mask))
+
+ # Setup the veth interfaces
+ brxip = IP(self.ceph_brx_net)[-2]
+ self.run_shell_payload(f"""
+ set -e
+ sudo ip link add veth0 netns {self.netns_name} type veth peer name brx.{nsid}
+ sudo ip netns exec {self.netns_name} ip addr add {ip}/{mask} brd {brd} dev veth0
+ sudo ip netns exec {self.netns_name} ip link set veth0 up
+ sudo ip netns exec {self.netns_name} ip link set lo up
+ sudo ip netns exec {self.netns_name} ip route add default via {brxip}
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ # Bring up the brx interface and join it to 'ceph-brx'
+ self.run_shell_payload(f"""
+ set -e
+ sudo ip link set brx.{nsid} up
+ sudo ip link set dev brx.{nsid} master ceph-brx
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ def _cleanup_netns(self):
+ if self.nsid == -1:
+ return
+ log.info("Removing the netns '{0}'".format(self.netns_name))
+
+ # Delete the netns and the peer veth interface
+ self.run_shell_payload(f"""
+ set -e
+ sudo ip link set brx.{self.nsid} down
+ sudo ip link delete dev brx.{self.nsid}
+ sudo ip netns delete {self.netns_name}
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ self.nsid = -1
+
+ def _cleanup_brx_and_nat(self):
+ brx = self.client_remote.run(args=['ip', 'addr'], stderr=StringIO(),
+ stdout=StringIO(), timeout=(5*60))
+ brx = re.findall(r'inet .* ceph-brx', brx.stdout.getvalue())
+ if not brx:
+ return
+
+ # If we are the last netns, will delete the ceph-brx
+ args = ['sudo', 'ip', 'link', 'show']
+ p = self.client_remote.run(args=args, stdout=StringIO(),
+ timeout=(5*60), omit_sudo=False)
+ _list = re.findall(r'brx\.', p.stdout.getvalue().strip())
+ if len(_list) != 0:
+ return
+
+ log.info("Removing the 'ceph-brx'")
+
+ self.run_shell_payload("""
+ set -e
+ sudo ip link set ceph-brx down
+ sudo ip link delete ceph-brx
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ # Drop the iptables NAT rules
+ ip = IP(self.ceph_brx_net)[-2]
+ mask = self.ceph_brx_net.split('/')[1]
+
+ p = self.client_remote.run(args=['route'], stderr=StringIO(),
+ stdout=StringIO(), timeout=(5*60))
+ p = re.findall(r'default .*', p.stdout.getvalue())
+ if p == False:
+ raise RuntimeError("No default gw found")
+ gw = p[0].split()[7]
+ self.run_shell_payload(f"""
+ set -e
+ sudo iptables -D FORWARD -o {gw} -i ceph-brx -j ACCEPT
+ sudo iptables -D FORWARD -i {gw} -o ceph-brx -j ACCEPT
+ sudo iptables -t nat -D POSTROUTING -s {ip}/{mask} -o {gw} -j MASQUERADE
+ """, timeout=(5*60), omit_sudo=False, cwd='/')
+
+ def setup_netns(self):
+ """
+ Setup the netns for the mountpoint.
+ """
+ log.info("Setting the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+ self._setup_brx_and_nat()
+ self._setup_netns()
+
+ def cleanup_netns(self):
+ """
+ Cleanup the netns for the mountpoint.
+ """
+ # We will defer cleaning the netnses and bridge until the last
+ # mountpoint is unmounted, this will be a temporary work around
+ # for issue#46282.
+
+ # log.info("Cleaning the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+ # self._cleanup_netns()
+ # self._cleanup_brx_and_nat()
+
+ def suspend_netns(self):
+ """
+ Suspend the netns veth interface.
+ """
+ if self.nsid == -1:
+ return
+
+ log.info("Suspending the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+
+ args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'down']
+ self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
+
+ def resume_netns(self):
+ """
+ Resume the netns veth interface.
+ """
+ if self.nsid == -1:
+ return
+
+ log.info("Resuming the '{0}' netns for '{1}'".format(self._netns_name, self.mountpoint))
+
+ args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
+ self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
+
+ def mount(self, mntopts=[], check_status=True, **kwargs):
+ """
+ kwargs expects its members to be same as the arguments accepted by
+ self.update_attrs().
+ """
raise NotImplementedError()
- def mount_wait(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
- self.mount(mount_path=mount_path, mount_fs_name=mount_fs_name, mountpoint=mountpoint,
- mount_options=mount_options)
+ def mount_wait(self, **kwargs):
+ """
+ Accepts arguments same as self.mount().
+ """
+ self.mount(**kwargs)
self.wait_until_mounted()
def umount(self):
raise NotImplementedError()
- def umount_wait(self, force=False, require_clean=False):
+ def umount_wait(self, force=False, require_clean=False, timeout=None):
"""
:param force: Expect that the mount will not shutdown cleanly: kill
:param require_clean: Wait for the Ceph client associated with the
mount (e.g. ceph-fuse) to terminate, and
raise if it doesn't do so cleanly.
+ :param timeout: amount of time to be waited for umount command to finish
:return:
"""
raise NotImplementedError()
- def kill_cleanup(self):
- raise NotImplementedError()
+ def _verify_attrs(self, **kwargs):
+ """
+ Verify that client_id, client_keyring_path, client_remote, hostfs_mntpt,
+ cephfs_name, cephfs_mntpt are either type str or None.
+ """
+ for k, v in kwargs.items():
+ if v is not None and not isinstance(v, str):
+ raise RuntimeError('value of attributes should be either str '
+ f'or None. {k} - {v}')
+
+ def update_attrs(self, client_id=None, client_keyring_path=None,
+ client_remote=None, hostfs_mntpt=None, cephfs_name=None,
+ cephfs_mntpt=None):
+ if not (client_id or client_keyring_path or client_remote or
+ cephfs_name or cephfs_mntpt or hostfs_mntpt):
+ return
+
+ self._verify_attrs(client_id=client_id,
+ client_keyring_path=client_keyring_path,
+ hostfs_mntpt=hostfs_mntpt, cephfs_name=cephfs_name,
+ cephfs_mntpt=cephfs_mntpt)
+
+ if client_id:
+ self.client_id = client_id
+ if client_keyring_path:
+ self.client_keyring_path = client_keyring_path
+ if client_remote:
+ self.client_remote = client_remote
+ if hostfs_mntpt:
+ self.hostfs_mntpt = hostfs_mntpt
+ if cephfs_name:
+ self.cephfs_name = cephfs_name
+ if cephfs_mntpt:
+ self.cephfs_mntpt = cephfs_mntpt
+
+ def remount(self, **kwargs):
+ """
+ Update mount object's attributes and attempt remount with these
+ new values for these attrbiutes.
+
+ 1. Run umount_wait().
+ 2. Run update_attrs().
+ 3. Run mount().
+
+ Accepts arguments of self.mount() and self.update_attrs() with 1
+ exception: wait accepted too which can be True or False.
+ """
+ self.umount_wait()
+ assert not self.mounted
+
+ mntopts = kwargs.pop('mntopts', [])
+ check_status = kwargs.pop('check_status', True)
+ wait = kwargs.pop('wait', True)
+
+ self.update_attrs(**kwargs)
+
+ retval = self.mount(mntopts=mntopts, check_status=check_status)
+ # avoid this scenario (again): mount command might've failed and
+ # check_status might have silenced the exception, yet we attempt to
+ # wait which might lead to an error.
+ if retval is None and wait:
+ self.wait_until_mounted()
+
+ return retval
def kill(self):
- raise NotImplementedError()
+ """
+ Suspend the netns veth interface to make the client disconnected
+ from the ceph cluster
+ """
+ log.info('Killing connection on {0}...'.format(self.client_remote.name))
+ self.suspend_netns()
+
+ def kill_cleanup(self):
+ """
+ Follow up ``kill`` to get to a clean unmounted state.
+ """
+ log.info('Cleaning up killed connection on {0}'.format(self.client_remote.name))
+ self.umount_wait(force=True)
def cleanup(self):
- raise NotImplementedError()
+ """
+ Remove the mount point.
+
+ Prerequisite: the client is not mounted.
+ """
+ log.info('Cleaning up mount {0}'.format(self.client_remote.name))
+ stderr = StringIO()
+ try:
+ self.client_remote.run(args=['rmdir', '--', self.mountpoint],
+ cwd=self.test_dir, stderr=stderr,
+ timeout=(60*5), check_status=False)
+ except CommandFailedError:
+ if "no such file or directory" not in stderr.getvalue().lower():
+ raise
+
+ self.cleanup_netns()
def wait_until_mounted(self):
raise NotImplementedError()
def get_keyring_path(self):
+ # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
+ def get_key_from_keyfile(self):
+ # XXX: don't call run_shell(), since CephFS might be unmounted.
+ keyring = self.client_remote.run(
+ args=['sudo', 'cat', self.client_keyring_path], stdout=StringIO(),
+ omit_sudo=False).stdout.getvalue()
+ for line in keyring.split('\n'):
+ if line.find('key') != -1:
+ return line[line.find('=') + 1 : ].strip()
+
@property
def config_path(self):
"""
return "/etc/ceph/ceph.conf"
@contextmanager
- def mounted(self):
+ def mounted_wait(self):
"""
A context manager, from an initially unmounted state, to mount
this, yield, and then unmount and clean up.
finally:
self.umount_wait()
- def is_blacklisted(self):
- addr = self.get_global_addr()
- blacklist = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "blacklist", "ls", "--format=json"))
- for b in blacklist:
- if addr == b["addr"]:
- return True
- return False
-
def create_file(self, filename='testfile', dirname=None, user=None,
check_status=True):
assert(self.is_mounted())
if os.path.isabs(dirname):
path = os.path.join(dirname, filename)
else:
- path = os.path.join(self.mountpoint, dirname, filename)
+ path = os.path.join(self.hostfs_mntpt, dirname, filename)
else:
- path = os.path.join(self.mountpoint, filename)
+ path = os.path.join(self.hostfs_mntpt, filename)
else:
path = filename
for suffix in self.test_files:
log.info("Creating file {0}".format(suffix))
self.client_remote.run(args=[
- 'sudo', 'touch', os.path.join(self.mountpoint, suffix)
+ 'touch', os.path.join(self.hostfs_mntpt, suffix)
])
def test_create_file(self, filename='testfile', dirname=None, user=None,
for suffix in self.test_files:
log.info("Checking file {0}".format(suffix))
r = self.client_remote.run(args=[
- 'sudo', 'ls', os.path.join(self.mountpoint, suffix)
+ 'ls', os.path.join(self.hostfs_mntpt, suffix)
], check_status=False)
if r.exitstatus != 0:
raise RuntimeError("Expected file {0} not found".format(suffix))
+ def write_file(self, path, data, perms=None):
+ """
+ Write the given data at the given path and set the given perms to the
+ file on the path.
+ """
+ if path.find(self.hostfs_mntpt) == -1:
+ path = os.path.join(self.hostfs_mntpt, path)
+
+ write_file(self.client_remote, path, data)
+
+ if perms:
+ self.run_shell(args=f'chmod {perms} {path}')
+
+ def read_file(self, path):
+ """
+ Return the data from the file on given path.
+ """
+ if path.find(self.hostfs_mntpt) == -1:
+ path = os.path.join(self.hostfs_mntpt, path)
+
+ return self.run_shell(args=['cat', path]).\
+ stdout.getvalue().strip()
+
def create_destroy(self):
assert(self.is_mounted())
filename = "{0} {1}".format(datetime.datetime.now(), self.client_id)
log.debug("Creating test file {0}".format(filename))
self.client_remote.run(args=[
- 'sudo', 'touch', os.path.join(self.mountpoint, filename)
+ 'touch', os.path.join(self.hostfs_mntpt, filename)
])
log.debug("Deleting test file {0}".format(filename))
self.client_remote.run(args=[
- 'sudo', 'rm', '-f', os.path.join(self.mountpoint, filename)
+ 'rm', '-f', os.path.join(self.hostfs_mntpt, filename)
])
- def _run_python(self, pyscript, py_version='python3'):
- return self.client_remote.run(
- args=['sudo', 'adjust-ulimits', 'daemon-helper', 'kill',
- py_version, '-c', pyscript], wait=False, stdin=run.PIPE,
- stdout=StringIO())
+ def _run_python(self, pyscript, py_version='python3', sudo=False):
+ args = []
+ if sudo:
+ args.append('sudo')
+ args += ['adjust-ulimits', 'daemon-helper', 'kill', py_version, '-c', pyscript]
+ return self.client_remote.run(args=args, wait=False, stdin=run.PIPE, stdout=StringIO())
- def run_python(self, pyscript, py_version='python3'):
- p = self._run_python(pyscript, py_version)
+ def run_python(self, pyscript, py_version='python3', sudo=False):
+ p = self._run_python(pyscript, py_version, sudo=sudo)
p.wait()
- return six.ensure_str(p.stdout.getvalue().strip())
+ return p.stdout.getvalue().strip()
+
+ def run_shell(self, args, timeout=900, **kwargs):
+ args = args.split() if isinstance(args, str) else args
+ omit_sudo = kwargs.pop('omit_sudo', False)
+ sudo = kwargs.pop('sudo', False)
+ cwd = kwargs.pop('cwd', self.mountpoint)
+ stdout = kwargs.pop('stdout', StringIO())
+ stderr = kwargs.pop('stderr', StringIO())
+
+ if sudo:
+ args.insert(0, 'sudo')
+
+ return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
+ stdout=stdout, stderr=stderr,
+ omit_sudo=omit_sudo, **kwargs)
def run_shell_payload(self, payload, **kwargs):
return self.run_shell(["bash", "-c", Raw(f"'{payload}'")], **kwargs)
- def run_shell(self, args, wait=True, stdin=None, check_status=True,
- omit_sudo=True):
+ def run_as_user(self, **kwargs):
+ """
+ Besides the arguments defined for run_shell() this method also
+ accepts argument 'user'.
+ """
+ args = kwargs.pop('args')
+ user = kwargs.pop('user')
if isinstance(args, str):
- args = args.split()
+ args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', args]
+ elif isinstance(args, list):
+ cmdlist = args
+ cmd = ''
+ for i in cmdlist:
+ cmd = cmd + i + ' '
+ # get rid of extra space at the end.
+ cmd = cmd[:-1]
+
+ args = ['sudo', '-u', user, '-s', '/bin/bash', '-c', cmd]
- args = ["cd", self.mountpoint, run.Raw('&&'), "sudo"] + args
- return self.client_remote.run(args=args, stdout=StringIO(),
- stderr=StringIO(), wait=wait,
- stdin=stdin, check_status=check_status,
- omit_sudo=omit_sudo)
+ kwargs['args'] = args
+ return self.run_shell(**kwargs)
+
+ def run_as_root(self, **kwargs):
+ """
+ Accepts same arguments as run_shell().
+ """
+ kwargs['user'] = 'root'
+ return self.run_as_user(**kwargs)
+
+ def _verify(self, proc, retval=None, errmsg=None):
+ if retval:
+ msg = ('expected return value: {}\nreceived return value: '
+ '{}\n'.format(retval, proc.returncode))
+ assert proc.returncode == retval, msg
+
+ if errmsg:
+ stderr = proc.stderr.getvalue().lower()
+ msg = ('didn\'t find given string in stderr -\nexpected string: '
+ '{}\nreceived error message: {}\nnote: received error '
+ 'message is converted to lowercase'.format(errmsg, stderr))
+ assert errmsg in stderr, msg
+
+ def negtestcmd(self, args, retval=None, errmsg=None, stdin=None,
+ cwd=None, wait=True):
+ """
+ Conduct a negative test for the given command.
+
+ retval and errmsg are parameters to confirm the cause of command
+ failure.
+ """
+ proc = self.run_shell(args=args, wait=wait, stdin=stdin, cwd=cwd,
+ check_status=False)
+ self._verify(proc, retval, errmsg)
+ return proc
+
+ def negtestcmd_as_user(self, args, user, retval=None, errmsg=None,
+ stdin=None, cwd=None, wait=True):
+ proc = self.run_as_user(args=args, user=user, wait=wait, stdin=stdin,
+ cwd=cwd, check_status=False)
+ self._verify(proc, retval, errmsg)
+ return proc
+
+ def negtestcmd_as_root(self, args, retval=None, errmsg=None, stdin=None,
+ cwd=None, wait=True):
+ proc = self.run_as_root(args=args, wait=wait, stdin=stdin, cwd=cwd,
+ check_status=False)
+ self._verify(proc, retval, errmsg)
+ return proc
def open_no_data(self, basename):
"""
"""
assert(self.is_mounted())
- path = os.path.join(self.mountpoint, basename)
+ path = os.path.join(self.hostfs_mntpt, basename)
p = self._run_python(dedent(
"""
"""
assert(self.is_mounted())
- path = os.path.join(self.mountpoint, basename)
+ path = os.path.join(self.hostfs_mntpt, basename)
if write:
pyscript = dedent("""
return rproc
- def wait_for_dir_empty(self, dirname, timeout=30):
- i = 0
- dirpath = os.path.join(self.mountpoint, dirname)
- while i < timeout:
- nr_entries = int(self.getfattr(dirpath, "ceph.dir.entries"))
- if nr_entries == 0:
- log.debug("Directory {0} seen empty from {1} after {2}s ".format(
- dirname, self.client_id, i))
- return
- else:
+ def open_dir_background(self, basename):
+ """
+ Create and hold a capability to a directory.
+ """
+ assert(self.is_mounted())
+
+ path = os.path.join(self.hostfs_mntpt, basename)
+
+ pyscript = dedent("""
+ import time
+ import os
+
+ os.mkdir("{path}")
+ fd = os.open("{path}", os.O_RDONLY)
+ while True:
time.sleep(1)
- i += 1
+ """).format(path=path)
+
+ rproc = self._run_python(pyscript)
+ self.background_procs.append(rproc)
+
+ self.wait_for_visible(basename)
+
+ return rproc
- raise RuntimeError("Timed out after {0}s waiting for {1} to become empty from {2}".format(
- i, dirname, self.client_id))
+ def wait_for_dir_empty(self, dirname, timeout=30):
+ dirpath = os.path.join(self.hostfs_mntpt, dirname)
+ with safe_while(sleep=5, tries=(timeout//5)) as proceed:
+ while proceed():
+ p = self.run_shell_payload(f"stat -c %h {dirpath}")
+ nr_links = int(p.stdout.getvalue().strip())
+ if nr_links == 2:
+ return
def wait_for_visible(self, basename="background_file", timeout=30):
i = 0
while i < timeout:
r = self.client_remote.run(args=[
- 'sudo', 'ls', os.path.join(self.mountpoint, basename)
+ 'stat', os.path.join(self.hostfs_mntpt, basename)
], check_status=False)
if r.exitstatus == 0:
log.debug("File {0} became visible from {1} after {2}s".format(
"""
assert(self.is_mounted())
- path = os.path.join(self.mountpoint, basename)
+ path = os.path.join(self.hostfs_mntpt, basename)
script_builder = """
import time
def lock_and_release(self, basename="background_file"):
assert(self.is_mounted())
- path = os.path.join(self.mountpoint, basename)
+ path = os.path.join(self.hostfs_mntpt, basename)
script = """
import time
def check_filelock(self, basename="background_file", do_flock=True):
assert(self.is_mounted())
- path = os.path.join(self.mountpoint, basename)
+ path = os.path.join(self.hostfs_mntpt, basename)
script_builder = """
import fcntl
log.info("check lock on file {0}".format(basename))
self.client_remote.run(args=[
- 'sudo', 'python3', '-c', pyscript
+ 'python3', '-c', pyscript
])
def write_background(self, basename="background_file", loop=False):
"""
assert(self.is_mounted())
- path = os.path.join(self.mountpoint, basename)
+ path = os.path.join(self.hostfs_mntpt, basename)
pyscript = dedent("""
import os
val = zlib.crc32(str(i).encode('utf-8')) & 7
f.write(chr(val))
""".format(
- path=os.path.join(self.mountpoint, filename),
+ path=os.path.join(self.hostfs_mntpt, filename),
size=size
)))
def validate_test_pattern(self, filename, size):
log.info("Validating {0} bytes from {1}".format(size, filename))
+ # Use sudo because cephfs-data-scan may recreate the file with owner==root
return self.run_python(dedent("""
import zlib
path = "{path}"
if b != chr(val):
raise RuntimeError("Bad data at offset {{0}}".format(i))
""".format(
- path=os.path.join(self.mountpoint, filename),
+ path=os.path.join(self.hostfs_mntpt, filename),
size=size
- )))
+ )), sudo=True)
def open_n_background(self, fs_path, count):
"""
"""
assert(self.is_mounted())
- abs_path = os.path.join(self.mountpoint, fs_path)
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
pyscript = dedent("""
import sys
n = {count}
abs_path = "{abs_path}"
- if not os.path.exists(os.path.dirname(abs_path)):
- os.makedirs(os.path.dirname(abs_path))
+ if not os.path.exists(abs_path):
+ os.makedirs(abs_path)
handles = []
for i in range(0, n):
- fname = "{{0}}_{{1}}".format(abs_path, i)
- handles.append(open(fname, 'w'))
+ fname = "file_"+str(i)
+ path = os.path.join(abs_path, fname)
+ handles.append(open(path, 'w'))
while True:
time.sleep(1)
self.background_procs.append(rproc)
return rproc
- def create_n_files(self, fs_path, count, sync=False):
+ def create_n_files(self, fs_path, count, sync=False, dirsync=False, unlink=False, finaldirsync=False):
+ """
+ Create n files.
+
+ :param sync: sync the file after writing
+ :param dirsync: sync the containing directory after closing the file
+ :param unlink: unlink the file after closing
+ :param finaldirsync: sync the containing directory after closing the last file
+ """
+
assert(self.is_mounted())
- abs_path = os.path.join(self.mountpoint, fs_path)
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
- pyscript = dedent("""
- import sys
- import time
+ pyscript = dedent(f"""
import os
n = {count}
- abs_path = "{abs_path}"
+ path = "{abs_path}"
- if not os.path.exists(os.path.dirname(abs_path)):
- os.makedirs(os.path.dirname(abs_path))
+ dpath = os.path.dirname(path)
+ fnameprefix = os.path.basename(path)
+ os.makedirs(dpath, exist_ok=True)
- for i in range(0, n):
- fname = "{{0}}_{{1}}".format(abs_path, i)
- with open(fname, 'w') as f:
- f.write('content')
- if {sync}:
- f.flush()
- os.fsync(f.fileno())
- """).format(abs_path=abs_path, count=count, sync=str(sync))
+ try:
+ dirfd = os.open(dpath, os.O_DIRECTORY)
+
+ for i in range(n):
+ fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
+ with open(fpath, 'w') as f:
+ f.write(f"{{i}}")
+ if {sync}:
+ f.flush()
+ os.fsync(f.fileno())
+ if {unlink}:
+ os.unlink(fpath)
+ if {dirsync}:
+ os.fsync(dirfd)
+ if {finaldirsync}:
+ os.fsync(dirfd)
+ finally:
+ os.close(dirfd)
+ """)
self.run_python(pyscript)
def get_osd_epoch(self):
raise NotImplementedError()
+ def get_op_read_count(self):
+ raise NotImplementedError()
+
+ def readlink(self, fs_path):
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
+
+ pyscript = dedent("""
+ import os
+
+ print(os.readlink("{path}"))
+ """).format(path=abs_path)
+
+ proc = self._run_python(pyscript)
+ proc.wait()
+ return str(proc.stdout.getvalue().strip())
+
+
def lstat(self, fs_path, follow_symlinks=False, wait=True):
return self.stat(fs_path, follow_symlinks=False, wait=True)
- def stat(self, fs_path, follow_symlinks=True, wait=True):
+ def stat(self, fs_path, follow_symlinks=True, wait=True, **kwargs):
"""
stat a file, and return the result as a dictionary like this:
{
Raises exception on absent file.
"""
- abs_path = os.path.join(self.mountpoint, fs_path)
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
if follow_symlinks:
stat_call = "os.stat('" + abs_path + "')"
else:
dict([(a, getattr(s, a)) for a in attrs]),
indent=2))
""").format(stat_call=stat_call)
- proc = self._run_python(pyscript)
+ proc = self._run_python(pyscript, **kwargs)
if wait:
proc.wait()
return json.loads(proc.stdout.getvalue().strip())
:param fs_path:
:return:
"""
- abs_path = os.path.join(self.mountpoint, fs_path)
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
pyscript = dedent("""
import sys
import errno
proc.wait()
def path_to_ino(self, fs_path, follow_symlinks=True):
- abs_path = os.path.join(self.mountpoint, fs_path)
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
if follow_symlinks:
pyscript = dedent("""
return int(proc.stdout.getvalue().strip())
def path_to_nlink(self, fs_path):
- abs_path = os.path.join(self.mountpoint, fs_path)
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
pyscript = dedent("""
import os
proc.wait()
return int(proc.stdout.getvalue().strip())
- def ls(self, path=None):
+ def ls(self, path=None, **kwargs):
"""
Wrap ls: return a list of strings
"""
if path:
cmd.append(path)
- ls_text = self.run_shell(cmd).stdout.getvalue().strip()
+ ls_text = self.run_shell(cmd, **kwargs).stdout.getvalue().strip()
if ls_text:
return ls_text.split("\n")
# gives you [''] instead of []
return []
- def setfattr(self, path, key, val):
+ def setfattr(self, path, key, val, **kwargs):
"""
Wrap setfattr.
:param val: xattr value
:return: None
"""
- self.run_shell(["setfattr", "-n", key, "-v", val, path])
+ self.run_shell(["setfattr", "-n", key, "-v", val, path], **kwargs)
- def getfattr(self, path, attr):
+ def getfattr(self, path, attr, **kwargs):
"""
Wrap getfattr: return the values of a named xattr on one file, or
None if the attribute is not found.
:return: a string
"""
- p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False)
+ p = self.run_shell(["getfattr", "--only-values", "-n", attr, path], wait=False, **kwargs)
try:
p.wait()
except CommandFailedError as e:
"used": int(used),
"available": int(avail)
}
+
+ def dir_checksum(self, path=None, follow_symlinks=False):
+ cmd = ["find"]
+ if follow_symlinks:
+ cmd.append("-L")
+ if path:
+ cmd.append(path)
+ cmd.extend(["-type", "f", "-exec", "md5sum", "{}", "+"])
+ checksum_text = self.run_shell(cmd).stdout.getvalue().strip()
+ checksum_sorted = sorted(checksum_text.split('\n'), key=lambda v: v.split()[1])
+ return hashlib.md5(('\n'.join(checksum_sorted)).encode('utf-8')).hexdigest()