Alternative usage:
# Alternatively, if you use different paths, specify them as follows:
- LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.2 python ~/git/ceph/qa/tasks/vstart_runner.py
+ LD_LIBRARY_PATH=`pwd`/lib PYTHONPATH=~/git/teuthology:~/git/ceph/qa:`pwd`/../src/pybind:`pwd`/lib/cython_modules/lib.3 python ~/git/ceph/qa/tasks/vstart_runner.py
# If you wish to drop to a python shell on failures, use --interactive:
python ~/git/ceph/qa/tasks/vstart_runner.py --interactive
# If you wish to run a named test case, pass it as an argument:
python ~/git/ceph/qa/tasks/vstart_runner.py tasks.cephfs.test_data_scan
+ # Also, you can create the cluster once and then run named test cases against it:
+ python ~/git/ceph/qa/tasks/vstart_runner.py --create-cluster-only
+ python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_health
+ python ~/git/ceph/qa/tasks/vstart_runner.py tasks.mgr.dashboard.test_rgw
+
"""
-from StringIO import StringIO
+from six import StringIO
+from io import BytesIO
from collections import defaultdict
import getpass
import signal
import re
import os
import time
-import json
import sys
import errno
from unittest import suite, loader
import unittest
import platform
+from teuthology import misc
from teuthology.orchestra.run import Raw, quote
from teuthology.orchestra.daemon import DaemonGroup
from teuthology.config import config as teuth_config
-
+import six
import logging
-log = logging.getLogger(__name__)
+def init_log():
+ global log
+ if log is not None:
+ del log
+ log = logging.getLogger(__name__)
+
+ global logpath
+ logpath = './vstart_runner.log'
-handler = logging.FileHandler("./vstart_runner.log")
-formatter = logging.Formatter(
- fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
- datefmt='%Y-%m-%dT%H:%M:%S')
-handler.setFormatter(formatter)
-log.addHandler(handler)
-log.setLevel(logging.INFO)
+ handler = logging.FileHandler(logpath)
+ formatter = logging.Formatter(
+ fmt=u'%(asctime)s.%(msecs)03d %(levelname)s:%(name)s:%(message)s',
+ datefmt='%Y-%m-%dT%H:%M:%S')
+ handler.setFormatter(formatter)
+ log.addHandler(handler)
+ log.setLevel(logging.INFO)
+
+log = None
+init_log()
def respawn_in_path(lib_path, python_paths):
else:
lib_path_var = "LD_LIBRARY_PATH"
- py_binary = os.environ.get("PYTHON", "python")
+ py_binary = os.environ.get("PYTHON", sys.executable)
if lib_path_var in os.environ:
if lib_path not in os.environ[lib_path_var]:
# A list of candidate paths for each package we need
guesses = [
["~/git/teuthology", "~/scm/teuthology", "~/teuthology"],
- ["lib/cython_modules/lib.2"],
+ ["lib/cython_modules/lib.3"],
["../src/pybind"],
]
python_paths.append(g_exp)
ld_path = os.path.join(os.getcwd(), "lib/")
- print "Using guessed paths {0} {1}".format(ld_path, python_paths)
+ print("Using guessed paths {0} {1}".format(ld_path, python_paths))
respawn_in_path(ld_path, python_paths)
from teuthology.exceptions import CommandFailedError
from tasks.ceph_manager import CephManager
from tasks.cephfs.fuse_mount import FuseMount
+ from tasks.cephfs.kernel_mount import KernelMount
from tasks.cephfs.filesystem import Filesystem, MDSCluster, CephCluster
- from mgr.mgr_test_case import MgrCluster
+ from tasks.mgr.mgr_test_case import MgrCluster
from teuthology.contextutil import MaxWhileTries
from teuthology.task import interactive
except ImportError:
SRC_PREFIX = "./"
+def rm_nonascii_chars(var):
+ var = var.replace('\xe2\x80\x98', '\'')
+ var = var.replace('\xe2\x80\x99', '\'')
+ return var
+
class LocalRemoteProcess(object):
def __init__(self, args, subproc, check_status, stdout, stderr):
self.args = args
self.subproc = subproc
- if stdout is None:
- self.stdout = StringIO()
- else:
- self.stdout = stdout
-
- if stderr is None:
- self.stderr = StringIO()
- else:
- self.stderr = stderr
+ self.stdout = stdout or BytesIO()
+ self.stderr = stderr or BytesIO()
self.check_status = check_status
self.exitstatus = self.returncode = None
if self.finished:
# Avoid calling communicate() on a dead process because it'll
# give you stick about std* already being closed
- if self.exitstatus != 0:
+ if self.check_status and self.exitstatus != 0:
raise CommandFailedError(self.args, self.exitstatus)
else:
return
out, err = self.subproc.communicate()
+ out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
self.stdout.write(out)
self.stderr.write(err)
self.exitstatus = self.returncode = self.subproc.returncode
if self.exitstatus != 0:
- sys.stderr.write(out)
- sys.stderr.write(err)
+ sys.stderr.write(six.ensure_str(out))
+ sys.stderr.write(six.ensure_str(err))
if self.check_status and self.exitstatus != 0:
raise CommandFailedError(self.args, self.exitstatus)
shutil.copy(path, tmpfile)
return tmpfile
+ # XXX: This method ignores the error raised when src and dst are
+ # holding same path. For teuthology, same path still represents
+ # different locations as they lie on different machines.
def put_file(self, src, dst, sudo=False):
- shutil.copy(src, dst)
+ if sys.version_info.major < 3:
+ exception = shutil.Error
+ elif sys.version_info.major >= 3:
+ exception = shutil.SameFileError
+
+ try:
+ shutil.copy(src, dst)
+ except exception as e:
+ if sys.version_info.major < 3 and e.message.find('are the same '
+ 'file') != -1:
+ return
+ raise e
+
+ # XXX: accepts only two arugments to maintain compatibility with
+ # teuthology's mkdtemp.
+ def mkdtemp(self, suffix='', parentdir=None):
+ from tempfile import mkdtemp
+
+ # XXX: prefix had to be set without that this method failed against
+ # Python2.7 -
+ # > /usr/lib64/python2.7/tempfile.py(337)mkdtemp()
+ # -> file = _os.path.join(dir, prefix + name + suffix)
+ # (Pdb) p prefix
+ # None
+ return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
+
+ def mktemp(self, suffix=None, parentdir=None):
+ """
+ Make a remote temporary file
- def run(self, args, check_status=True, wait=True,
- stdout=None, stderr=None, cwd=None, stdin=None,
- logger=None, label=None, env=None):
- log.info("run args={0}".format(args))
+ Returns: the path of the temp file created.
+ """
+ from tempfile import mktemp
+ return mktemp(suffix=suffix, dir=parentdir)
+
+ def _perform_checks_and_return_list_of_args(self, args, omit_sudo):
+ # Since Python's shell simulation can only work when commands are
+ # provided as a list of argumensts...
+ if isinstance(args, str) or isinstance(args, six.text_type):
+ args = args.split()
+
+ # We'll let sudo be a part of command even omit flag says otherwise in
+ # cases of commands which can normally be ran only by root.
+ try:
+ if args[args.index('sudo') + 1] in ['-u', 'passwd', 'chown']:
+ omit_sudo = False
+ except ValueError:
+ pass
- # We don't need no stinkin' sudo
- args = [a for a in args if a != "sudo"]
+ # Quotes wrapping a command argument don't work fine in Python's shell
+ # simulation if the arguments contains spaces too. E.g. '"ls"' is OK
+ # but "ls /" isn't.
+ errmsg = "Don't surround arguments commands by quotes if it " + \
+ "contains spaces.\nargs - %s" % (args)
+ for arg in args:
+ if isinstance(arg, Raw):
+ continue
+
+ if arg and (arg[0] in ['"', "'"] or arg[-1] in ['"', "'"]) and \
+ (arg.find(' ') != -1 and 0 < arg.find(' ') < len(arg) - 1):
+ raise RuntimeError(errmsg)
+
+ # ['sudo', '-u', 'user', '-s', 'path-to-shell', '-c', 'ls', 'a']
+ # and ['sudo', '-u', user, '-s', path_to_shell, '-c', 'ls a'] are
+ # treated differently by Python's shell simulation. Only latter has
+ # the desired effect.
+ errmsg = 'The entire command to executed as other user should be a ' +\
+ 'single argument.\nargs - %s' % (args)
+ if 'sudo' in args and '-u' in args and '-c' in args and \
+ args.count('-c') == 1:
+ if args.index('-c') != len(args) - 2 and \
+ args[args.index('-c') + 2].find('-') == -1:
+ raise RuntimeError(errmsg)
+
+ if omit_sudo:
+ args = [a for a in args if a != "sudo"]
+
+ return args
+
+ # Wrapper to keep the interface exactly same as that of
+ # teuthology.remote.run.
+ def run(self, **kwargs):
+ return self._do_run(**kwargs)
+
+ def _do_run(self, args, check_status=True, wait=True, stdout=None,
+ stderr=None, cwd=None, stdin=None, logger=None, label=None,
+ env=None, timeout=None, omit_sudo=False):
+ args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
# We have to use shell=True if any run.Raw was present, e.g. &&
shell = any([a for a in args if isinstance(a, Raw)])
- if shell:
- filtered = []
- i = 0
- while i < len(args):
- if args[i] == 'adjust-ulimits':
- i += 1
- elif args[i] == 'ceph-coverage':
- i += 2
- elif args[i] == 'timeout':
- i += 2
- else:
- filtered.append(args[i])
- i += 1
-
- args = quote(filtered)
- log.info("Running {0}".format(args))
+ # Filter out helper tools that don't exist in a vstart environment
+ args = [a for a in args if a not in ('adjust-ulimits',
+ 'ceph-coverage')]
+
+ # Adjust binary path prefix if given a bare program name
+ if "/" not in args[0]:
+ # If they asked for a bare binary name, and it exists
+ # in our built tree, use the one there.
+ local_bin = os.path.join(BIN_PREFIX, args[0])
+ if os.path.exists(local_bin):
+ args = [local_bin] + args[1:]
+ else:
+ log.debug("'{0}' is not a binary in the Ceph build dir".format(
+ args[0]
+ ))
- subproc = subprocess.Popen(args,
+ log.info("Running {0}".format(args))
+
+ if shell:
+ subproc = subprocess.Popen(quote(args),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.PIPE,
cwd=cwd,
shell=True)
else:
- log.info("Running {0}".format(args))
-
+ # Sanity check that we've got a list of strings
for arg in args:
- if not isinstance(arg, basestring):
+ if not isinstance(arg, six.string_types):
raise RuntimeError("Oops, can't handle arg {0} type {1}".format(
arg, arg.__class__
))
env=env)
if stdin:
- if not isinstance(stdin, basestring):
+ if not isinstance(stdin, six.string_types):
raise RuntimeError("Can't handle non-string stdins on a vstart cluster")
# Hack: writing to stdin is not deadlock-safe, but it "always" works
return proc
+ # XXX: for compatibility keep this method same teuthology.orchestra.remote.sh
+ def sh(self, script, **kwargs):
+ """
+ Shortcut for run method.
+
+ Usage:
+ my_name = remote.sh('whoami')
+ remote_date = remote.sh('date')
+ """
+ if 'stdout' not in kwargs:
+ kwargs['stdout'] = StringIO()
+ if 'args' not in kwargs:
+ kwargs['args'] = script
+ proc = self.run(**kwargs)
+ return proc.stdout.getvalue()
+
class LocalDaemon(object):
def __init__(self, daemon_type, daemon_id):
def running(self):
return self._get_pid() is not None
+ def check_status(self):
+ if self.proc:
+ return self.proc.poll()
+
def _get_pid(self):
"""
Return PID as an integer or None if not found
"""
- ps_txt = self.controller.run(
+ ps_txt = six.ensure_str(self.controller.run(
args=["ps", "ww", "-u"+str(os.getuid())]
- ).stdout.getvalue().strip()
+ ).stdout.getvalue()).strip()
lines = ps_txt.split("\n")[1:]
for line in lines:
if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
log.info("Found ps line for daemon: {0}".format(line))
return int(line.split()[0])
- log.info("No match for {0} {1}: {2}".format(
- self.daemon_type, self.daemon_id, ps_txt
- ))
- return None
+ if opt_log_ps_output:
+ log.info("No match for {0} {1}: {2}".format(
+ self.daemon_type, self.daemon_id, ps_txt))
+ else:
+ log.info("No match for {0} {1}".format(self.daemon_type,
+ self.daemon_id))
+ return None
def wait(self, timeout):
waited = 0
pid = self._get_pid()
log.info("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
- os.kill(pid, signal.SIGKILL)
+ os.kill(pid, signal.SIGTERM)
waited = 0
while pid is not None:
if new_pid is not None and new_pid != pid:
log.info("Killing new PID {0}".format(new_pid))
pid = new_pid
- os.kill(pid, signal.SIGKILL)
+ os.kill(pid, signal.SIGTERM)
if new_pid is None:
break
if self._get_pid() is not None:
self.stop()
- self.proc = self.controller.run([os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)), "-i", self.daemon_id])
+ self.proc = self.controller.run(args=[
+ os.path.join(BIN_PREFIX, "./ceph-{0}".format(self.daemon_type)),
+ "-i", self.daemon_id])
+
+ def signal(self, sig, silent=False):
+ if not self.running():
+ raise RuntimeError("Can't send signal to non-running daemon")
+
+ os.kill(self._get_pid(), sig)
+ if not silent:
+ log.info("Sent signal {0} to {1}.{2}".format(sig, self.daemon_type, self.daemon_id))
def safe_kill(pid):
raise
-class LocalFuseMount(FuseMount):
- def __init__(self, test_dir, client_id):
- super(LocalFuseMount, self).__init__(None, test_dir, client_id, LocalRemote())
+class LocalKernelMount(KernelMount):
+ def __init__(self, ctx, test_dir, client_id):
+ super(LocalKernelMount, self).__init__(ctx, test_dir, client_id, LocalRemote(), None, None, None)
@property
def config_path(self):
def get_keyring_path(self):
# This is going to end up in a config file, so use an absolute path
# to avoid assumptions about daemons' pwd
- return os.path.abspath("./client.{0}.keyring".format(self.client_id))
+ keyring_path = "./client.{0}.keyring".format(self.client_id)
+ try:
+ os.stat(keyring_path)
+ except OSError:
+ return os.path.join(os.getcwd(), 'keyring')
+ else:
+ return keyring_path
- def run_shell(self, args, wait=True):
+ def run_shell(self, args, wait=True, stdin=None, check_status=True,
+ omit_sudo=False):
# FIXME maybe should add a pwd arg to teuthology.orchestra so that
# the "cd foo && bar" shenanigans isn't needed to begin with and
# then we wouldn't have to special case this
- return self.client_remote.run(
- args, wait=wait, cwd=self.mountpoint
- )
+ return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
+ stdin=stdin, check_status=check_status,
+ omit_sudo=omit_sudo)
+
+ def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ if isinstance(args, str):
+ args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
+ elif isinstance(args, list):
+ cmdlist = args
+ cmd = ''
+ for i in cmdlist:
+ cmd = cmd + i + ' '
+ args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
+ args.append(cmd)
+
+ return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
+ check_status=check_status, stdin=stdin,
+ omit_sudo=False)
+
+ def run_as_root(self, args, wait=True, stdin=None, check_status=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ if isinstance(args, str):
+ args = 'sudo ' + args
+ if isinstance(args, list):
+ args.insert(0, 'sudo')
+
+ return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
+ check_status=check_status,
+ omit_sudo=False)
+
+ def testcmd(self, args, wait=True, stdin=None, omit_sudo=False):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
+ omit_sudo=omit_sudo)
+
+ def testcmd_as_user(self, args, user, wait=True, stdin=None):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
+ check_status=False)
+
+ def testcmd_as_root(self, args, wait=True, stdin=None):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.run_as_root(args, wait=wait, stdin=stdin,
+ check_status=False)
+
+ def setupfs(self, name=None):
+ if name is None and self.fs is not None:
+ # Previous mount existed, reuse the old name
+ name = self.fs.name
+ self.fs = LocalFilesystem(self.ctx, name=name)
+ log.info('Wait for MDS to reach steady state...')
+ self.fs.wait_for_daemons()
+ log.info('Ready to start {}...'.format(type(self).__name__))
@property
def _prefix(self):
# the PID of the launching process, not the long running ceph-fuse process. Therefore
# we need to give an exact path here as the logic for checking /proc/ for which
# asok is alive does not work.
- path = "./out/client.{0}.{1}.asok".format(self.client_id, self.fuse_daemon.subproc.pid)
+
+ # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
+ # in a tmpdir. All of the paths are the same, so no need to select
+ # based off of the service type.
+ d = "./out"
+ with open(self.config_path) as f:
+ for line in f:
+ asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
+ if asok_conf:
+ d = asok_conf.groups(1)[0]
+ break
+ path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
return path
- def umount(self):
- if self.is_mounted():
- super(LocalFuseMount, self).umount()
+ def mount(self, mount_path=None, mount_fs_name=None, mount_options=[], **kwargs):
+ self.setupfs(name=mount_fs_name)
+
+ log.info('Mounting kclient client.{id} at {remote} {mnt}...'.format(
+ id=self.client_id, remote=self.client_remote, mnt=self.mountpoint))
- def mount(self, mount_path=None, mount_fs_name=None):
self.client_remote.run(
args=[
'mkdir',
'--',
self.mountpoint,
],
+ timeout=(5*60),
+ )
+
+ if mount_path is None:
+ mount_path = "/"
+
+ opts = 'name={id},norequire_active_mds,conf={conf}'.format(id=self.client_id,
+ conf=self.config_path)
+
+ if mount_fs_name is not None:
+ opts += ",mds_namespace={0}".format(mount_fs_name)
+
+ for mount_opt in mount_options:
+ opts += ",{0}".format(mount_opt)
+
+ self.client_remote.run(
+ args=[
+ 'sudo',
+ './bin/mount.ceph',
+ ':{mount_path}'.format(mount_path=mount_path),
+ self.mountpoint,
+ '-v',
+ '-o',
+ opts
+ ],
+ timeout=(30*60),
+ omit_sudo=False,
)
+ self.client_remote.run(
+ args=['sudo', 'chmod', '1777', self.mountpoint], timeout=(5*60))
+
+ self.mounted = True
+
+ def _run_python(self, pyscript, py_version='python'):
+ """
+ Override this to remove the daemon-helper prefix that is used otherwise
+ to make the process killable.
+ """
+ return self.client_remote.run(args=[py_version, '-c', pyscript],
+ wait=False)
+
+class LocalFuseMount(FuseMount):
+ def __init__(self, ctx, test_dir, client_id):
+ super(LocalFuseMount, self).__init__(ctx, None, test_dir, client_id, LocalRemote())
+
+ @property
+ def config_path(self):
+ return "./ceph.conf"
+
+ def get_keyring_path(self):
+ # This is going to end up in a config file, so use an absolute path
+ # to avoid assumptions about daemons' pwd
+ return os.path.abspath("./client.{0}.keyring".format(self.client_id))
+
+ def run_shell(self, args, wait=True, stdin=None, check_status=True, omit_sudo=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
+ stdin=stdin, check_status=check_status,
+ omit_sudo=omit_sudo)
+
+ def run_as_user(self, args, user, wait=True, stdin=None, check_status=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ if isinstance(args, str):
+ args = 'sudo -u %s -s /bin/bash -c %s' % (user, args)
+ elif isinstance(args, list):
+ cmdlist = args
+ cmd = ''
+ for i in cmdlist:
+ cmd = cmd + i + ' '
+ args = ['sudo', '-u', user, '-s', '/bin/bash', '-c']
+ args.append(cmd)
+
+ return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
+ check_status=check_status, stdin=stdin,
+ omit_sudo=False)
+
+ def run_as_root(self, args, wait=True, stdin=None, check_status=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ if isinstance(args, str):
+ args = 'sudo ' + args
+ if isinstance(args, list):
+ args.insert(0, 'sudo')
+
+ return self.client_remote.run(args=args, wait=wait, cwd=self.mountpoint,
+ check_status=check_status,
+ omit_sudo=False)
+
+ def testcmd(self, args, wait=True, stdin=None, omit_sudo=True):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.run_shell(args, wait=wait, stdin=stdin, check_status=False,
+ omit_sudo=omit_sudo)
+
+ def testcmd_as_user(self, args, user, wait=True, stdin=None):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.run_as_user(args, user=user, wait=wait, stdin=stdin,
+ check_status=False)
+
+ def testcmd_as_root(self, args, wait=True, stdin=None):
+ # FIXME maybe should add a pwd arg to teuthology.orchestra so that
+ # the "cd foo && bar" shenanigans isn't needed to begin with and
+ # then we wouldn't have to special case this
+ return self.run_as_root(args, wait=wait, stdin=stdin,
+ check_status=False)
+
+ def setupfs(self, name=None):
+ if name is None and self.fs is not None:
+ # Previous mount existed, reuse the old name
+ name = self.fs.name
+ self.fs = LocalFilesystem(self.ctx, name=name)
+ log.info('Wait for MDS to reach steady state...')
+ self.fs.wait_for_daemons()
+ log.info('Ready to start {}...'.format(type(self).__name__))
+
+ @property
+ def _prefix(self):
+ return BIN_PREFIX
+
+ def _asok_path(self):
+ # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
+ # run foreground. When running it daemonized however, the asok is named after
+ # the PID of the launching process, not the long running ceph-fuse process. Therefore
+ # we need to give an exact path here as the logic for checking /proc/ for which
+ # asok is alive does not work.
+
+ # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
+ # in a tmpdir. All of the paths are the same, so no need to select
+ # based off of the service type.
+ d = "./out"
+ with open(self.config_path) as f:
+ for line in f:
+ asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
+ if asok_conf:
+ d = asok_conf.groups(1)[0]
+ break
+ path = "{0}/client.{1}.{2}.asok".format(d, self.client_id, self.fuse_daemon.subproc.pid)
+ log.info("I think my launching pid was {0}".format(self.fuse_daemon.subproc.pid))
+ return path
+
+ def mount(self, mount_path=None, mount_fs_name=None, mountpoint=None, mount_options=[]):
+ if mountpoint is not None:
+ self.mountpoint = mountpoint
+ self.setupfs(name=mount_fs_name)
+
+ self.client_remote.run(args=['mkdir', '-p', self.mountpoint])
+
def list_connections():
self.client_remote.run(
args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
check_status=False
)
if p.exitstatus != 0:
- log.warn("ls conns failed with {0}, assuming none".format(p.exitstatus))
+ log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
return []
- ls_str = p.stdout.getvalue().strip()
+ ls_str = six.ensure_str(p.stdout.getvalue().strip())
if ls_str:
return [int(n) for n in ls_str.split("\n")]
else:
prefix = [os.path.join(BIN_PREFIX, "ceph-fuse")]
if os.getuid() != 0:
- prefix += ["--client-die-on-failed-remount=false"]
+ prefix += ["--client_die_on_failed_dentry_invalidate=false"]
if mount_path is not None:
prefix += ["--client_mountpoint={0}".format(mount_path)]
if mount_fs_name is not None:
- prefix += ["--client_mds_namespace={0}".format(mount_fs_name)]
+ prefix += ["--client_fs={0}".format(mount_fs_name)]
+
+ prefix += mount_options;
self.fuse_daemon = self.client_remote.run(args=
prefix + [
else:
self._fuse_conn = new_conns[0]
- def _run_python(self, pyscript):
+ self.gather_mount_info()
+
+ self.mounted = True
+
+ def _run_python(self, pyscript, py_version='python'):
"""
Override this to remove the daemon-helper prefix that is used otherwise
to make the process killable.
"""
- return self.client_remote.run(args=[
- 'python', '-c', pyscript
- ], wait=False)
-
+ return self.client_remote.run(args=[py_version, '-c', pyscript],
+ wait=False)
+# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
+# the same name.
class LocalCephManager(CephManager):
def __init__(self):
# Deliberately skip parent init, only inheriting from it to get
self.log = lambda x: log.info(x)
+ # Don't bother constructing a map of pools: it should be empty
+ # at test cluster start, and in any case it would be out of date
+ # in no time. The attribute needs to exist for some of the CephManager
+ # methods to work though.
+ self.pools = {}
+
def find_remote(self, daemon_type, daemon_id):
"""
daemon_type like 'mds', 'osd'
"""
return LocalRemote()
- def run_ceph_w(self):
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph"), "-w"], wait=False, stdout=StringIO())
+ def run_ceph_w(self, watch_channel=None):
+ """
+ :param watch_channel: Specifies the channel to be watched.
+ This can be 'cluster', 'audit', ...
+ :type watch_channel: str
+ """
+ args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
+ if watch_channel is not None:
+ args.append("--watch-channel")
+ args.append(watch_channel)
+ proc = self.controller.run(args=args, wait=False, stdout=StringIO())
return proc
- def raw_cluster_cmd(self, *args):
+ def raw_cluster_cmd(self, *args, **kwargs):
"""
args like ["osd", "dump"}
return stdout string
"""
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args))
- return proc.stdout.getvalue()
+ proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
+ list(args), **kwargs)
+ return six.ensure_str(proc.stdout.getvalue())
- def raw_cluster_cmd_result(self, *args):
+ def raw_cluster_cmd_result(self, *args, **kwargs):
"""
like raw_cluster_cmd but don't check status, just return rc
"""
- proc = self.controller.run([os.path.join(BIN_PREFIX, "ceph")] + list(args), check_status=False)
+ kwargs['check_status'] = False
+ proc = self.controller.run(args=[os.path.join(BIN_PREFIX, "ceph")] + \
+ list(args), **kwargs)
return proc.exitstatus
- def admin_socket(self, daemon_type, daemon_id, command, check_status=True):
+ def admin_socket(self, daemon_type, daemon_id, command, check_status=True, timeout=None):
return self.controller.run(
- args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command, check_status=check_status
+ args=[os.path.join(BIN_PREFIX, "ceph"), "daemon", "{0}.{1}".format(daemon_type, daemon_id)] + command,
+ check_status=check_status,
+ timeout=timeout
)
- # FIXME: copypasta
- def get_mds_status(self, mds):
+ def get_mon_socks(self):
"""
- Run cluster commands for the mds in order to get mds information
+ Get monitor sockets.
+
+ :return socks: tuple of strings; strings are individual sockets.
"""
- out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
- j = json.loads(' '.join(out.splitlines()[1:]))
- # collate; for dup ids, larger gid wins.
- for info in j['info'].itervalues():
- if info['name'] == mds:
- return info
- return None
-
- # FIXME: copypasta
- def get_mds_status_by_rank(self, rank):
+ from json import loads
+
+ output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+ socks = []
+ for mon in output['mons']:
+ for addrvec_mem in mon['public_addrs']['addrvec']:
+ socks.append(addrvec_mem['addr'])
+ return tuple(socks)
+
+ def get_msgrv1_mon_socks(self):
"""
- Run cluster commands for the mds in order to get mds information
- check rank.
+ Get monitor sockets that use msgrv2 to operate.
+
+ :return socks: tuple of strings; strings are individual sockets.
"""
- j = self.get_mds_status_all()
- # collate; for dup ids, larger gid wins.
- for info in j['info'].itervalues():
- if info['rank'] == rank:
- return info
- return None
-
- def get_mds_status_all(self):
+ from json import loads
+
+ output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+ socks = []
+ for mon in output['mons']:
+ for addrvec_mem in mon['public_addrs']['addrvec']:
+ if addrvec_mem['type'] == 'v1':
+ socks.append(addrvec_mem['addr'])
+ return tuple(socks)
+
+ def get_msgrv2_mon_socks(self):
"""
- Run cluster command to extract all the mds status.
+ Get monitor sockets that use msgrv2 to operate.
+
+ :return socks: tuple of strings; strings are individual sockets.
"""
- out = self.raw_cluster_cmd('mds', 'dump', '--format=json')
- j = json.loads(' '.join(out.splitlines()[1:]))
- return j
+ from json import loads
+
+ output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
+ socks = []
+ for mon in output['mons']:
+ for addrvec_mem in mon['public_addrs']['addrvec']:
+ if addrvec_mem['type'] == 'v2':
+ socks.append(addrvec_mem['addr'])
+ return tuple(socks)
class LocalCephCluster(CephCluster):
def __init__(self, ctx):
super(LocalMDSCluster, self).__init__(ctx)
- self.mds_ids = ctx.daemons.daemons['mds'].keys()
+ self.mds_ids = ctx.daemons.daemons['ceph.mds'].keys()
self.mds_daemons = dict([(id_, LocalDaemon("mds", id_)) for id_ in self.mds_ids])
def clear_firewall(self):
# FIXME: unimplemented
pass
- def newfs(self, name):
- return LocalFilesystem(self._ctx, create=name)
+ def newfs(self, name='cephfs', create=True):
+ return LocalFilesystem(self._ctx, name=name, create=create)
class LocalMgrCluster(LocalCephCluster, MgrCluster):
def __init__(self, ctx):
super(LocalMgrCluster, self).__init__(ctx)
- self.mgr_ids = ctx.daemons.daemons['mgr'].keys()
+ self.mgr_ids = ctx.daemons.daemons['ceph.mgr'].keys()
self.mgr_daemons = dict([(id_, LocalDaemon("mgr", id_)) for id_ in self.mgr_ids])
class LocalFilesystem(Filesystem, LocalMDSCluster):
- def __init__(self, ctx, fscid=None, create=None):
+ def __init__(self, ctx, fscid=None, name='cephfs', create=False, ec_profile=None):
# Deliberately skip calling parent constructor
self._ctx = ctx
self.id = None
- self.name = None
+ self.name = name
+ self.ec_profile = ec_profile
self.metadata_pool_name = None
+ self.metadata_overlay = False
+ self.data_pool_name = None
self.data_pools = None
# Hack: cheeky inspection of ceph.conf to see what MDSs exist
self._conf = defaultdict(dict)
- if create is not None:
+ if name is not None:
if fscid is not None:
raise RuntimeError("cannot specify fscid when creating fs")
- if create is True:
- self.name = 'cephfs'
- else:
- self.name = create
- self.create()
- elif fscid is not None:
- self.id = fscid
- self.getinfo(refresh=True)
+ if create and not self.legacy_configured():
+ self.create()
+ else:
+ if fscid is not None:
+ self.id = fscid
+ self.getinfo(refresh=True)
# Stash a reference to the first created filesystem on ctx, so
# that if someone drops to the interactive shell they can easily
def set_clients_block(self, blocked, mds_id=None):
raise NotImplementedError()
- def get_pgs_per_fs_pool(self):
- # FIXME: assuming there are 3 OSDs
- return 3 * int(self.get_config('mon_pg_warn_min_per_osd'))
-
class InteractiveFailureResult(unittest.TextTestResult):
"""
max_required_mds = 0
max_required_clients = 0
max_required_mgr = 0
+ require_memstore = False
- for suite, case in enumerate_methods(overall_suite):
+ for suite_, case in enumerate_methods(overall_suite):
max_required_mds = max(max_required_mds,
getattr(case, "MDSS_REQUIRED", 0))
max_required_clients = max(max_required_clients,
getattr(case, "CLIENTS_REQUIRED", 0))
max_required_mgr = max(max_required_mgr,
getattr(case, "MGRS_REQUIRED", 0))
+ require_memstore = getattr(case, "REQUIRE_MEMSTORE", False) \
+ or require_memstore
- return max_required_mds, max_required_clients, max_required_mgr
+ return max_required_mds, max_required_clients, \
+ max_required_mgr, require_memstore
class LocalCluster(object):
# Inspect ceph.conf to see what roles exist
for conf_line in open("ceph.conf").readlines():
for svc_type in ["mon", "osd", "mds", "mgr"]:
- if svc_type not in self.daemons.daemons:
- self.daemons.daemons[svc_type] = {}
+ prefixed_type = "ceph." + svc_type
+ if prefixed_type not in self.daemons.daemons:
+ self.daemons.daemons[prefixed_type] = {}
match = re.match("^\[{0}\.(.+)\]$".format(svc_type), conf_line)
if match:
svc_id = match.group(1)
- self.daemons.daemons[svc_type][svc_id] = LocalDaemon(svc_type, svc_id)
+ self.daemons.daemons[prefixed_type][svc_id] = LocalDaemon(svc_type, svc_id)
def __del__(self):
shutil.rmtree(self.teuthology_config['test_path'])
+def teardown_cluster():
+ log.info('\ntearing down the cluster...')
+ remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
+ remote.run(args=['rm', '-rf', './dev', './out'])
+
+def clear_old_log():
+ from os import stat
+
+ try:
+ stat(logpath)
+ # would need an update when making this py3 compatible. Use FileNotFound
+ # instead.
+ except OSError:
+ return
+ else:
+ os.remove(logpath)
+ with open(logpath, 'w') as logfile:
+ logfile.write('')
+ init_log()
+ log.info('logging in a fresh file now...')
def exec_test():
# Parse arguments
- interactive_on_error = False
- create_cluster = False
+ opt_interactive_on_error = False
+ opt_create_cluster = False
+ opt_create_cluster_only = False
+ opt_ignore_missing_binaries = False
+ opt_teardown_cluster = False
+ global opt_log_ps_output
+ opt_log_ps_output = False
+ use_kernel_client = False
args = sys.argv[1:]
flags = [a for a in args if a.startswith("-")]
modules = [a for a in args if not a.startswith("-")]
for f in flags:
if f == "--interactive":
- interactive_on_error = True
+ opt_interactive_on_error = True
elif f == "--create":
- create_cluster = True
+ opt_create_cluster = True
+ elif f == "--create-cluster-only":
+ opt_create_cluster_only = True
+ elif f == "--ignore-missing-binaries":
+ opt_ignore_missing_binaries = True
+ elif f == '--teardown':
+ opt_teardown_cluster = True
+ elif f == '--log-ps-output':
+ opt_log_ps_output = True
+ elif f == '--clear-old-log':
+ clear_old_log()
+ elif f == "--kclient":
+ use_kernel_client = True
else:
log.error("Unknown option '{0}'".format(f))
sys.exit(-1)
require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
"cephfs-table-tool", "ceph-fuse", "rados"]
missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
- if missing_binaries:
+ if missing_binaries and not opt_ignore_missing_binaries:
log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
sys.exit(-1)
- max_required_mds, max_required_clients, max_required_mgr = scan_tests(modules)
+ max_required_mds, max_required_clients, \
+ max_required_mgr, require_memstore = scan_tests(modules)
+ global remote
remote = LocalRemote()
# Tolerate no MDSs or clients running at start
- ps_txt = remote.run(
+ ps_txt = six.ensure_str(remote.run(
args=["ps", "-u"+str(os.getuid())]
- ).stdout.getvalue().strip()
+ ).stdout.getvalue().strip())
lines = ps_txt.split("\n")[1:]
for line in lines:
if 'ceph-fuse' in line or 'ceph-mds' in line:
pid = int(line.split()[0])
- log.warn("Killing stray process {0}".format(line))
+ log.warning("Killing stray process {0}".format(line))
os.kill(pid, signal.SIGKILL)
# Fire up the Ceph cluster if the user requested it
- if create_cluster:
+ if opt_create_cluster or opt_create_cluster_only:
log.info("Creating cluster with {0} MDS daemons".format(
max_required_mds))
- remote.run([os.path.join(SRC_PREFIX, "stop.sh")], check_status=False)
- remote.run(["rm", "-rf", "./out"])
- remote.run(["rm", "-rf", "./dev"])
+ teardown_cluster()
vstart_env = os.environ.copy()
vstart_env["FS"] = "0"
vstart_env["MDS"] = max_required_mds.__str__()
- vstart_env["OSD"] = "1"
+ vstart_env["OSD"] = "4"
vstart_env["MGR"] = max(max_required_mgr, 1).__str__()
- remote.run([os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d", "--nolockdep"],
- env=vstart_env)
+ args = [os.path.join(SRC_PREFIX, "vstart.sh"), "-n", "-d",
+ "--nolockdep"]
+ if require_memstore:
+ args.append("--memstore")
+
+ # usually, i get vstart.sh running completely in less than 100
+ # seconds.
+ remote.run(args=args, env=vstart_env, timeout=(3 * 60))
# Wait for OSD to come up so that subsequent injectargs etc will
# definitely succeed
LocalCephCluster(LocalContext()).mon_manager.wait_for_all_osds_up(timeout=30)
+ if opt_create_cluster_only:
+ return
+
# List of client mounts, sufficient to run the selected tests
clients = [i.__str__() for i in range(0, max_required_clients)]
test_dir = tempfile.mkdtemp()
teuth_config['test_path'] = test_dir
+ ctx = LocalContext()
+ ceph_cluster = LocalCephCluster(ctx)
+ mds_cluster = LocalMDSCluster(ctx)
+ mgr_cluster = LocalMgrCluster(ctx)
+
# Construct Mount classes
mounts = []
for client_id in clients:
open("./keyring", "a").write(p.stdout.getvalue())
- mount = LocalFuseMount(test_dir, client_id)
- mounts.append(mount)
- if mount.is_mounted():
- log.warn("unmounting {0}".format(mount.mountpoint))
- mount.umount_wait()
+ if use_kernel_client:
+ mount = LocalKernelMount(ctx, test_dir, client_id)
else:
- if os.path.exists(mount.mountpoint):
- os.rmdir(mount.mountpoint)
+ mount = LocalFuseMount(ctx, test_dir, client_id)
- ctx = LocalContext()
- ceph_cluster = LocalCephCluster(ctx)
- mds_cluster = LocalMDSCluster(ctx)
- mgr_cluster = LocalMgrCluster(ctx)
+ mounts.append(mount)
+ if os.path.exists(mount.mountpoint):
+ if mount.is_mounted():
+ log.warning("unmounting {0}".format(mount.mountpoint))
+ mount.umount_wait()
+ else:
+ os.rmdir(mount.mountpoint)
from tasks.cephfs_test_runner import DecoratingLoader
# For the benefit of polling tests like test_full -- in teuthology land we set this
# in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
- remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval-max", "5"])
- ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval_max", "5")
+ remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
+ ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
# Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning
# from normal IO latency. Increase it for running teests.
if hasattr(fn, 'is_for_teuthology') and getattr(fn, 'is_for_teuthology') is True:
drop_test = True
- log.warn("Dropping test because long running: ".format(method.id()))
+ log.warning("Dropping test because long running: {method_id}".format(method_id=method.id()))
if getattr(fn, "needs_trimming", False) is True:
drop_test = (os.getuid() != 0)
- log.warn("Dropping test because client trim unavailable: ".format(method.id()))
+ log.warning("Dropping test because client trim unavailable: {method_id}".format(method_id=method.id()))
if drop_test:
# Don't drop the test if it was explicitly requested in arguments
for s, method in victims:
s._tests.remove(method)
- if interactive_on_error:
+ if opt_interactive_on_error:
result_class = InteractiveFailureResult
else:
result_class = unittest.TextTestResult
verbosity=2,
failfast=True).run(overall_suite)
+ if opt_teardown_cluster:
+ teardown_cluster()
+
if not result.wasSuccessful():
result.printErrors() # duplicate output at end for convenience