def json_asok(self, command, service_type, service_id, timeout=None):
if timeout is None:
- timeout = 15*60
+ timeout = 300
command.insert(0, '--format=json')
proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
response_data = proc.stdout.getvalue().strip()
if len(response_data) > 0:
- j = json.loads(response_data)
+
+ def get_nonnumeric_values(value):
+ c = {"NaN": float("nan"), "Infinity": float("inf"),
+ "-Infinity": -float("inf")}
+ return c[value]
+
+ j = json.loads(response_data.replace('inf', 'Infinity'),
+ parse_constant=get_nonnumeric_values)
pretty = json.dumps(j, sort_keys=True, indent=2)
log.debug(f"_json_asok output\n{pretty}")
return j
"""
self.mds_daemons[mds_id].signal(sig, silent);
+ def mds_is_running(self, mds_id):
+ return self.mds_daemons[mds_id].running()
+
def newfs(self, name='cephfs', create=True):
return Filesystem(self._ctx, name=name, create=create)
ip_str, port_str = re.match("(.+):(.+)", addr).groups()
remote.run(
args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
- "comment", "--comment", "teuthology"])
+ "comment", "--comment", "teuthology"], omit_sudo=False)
mds = mds_ids[1]
ip_str, port_str = re.match("(.+):(.+)", addr).groups()
remote.run(
args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
- "comment", "--comment", "teuthology"])
+ "comment", "--comment", "teuthology"], omit_sudo=False)
remote.run(
args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
- "comment", "--comment", "teuthology"])
+ "comment", "--comment", "teuthology"], omit_sudo=False)
self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
class Filesystem(MDSCluster):
+
+ """
+ Generator for all Filesystems in the cluster.
+ """
+ @classmethod
+ def get_all_fs(cls, ctx):
+ mdsc = MDSCluster(ctx)
+ status = mdsc.status()
+ for fs in status.get_filesystems():
+ yield cls(ctx, fscid=fs['id'])
+
"""
This object is for driving a CephFS filesystem. The MDS daemons driven by
MDSCluster may be shared with other Filesystems.
self.name = name
self.id = None
self.metadata_pool_name = None
- self.metadata_overlay = False
self.data_pool_name = None
self.data_pools = None
self.fs_config = fs_config
self.get_pool_names(status = status, refresh = refresh)
return status
- def set_metadata_overlay(self, overlay):
- if self.id is not None:
- raise RuntimeError("cannot specify fscid when configuring overlay")
- self.metadata_overlay = overlay
-
def reach_max_mds(self):
status = self.wait_for_daemons()
mds_map = self.get_mds_map(status=status)
def set_allow_new_snaps(self, yes):
self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
+ def set_bal_rank_mask(self, bal_rank_mask):
+ self.set_var("bal_rank_mask", bal_rank_mask)
+
+ def set_refuse_client_session(self, yes):
+ self.set_var("refuse_client_session", yes)
+
def compat(self, *args):
a = map(lambda x: str(x).lower(), args)
self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
target_size_ratio = 0.9
target_size_ratio_ec = 0.9
- def create(self):
+ def create(self, recover=False, metadata_overlay=False):
if self.name is None:
self.name = "cephfs"
if self.metadata_pool_name is None:
# will use the ec pool to store the data and a small amount of
# metadata still goes to the primary data pool for all files.
- if not self.metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
+ if not metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
self.target_size_ratio = 0.05
log.debug("Creating filesystem '{0}'".format(self.name))
else:
raise
- if self.metadata_overlay:
- self.mon_manager.raw_cluster_cmd('fs', 'new',
- self.name, self.metadata_pool_name, data_pool_name,
- '--allow-dangerous-metadata-overlay')
- else:
- self.mon_manager.raw_cluster_cmd('fs', 'new',
- self.name,
- self.metadata_pool_name,
- data_pool_name)
+ args = ["fs", "new", self.name, self.metadata_pool_name, data_pool_name]
+ if recover:
+ args.append('--recover')
+ if metadata_overlay:
+ args.append('--allow-dangerous-metadata-overlay')
+ self.mon_manager.raw_cluster_cmd(*args)
+ if not recover:
if self.ec_profile and 'disabled' not in self.ec_profile:
ec_data_pool_name = data_pool_name + "_ec"
log.debug("EC profile is %s", self.ec_profile)
raise
if self.fs_config is not None:
+ log.debug(f"fs_config: {self.fs_config}")
max_mds = self.fs_config.get('max_mds', 1)
if max_mds > 1:
self.set_max_mds(max_mds)
if session_timeout != 60:
self.set_session_timeout(session_timeout)
+ if self.fs_config.get('subvols', None) is not None:
+ log.debug(f"Creating {self.fs_config.get('subvols')} subvols "
+ f"for filesystem '{self.name}'")
+ if not hasattr(self._ctx, "created_subvols"):
+ self._ctx.created_subvols = dict()
+
+ subvols = self.fs_config.get('subvols')
+ assert(isinstance(subvols, dict))
+ assert(isinstance(subvols['create'], int))
+ assert(subvols['create'] > 0)
+
+ for sv in range(0, subvols['create']):
+ sv_name = f'sv_{sv}'
+ self.mon_manager.raw_cluster_cmd(
+ 'fs', 'subvolume', 'create', self.name, sv_name,
+ self.fs_config.get('subvol_options', ''))
+
+ if self.name not in self._ctx.created_subvols:
+ self._ctx.created_subvols[self.name] = []
+
+ subvol_path = self.mon_manager.raw_cluster_cmd(
+ 'fs', 'subvolume', 'getpath', self.name, sv_name)
+ subvol_path = subvol_path.strip()
+ self._ctx.created_subvols[self.name].append(subvol_path)
+ else:
+ log.debug(f"Not Creating any subvols for filesystem '{self.name}'")
+
+
self.getinfo(refresh = True)
# wait pgs to be clean
def rank_freeze(self, yes, rank=0):
self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
+ def rank_repaired(self, rank):
+ self.mon_manager.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self.id, rank))
+
def rank_fail(self, rank=0):
self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
+ def rank_is_running(self, rank=0, status=None):
+ name = self.get_rank(rank=rank, status=status)['name']
+ return self.mds_is_running(name)
+
def get_ranks(self, status=None):
if status is None:
status = self.getinfo()
if timeout is None:
timeout = DAEMON_WAIT_TIMEOUT
+ if self.id is None:
+ status = self.getinfo(refresh=True)
+
if status is None:
status = self.status()
out.append((rank, f(perf)))
return out
- def read_cache(self, path, depth=None):
+ def read_cache(self, path, depth=None, rank=None):
cmd = ["dump", "tree", path]
if depth is not None:
cmd.append(depth.__str__())
- result = self.mds_asok(cmd)
- if len(result) == 0:
+ result = self.rank_asok(cmd, rank=rank)
+ if result is None or len(result) == 0:
raise RuntimeError("Path not found in cache: {0}".format(path))
return result
if quiet:
base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
else:
- base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
+ base_args = [os.path.join(self._prefix, tool), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1']
if rank is not None:
base_args.extend(["--rank", "%s" % str(rank)])
caps: tuple containing the path and permission (can be r or rw)
respectively.
"""
+ if isinstance(caps[0], (tuple, list)):
+ x = []
+ for c in caps:
+ x.extend(c)
+ caps = tuple(x)
+
client_name = 'client.' + client_id
return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
client_name, *caps)
def get_scrub_status(self, rank=0):
return self.run_scrub(["status"], rank)
+ def flush(self, rank=0):
+ return self.rank_tell(["flush", "journal"], rank=rank)
+
def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
timeout=300, reverse=False):
# time out after "timeout" seconds and assume as done
# timed out waiting for scrub to complete
return False
+
+ def get_damage(self, rank=None):
+ if rank is None:
+ result = {}
+ for info in self.get_ranks():
+ rank = info['rank']
+ result[rank] = self.get_damage(rank=rank)
+ return result
+ else:
+ return self.rank_tell(['damage', 'ls'], rank=rank)