"""
-from io import BytesIO
from io import StringIO
from collections import defaultdict
import getpass
import time
import sys
import errno
-from unittest import suite, loader
+from IPy import IP
import unittest
import platform
-from teuthology import misc
+import logging
+
+from unittest import suite, loader
+
from teuthology.orchestra.run import Raw, quote
from teuthology.orchestra.daemon import DaemonGroup
+from teuthology.orchestra.remote import Remote
from teuthology.config import config as teuth_config
-import six
-import logging
+from teuthology.contextutil import safe_while
+from teuthology.contextutil import MaxWhileTries
+from teuthology.orchestra.run import CommandFailedError
try:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
try:
- from teuthology.exceptions import CommandFailedError
from tasks.ceph_manager import CephManager
from tasks.cephfs.fuse_mount import FuseMount
from tasks.cephfs.kernel_mount import KernelMount
from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
+ from tasks.cephfs.mount import CephFSMount
from tasks.mgr.mgr_test_case import MgrCluster
- from teuthology.contextutil import MaxWhileTries
from teuthology.task import interactive
except ImportError:
sys.stderr.write("***\nError importing packages, have you activated your teuthology virtualenv "
def __init__(self, args, subproc, check_status, stdout, stderr):
self.args = args
self.subproc = subproc
- self.stdout = stdout or BytesIO()
- self.stderr = stderr or BytesIO()
+ self.stdout = stdout
+ self.stderr = stderr
+ # this variable is meant for instance of this class named fuse_daemon.
+ # child process of the command launched with sudo must be killed,
+ # since killing parent process alone has no impact on the child
+ # process.
+ self.fuse_pid = -1
self.check_status = check_status
self.exitstatus = self.returncode = None
out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
if isinstance(self.stdout, StringIO):
self.stdout.write(out.decode(errors='ignore'))
+ elif self.stdout is None:
+ pass
else:
self.stdout.write(out)
if isinstance(self.stderr, StringIO):
self.stderr.write(err.decode(errors='ignore'))
+ elif self.stderr is None:
+ pass
else:
self.stderr.write(err)
self.exitstatus = self.returncode = self.subproc.returncode
if self.exitstatus != 0:
- sys.stderr.write(six.ensure_str(out))
- sys.stderr.write(six.ensure_str(err))
+ sys.stderr.write(out.decode())
+ sys.stderr.write(err.decode())
if self.check_status and self.exitstatus != 0:
raise CommandFailedError(self.args, self.exitstatus)
if self.subproc.poll() is not None:
out, err = self.subproc.communicate()
- self.stdout.write(out)
- self.stderr.write(err)
+ if isinstance(self.stdout, StringIO):
+ self.stdout.write(out.decode(errors='ignore'))
+ elif self.stdout is None:
+ pass
+ else:
+ self.stdout.write(out)
+ if isinstance(self.stderr, StringIO):
+ self.stderr.write(err.decode(errors='ignore'))
+ elif self.stderr is None:
+ pass
+ else:
+ self.stderr.write(err)
self.exitstatus = self.returncode = self.subproc.returncode
return True
else:
if self.subproc.pid and not self.finished:
log.debug("kill: killing pid {0} ({1})".format(
self.subproc.pid, self.args))
- safe_kill(self.subproc.pid)
+ if self.fuse_pid != -1:
+ safe_kill(self.fuse_pid)
+ else:
+ safe_kill(self.subproc.pid)
else:
log.debug("kill: already terminated ({0})".format(self.args))
Run this inside your src/ dir!
"""
+ os = Remote.os
+ arch = Remote.arch
+
def __init__(self):
self.name = "local"
self.hostname = "localhost"
# holding same path. For teuthology, same path still represents
# different locations as they lie on different machines.
def put_file(self, src, dst, sudo=False):
- if sys.version_info.major < 3:
- exception = shutil.Error
- elif sys.version_info.major >= 3:
- exception = shutil.SameFileError
-
try:
shutil.copy(src, dst)
- except exception as e:
- if sys.version_info.major < 3 and e.message.find('are the same '
- 'file') != -1:
- return
- raise e
+ except shutil.SameFileError:
+ pass
# XXX: accepts only two arugments to maintain compatibility with
# teuthology's mkdtemp.
from tempfile import mktemp
return mktemp(suffix=suffix, dir=parentdir)
+ def write_file(self, path, data, sudo=False, mode=None, owner=None,
+ mkdir=False, append=False):
+ """
+ Write data to file
+
+ :param path: file path on host
+ :param data: str, binary or fileobj to be written
+ :param sudo: use sudo to write file, defaults False
+ :param mode: set file mode bits if provided
+ :param owner: set file owner if provided
+ :param mkdir: preliminary create the file directory, defaults False
+ :param append: append data to the file, defaults False
+ """
+ dd = 'sudo dd' if sudo else 'dd'
+ args = dd + ' of=' + path
+ if append:
+ args += ' conv=notrunc oflag=append'
+ if mkdir:
+ mkdirp = 'sudo mkdir -p' if sudo else 'mkdir -p'
+ dirpath = os.path.dirname(path)
+ if dirpath:
+ args = mkdirp + ' ' + dirpath + '\n' + args
+ if mode:
+ chmod = 'sudo chmod' if sudo else 'chmod'
+ args += '\n' + chmod + ' ' + mode + ' ' + path
+ if owner:
+ chown = 'sudo chown' if sudo else 'chown'
+ args += '\n' + chown + ' ' + owner + ' ' + path
+ omit_sudo = False if sudo else True
+ self.run(args=args, stdin=data, omit_sudo=omit_sudo)
+
+ def sudo_write_file(self, path, data, **kwargs):
+ """
+ Write data to file with sudo, for more info see `write_file()`.
+ """
+ self.write_file(path, data, sudo=True, **kwargs)
+
def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
# Since Python's shell simulation can only work when commands are
# provided as a list of argumensts...
- if isinstance(args, str) or isinstance(args, six.text_type):
+ if isinstance(args, str):
args = args.split()
# We'll let sudo be a part of command even omit flag says otherwise in
def run(self, **kwargs):
return self._do_run(**kwargs)
+ # XXX: omit_sudo is set to True since using sudo can change the ownership
+ # of files which becomes problematic for following executions of
+ # vstart_runner.py.
def _do_run(self, args, check_status=True, wait=True, stdout=None,
stderr=None, cwd=None, stdin=None, logger=None, label=None,
- env=None, timeout=None, omit_sudo=False):
+ env=None, timeout=None, omit_sudo=True):
args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
# We have to use shell=True if any run.Raw was present, e.g. &&
'ceph-coverage')]
# Adjust binary path prefix if given a bare program name
- if "/" not in args[0]:
+ if not isinstance(args[0], Raw) and "/" not in args[0]:
# If they asked for a bare binary name, and it exists
# in our built tree, use the one there.
local_bin = os.path.join(BIN_PREFIX, args[0])
args[0]
))
- log.debug("Running {0}".format(args))
+ log.debug('> ' +
+ ' '.join([str(a.value) if isinstance(a, Raw) else a for a in args]))
if shell:
subproc = subprocess.Popen(quote(args),
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
cwd=cwd,
+ env=env,
shell=True)
else:
# Sanity check that we've got a list of strings
for arg in args:
- if not isinstance(arg, six.string_types):
+ if not isinstance(arg, str):
raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
arg, arg.__class__
))
env=env)
if stdin:
- if not isinstance(stdin, str):
- raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
-
# Hack: writing to stdin is not deadlock-safe, but it "always" works
# as long as the input buffer is "small"
- subproc.stdin.write(stdin.encode())
+ if isinstance(stdin, str):
+ subproc.stdin.write(stdin.encode())
+ else:
+ subproc.stdin.write(stdin)
proc = LocalRemoteProcess(
args, subproc, check_status,
"""
Return PID as an integer or None if not found
"""
- ps_txt = six.ensure_str(self.controller.run(
- args=["ps", "ww", "-u"+str(os.getuid())]
- ).stdout.getvalue()).strip()
+ ps_txt = self.controller.run(args=["ps", "ww", "-u"+str(os.getuid())],
+ stdout=StringIO()).\
+ stdout.getvalue().strip()
lines = ps_txt.split("\n")[1:]
for line in lines:
else:
raise
+def mon_in_localhost(config_path="./ceph.conf"):
+ """
+ If the ceph cluster is using the localhost IP as mon host, will must disable ns unsharing
+ """
+ with open(config_path) as f:
+ for line in f:
+ local = re.match(r'^\s*mon host\s*=\s*\[((v1|v2):127\.0\.0\.1:\d+,?)+\]', line)
+ if local:
+ return True
+ return False
class LocalKernelMount(KernelMount):
- def __init__(self, ctx, test_dir, client_id):
- super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None)
+ def __init__(self, ctx, test_dir, client_id=None,
+ client_keyring_path=None, client_remote=None,
+ hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
+ brxnet=None):
+ super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
+ client_id=client_id, client_keyring_path=client_keyring_path,
+ client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+ cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
@property
def config_path(self):
else:
return keyring_path
- def run_shell(self, args, wait=True, stdin=None, check_status=True,
- omit_sudo=False):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
- stdin=stdin, check_status=check_status,
- omit_sudo=omit_sudo)
-
- def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- if isinstance(args, str):
- args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
- elif isinstance(args, list):
- cmdlist = args
- cmd = ''
- for i in cmdlist:
- cmd = cmd + i + ' '
- args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
- args.append(cmd)
-
- return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
- check_status=check_status, stdin=stdin,
- omit_sudo=False)
-
- def run_as_root(self, args, wait=True, stdin=None, check_status=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- if isinstance(args, str):
- args = 'sudo ' + args
- if isinstance(args, list):
- args.insert(0, 'sudo')
-
- return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
- check_status=check_status,
- omit_sudo=False)
-
- def testcmd(self, args, wait=True, stdin=None, omit_sudo=False):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
- omit_sudo=omit_sudo)
-
- def testcmd_as_user(self, args, user, wait=True, stdin=None):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
- check_status=False)
-
- def testcmd_as_root(self, args, wait=True, stdin=None):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.run_as_root(args, wait=wait, stdin=stdin,
- check_status=False)
-
def setupfs(self, name=None):
if name is None and self.fs is not None:
# Previous mount existed, reuse the old name
if asok_conf:
d = asok_conf.groups(1)[0]
break
- path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
- log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+ path = "{0}/client.{1}.*.asok".format(d, self.client_id)
return path
- def mount(self, mount_path=None, mount_fs_name=None, mount_options=[], **kwargs):
- self.setupfs(name=mount_fs_name)
+ def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
+ self.update_attrs(**kwargs)
+ self.assert_and_log_minimum_mount_details()
- log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format(
- id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
-
- self.client_remote.run(
- args=[
- 'mkdir',
- '--',
- self.mountpoint,
- ],
- timeout=(5*60),
- )
-
- if mount_path is None:
- mount_path = "/"
-
- opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id,
- conf=self.config_path)
-
- if mount_fs_name is not None:
- opts += ",mds_namespace={0}".format(mount_fs_name)
-
- for mount_opt in mount_options:
- opts += ",{0}".format(mount_opt)
-
- self.client_remote.run(
- args=[
- 'sudo',
- './bin/mount.ceph',
- ':{mount_path}'.format(mount_path=mount_path),
- self.mountpoint,
- '-v',
- '-o',
- opts
- ],
- timeout=(30*60),
- omit_sudo=False,
- )
+ if opt_use_ns:
+ self.using_namespace = True
+ self.setup_netns()
+ else:
+ self.using_namespace = False
+
+ if not self.cephfs_mntpt:
+ self.cephfs_mntpt = "/"
+ # TODO: don't call setupfs() from within mount()
+ if createfs:
+ self.setupfs(name=self.cephfs_name)
+
+ opts = 'norequire_active_mds'
+ if self.client_id:
+ opts += ',name=' + self.client_id
+ if self.client_keyring_path and self.client_id:
+ opts += ",secret=" + self.get_key_from_keyfile()
+ if self.config_path:
+ opts += ',conf=' + self.config_path
+ if self.cephfs_name:
+ opts += ",mds_namespace={0}".format(self.cephfs_name)
+ if mntopts:
+ opts += ',' + ','.join(mntopts)
+
+ stderr = StringIO()
+ try:
+ self.client_remote.run(args=['mkdir', '--', self.hostfs_mntpt],
+ timeout=(5*60), stderr=stderr)
+ except CommandFailedError:
+ if 'file exists' not in stderr.getvalue().lower():
+ raise
+
+ if self.cephfs_mntpt is None:
+ self.cephfs_mntpt = "/"
+ cmdargs = ['sudo']
+ if self.using_namespace:
+ cmdargs += ['nsenter',
+ '--net=/var/run/netns/{0}'.format(self.netns_name)]
+ cmdargs += ['./bin/mount.ceph', ':' + self.cephfs_mntpt,
+ self.hostfs_mntpt, '-v', '-o', opts]
+
+ mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
+ try:
+ self.client_remote.run(args=cmdargs, timeout=(30*60),
+ omit_sudo=False, stdout=mountcmd_stdout,
+ stderr=mountcmd_stderr)
+ except CommandFailedError as e:
+ if check_status:
+ raise
+ else:
+ return (e, mountcmd_stdout.getvalue(),
+ mountcmd_stderr.getvalue())
- self.client_remote.run(
- args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60))
+ stderr = StringIO()
+ try:
+ self.client_remote.run(args=['sudo', 'chmod', '1777',
+ self.hostfs_mntpt], stderr=stderr,
+ timeout=(5*60))
+ except CommandFailedError:
+ # the client does not have write permissions in cap it holds for
+ # the Ceph FS that was just mounted.
+ if 'permission denied' in stderr.getvalue().lower():
+ pass
self.mounted = True
+ def cleanup_netns(self):
+ if self.using_namespace:
+ super(type(self), self).cleanup_netns()
+
def _run_python(self, pyscript, py_version='python'):
"""
Override this to remove the daemon-helper prefix that is used otherwise
to make the process killable.
"""
return self.client_remote.run(args=[py_version, '-c', pyscript],
- wait=False)
+ wait=False, stdout=StringIO())
class LocalFuseMount(FuseMount):
- def __init__(self, ctx, test_dir, client_id):
- super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
+ def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
+ client_remote=None, hostfs_mntpt=None, cephfs_name=None,
+ cephfs_mntpt=None, brxnet=None):
+ super(LocalFuseMount, self).__init__(ctx=ctx, client_config=None,
+ test_dir=test_dir, client_id=client_id,
+ client_keyring_path=client_keyring_path,
+ client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+ cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
@property
def config_path(self):
# to avoid assumptions about daemons' pwd
return os.path.abspath("./client.{0}.keyring".format(self.client_id))
- def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
- stdin=stdin, check_status=check_status,
- omit_sudo=omit_sudo)
-
- def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- if isinstance(args, str):
- args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
- elif isinstance(args, list):
- cmdlist = args
- cmd = ''
- for i in cmdlist:
- cmd = cmd + i + ' '
- args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
- args.append(cmd)
-
- return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
- check_status=check_status, stdin=stdin,
- omit_sudo=False)
-
- def run_as_root(self, args, wait=True, stdin=None, check_status=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- if isinstance(args, str):
- args = 'sudo ' + args
- if isinstance(args, list):
- args.insert(0, 'sudo')
-
- return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
- check_status=check_status,
- omit_sudo=False)
-
- def testcmd(self, args, wait=True, stdin=None, omit_sudo=True):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
- omit_sudo=omit_sudo)
-
- def testcmd_as_user(self, args, user, wait=True, stdin=None):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
- check_status=False)
-
- def testcmd_as_root(self, args, wait=True, stdin=None):
- # FIXME maybe should add a pwd arg to teuthology.orchestra so that
- # the "cd foo && bar" shenanigans isn't needed to begin with and
- # then we wouldn't have to special case this
- return self.run_as_root(args, wait=wait, stdin=stdin,
- check_status=False)
-
def setupfs(self, name=None):
if name is None and self.fs is not None:
# Previous mount existed, reuse the old name
if asok_conf:
d = asok_conf.groups(1)[0]
break
- path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
- log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+ path = "{0}/client.{1}.*.asok".format(d, self.client_id)
return path
- def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
- if mountpoint is not None:
- self.mountpoint = mountpoint
- self.setupfs(name=mount_fs_name)
+ def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
+ self.update_attrs(**kwargs)
+ self.assert_and_log_minimum_mount_details()
+
+ if opt_use_ns:
+ self.using_namespace = True
+ self.setup_netns()
+ else:
+ self.using_namespace = False
+
+ # TODO: don't call setupfs() from within mount()
+ if createfs:
+ self.setupfs(name=self.cephfs_name)
- self.client_remote.run(args=['mkdir', '-p', self.mountpoint])
+ stderr = StringIO()
+ try:
+ self.client_remote.run(args=['mkdir', '-p', self.hostfs_mntpt],
+ stderr=stderr)
+ except CommandFailedError:
+ if 'file exists' not in stderr.getvalue().lower():
+ raise
def list_connections():
self.client_remote.run(
args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
check_status=False
)
- p = self.client_remote.run(
- args=["ls", "/sys/fs/fuse/connections"],
- check_status=False
- )
+
+ p = self.client_remote.run(args=["ls", "/sys/fs/fuse/connections"],
+ check_status=False, stdout=StringIO())
if p.exitstatus != 0:
log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
return []
- ls_str = six.ensure_str(p.stdout.getvalue().strip())
+ ls_str = p.stdout.getvalue().strip()
if ls_str:
return [int(n) for n in ls_str.split("\n")]
else:
pre_mount_conns = list_connections()
log.debug("Pre-mount connections: {0}".format(pre_mount_conns))
- prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
+ cmdargs = []
+ if self.using_namespace:
+ cmdargs = ['sudo', 'nsenter',
+ '--net=/var/run/netns/{0}'.format(self.netns_name),
+ '--setuid', str(os.getuid())]
+ cmdargs += [os.path.join(BIN_PREFIX, 'ceph-fuse'), self.hostfs_mntpt,
+ '-f']
+ if self.client_id is not None:
+ cmdargs += ["--id", self.client_id]
+ if self.client_keyring_path and self.client_id is not None:
+ cmdargs.extend(['-k', self.client_keyring_path])
+ if self.cephfs_name:
+ cmdargs += ["--client_fs=" + self.cephfs_name]
+ if self.cephfs_mntpt:
+ cmdargs += ["--client_mountpoint=" + self.cephfs_mntpt]
if os.getuid() != 0:
- prefix += ["--client_die_on_failed_dentry_invalidate=false"]
-
- if mount_path is not None:
- prefix += ["--client_mountpoint={0}".format(mount_path)]
-
- if mount_fs_name is not None:
- prefix += ["--client_fs={0}".format(mount_fs_name)]
-
- prefix += mount_options;
+ cmdargs += ["--client_die_on_failed_dentry_invalidate=false"]
+ if mntopts:
+ cmdargs += mntopts
- self.fuse_daemon = self.client_remote.run(args=
- prefix + [
- "-f",
- "--name",
- "client.{0}".format(self.client_id),
- self.mountpoint
- ], wait=False)
-
- log.debug("Mounting client.{0} with pid {1}".format(self.client_id, self.fuse_daemon.subproc.pid))
+ mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
+ self.fuse_daemon = self.client_remote.run(args=cmdargs, wait=False,
+ omit_sudo=False, stdout=mountcmd_stdout, stderr=mountcmd_stderr)
+ self._set_fuse_daemon_pid(check_status)
+ log.debug("Mounting client.{0} with pid "
+ "{1}".format(self.client_id, self.fuse_daemon.subproc.pid))
# Wait for the connection reference to appear in /sys
waited = 0
if self.fuse_daemon.finished:
# Did mount fail? Raise the CommandFailedError instead of
# hitting the "failed to populate /sys/" timeout
- self.fuse_daemon.wait()
+ try:
+ self.fuse_daemon.wait()
+ except CommandFailedError as e:
+ if check_status:
+ raise
+ else:
+ return (e, mountcmd_stdout.getvalue(),
+ mountcmd_stderr.getvalue())
time.sleep(1)
waited += 1
if waited > 30:
self.mounted = True
+ def _set_fuse_daemon_pid(self, check_status):
+ # NOTE: When a command <args> is launched with sudo, two processes are
+ # launched, one with sudo in <args> and other without. Make sure we
+ # get the PID of latter one.
+ try:
+ with safe_while(sleep=1, tries=15) as proceed:
+ while proceed():
+ try:
+ sock = self.find_admin_socket()
+ except (RuntimeError, CommandFailedError):
+ continue
+
+ self.fuse_daemon.fuse_pid = int(re.match(".*\.(\d+)\.asok$",
+ sock).group(1))
+ break
+ except MaxWhileTries:
+ if check_status:
+ raise
+ else:
+ pass
+
+ def cleanup_netns(self):
+ if self.using_namespace:
+ super(type(self), self).cleanup_netns()
+
def _run_python(self, pyscript, py_version='python'):
"""
Override this to remove the daemon-helper prefix that is used otherwise
to make the process killable.
"""
return self.client_remote.run(args=[py_version, '-c', pyscript],
- wait=False)
+ wait=False, stdout=StringIO())
# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
# the same name.
proc = self.controller.run(args=args, wait=False, stdout=StringIO())
return proc
- def raw_cluster_cmd(self, *args, **kwargs):
+ def run_cluster_cmd(self, **kwargs):
+ """
+ Run a Ceph command and the object representing the process for the
+ command.
+
+ Accepts arguments same as teuthology.orchestra.remote.run().
+ """
+ kwargs['args'] = [os.path.join(BIN_PREFIX,'ceph')]+list(kwargs['args'])
+ return self.controller.run(**kwargs)
+
+ def raw_cluster_cmd(self, *args, **kwargs) -> str:
"""
args like ["osd", "dump"}
return stdout string
"""
- proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
- list(args), **kwargs)
- return six.ensure_str(proc.stdout.getvalue())
+ kwargs['args'] = args
+ if kwargs.get('stdout') is None:
+ kwargs['stdout'] = StringIO()
+ return self.run_cluster_cmd(**kwargs).stdout.getvalue()
def raw_cluster_cmd_result(self, *args, **kwargs):
"""
like raw_cluster_cmd but don't check status, just return rc
"""
- kwargs['check_status'] = False
- proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
- list(args), **kwargs)
- return proc.exitstatus
+ kwargs['args'], kwargs['check_status'] = args, False
+ return self.run_cluster_cmd(**kwargs).exitstatus
+
+ def admin_socket(self, daemon_type, daemon_id, command, check_status=True,
+ timeout=None, stdout=None):
+ if stdout is None:
+ stdout = StringIO()
- def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
return self.controller.run(
- args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
- check_status=check_status,
- timeout=timeout
- )
+ args=[os.path.join(BIN_PREFIX, "ceph"), "daemon",
+ "{0}.{1}".format(daemon_type, daemon_id)] + command,
+ check_status=check_status, timeout=timeout, stdout=stdout)
def get_mon_socks(self):
"""
class LocalCephCluster(CephCluster):
def __init__(self, ctx):
- # Deliberately skip calling parent constructor
+ # Deliberately skip calling CephCluster constructor
self._ctx = ctx
self.mon_manager = LocalCephManager()
self._conf = defaultdict(dict)
class LocalMDSCluster(LocalCephCluster, MDSCluster):
def __init__(self, ctx):
- super(LocalMDSCluster, self).__init__(ctx)
+ LocalCephCluster.__init__(self, ctx)
+ # Deliberately skip calling MDSCluster constructor
+ self._mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
+ log.debug("Discovered MDS IDs: {0}".format(self._mds_ids))
+ self._mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+
+ @property
+ def mds_ids(self):
+ return self._mds_ids
- self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
- self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
+ @property
+ def mds_daemons(self):
+ return self._mds_daemons
def clear_firewall(self):
# FIXME: unimplemented
def newfs(self, name='cephfs', create=True):
return LocalFilesystem(self._ctx, name=name, create=create)
+ def delete_all_filesystems(self):
+ """
+ Remove all filesystems that exist, and any pools in use by them.
+ """
+ for fs in self.status().get_filesystems():
+ LocalFilesystem(ctx=self._ctx, fscid=fs['id']).destroy()
+
class LocalMgrCluster(LocalCephCluster, MgrCluster):
def __init__(self, ctx):
self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
-class LocalFilesystem(Filesystem, LocalMDSCluster):
- def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None):
- # Deliberately skip calling parent constructor
- self._ctx = ctx
+class LocalFilesystem(LocalMDSCluster, Filesystem):
+ def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
+ # Deliberately skip calling Filesystem constructor
+ LocalMDSCluster.__init__(self, ctx)
self.id = None
self.name = name
- self.ec_profile = ec_profile
self.metadata_pool_name = None
self.metadata_overlay = False
self.data_pool_name = None
self.data_pools = None
- self.fs_config = None
-
- # Hack: cheeky inspection of ceph.conf to see what MDSs exist
- self.mds_ids = set()
- for line in open("ceph.conf").readlines():
- match = re.match("^\[mds\.(.+)\]$", line)
- if match:
- self.mds_ids.add(match.group(1))
-
- if not self.mds_ids:
- raise RuntimeError("No MDSs found in ceph.conf!")
-
- self.mds_ids = list(self.mds_ids)
-
- log.debug("Discovered MDS IDs: {0}".format(self.mds_ids))
+ self.fs_config = fs_config
+ self.ec_profile = fs_config.get('ec_profile')
self.mon_manager = LocalCephManager()
- self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
-
self.client_remote = LocalRemote()
self._conf = defaultdict(dict)
raise NotImplementedError()
-class InteractiveFailureResult(unittest.TextTestResult):
- """
- Specialization that implements interactive-on-error style
- behavior.
- """
- def addFailure(self, test, err):
- super(InteractiveFailureResult, self).addFailure(test, err)
- log.error(self._exc_info_to_string(err, test))
- log.error("Failure in test '{0}', going interactive".format(
- self.getDescription(test)
- ))
- interactive.task(ctx=None, config=None)
+class LocalCluster(object):
+ def __init__(self, rolename="placeholder"):
+ self.remotes = {
+ LocalRemote(): [rolename]
+ }
- def addError(self, test, err):
- super(InteractiveFailureResult, self).addError(test, err)
- log.error(self._exc_info_to_string(err, test))
- log.error("Error in test '{0}', going interactive".format(
- self.getDescription(test)
- ))
- interactive.task(ctx=None, config=None)
+ def only(self, requested):
+ return self.__class__(rolename=requested)
+
+
+class LocalContext(object):
+ def __init__(self):
+ self.config = {}
+ self.teuthology_config = teuth_config
+ self.cluster = LocalCluster()
+ self.daemons = DaemonGroup()
+
+ # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
+ # tests that want to look these up via ctx can do so.
+ # Inspect ceph.conf to see what roles exist
+ for conf_line in open("ceph.conf").readlines():
+ for svc_type in ["mon", "osd", "mds", "mgr"]:
+ prefixed_type = "ceph." + svc_type
+ if prefixed_type not in self.daemons.daemons:
+ self.daemons.daemons[prefixed_type] = {}
+ match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
+ if match:
+ svc_id = match.group(1)
+ self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
+
+ def __del__(self):
+ test_path = self.teuthology_config['test_path']
+ # opt_create_cluster_only does not create the test path
+ if test_path:
+ shutil.rmtree(test_path)
+
+
+#########################################
+#
+# stuff necessary for launching tests...
+#
+#########################################
def enumerate_methods(s):
max_required_mgr, require_memstore
-class LocalCluster(object):
- def __init__(self, rolename="placeholder"):
- self.remotes = {
- LocalRemote(): [rolename]
- }
-
- def only(self, requested):
- return self.__class__(rolename=requested)
-
-
-class LocalContext(object):
+class LogRotate():
def __init__(self):
- self.config = {}
- self.teuthology_config = teuth_config
- self.cluster = LocalCluster()
- self.daemons = DaemonGroup()
+ self.conf_file_path = os.path.join(os.getcwd(), 'logrotate.conf')
+ self.state_file_path = os.path.join(os.getcwd(), 'logrotate.state')
- # Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
- # tests that want to look these up via ctx can do so.
- # Inspect ceph.conf to see what roles exist
- for conf_line in open("ceph.conf").readlines():
- for svc_type in ["mon", "osd", "mds", "mgr"]:
- prefixed_type = "ceph." + svc_type
- if prefixed_type not in self.daemons.daemons:
- self.daemons.daemons[prefixed_type] = {}
- match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
- if match:
- svc_id = match.group(1)
- self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
+ def run_logrotate(self):
+ remote.run(args=['logrotate', '-f', self.conf_file_path, '-s',
+ self.state_file_path, '--verbose'])
- def __del__(self):
- test_path = self.teuthology_config['test_path']
- # opt_create_cluster_only does not create the test path
- if test_path:
- shutil.rmtree(test_path)
def teardown_cluster():
log.info('\ntearing down the cluster...')
remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
remote.run(args=['rm', '-rf', './dev', './out'])
+
def clear_old_log():
from os import stat
init_log()
log.debug('logging in a fresh file now...')
+
+class LogStream(object):
+ def __init__(self):
+ self.buffer = ""
+ self.omit_result_lines = False
+
+ def _del_result_lines(self):
+ """
+ Don't let unittest.TextTestRunner print "Ran X tests in Ys",
+ vstart_runner.py will do it for itself since it runs tests in a
+ testsuite one by one.
+ """
+ if self.omit_result_lines:
+ self.buffer = re.sub('-'*70+'\nran [0-9]* test in [0-9.]*s\n*',
+ '', self.buffer, flags=re.I)
+ self.buffer = re.sub('failed \(failures=[0-9]*\)\n', '', self.buffer,
+ flags=re.I)
+ self.buffer = self.buffer.replace('OK\n', '')
+
+ def write(self, data):
+ self.buffer += data
+ if self.buffer.count("\n") > 5:
+ self._write()
+
+ def _write(self):
+ if opt_rotate_logs:
+ self._del_result_lines()
+ if self.buffer == '':
+ return
+
+ lines = self.buffer.split("\n")
+ for line in lines:
+ # sys.stderr.write(line + "\n")
+ log.info(line)
+ self.buffer = ''
+
+ def flush(self):
+ pass
+
+ def __del__(self):
+ self._write()
+
+
+class InteractiveFailureResult(unittest.TextTestResult):
+ """
+ Specialization that implements interactive-on-error style
+ behavior.
+ """
+ def addFailure(self, test, err):
+ super(InteractiveFailureResult, self).addFailure(test, err)
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Failure in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=None, config=None)
+
+ def addError(self, test, err):
+ super(InteractiveFailureResult, self).addError(test, err)
+ log.error(self._exc_info_to_string(err, test))
+ log.error("Error in test '{0}', going interactive".format(
+ self.getDescription(test)
+ ))
+ interactive.task(ctx=None, config=None)
+
+
+# XXX: class we require would be inherited from this one and one of
+# InteractiveFailureResult and unittestunittest.TextTestResult.
+class LoggingResultTemplate(object):
+ fail_on_skip = False
+
+ def startTest(self, test):
+ log.info("Starting test: {0}".format(self.getDescription(test)))
+ test.started_at = datetime.datetime.utcnow()
+ return super(LoggingResultTemplate, self).startTest(test)
+
+ def stopTest(self, test):
+ log.info("Stopped test: {0} in {1}s".format(
+ self.getDescription(test),
+ (datetime.datetime.utcnow() - test.started_at).total_seconds()
+ ))
+
+ def addSkip(self, test, reason):
+ if LoggingResultTemplate.fail_on_skip:
+ # Don't just call addFailure because that requires a traceback
+ self.failures.append((test, reason))
+ else:
+ super(LoggingResultTemplate, self).addSkip(test, reason)
+
+
+def launch_tests(overall_suite):
+ if opt_rotate_logs or not opt_exit_on_test_failure:
+ return launch_individually(overall_suite)
+ else:
+ return launch_entire_suite(overall_suite)
+
+
+def get_logging_result_class():
+ result_class = InteractiveFailureResult if opt_interactive_on_error else \
+ unittest.TextTestResult
+ return type('', (LoggingResultTemplate, result_class), {})
+
+
+def launch_individually(overall_suite):
+ no_of_tests_execed = 0
+ no_of_tests_failed, no_of_tests_execed = 0, 0
+ LoggingResult = get_logging_result_class()
+ stream = LogStream()
+ stream.omit_result_lines = True
+ if opt_rotate_logs:
+ logrotate = LogRotate()
+
+ started_at = datetime.datetime.utcnow()
+ for suite_, case in enumerate_methods(overall_suite):
+ # don't run logrotate beforehand since some ceph daemons might be
+ # down and pre/post-rotate scripts in logrotate.conf might fail.
+ if opt_rotate_logs:
+ logrotate.run_logrotate()
+
+ result = unittest.TextTestRunner(stream=stream,
+ resultclass=LoggingResult,
+ verbosity=2, failfast=True).run(case)
+
+ if not result.wasSuccessful():
+ if opt_exit_on_test_failure:
+ break
+ else:
+ no_of_tests_failed += 1
+
+ no_of_tests_execed += 1
+ time_elapsed = (datetime.datetime.utcnow() - started_at).total_seconds()
+
+ if result.wasSuccessful():
+ log.info('')
+ log.info('-'*70)
+ log.info(f'Ran {no_of_tests_execed} tests in {time_elapsed}s')
+ if no_of_tests_failed > 0:
+ log.info(f'{no_of_tests_failed} tests failed')
+ log.info('')
+ log.info('OK')
+
+ return result
+
+
+def launch_entire_suite(overall_suite):
+ LoggingResult = get_logging_result_class()
+
+ testrunner = unittest.TextTestRunner(stream=LogStream(),
+ resultclass=LoggingResult,
+ verbosity=2, failfast=True)
+ return testrunner.run(overall_suite)
+
+
def exec_test():
# Parse arguments
+ global opt_interactive_on_error
opt_interactive_on_error = False
opt_create_cluster = False
opt_create_cluster_only = False
global opt_log_ps_output
opt_log_ps_output = False
use_kernel_client = False
+ global opt_use_ns
+ opt_use_ns = False
+ opt_brxnet= None
opt_verbose = True
+ global opt_rotate_logs
+ opt_rotate_logs = False
+ global opt_exit_on_test_failure
+ opt_exit_on_test_failure = True
args = sys.argv[1:]
flags = [a for a in args if a.startswith("-")]
clear_old_log()
elif f == "--kclient":
use_kernel_client = True
+ elif f == '--usens':
+ opt_use_ns = True
+ elif '--brxnet' in f:
+ if re.search(r'=[0-9./]+', f) is None:
+ log.error("--brxnet=<ip/mask> option needs one argument: '{0}'".format(f))
+ sys.exit(-1)
+ opt_brxnet=f.split('=')[1]
+ try:
+ IP(opt_brxnet)
+ if IP(opt_brxnet).iptype() == 'PUBLIC':
+ raise RuntimeError('is public')
+ except Exception as e:
+ log.error("Invalid ip '{0}' {1}".format(opt_brxnet, e))
+ sys.exit(-1)
elif '--no-verbose' == f:
opt_verbose = False
+ elif f == '--rotate-logs':
+ opt_rotate_logs = True
+ elif f == '--run-all-tests':
+ opt_exit_on_test_failure = False
+ elif f == '--debug':
+ log.setLevel(logging.DEBUG)
else:
log.error("Unknown option '{0}'".format(f))
sys.exit(-1)
# Help developers by stopping up-front if their tree isn't built enough for all the
# tools that the tests might want to use (add more here if needed)
require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
- "cephfs-table-tool", "ceph-fuse", "rados"]
+ "cephfs-table-tool", "ceph-fuse", "rados", "cephfs-meta-injection"]
missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
if missing_binaries and not opt_ignore_missing_binaries:
log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
global remote
remote = LocalRemote()
+ CephFSMount.cleanup_stale_netnses_and_bridge(remote)
+
# Tolerate no MDSs or clients running at start
- ps_txt = six.ensure_str(remote.run(
- args=["ps", "-u"+str(os.getuid())],
- stdout=StringIO()
- ).stdout.getvalue().strip())
+ ps_txt = remote.run(args=["ps", "-u"+str(os.getuid())],
+ stdout=StringIO()).stdout.getvalue().strip()
lines = ps_txt.split("\n")[1:]
for line in lines:
if 'ceph-fuse' in line or 'ceph-mds' in line:
if opt_create_cluster_only:
return
+ if opt_use_ns and mon_in_localhost() and not opt_create_cluster:
+ raise RuntimeError("cluster is on localhost; '--usens' option is incompatible. Or you can pass an extra '--create' option to create a new cluster without localhost!")
+
# List of client mounts, sufficient to run the selected tests
clients = [i.__str__() for i in range(0, max_required_clients)]
p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
"osd", "allow rw",
"mds", "allow",
- "mon", "allow r"])
+ "mon", "allow r"], stdout=StringIO())
- open("./keyring", "ab").write(p.stdout.getvalue())
+ open("./keyring", "at").write(p.stdout.getvalue())
if use_kernel_client:
- mount = LocalKernelMount(ctx, test_dir, client_id)
+ mount = LocalKernelMount(ctx=ctx, test_dir=test_dir,
+ client_id=client_id, brxnet=opt_brxnet)
else:
- mount = LocalFuseMount(ctx, test_dir, client_id)
+ mount = LocalFuseMount(ctx=ctx, test_dir=test_dir,
+ client_id=client_id, brxnet=opt_brxnet)
mounts.append(mount)
- if os.path.exists(mount.mountpoint):
+ if os.path.exists(mount.hostfs_mntpt):
if mount.is_mounted():
- log.warning("unmounting {0}".format(mount.mountpoint))
+ log.warning("unmounting {0}".format(mount.hostfs_mntpt))
mount.umount_wait()
else:
- os.rmdir(mount.mountpoint)
+ os.rmdir(mount.hostfs_mntpt)
from tasks.cephfs_test_runner import DecoratingLoader
- class LogStream(object):
- def __init__(self):
- self.buffer = ""
-
- def write(self, data):
- self.buffer += data
- if "\n" in self.buffer:
- lines = self.buffer.split("\n")
- for line in lines[:-1]:
- pass
- # sys.stderr.write(line + "\n")
- log.info(line)
- self.buffer = lines[-1]
-
- def flush(self):
- pass
-
decorating_loader = DecoratingLoader({
"ctx": ctx,
"mounts": mounts,
for s, method in victims:
s._tests.remove(method)
- if opt_interactive_on_error:
- result_class = InteractiveFailureResult
- else:
- result_class = unittest.TextTestResult
- fail_on_skip = False
-
- class LoggingResult(result_class):
- def startTest(self, test):
- log.info("Starting test: {0}".format(self.getDescription(test)))
- test.started_at = datetime.datetime.utcnow()
- return super(LoggingResult, self).startTest(test)
-
- def stopTest(self, test):
- log.info("Stopped test: {0} in {1}s".format(
- self.getDescription(test),
- (datetime.datetime.utcnow() - test.started_at).total_seconds()
- ))
-
- def addSkip(self, test, reason):
- if fail_on_skip:
- # Don't just call addFailure because that requires a traceback
- self.failures.append((test, reason))
- else:
- super(LoggingResult, self).addSkip(test, reason)
-
- # Execute!
- result = unittest.TextTestRunner(
- stream=LogStream(),
- resultclass=LoggingResult,
- verbosity=2,
- failfast=True).run(overall_suite)
+ overall_suite = load_tests(modules, loader.TestLoader())
+ result = launch_tests(overall_suite)
+ CephFSMount.cleanup_stale_netnses_and_bridge(remote)
if opt_teardown_cluster:
teardown_cluster()
if not result.wasSuccessful():
- result.printErrors() # duplicate output at end for convenience
+ # no point in duplicating if we can have multiple failures in same
+ # run.
+ if opt_exit_on_test_failure:
+ result.printErrors() # duplicate output at end for convenience
bad_tests = []
for test, error in result.errors: