from unittest import suite, loader
-from teuthology.orchestra.run import Raw, quote
+from teuthology.orchestra.run import Raw, quote, PIPE
from teuthology.orchestra.daemon import DaemonGroup
from teuthology.orchestra.remote import Remote
from teuthology.config import config as teuth_config
from teuthology.contextutil import safe_while
from teuthology.contextutil import MaxWhileTries
-from teuthology.orchestra.run import CommandFailedError
+from teuthology.exceptions import CommandFailedError
try:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except:
pass
-def init_log():
+def init_log(log_level=logging.INFO):
global log
if log is not None:
del log
datefmt='%Y-%m-%dT%H:%M:%S')
handler.setFormatter(formatter)
log.addHandler(handler)
- log.setLevel(logging.INFO)
+ log.setLevel(log_level)
log = None
init_log()
BIN_PREFIX = "./"
SRC_PREFIX = "./"
+CEPH_CMD = os.path.join(BIN_PREFIX, 'ceph')
+
def rm_nonascii_chars(var):
var = var.replace(b'\xe2\x80\x98', b'\'')
self.check_status = check_status
self.exitstatus = self.returncode = None
- def wait(self):
- if self.finished:
- # Avoid calling communicate() on a dead process because it'll
- # give you stick about std* already being closed
- if self.check_status and self.exitstatus != 0:
- raise CommandFailedError(self.args, self.exitstatus)
- else:
- return
-
- out, err = self.subproc.communicate()
- out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
+ def _write_stdout(self, out):
if isinstance(self.stdout, StringIO):
self.stdout.write(out.decode(errors='ignore'))
elif self.stdout is None:
pass
else:
self.stdout.write(out)
+
+ def _write_stderr(self, err):
if isinstance(self.stderr, StringIO):
self.stderr.write(err.decode(errors='ignore'))
elif self.stderr is None:
else:
self.stderr.write(err)
+ def wait(self):
+ if self.finished:
+ # Avoid calling communicate() on a dead process because it'll
+ # give you stick about std* already being closed
+ if self.check_status and self.exitstatus != 0:
+ raise CommandFailedError(self.args, self.exitstatus)
+ else:
+ return
+
+ out, err = self.subproc.communicate()
+ out, err = rm_nonascii_chars(out), rm_nonascii_chars(err)
+ self._write_stdout(out)
+ self._write_stderr(err)
+
self.exitstatus = self.returncode = self.subproc.returncode
if self.exitstatus != 0:
if self.subproc.poll() is not None:
out, err = self.subproc.communicate()
- if isinstance(self.stdout, StringIO):
- self.stdout.write(out.decode(errors='ignore'))
- elif self.stdout is None:
- pass
- else:
- self.stdout.write(out)
- if isinstance(self.stderr, StringIO):
- self.stderr.write(err.decode(errors='ignore'))
- elif self.stderr is None:
- pass
- else:
- self.stderr.write(err)
+ self._write_stdout(out)
+ self._write_stderr(err)
+
self.exitstatus = self.returncode = self.subproc.returncode
+
return True
else:
return False
# None
return mkdtemp(suffix=suffix, prefix='', dir=parentdir)
- def mktemp(self, suffix=None, parentdir=None):
+ def mktemp(self, suffix='', parentdir='', path=None, data=None,
+ owner=None, mode=None):
"""
Make a remote temporary file
Returns: the path of the temp file created.
"""
from tempfile import mktemp
- return mktemp(suffix=suffix, dir=parentdir)
+ if not path:
+ path = mktemp(suffix=suffix, dir=parentdir)
+ if not parentdir:
+ path = os.path.join('/tmp', path)
+
+ if data:
+ # sudo is set to False since root user can't write files in /tmp
+ # owned by other users.
+ self.write_file(path=path, data=data, sudo=False)
+
+ return path
def write_file(self, path, data, sudo=False, mode=None, owner=None,
mkdir=False, append=False):
# vstart_runner.py.
def _do_run(self, args, check_status=True, wait=True, stdout=None,
stderr=None, cwd=None, stdin=None, logger=None, label=None,
- env=None, timeout=None, omit_sudo=True):
+ env=None, timeout=None, omit_sudo=True, shell=True):
args = self._perform_checks_and_return_list_of_args(args, omit_sudo)
# We have to use shell=True if any run.Raw was present, e.g. &&
- shell = any([a for a in args if isinstance(a, Raw)])
+ if not shell:
+ shell = any([a for a in args if isinstance(a, Raw)])
# Filter out helper tools that don't exist in a vstart environment
args = [a for a in args if a not in ('adjust-ulimits',
local_bin = os.path.join(BIN_PREFIX, args[0])
if os.path.exists(local_bin):
args = [local_bin] + args[1:]
- else:
- log.debug("'{0}' is not a binary in the Ceph build dir".format(
- args[0]
- ))
log.debug('> ' +
' '.join([str(a.value) if isinstance(a, Raw) else a for a in args]))
# as long as the input buffer is "small"
if isinstance(stdin, str):
subproc.stdin.write(stdin.encode())
+ elif stdin == subprocess.PIPE or stdin == PIPE:
+ pass
+ elif isinstance(stdin, StringIO):
+ subproc.stdin.write(bytes(stdin.getvalue(),encoding='utf8'))
else:
subproc.stdin.write(stdin)
if line.find("ceph-{0} -i {1}".format(self.daemon_type, self.daemon_id)) != -1:
log.debug("Found ps line for daemon: {0}".format(line))
return int(line.split()[0])
- if opt_log_ps_output:
- log.debug("No match for {0} {1}: {2}".format(
- self.daemon_type, self.daemon_id, ps_txt))
- else:
- log.debug("No match for {0} {1}".format(self.daemon_type,
- self.daemon_id))
- return None
+ if not opt_log_ps_output:
+ ps_txt = '(omitted)'
+ log.debug("No match for {0} {1}: {2}".format(
+ self.daemon_type, self.daemon_id, ps_txt))
+ return None
def wait(self, timeout):
waited = 0
return
pid = self._get_pid()
+ if pid is None:
+ return
log.debug("Killing PID {0} for {1}.{2}".format(pid, self.daemon_type, self.daemon_id))
os.kill(pid, signal.SIGTERM)
return True
return False
-class LocalKernelMount(KernelMount):
- def __init__(self, ctx, test_dir, client_id=None,
- client_keyring_path=None, client_remote=None,
- hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
- brxnet=None):
- super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
- client_id=client_id, client_keyring_path=client_keyring_path,
- client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
- cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
-
+class LocalCephFSMount():
@property
def config_path(self):
return "./ceph.conf"
else:
return keyring_path
- def setupfs(self, name=None):
- if name is None and self.fs is not None:
- # Previous mount existed, reuse the old name
- name = self.fs.name
- self.fs = LocalFilesystem(self.ctx, name=name)
- log.debug('Wait for MDS to reach steady state...')
- self.fs.wait_for_daemons()
- log.debug('Ready to start {}...'.format(type(self).__name__))
-
@property
def _prefix(self):
return BIN_PREFIX
def _asok_path(self):
- # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
- # run foreground. When running it daemonized however, the asok is named after
- # the PID of the launching process, not the long running ceph-fuse process. Therefore
- # we need to give an exact path here as the logic for checking /proc/ for which
- # asok is alive does not work.
+ # In teuthology, the asok is named after the PID of the ceph-fuse
+ # process, because it's run foreground. When running it daemonized
+ # however, the asok is named after the PID of the launching process,
+ # not the long running ceph-fuse process. Therefore we need to give
+ # an exact path here as the logic for checking /proc/ for which asok
+ # is alive does not work.
# Load the asok path from ceph.conf as vstart.sh now puts admin sockets
# in a tmpdir. All of the paths are the same, so no need to select
path = "{0}/client.{1}.*.asok".format(d, self.client_id)
return path
- def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
- self.update_attrs(**kwargs)
- self.assert_and_log_minimum_mount_details()
-
- if opt_use_ns:
- self.using_namespace = True
- self.setup_netns()
- else:
- self.using_namespace = False
-
- if not self.cephfs_mntpt:
- self.cephfs_mntpt = "/"
- # TODO: don't call setupfs() from within mount()
- if createfs:
- self.setupfs(name=self.cephfs_name)
-
- opts = 'norequire_active_mds'
- if self.client_id:
- opts += ',name=' + self.client_id
- if self.client_keyring_path and self.client_id:
- opts += ",secret=" + self.get_key_from_keyfile()
- if self.config_path:
- opts += ',conf=' + self.config_path
- if self.cephfs_name:
- opts += ",mds_namespace={0}".format(self.cephfs_name)
- if mntopts:
- opts += ',' + ','.join(mntopts)
-
- stderr = StringIO()
- try:
- self.client_remote.run(args=['mkdir', '--', self.hostfs_mntpt],
- timeout=(5*60), stderr=stderr)
- except CommandFailedError:
- if 'file exists' not in stderr.getvalue().lower():
- raise
-
- if self.cephfs_mntpt is None:
- self.cephfs_mntpt = "/"
- cmdargs = ['sudo']
- if self.using_namespace:
- cmdargs += ['nsenter',
- '--net=/var/run/netns/{0}'.format(self.netns_name)]
- cmdargs += ['./bin/mount.ceph', ':' + self.cephfs_mntpt,
- self.hostfs_mntpt, '-v', '-o', opts]
-
- mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
- try:
- self.client_remote.run(args=cmdargs, timeout=(30*60),
- omit_sudo=False, stdout=mountcmd_stdout,
- stderr=mountcmd_stderr)
- except CommandFailedError as e:
- if check_status:
- raise
- else:
- return (e, mountcmd_stdout.getvalue(),
- mountcmd_stderr.getvalue())
-
- stderr = StringIO()
- try:
- self.client_remote.run(args=['sudo', 'chmod', '1777',
- self.hostfs_mntpt], stderr=stderr,
- timeout=(5*60))
- except CommandFailedError:
- # the client does not have write permissions in cap it holds for
- # the Ceph FS that was just mounted.
- if 'permission denied' in stderr.getvalue().lower():
- pass
-
- self.mounted = True
-
- def cleanup_netns(self):
- if self.using_namespace:
- super(type(self), self).cleanup_netns()
-
- def _run_python(self, pyscript, py_version='python'):
+ def _run_python(self, pyscript, py_version='python', sudo=False):
"""
Override this to remove the daemon-helper prefix that is used otherwise
to make the process killable.
"""
- return self.client_remote.run(args=[py_version, '-c', pyscript],
+ args = []
+ if sudo:
+ args.append('sudo')
+ args += [py_version, '-c', pyscript]
+ return self.client_remote.run(args=args,
wait=False, stdout=StringIO())
-class LocalFuseMount(FuseMount):
- def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
- client_remote=None, hostfs_mntpt=None, cephfs_name=None,
- cephfs_mntpt=None, brxnet=None):
- super(LocalFuseMount, self).__init__(ctx=ctx, client_config=None,
- test_dir=test_dir, client_id=client_id,
- client_keyring_path=client_keyring_path,
- client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
- cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
+ def setup_netns(self):
+ if opt_use_ns:
+ super(type(self), self).setup_netns()
@property
- def config_path(self):
- return "./ceph.conf"
-
- def get_keyring_path(self):
- # This is going to end up in a config file, so use an absolute path
- # to avoid assumptions about daemons' pwd
- return os.path.abspath("./client.{0}.keyring".format(self.client_id))
+ def _nsenter_args(self):
+ if opt_use_ns:
+ return super(type(self), self)._nsenter_args
+ else:
+ return []
def setupfs(self, name=None):
if name is None and self.fs is not None:
# Previous mount existed, reuse the old name
name = self.fs.name
self.fs = LocalFilesystem(self.ctx, name=name)
- log.debug('Wait for MDS to reach steady state...')
+ log.info('Wait for MDS to reach steady state...')
self.fs.wait_for_daemons()
- log.debug('Ready to start {}...'.format(type(self).__name__))
-
- @property
- def _prefix(self):
- return BIN_PREFIX
-
- def _asok_path(self):
- # In teuthology, the asok is named after the PID of the ceph-fuse process, because it's
- # run foreground. When running it daemonized however, the asok is named after
- # the PID of the launching process, not the long running ceph-fuse process. Therefore
- # we need to give an exact path here as the logic for checking /proc/ for which
- # asok is alive does not work.
-
- # Load the asok path from ceph.conf as vstart.sh now puts admin sockets
- # in a tmpdir. All of the paths are the same, so no need to select
- # based off of the service type.
- d = "./out"
- with open(self.config_path) as f:
- for line in f:
- asok_conf = re.search("^\s*admin\s+socket\s*=\s*(.*?)[^/]+$", line)
- if asok_conf:
- d = asok_conf.groups(1)[0]
- break
- path = "{0}/client.{1}.*.asok".format(d, self.client_id)
- return path
+ log.info('Ready to start {}...'.format(type(self).__name__))
- def mount(self, mntopts=[], createfs=True, check_status=True, **kwargs):
- self.update_attrs(**kwargs)
- self.assert_and_log_minimum_mount_details()
- if opt_use_ns:
- self.using_namespace = True
- self.setup_netns()
- else:
- self.using_namespace = False
-
- # TODO: don't call setupfs() from within mount()
- if createfs:
- self.setupfs(name=self.cephfs_name)
+class LocalKernelMount(LocalCephFSMount, KernelMount):
+ def __init__(self, ctx, test_dir, client_id=None,
+ client_keyring_path=None, client_remote=None,
+ hostfs_mntpt=None, cephfs_name=None, cephfs_mntpt=None,
+ brxnet=None):
+ super(LocalKernelMount, self).__init__(ctx=ctx, test_dir=test_dir,
+ client_id=client_id, client_keyring_path=client_keyring_path,
+ client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+ cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
- stderr = StringIO()
- try:
- self.client_remote.run(args=['mkdir', '-p', self.hostfs_mntpt],
- stderr=stderr)
- except CommandFailedError:
- if 'file exists' not in stderr.getvalue().lower():
- raise
+ # Make vstart_runner compatible with teuth and qa/tasks/cephfs.
+ self._mount_bin = [os.path.join(BIN_PREFIX , 'mount.ceph')]
- def list_connections():
- self.client_remote.run(
- args=["mount", "-t", "fusectl", "/sys/fs/fuse/connections", "/sys/fs/fuse/connections"],
- check_status=False
- )
- p = self.client_remote.run(args=["ls", "/sys/fs/fuse/connections"],
- check_status=False, stdout=StringIO())
- if p.exitstatus != 0:
- log.warning("ls conns failed with {0}, assuming none".format(p.exitstatus))
- return []
+class LocalFuseMount(LocalCephFSMount, FuseMount):
+ def __init__(self, ctx, test_dir, client_id, client_keyring_path=None,
+ client_remote=None, hostfs_mntpt=None, cephfs_name=None,
+ cephfs_mntpt=None, brxnet=None):
+ super(LocalFuseMount, self).__init__(ctx=ctx, test_dir=test_dir,
+ client_id=client_id, client_keyring_path=client_keyring_path,
+ client_remote=LocalRemote(), hostfs_mntpt=hostfs_mntpt,
+ cephfs_name=cephfs_name, cephfs_mntpt=cephfs_mntpt, brxnet=brxnet)
- ls_str = p.stdout.getvalue().strip()
- if ls_str:
- return [int(n) for n in ls_str.split("\n")]
- else:
- return []
+ # Following block makes tests meant for teuthology compatible with
+ # vstart_runner.
+ self._mount_bin = [os.path.join(BIN_PREFIX, 'ceph-fuse')]
+ self._mount_cmd_cwd, self._mount_cmd_logger, \
+ self._mount_cmd_stdin = None, None, None
- # Before starting ceph-fuse process, note the contents of
- # /sys/fs/fuse/connections
- pre_mount_conns = list_connections()
- log.debug("Pre-mount connections: {0}".format(pre_mount_conns))
-
- cmdargs = []
- if self.using_namespace:
- cmdargs = ['sudo', 'nsenter',
- '--net=/var/run/netns/{0}'.format(self.netns_name),
- '--setuid', str(os.getuid())]
- cmdargs += [os.path.join(BIN_PREFIX, 'ceph-fuse'), self.hostfs_mntpt,
- '-f']
- if self.client_id is not None:
- cmdargs += ["--id", self.client_id]
- if self.client_keyring_path and self.client_id is not None:
- cmdargs.extend(['-k', self.client_keyring_path])
- if self.cephfs_name:
- cmdargs += ["--client_fs=" + self.cephfs_name]
- if self.cephfs_mntpt:
- cmdargs += ["--client_mountpoint=" + self.cephfs_mntpt]
- if os.getuid() != 0:
- cmdargs += ["--client_die_on_failed_dentry_invalidate=false"]
- if mntopts:
- cmdargs += mntopts
+ # XXX: CephFSMount._create_mntpt() sets mountpoint's permission mode to
+ # 0000 which doesn't work for vstart_runner since superuser privileges are
+ # not used for mounting Ceph FS with FUSE.
+ def _create_mntpt(self):
+ self.client_remote.run(args=f'mkdir -p -v {self.hostfs_mntpt}')
- mountcmd_stdout, mountcmd_stderr = StringIO(), StringIO()
- self.fuse_daemon = self.client_remote.run(args=cmdargs, wait=False,
- omit_sudo=False, stdout=mountcmd_stdout, stderr=mountcmd_stderr)
+ def _run_mount_cmd(self, mntopts, check_status):
+ super(type(self), self)._run_mount_cmd(mntopts, check_status)
self._set_fuse_daemon_pid(check_status)
- log.debug("Mounting client.{0} with pid "
- "{1}".format(self.client_id, self.fuse_daemon.subproc.pid))
- # Wait for the connection reference to appear in /sys
- waited = 0
- post_mount_conns = list_connections()
- while len(post_mount_conns) <= len(pre_mount_conns):
- if self.fuse_daemon.finished:
- # Did mount fail? Raise the CommandFailedError instead of
- # hitting the "failed to populate /sys/" timeout
- try:
- self.fuse_daemon.wait()
- except CommandFailedError as e:
- if check_status:
- raise
- else:
- return (e, mountcmd_stdout.getvalue(),
- mountcmd_stderr.getvalue())
- time.sleep(1)
- waited += 1
- if waited > 30:
- raise RuntimeError("Fuse mount failed to populate /sys/ after {0} seconds".format(
- waited
- ))
- post_mount_conns = list_connections()
+ def _get_mount_cmd(self, mntopts):
+ mount_cmd = super(type(self), self)._get_mount_cmd(mntopts)
- log.debug("Post-mount connections: {0}".format(post_mount_conns))
+ if os.getuid() != 0:
+ mount_cmd += ['--client_die_on_failed_dentry_invalidate=false']
- # Record our fuse connection number so that we can use it when
- # forcing an unmount
- new_conns = list(set(post_mount_conns) - set(pre_mount_conns))
- if len(new_conns) == 0:
- raise RuntimeError("New fuse connection directory not found ({0})".format(new_conns))
- elif len(new_conns) > 1:
- raise RuntimeError("Unexpectedly numerous fuse connections {0}".format(new_conns))
- else:
- self._fuse_conn = new_conns[0]
+ return mount_cmd
- self.gather_mount_info()
+ @property
+ def _fuse_conn_check_timeout(self):
+ return 30
- self.mounted = True
+ def _add_valgrind_args(self, mount_cmd):
+ return []
def _set_fuse_daemon_pid(self, check_status):
# NOTE: When a command <args> is launched with sudo, two processes are
else:
pass
- def cleanup_netns(self):
- if self.using_namespace:
- super(type(self), self).cleanup_netns()
-
- def _run_python(self, pyscript, py_version='python'):
- """
- Override this to remove the daemon-helper prefix that is used otherwise
- to make the process killable.
- """
- return self.client_remote.run(args=[py_version, '-c', pyscript],
- wait=False, stdout=StringIO())
-
# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
# the same name.
class LocalCephManager(CephManager):
# methods to work though.
self.pools = {}
+ # NOTE: These variables are being overriden here so that parent class
+ # can pick it up.
+ self.cephadm = False
+ self.rook = False
+ self.testdir = None
+ self.run_cluster_cmd_prefix = [CEPH_CMD]
+ # XXX: Ceph API test CI job crashes because "ceph -w" process launched
+ # by run_ceph_w() crashes when shell is set to True.
+ # See https://tracker.ceph.com/issues/49644.
+ #
+ # The 2 possible workaround this are either setting "shell" to "False"
+ # when command "ceph -w" is executed or to prepend "exec sudo" to
+ # command arguments. We are going with latter since former would make
+ # it necessary to pass "shell" parameter to run() method. This leads
+ # to incompatibility with the method teuthology.orchestra.run's run()
+ # since it doesn't accept "shell" as parameter.
+ self.run_ceph_w_prefix = ['exec', 'sudo', CEPH_CMD]
+
def find_remote(self, daemon_type, daemon_id):
"""
daemon_type like 'mds', 'osd'
"""
return LocalRemote()
- def run_ceph_w(self, watch_channel=None):
- """
- :param watch_channel: Specifies the channel to be watched.
- This can be 'cluster', 'audit', ...
- :type watch_channel: str
- """
- args = [os.path.join(BIN_PREFIX, "ceph"), "-w"]
- if watch_channel is not None:
- args.append("--watch-channel")
- args.append(watch_channel)
- proc = self.controller.run(args=args, wait=False, stdout=StringIO())
- return proc
-
- def run_cluster_cmd(self, **kwargs):
- """
- Run a Ceph command and the object representing the process for the
- command.
-
- Accepts arguments same as teuthology.orchestra.remote.run().
- """
- kwargs['args'] = [os.path.join(BIN_PREFIX,'ceph')]+list(kwargs['args'])
- return self.controller.run(**kwargs)
-
- def raw_cluster_cmd(self, *args, **kwargs) -> str:
- """
- args like ["osd", "dump"}
- return stdout string
- """
- kwargs['args'] = args
- if kwargs.get('stdout') is None:
- kwargs['stdout'] = StringIO()
- return self.run_cluster_cmd(**kwargs).stdout.getvalue()
-
- def raw_cluster_cmd_result(self, *args, **kwargs):
- """
- like raw_cluster_cmd but don't check status, just return rc
- """
- kwargs['args'], kwargs['check_status'] = args, False
- return self.run_cluster_cmd(**kwargs).exitstatus
-
def admin_socket(self, daemon_type, daemon_id, command, check_status=True,
timeout=None, stdout=None):
if stdout is None:
stdout = StringIO()
- return self.controller.run(
- args=[os.path.join(BIN_PREFIX, "ceph"), "daemon",
- "{0}.{1}".format(daemon_type, daemon_id)] + command,
- check_status=check_status, timeout=timeout, stdout=stdout)
-
- def get_mon_socks(self):
- """
- Get monitor sockets.
-
- :return socks: tuple of strings; strings are individual sockets.
- """
- from json import loads
-
- output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
- socks = []
- for mon in output['mons']:
- for addrvec_mem in mon['public_addrs']['addrvec']:
- socks.append(addrvec_mem['addr'])
- return tuple(socks)
-
- def get_msgrv1_mon_socks(self):
- """
- Get monitor sockets that use msgrv2 to operate.
-
- :return socks: tuple of strings; strings are individual sockets.
- """
- from json import loads
-
- output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
- socks = []
- for mon in output['mons']:
- for addrvec_mem in mon['public_addrs']['addrvec']:
- if addrvec_mem['type'] == 'v1':
- socks.append(addrvec_mem['addr'])
- return tuple(socks)
-
- def get_msgrv2_mon_socks(self):
- """
- Get monitor sockets that use msgrv2 to operate.
-
- :return socks: tuple of strings; strings are individual sockets.
- """
- from json import loads
-
- output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
- socks = []
- for mon in output['mons']:
- for addrvec_mem in mon['public_addrs']['addrvec']:
- if addrvec_mem['type'] == 'v2':
- socks.append(addrvec_mem['addr'])
- return tuple(socks)
+ args=[CEPH_CMD, "daemon", f"{daemon_type}.{daemon_id}"] + command
+ return self.controller.run(args=args, check_status=check_status,
+ timeout=timeout, stdout=stdout)
class LocalCephCluster(CephCluster):
class LocalContext(object):
def __init__(self):
- self.config = {}
+ self.config = {'cluster': 'ceph'}
self.teuthology_config = teuth_config
self.cluster = LocalCluster()
self.daemons = DaemonGroup()
+ if not hasattr(self, 'managers'):
+ self.managers = {}
+ self.managers[self.config['cluster']] = LocalCephManager()
# Shove some LocalDaemons into the ctx.daemons DaemonGroup instance so that any
# tests that want to look these up via ctx can do so.
def scan_tests(modules):
overall_suite = load_tests(modules, loader.TestLoader())
-
max_required_mds = 0
max_required_clients = 0
max_required_mgr = 0
def teardown_cluster():
log.info('\ntearing down the cluster...')
remote.run(args=[os.path.join(SRC_PREFIX, "stop.sh")], timeout=60)
+ log.info('\nceph cluster torn down')
remote.run(args=['rm', '-rf', './dev', './out'])
def clear_old_log():
- from os import stat
-
try:
- stat(logpath)
- # would need an update when making this py3 compatible. Use FileNotFound
- # instead.
- except OSError:
+ os.stat(logpath)
+ except FileNotFoundError:
return
else:
os.remove(logpath)
with open(logpath, 'w') as logfile:
logfile.write('')
- init_log()
+ init_log(log.level)
log.debug('logging in a fresh file now...')
# tools that the tests might want to use (add more here if needed)
require_binaries = ["ceph-dencoder", "cephfs-journal-tool", "cephfs-data-scan",
"cephfs-table-tool", "ceph-fuse", "rados", "cephfs-meta-injection"]
+ # What binaries may be required is task specific
+ require_binaries = ["ceph-dencoder", "rados"]
missing_binaries = [b for b in require_binaries if not os.path.exists(os.path.join(BIN_PREFIX, b))]
if missing_binaries and not opt_ignore_missing_binaries:
log.error("Some ceph binaries missing, please build them: {0}".format(" ".join(missing_binaries)))
if opt_verbose:
args.append("-d")
+ log.info('\nrunning vstart.sh now...')
# usually, i get vstart.sh running completely in less than 100
# seconds.
remote.run(args=args, env=vstart_env, timeout=(3 * 60))
+ log.info('\nvstart.sh finished running')
# Wait for OSD to come up so that subsequent injectargs etc will
# definitely succeed
client_name = "client.{0}".format(client_id)
if client_name not in open("./keyring").read():
- p = remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "auth", "get-or-create", client_name,
+ p = remote.run(args=[CEPH_CMD, "auth", "get-or-create", client_name,
"osd", "allow rw",
"mds", "allow",
"mon", "allow r"], stdout=StringIO())
# For the benefit of polling tests like test_full -- in teuthology land we set this
# in a .yaml, here it's just a hardcoded thing for the developer's pleasure.
- remote.run(args=[os.path.join(BIN_PREFIX, "ceph"), "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
+ remote.run(args=[CEPH_CMD, "tell", "osd.*", "injectargs", "--osd-mon-report-interval", "5"])
ceph_cluster.set_ceph_conf("osd", "osd_mon_report_interval", "5")
# Vstart defaults to two segments, which very easily gets a "behind on trimming" health warning