from teuthology.contextutil import safe_while
from teuthology.misc import get_file, write_file
from teuthology.orchestra import run
-from teuthology.orchestra.run import CommandFailedError, ConnectionLostError, Raw
+from teuthology.orchestra.run import Raw
+from teuthology.exceptions import CommandFailedError, ConnectionLostError
from tasks.cephfs.filesystem import Filesystem
self.cephfs_name = cephfs_name
self.cephfs_mntpt = cephfs_mntpt
+ self.cluster_name = 'ceph' # TODO: use config['cluster']
+
self.fs = None
self._netns_name = None
def netns_name(self, name):
self._netns_name = name
+ def assert_that_ceph_fs_exists(self):
+ output = self.ctx.managers[self.cluster_name].raw_cluster_cmd("fs", "ls")
+ if self.cephfs_name:
+ assert self.cephfs_name in output, \
+ 'expected ceph fs is not present on the cluster'
+ log.info(f'Mounting Ceph FS {self.cephfs_name}; just confirmed its presence on cluster')
+ else:
+ assert 'No filesystems enabled' not in output, \
+ 'ceph cluster has no ceph fs, not even the default ceph fs'
+ log.info('Mounting default Ceph FS; just confirmed its presence on cluster')
+
def assert_and_log_minimum_mount_details(self):
"""
Make sure we have minimum details required for mounting. Ideally, this
'3. the remote machine where CephFS will be mounted.\n')
raise RuntimeError(errmsg)
+ self.assert_that_ceph_fs_exists()
+
log.info('Mounting Ceph FS. Following are details of mount; remember '
'"None" represents Python type None -')
log.info(f'self.client_remote.hostname = {self.client_remote.hostname}')
self.fs.wait_for_daemons()
log.info('Ready to start {}...'.format(type(self).__name__))
+ def _create_mntpt(self):
+ self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}',
+ timeout=60)
+ # Use 0000 mode to prevent undesired modifications to the mountpoint on
+ # the local file system.
+ self.client_remote.run(args=f'chmod 0000 {self.hostfs_mntpt}',
+ timeout=60)
+
+ @property
+ def _nsenter_args(self):
+ return ['nsenter', f'--net=/var/run/netns/{self.netns_name}']
+
+ def _set_filemode_on_mntpt(self):
+ stderr = StringIO()
+ try:
+ self.client_remote.run(
+ args=['sudo', 'chmod', '1777', self.hostfs_mntpt],
+ stderr=stderr, timeout=(5*60))
+ except CommandFailedError:
+ # the client does not have write permissions in the caps it holds
+ # for the Ceph FS that was just mounted.
+ if 'permission denied' in stderr.getvalue().lower():
+ pass
+
def _setup_brx_and_nat(self):
# The ip for ceph-brx should be
ip = IP(self.ceph_brx_net)[-2]
args = ['sudo', 'ip', 'link', 'set', 'brx.{0}'.format(self.nsid), 'up']
self.client_remote.run(args=args, timeout=(5*60), omit_sudo=False)
- def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
+ def mount(self, mntopts=[], check_status=True, **kwargs):
"""
kwargs expects its members to be same as the arguments accepted by
self.update_attrs().
2. Run update_attrs().
3. Run mount().
- Accepts arguments of self.mount() and self.update_attrs() with 2 exceptions -
- 1. Accepts wait too which can be True or False.
- 2. The default value of createfs is False.
+ Accepts arguments of self.mount() and self.update_attrs() with 1
+ exception: wait accepted too which can be True or False.
"""
self.umount_wait()
assert not self.mounted
mntopts = kwargs.pop('mntopts', [])
- createfs = kwargs.pop('createfs', False)
check_status = kwargs.pop('check_status', True)
wait = kwargs.pop('wait', True)
self.update_attrs(**kwargs)
- retval = self.mount(mntopts=mntopts, createfs=createfs,
- check_status=check_status)
+ retval = self.mount(mntopts=mntopts, check_status=check_status)
# avoid this scenario (again): mount command might've failed and
# check_status might have silenced the exception, yet we attempt to
# wait which might lead to an error.
raise NotImplementedError()
def get_keyring_path(self):
+ # N.B.: default keyring is /etc/ceph/ceph.keyring; see ceph.py and generate_caps
return '/etc/ceph/ceph.client.{id}.keyring'.format(id=self.client_id)
def get_key_from_keyfile(self):
def run_shell(self, args, timeout=900, **kwargs):
args = args.split() if isinstance(args, str) else args
- kwargs.pop('omit_sudo', False)
+ omit_sudo = kwargs.pop('omit_sudo', False)
sudo = kwargs.pop('sudo', False)
cwd = kwargs.pop('cwd', self.mountpoint)
stdout = kwargs.pop('stdout', StringIO())
if sudo:
args.insert(0, 'sudo')
- return self.client_remote.run(args=args, cwd=cwd, timeout=timeout, stdout=stdout, stderr=stderr, **kwargs)
+ return self.client_remote.run(args=args, cwd=cwd, timeout=timeout,
+ stdout=stdout, stderr=stderr,
+ omit_sudo=omit_sudo, **kwargs)
def run_shell_payload(self, payload, **kwargs):
return self.run_shell(["bash", "-c", Raw(f"'{payload}'")], **kwargs)
return rproc
+ def open_dir_background(self, basename):
+ """
+ Create and hold a capability to a directory.
+ """
+ assert(self.is_mounted())
+
+ path = os.path.join(self.hostfs_mntpt, basename)
+
+ pyscript = dedent("""
+ import time
+ import os
+
+ os.mkdir("{path}")
+ fd = os.open("{path}", os.O_RDONLY)
+ while True:
+ time.sleep(1)
+ """).format(path=path)
+
+ rproc = self._run_python(pyscript)
+ self.background_procs.append(rproc)
+
+ self.wait_for_visible(basename)
+
+ return rproc
+
def wait_for_dir_empty(self, dirname, timeout=30):
dirpath = os.path.join(self.hostfs_mntpt, dirname)
with safe_while(sleep=5, tries=(timeout//5)) as proceed:
self.background_procs.append(rproc)
return rproc
- def create_n_files(self, fs_path, count, sync=False):
+ def create_n_files(self, fs_path, count, sync=False, dirsync=False, unlink=False, finaldirsync=False):
+ """
+ Create n files.
+
+ :param sync: sync the file after writing
+ :param dirsync: sync the containing directory after closing the file
+ :param unlink: unlink the file after closing
+ :param finaldirsync: sync the containing directory after closing the last file
+ """
+
assert(self.is_mounted())
abs_path = os.path.join(self.hostfs_mntpt, fs_path)
- pyscript = dedent("""
- import sys
- import time
+ pyscript = dedent(f"""
import os
n = {count}
- abs_path = "{abs_path}"
+ path = "{abs_path}"
- if not os.path.exists(os.path.dirname(abs_path)):
- os.makedirs(os.path.dirname(abs_path))
+ dpath = os.path.dirname(path)
+ fnameprefix = os.path.basename(path)
+ os.makedirs(dpath, exist_ok=True)
- for i in range(0, n):
- fname = "{{0}}_{{1}}".format(abs_path, i)
- with open(fname, 'w') as f:
- f.write('content')
- if {sync}:
- f.flush()
- os.fsync(f.fileno())
- """).format(abs_path=abs_path, count=count, sync=str(sync))
+ try:
+ dirfd = os.open(dpath, os.O_DIRECTORY)
+
+ for i in range(n):
+ fpath = os.path.join(dpath, f"{{fnameprefix}}_{{i}}")
+ with open(fpath, 'w') as f:
+ f.write(f"{{i}}")
+ if {sync}:
+ f.flush()
+ os.fsync(f.fileno())
+ if {unlink}:
+ os.unlink(fpath)
+ if {dirsync}:
+ os.fsync(dirfd)
+ if {finaldirsync}:
+ os.fsync(dirfd)
+ finally:
+ os.close(dirfd)
+ """)
self.run_python(pyscript)
def get_op_read_count(self):
raise NotImplementedError()
+ def readlink(self, fs_path):
+ abs_path = os.path.join(self.hostfs_mntpt, fs_path)
+
+ pyscript = dedent("""
+ import os
+
+ print(os.readlink("{path}"))
+ """).format(path=abs_path)
+
+ proc = self._run_python(pyscript)
+ proc.wait()
+ return str(proc.stdout.getvalue().strip())
+
+
def lstat(self, fs_path, follow_symlinks=False, wait=True):
return self.stat(fs_path, follow_symlinks=False, wait=True)