import random
import traceback
-from io import BytesIO
-from six import StringIO
+from io import BytesIO, StringIO
+from errno import EBUSY
from teuthology.exceptions import CommandFailedError
from teuthology import misc
from teuthology.nuke import clear_firewall
from teuthology.parallel import parallel
+from teuthology import contextutil
from tasks.ceph_manager import write_conf
from tasks import ceph_manager
def __str__(self):
return "Object not found: '{0}'".format(self._object_name)
+class FSMissing(Exception):
+ def __init__(self, ident):
+ self.ident = ident
+
+ def __str__(self):
+ return f"File system {self.ident} does not exist in the map"
+
class FSStatus(object):
"""
Operations on a snapshot of the FSMap.
for fs in self.map['filesystems']:
if fscid is None or fs['id'] == fscid:
return fs
- raise RuntimeError("FSCID {0} not in map".format(fscid))
+ raise FSMissing(fscid)
def get_fsmap_byname(self, name):
"""
for fs in self.map['filesystems']:
if name is None or fs['mdsmap']['fs_name'] == name:
return fs
- raise RuntimeError("FS {0} not in map".format(name))
+ raise FSMissing(name)
def get_replays(self, fscid):
"""
log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
raise RuntimeError("MDS id '{0}' not found in map".format(name))
+ def get_mds_addrs(self, name):
+ """
+ Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
+ """
+ info = self.get_mds(name)
+ if info:
+ return [e['addr'] for e in info['addrs']['addrvec']]
+ else:
+ log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
+ raise RuntimeError("MDS id '{0}' not found in map".format(name))
+
def get_mds_gid(self, gid):
"""
Get the info for the given MDS gid.
(result,) = self._ctx.cluster.only(first_mon).remotes.keys()
return result
- def __init__(self, ctx):
+ def __init__(self, ctx) -> None:
self._ctx = ctx
self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
log.debug("_json_asok output empty")
return None
+ def is_addr_blocklisted(self, addr=None):
+ if addr is None:
+ log.warn("Couldn't get the client address, so the blocklisted "
+ "status undetermined")
+ return False
+
+ blocklist = json.loads(self.mon_manager.run_cluster_cmd(
+ args=["osd", "blocklist", "ls", "--format=json"],
+ stdout=StringIO()).stdout.getvalue())
+ for b in blocklist:
+ if addr == b["addr"]:
+ return True
+ return False
+
class MDSCluster(CephCluster):
"""
a parent of Filesystem. The correct way to use MDSCluster going forward is
as a separate instance outside of your (multiple) Filesystem instances.
"""
+
def __init__(self, ctx):
super(MDSCluster, self).__init__(ctx)
- self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
-
- if len(self.mds_ids) == 0:
- raise RuntimeError("This task requires at least one MDS")
+ @property
+ def mds_ids(self):
+ # do this dynamically because the list of ids may change periodically with cephadm
+ return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
- if hasattr(self._ctx, "daemons"):
- # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
- self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
+ @property
+ def mds_daemons(self):
+ return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
def _one_or_all(self, mds_id, cb, in_parallel=True):
"""
:param cb: Callback taking single argument of MDS daemon name
:param in_parallel: whether to invoke callbacks concurrently (else one after the other)
"""
+
if mds_id is None:
if in_parallel:
with parallel() as p:
def status(self):
return FSStatus(self.mon_manager)
- def delete_all_filesystems(self):
- """
- Remove all filesystems that exist, and any pools in use by them.
- """
- pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
- pool_id_name = {}
- for pool in pools:
- pool_id_name[pool['pool']] = pool['pool_name']
-
- # mark cluster down for each fs to prevent churn during deletion
- status = self.status()
- for fs in status.get_filesystems():
- self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
-
- # get a new copy as actives may have since changed
- status = self.status()
- for fs in status.get_filesystems():
- mdsmap = fs['mdsmap']
- metadata_pool = pool_id_name[mdsmap['metadata_pool']]
-
- self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
- self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
- metadata_pool, metadata_pool,
- '--yes-i-really-really-mean-it')
- for data_pool in mdsmap['data_pools']:
- data_pool = pool_id_name[data_pool]
- try:
- self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
- data_pool, data_pool,
- '--yes-i-really-really-mean-it')
- except CommandFailedError as e:
- if e.exitstatus == 16: # EBUSY, this data pool is used
- pass # by two metadata pools, let the 2nd
- else: # pass delete it
- raise
-
def get_standby_daemons(self):
return set([s['name'] for s in self.status().get_standbys()])
self._one_or_all(mds_id, set_block, in_parallel=False)
+ def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
+ """
+ Block (using iptables) communications from a provided MDS to other MDSs.
+ Block all ports that an MDS uses for communication.
+
+ :param blocked: True to block the MDS, False otherwise
+ :param mds_rank_1: MDS rank
+ :param mds_rank_2: MDS rank
+ :return:
+ """
+ da_flag = "-A" if blocked else "-D"
+
+ def set_block(mds_ids):
+ status = self.status()
+
+ mds = mds_ids[0]
+ remote = self.mon_manager.find_remote('mds', mds)
+ addrs = status.get_mds_addrs(mds)
+ for addr in addrs:
+ ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+ remote.run(
+ args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+
+
+ mds = mds_ids[1]
+ remote = self.mon_manager.find_remote('mds', mds)
+ addrs = status.get_mds_addrs(mds)
+ for addr in addrs:
+ ip_str, port_str = re.match("(.+):(.+)", addr).groups()
+ remote.run(
+ args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+ remote.run(
+ args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
+ "comment", "--comment", "teuthology"])
+
+ self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
+
def clear_firewall(self):
clear_firewall(self._ctx)
raise RuntimeError("Pool not found '{0}'".format(pool_name))
+ def delete_all_filesystems(self):
+ """
+ Remove all filesystems that exist, and any pools in use by them.
+ """
+ for fs in self.status().get_filesystems():
+ Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
+
+
class Filesystem(MDSCluster):
"""
This object is for driving a CephFS filesystem. The MDS daemons driven by
MDSCluster may be shared with other Filesystems.
"""
- def __init__(self, ctx, fs_config=None, fscid=None, name=None, create=False,
- ec_profile=None):
+ def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
super(Filesystem, self).__init__(ctx)
self.name = name
- self.ec_profile = ec_profile
self.id = None
self.metadata_pool_name = None
self.metadata_overlay = False
self.data_pool_name = None
self.data_pools = None
self.fs_config = fs_config
+ self.ec_profile = fs_config.get('ec_profile')
client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
self.client_id = client_list[0]
if not hasattr(self._ctx, "filesystem"):
self._ctx.filesystem = self
+ def dead(self):
+ try:
+ return not bool(self.get_mds_map())
+ except FSMissing:
+ return True
+
def get_task_status(self, status_key):
return self.mon_manager.get_service_task_status("mds", status_key)
assert(mds_map['max_mds'] == max_mds)
assert(mds_map['in'] == list(range(0, max_mds)))
+ def reset(self):
+ self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
+
def fail(self):
self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
def set_allow_new_snaps(self, yes):
self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
+ def required_client_features(self, *args, **kwargs):
+ c = ["fs", "required_client_features", self.name, *args]
+ return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
+
# In Octopus+, the PG count can be omitted to use the default. We keep the
# hard-coded value for deployments of Mimic/Nautilus.
pgs_per_fs_pool = 8
self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
+
+ self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
+ data_pool_name, self.pgs_per_fs_pool.__str__())
+
if self.metadata_overlay:
self.mon_manager.raw_cluster_cmd('fs', 'new',
self.name, self.metadata_pool_name, data_pool_name,
'--allow-dangerous-metadata-overlay')
else:
+ self.mon_manager.raw_cluster_cmd('fs', 'new',
+ self.name,
+ self.metadata_pool_name,
+ data_pool_name)
+
if self.ec_profile and 'disabled' not in self.ec_profile:
+ ec_data_pool_name = data_pool_name + "_ec"
log.debug("EC profile is %s", self.ec_profile)
- cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
+ cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
cmd.extend(self.ec_profile)
self.mon_manager.raw_cluster_cmd(*cmd)
self.mon_manager.raw_cluster_cmd(
'osd', 'pool', 'create',
- data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
- data_pool_name)
+ ec_data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
+ ec_data_pool_name)
self.mon_manager.raw_cluster_cmd(
'osd', 'pool', 'set',
- data_pool_name, 'allow_ec_overwrites', 'true')
- else:
- self.mon_manager.raw_cluster_cmd(
- 'osd', 'pool', 'create',
- data_pool_name, self.pgs_per_fs_pool.__str__())
- self.mon_manager.raw_cluster_cmd('fs', 'new',
- self.name,
- self.metadata_pool_name,
- data_pool_name,
- "--force")
+ ec_data_pool_name, 'allow_ec_overwrites', 'true')
+ self.add_data_pool(ec_data_pool_name, create=False)
+ self.check_pool_application(ec_data_pool_name)
+
+ self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
+
self.check_pool_application(self.metadata_pool_name)
self.check_pool_application(data_pool_name)
+
# Turn off spurious standby count warnings from modifying max_mds in tests.
try:
self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
if max_mds > 1:
self.set_max_mds(max_mds)
+ standby_replay = self.fs_config.get('standby_replay', False)
+ self.set_allow_standby_replay(standby_replay)
+
# If absent will use the default value (60 seconds)
session_timeout = self.fs_config.get('session_timeout', 60)
if session_timeout != 60:
self.getinfo(refresh = True)
-
+ def run_client_payload(self, cmd):
+ # avoid circular dep by importing here:
+ from tasks.cephfs.fuse_mount import FuseMount
+ d = misc.get_testdir(self._ctx)
+ m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
+ m.mount()
+ m.run_shell_payload(cmd)
+ m.umount_wait(require_clean=True)
+
+ def _remove_pool(self, name, **kwargs):
+ c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
+ return self.mon_manager.ceph(c, **kwargs)
+
+ def rm(self, **kwargs):
+ c = f'fs rm {self.name} --yes-i-really-mean-it'
+ return self.mon_manager.ceph(c, **kwargs)
+
+ def remove_pools(self, data_pools):
+ self._remove_pool(self.get_metadata_pool_name())
+ for poolname in data_pools:
+ try:
+ self._remove_pool(poolname)
+ except CommandFailedError as e:
+ # EBUSY, this data pool is used by two metadata pools, let the
+ # 2nd pass delete it
+ if e.exitstatus == EBUSY:
+ pass
+ else:
+ raise
+
+ def destroy(self, reset_obj_attrs=True):
+ log.info(f'Destroying file system {self.name} and related pools')
+
+ if self.dead():
+ log.debug('already dead...')
+ return
+
+ data_pools = self.get_data_pool_names(refresh=True)
+
+ # make sure no MDSs are attached to given FS.
+ self.fail()
+ self.rm()
+
+ self.remove_pools(data_pools)
+
+ if reset_obj_attrs:
+ self.id = None
+ self.name = None
+ self.metadata_pool_name = None
+ self.data_pool_name = None
+ self.data_pools = None
+
+ def recreate(self):
+ self.destroy()
+
+ self.create()
+ self.getinfo(refresh=True)
+
def check_pool_application(self, pool_name):
osd_map = self.mon_manager.get_osd_dump_json()
for pool in osd_map['pools']:
if not "cephfs" in pool['application_metadata']:
raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
format(pool_name=pool_name))
-
def __del__(self):
if getattr(self._ctx, "filesystem", None) == self:
def _df(self):
return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
+ # may raise FSMissing
def get_mds_map(self, status=None):
if status is None:
status = self.status()
mds.check_status()
active_count = 0
- try:
- mds_map = self.get_mds_map(status=status)
- except CommandFailedError as cfe:
- # Old version, fall back to non-multi-fs commands
- if cfe.exitstatus == errno.EINVAL:
- mds_map = json.loads(
- self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
- else:
- raise
+ mds_map = self.get_mds_map(status=status)
log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
for mds_id, mds_status in mds_map['info'].items():
if mds_status['state'] == 'up:active':
try:
- daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
+ daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
except CommandFailedError as cfe:
if cfe.exitstatus == errno.EINVAL:
# Old version, can't do this check
status = self.status()
- def get_lone_mds_id(self):
+ def dencoder(self, obj_type, obj_blob):
+ args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
+ p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
+ return p.stdout.getvalue()
+
+ def rados(self, *args, **kwargs):
"""
- Get a single MDS ID: the only one if there is only one
- configured, else the only one currently holding a rank,
- else raise an error.
+ Callout to rados CLI.
"""
- if len(self.mds_ids) != 1:
- alive = self.get_rank_names()
- if len(alive) == 1:
- return alive[0]
- else:
- raise ValueError("Explicit MDS argument required when multiple MDSs in use")
- else:
- return self.mds_ids[0]
- def recreate(self):
- log.info("Creating new filesystem")
- self.delete_all_filesystems()
- self.id = None
- self.create()
+ return self.mon_manager.do_rados(*args, **kwargs)
- def put_metadata_object_raw(self, object_id, infile):
+ def radosm(self, *args, **kwargs):
"""
- Save an object to the metadata pool
+ Interact with the metadata pool via rados CLI.
"""
- temp_bin_path = infile
- self.client_remote.run(args=[
- 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
- ])
- def get_metadata_object_raw(self, object_id):
+ return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
+
+ def radosmo(self, *args, stdout=BytesIO(), **kwargs):
"""
- Retrieve an object from the metadata pool and store it in a file.
+ Interact with the metadata pool via rados CLI. Get the stdout.
"""
- temp_bin_path = '/tmp/' + object_id + '.bin'
- self.client_remote.run(args=[
- 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
- ])
-
- return temp_bin_path
+ return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
def get_metadata_object(self, object_type, object_id):
"""
Retrieve an object from the metadata pool, pass it through
ceph-dencoder to dump it to JSON, and return the decoded object.
"""
- temp_bin_path = '/tmp/out.bin'
-
- self.client_remote.run(args=[
- 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
- ])
- dump_json = self.client_remote.sh([
- 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
- ]).strip()
+ o = self.radosmo(['get', object_id, '-'])
+ j = self.dencoder(object_type, o)
try:
- dump = json.loads(dump_json)
+ return json.loads(j)
except (TypeError, ValueError):
- log.error("Failed to decode JSON: '{0}'".format(dump_json))
+ log.error("Failed to decode JSON: '{0}'".format(j))
raise
- return dump
-
def get_journal_version(self):
"""
Read the JournalPointer and Journal::Header objects to learn the version of
def mds_asok(self, command, mds_id=None, timeout=None):
if mds_id is None:
- mds_id = self.get_lone_mds_id()
+ return self.rank_asok(command, timeout=timeout)
return self.json_asok(command, 'mds', mds_id, timeout=timeout)
+ def mds_tell(self, command, mds_id=None):
+ if mds_id is None:
+ return self.rank_tell(command)
+
+ return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
+
def rank_asok(self, command, rank=0, status=None, timeout=None):
info = self.get_rank(rank=rank, status=status)
return self.json_asok(command, 'mds', info['name'], timeout=timeout)
def rank_tell(self, command, rank=0, status=None):
- info = self.get_rank(rank=rank, status=status)
- return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
+ return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
def ranks_tell(self, command, status=None):
if status is None:
else:
time.sleep(1)
- def _read_data_xattr(self, ino_no, xattr_name, type, pool):
- mds_id = self.mds_ids[0]
- remote = self.mds_daemons[mds_id].remote
+ def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
if pool is None:
pool = self.get_data_pool_name()
obj_name = "{0:x}.00000000".format(ino_no)
- args = [
- os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
- ]
+ args = ["getxattr", obj_name, xattr_name]
try:
- proc = remote.run(args=args, stdout=BytesIO())
+ proc = self.rados(args, pool=pool, stdout=BytesIO())
except CommandFailedError as e:
log.error(e.__str__())
raise ObjectNotFound(obj_name)
- data = proc.stdout.getvalue()
- dump = remote.sh(
- [os.path.join(self._prefix, "ceph-dencoder"),
- "type", type,
- "import", "-",
- "decode", "dump_json"],
- stdin=data
- )
-
- return json.loads(dump.strip())
+ obj_blob = proc.stdout.getvalue()
+ return json.loads(self.dencoder(obj_type, obj_blob).strip())
def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
"""
:param pool: name of data pool or None to use primary data pool
:return: None
"""
- remote = self.mds_daemons[self.mds_ids[0]].remote
if pool is None:
pool = self.get_data_pool_name()
obj_name = "{0:x}.00000000".format(ino_no)
- args = [
- os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
- obj_name, xattr_name, data
- ]
- remote.sh(args)
+ args = ["setxattr", obj_name, xattr_name, data]
+ self.rados(args, pool=pool)
def read_backtrace(self, ino_no, pool=None):
"""
for n in range(0, ((size - 1) // stripe_size) + 1)
]
- exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
+ exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
return want_objects, exist_objects
def dirfrag_exists(self, ino, frag):
try:
- self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
+ self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
except CommandFailedError:
return False
else:
return True
- def rados(self, args, pool=None, namespace=None, stdin_data=None,
- stdin_file=None,
- stdout_data=None):
- """
- Call into the `rados` CLI from an MDS
- """
-
- if pool is None:
- pool = self.get_metadata_pool_name()
-
- # Doesn't matter which MDS we use to run rados commands, they all
- # have access to the pools
- mds_id = self.mds_ids[0]
- remote = self.mds_daemons[mds_id].remote
-
- # NB we could alternatively use librados pybindings for this, but it's a one-liner
- # using the `rados` CLI
- args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
- (["--namespace", namespace] if namespace else []) +
- args)
-
- if stdin_file is not None:
- args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
- if stdout_data is None:
- stdout_data = StringIO()
-
- p = remote.run(args=args,
- stdin=stdin_data,
- stdout=stdout_data)
- return p.stdout.getvalue().strip()
-
def list_dirfrag(self, dir_ino):
"""
Read the named object and return the list of omap keys
dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
try:
- key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
+ key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
except CommandFailedError as e:
log.error(e.__str__())
raise ObjectNotFound(dirfrag_obj_name)
- return key_list_str.split("\n") if key_list_str else []
+ return key_list_str.strip().split("\n") if key_list_str else []
+
+ def get_meta_of_fs_file(self, dir_ino, obj_name, out):
+ """
+ get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
+ warning : The splitting of directory is not considered here.
+ """
+
+ dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
+ try:
+ self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
+ except CommandFailedError as e:
+ log.error(e.__str__())
+ raise ObjectNotFound(dir_ino)
def erase_metadata_objects(self, prefix):
"""
This O(N) with the number of objects in the pool, so only suitable
for use on toy test filesystems.
"""
- all_objects = self.rados(["ls"]).split("\n")
+ all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
matching_objects = [o for o in all_objects if o.startswith(prefix)]
for o in matching_objects:
- self.rados(["rm", o])
+ self.radosm(["rm", o])
def erase_mds_objects(self, rank):
"""
it'll definitely have keys with perms to access cephfs metadata pool. This is public
so that tests can use this remote to go get locally written output files from the tools.
"""
- mds_id = self.mds_ids[0]
- return self.mds_daemons[mds_id].remote
+ return self.mon_manager.controller
def journal_tool(self, args, rank, quiet=False):
"""
fs_rank = self._make_rank(rank)
return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
+ def meta_tool(self, args, rank, quiet=False):
+ """
+ Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
+ """
+ fs_rank = self._make_rank(rank)
+ return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
+
def table_tool(self, args, quiet=False):
"""
Invoke cephfs-table-tool with the passed arguments, and return its stdout
def is_full(self):
return self.is_pool_full(self.get_data_pool_name())
+
+ def authorize(self, client_id, caps=('/', 'rw')):
+ """
+ Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
+ keyring.
+
+ client_id: client id that will be authorized
+ caps: tuple containing the path and permission (can be r or rw)
+ respectively.
+ """
+ client_name = 'client.' + client_id
+ return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
+ client_name, *caps)
+
+ def grow(self, new_max_mds, status=None):
+ oldmax = self.get_var('max_mds', status=status)
+ assert(new_max_mds > oldmax)
+ self.set_max_mds(new_max_mds)
+ return self.wait_for_daemons()
+
+ def shrink(self, new_max_mds, status=None):
+ oldmax = self.get_var('max_mds', status=status)
+ assert(new_max_mds < oldmax)
+ self.set_max_mds(new_max_mds)
+ return self.wait_for_daemons()
+
+ def run_scrub(self, cmd, rank=0):
+ return self.rank_tell(["scrub"] + cmd, rank)
+
+ def get_scrub_status(self, rank=0):
+ return self.run_scrub(["status"], rank)
+
+ def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
+ timeout=300, reverse=False):
+ # time out after "timeout" seconds and assume as done
+ if result is None:
+ result = "no active scrubs running"
+ with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
+ while proceed():
+ out_json = self.rank_tell(["scrub", "status"], rank=rank)
+ assert out_json is not None
+ if not reverse:
+ if result in out_json['status']:
+ log.info("all active scrubs completed")
+ return True
+ else:
+ if result not in out_json['status']:
+ log.info("all active scrubs completed")
+ return True
+
+ if tag is not None:
+ status = out_json['scrubs'][tag]
+ if status is not None:
+ log.info(f"scrub status for tag:{tag} - {status}")
+ else:
+ log.info(f"scrub has completed for tag:{tag}")
+ return True
+
+ # timed out waiting for scrub to complete
+ return False