-
-from StringIO import StringIO
+from io import BytesIO
+from io import StringIO
import json
import time
import logging
+
+import six
+
from textwrap import dedent
from teuthology import misc
from teuthology.contextutil import MaxWhileTries
from teuthology.orchestra import run
from teuthology.orchestra.run import CommandFailedError
-from .mount import CephFSMount
+from tasks.cephfs.mount import CephFSMount
log = logging.getLogger(__name__)
class FuseMount(CephFSMount):
- def __init__(self, client_config, test_dir, client_id, client_remote):
- super(FuseMount, self).__init__(test_dir, client_id, client_remote)
+ def __init__(self, ctx, client_config, test_dir, client_id, client_remote):
+ super(FuseMount, self).__init__(ctx, test_dir, client_id, client_remote)
self.client_config = client_config if client_config else {}
self.fuse_daemon = None
self._fuse_conn = None
+ self.id = None
+ self.inst = None
+ self.addr = None
+
+ def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
+ if mountpoint is not None:
+ self.mountpoint = mountpoint
+ self.setupfs(name=mount_fs_name)
- def mount(self, mount_path=None, mount_fs_name=None):
try:
- return self._mount(mount_path, mount_fs_name)
+ return self._mount(mount_path, mount_fs_name, mount_options)
except RuntimeError:
# Catch exceptions by the mount() logic (i.e. not remote command
# failures) and ensure the mount is not left half-up.
# Otherwise we might leave a zombie mount point that causes
# anyone traversing cephtest/ to get hung up on.
- log.warn("Trying to clean up after failed mount")
+ log.warning("Trying to clean up after failed mount")
self.umount_wait(force=True)
raise
- def _mount(self, mount_path, mount_fs_name):
+ def _mount(self, mount_path, mount_fs_name, mount_options):
log.info("Client client.%s config is %s" % (self.client_id, self.client_config))
daemon_signal = 'kill'
log.info('Mounting ceph-fuse client.{id} at {remote} {mnt}...'.format(
id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
- self.client_remote.run(
- args=[
- 'mkdir',
- '--',
- self.mountpoint,
- ],
- )
+ self.client_remote.run(args=['mkdir', '-p', self.mountpoint],
+ timeout=(15*60), cwd=self.test_dir)
run_cmd = [
'sudo',
fuse_cmd += ["--client_mountpoint={0}".format(mount_path)]
if mount_fs_name is not None:
- fuse_cmd += ["--client_mds_namespace={0}".format(mount_fs_name)]
+ fuse_cmd += ["--client_fs={0}".format(mount_fs_name)]
+
+ fuse_cmd += mount_options
fuse_cmd += [
'--name', 'client.{id}'.format(id=self.client_id),
self.mountpoint,
]
+ cwd = self.test_dir
if self.client_config.get('valgrind') is not None:
run_cmd = misc.get_valgrind_args(
self.test_dir,
run_cmd,
self.client_config.get('valgrind'),
)
+ cwd = None # misc.get_valgrind_args chdir for us
run_cmd.extend(fuse_cmd)
def list_connections():
+ from teuthology.misc import get_system_type
+
+ conn_dir = "/sys/fs/fuse/connections"
+
+ self.client_remote.run(args=['sudo', 'modprobe', 'fuse'],
+ check_status=False)
self.client_remote.run(
- args=["sudo", "mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
- check_status=False
- )
- p = self.client_remote.run(
- args=["ls", "/sys/fs/fuse/connections"],
- stdout=StringIO(),
- check_status=False
- )
- if p.exitstatus != 0:
+ args=["sudo", "mount", "-t", "fusectl", conn_dir, conn_dir],
+ check_status=False, timeout=(30))
+
+ try:
+ ls_str = self.client_remote.sh("ls " + conn_dir,
+ stdout=StringIO(),
+ timeout=(15*60)).strip()
+ except CommandFailedError:
return []
- ls_str = p.stdout.getvalue().strip()
if ls_str:
return [int(n) for n in ls_str.split("\n")]
else:
proc = self.client_remote.run(
args=run_cmd,
+ cwd=cwd,
logger=log.getChild('ceph-fuse.{id}'.format(id=self.client_id)),
stdin=run.PIPE,
wait=False,
else:
self._fuse_conn = new_conns[0]
+ self.gather_mount_info()
+
+ def gather_mount_info(self):
+ status = self.admin_socket(['status'])
+ self.id = status['id']
+ self.client_pid = status['metadata']['pid']
+ try:
+ self.inst = status['inst_str']
+ self.addr = status['addr_str']
+ except KeyError:
+ sessions = self.fs.rank_asok(['session', 'ls'])
+ for s in sessions:
+ if s['id'] == self.id:
+ self.inst = s['inst']
+ self.addr = self.inst.split()[1]
+ if self.inst is None:
+ raise RuntimeError("cannot find client session")
+
def is_mounted(self):
proc = self.client_remote.run(
args=[
'--',
self.mountpoint,
],
- stdout=StringIO(),
- stderr=StringIO(),
- wait=False
+ cwd=self.test_dir,
+ stdout=BytesIO(),
+ stderr=BytesIO(),
+ wait=False,
+ timeout=(15*60)
)
try:
proc.wait()
except CommandFailedError:
- if ("endpoint is not connected" in proc.stderr.getvalue()
- or "Software caused connection abort" in proc.stderr.getvalue()):
+ error = six.ensure_str(proc.stderr.getvalue())
+ if ("endpoint is not connected" in error
+ or "Software caused connection abort" in error):
# This happens is fuse is killed without unmount
- log.warn("Found stale moutn point at {0}".format(self.mountpoint))
+ log.warning("Found stale moutn point at {0}".format(self.mountpoint))
return True
else:
# This happens if the mount directory doesn't exist
log.info('mount point does not exist: %s', self.mountpoint)
return False
- fstype = proc.stdout.getvalue().rstrip('\n')
+ fstype = six.ensure_str(proc.stdout.getvalue()).rstrip('\n')
if fstype == 'fuseblk':
log.info('ceph-fuse is mounted on %s', self.mountpoint)
return True
# Now that we're mounted, set permissions so that the rest of the test will have
# unrestricted access to the filesystem mount.
- self.client_remote.run(
- args=['sudo', 'chmod', '1777', self.mountpoint])
+ try:
+ stderr = BytesIO()
+ self.client_remote.run(args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(15*60), cwd=self.test_dir, stderr=stderr)
+ except run.CommandFailedError:
+ stderr = stderr.getvalue()
+ if b"Read-only file system".lower() in stderr.lower():
+ pass
+ else:
+ raise
def _mountpoint_exists(self):
- return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False).exitstatus == 0
+ return self.client_remote.run(args=["ls", "-d", self.mountpoint], check_status=False, cwd=self.test_dir, timeout=(15*60)).exitstatus == 0
def umount(self):
+ if not self.is_mounted():
+ return
+
try:
log.info('Running fusermount -u on {name}...'.format(name=self.client_remote.name))
self.client_remote.run(
'-u',
self.mountpoint,
],
+ cwd=self.test_dir,
+ timeout=(30*60),
)
except run.CommandFailedError:
log.info('Failed to unmount ceph-fuse on {name}, aborting...'.format(name=self.client_remote.name))
+ self.client_remote.run(args=[
+ 'sudo',
+ run.Raw('PATH=/usr/sbin:$PATH'),
+ 'lsof',
+ run.Raw(';'),
+ 'ps',
+ 'auxf',
+ ], timeout=(60*15))
+
# abort the fuse mount, killing all hung processes
if self._fuse_conn:
self.run_python(dedent("""
""").format(self._fuse_conn))
self._fuse_conn = None
- stderr = StringIO()
+ stderr = BytesIO()
try:
# make sure its unmounted
self.client_remote.run(
'-f',
self.mountpoint,
],
- stderr=stderr
+ stderr=stderr,
+ timeout=(60*15)
)
except CommandFailedError:
if self.is_mounted():
assert not self.is_mounted()
self._fuse_conn = None
+ self.id = None
+ self.inst = None
+ self.addr = None
- def umount_wait(self, force=False, require_clean=False):
+ def umount_wait(self, force=False, require_clean=False, timeout=900):
"""
:param force: Complete cleanly even if the MDS is offline
"""
+ if not (self.is_mounted() and self.fuse_daemon):
+ log.debug('ceph-fuse client.{id} is not mounted at {remote} {mnt}'.format(id=self.client_id,
+ remote=self.client_remote,
+ mnt=self.mountpoint))
+ return
+
if force:
assert not require_clean # mutually exclusive
self.umount()
try:
- if self.fuse_daemon:
- # Permit a timeout, so that we do not block forever
- run.wait([self.fuse_daemon], 900)
+ # Permit a timeout, so that we do not block forever
+ run.wait([self.fuse_daemon], timeout)
except MaxWhileTries:
- log.error("process failed to terminate after unmount. This probably"
- "indicates a bug within ceph-fuse.")
+ log.error("process failed to terminate after unmount. This probably"
+ " indicates a bug within ceph-fuse.")
raise
except CommandFailedError:
if require_clean:
Prerequisite: the client is not mounted.
"""
- stderr = StringIO()
+ stderr = BytesIO()
try:
self.client_remote.run(
args=[
'--',
self.mountpoint,
],
- stderr=stderr
+ cwd=self.test_dir,
+ stderr=stderr,
+ timeout=(60*5),
+ check_status=False,
)
except CommandFailedError:
- if "No such file or directory" in stderr.getvalue():
+ if b"No such file or directory" in stderr.getvalue():
pass
else:
raise
"""
Terminate the client without removing the mount point.
"""
+ log.info('Killing ceph-fuse connection on {name}...'.format(name=self.client_remote.name))
self.fuse_daemon.stdin.close()
try:
self.fuse_daemon.wait()
"""
Follow up ``kill`` to get to a clean unmounted state.
"""
+ log.info('Cleaning up killed ceph-fuse connection')
self.umount()
self.cleanup()
'-rf',
self.mountpoint,
],
+ cwd=self.test_dir,
+ timeout=(60*5)
)
def _asok_path(self):
return f
raise RuntimeError("Client socket {{0}} not found".format(client_name))
-print find_socket("{client_name}")
+print(find_socket("{client_name}"))
""".format(
asok_path=self._asok_path(),
client_name="client.{0}".format(self.client_id))
# Find the admin socket
- p = self.client_remote.run(args=[
- 'python', '-c', pyscript
- ], stdout=StringIO())
- asok_path = p.stdout.getvalue().strip()
+ asok_path = self.client_remote.sh(
+ ['sudo', 'python3', '-c', pyscript],
+ stdout=StringIO(),
+ timeout=(15*60)).strip()
log.info("Found client admin socket at {0}".format(asok_path))
# Query client ID from admin socket
- p = self.client_remote.run(
- args=['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
- stdout=StringIO())
- return json.loads(p.stdout.getvalue())
+ json_data = self.client_remote.sh(
+ ['sudo', self._prefix + 'ceph', '--admin-daemon', asok_path] + args,
+ stdout=StringIO(),
+ timeout=(15*60))
+ return json.loads(json_data)
def get_global_id(self):
"""
Look up the CephFS client ID for this mount
"""
-
return self.admin_socket(['mds_sessions'])['id']
+ def get_global_inst(self):
+ """
+ Look up the CephFS client instance for this mount
+ """
+ return self.inst
+
+ def get_global_addr(self):
+ """
+ Look up the CephFS client addr for this mount
+ """
+ return self.addr
+
+ def get_client_pid(self):
+ """
+ return pid of ceph-fuse process
+ """
+ status = self.admin_socket(['status'])
+ return status['metadata']['pid']
+
def get_osd_epoch(self):
"""
Return 2-tuple of osd_epoch, osd_epoch_barrier