]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
4 from gevent
import Greenlet
13 from io
import BytesIO
, StringIO
14 from errno
import EBUSY
16 from teuthology
.exceptions
import CommandFailedError
17 from teuthology
import misc
18 from teuthology
.nuke
import clear_firewall
19 from teuthology
.parallel
import parallel
20 from teuthology
import contextutil
21 from tasks
.ceph_manager
import write_conf
22 from tasks
import ceph_manager
25 log
= logging
.getLogger(__name__
)
28 DAEMON_WAIT_TIMEOUT
= 120
31 class FileLayout(object):
32 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
34 self
.pool_namespace
= pool_namespace
35 self
.stripe_unit
= stripe_unit
36 self
.stripe_count
= stripe_count
37 self
.object_size
= object_size
40 def load_from_ceph(layout_str
):
45 if self
.pool
is not None:
46 yield ("pool", self
.pool
)
47 if self
.pool_namespace
:
48 yield ("pool_namespace", self
.pool_namespace
)
49 if self
.stripe_unit
is not None:
50 yield ("stripe_unit", self
.stripe_unit
)
51 if self
.stripe_count
is not None:
52 yield ("stripe_count", self
.stripe_count
)
53 if self
.object_size
is not None:
54 yield ("object_size", self
.stripe_size
)
56 class ObjectNotFound(Exception):
57 def __init__(self
, object_name
):
58 self
._object
_name
= object_name
61 return "Object not found: '{0}'".format(self
._object
_name
)
63 class FSMissing(Exception):
64 def __init__(self
, ident
):
68 return f
"File system {self.ident} does not exist in the map"
70 class FSStatus(object):
72 Operations on a snapshot of the FSMap.
74 def __init__(self
, mon_manager
):
75 self
.mon
= mon_manager
76 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
79 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
81 # Expose the fsmap for manual inspection.
82 def __getitem__(self
, key
):
84 Get a field from the fsmap.
88 def get_filesystems(self
):
90 Iterator for all filesystems.
92 for fs
in self
.map['filesystems']:
97 Iterator for all the mds_info components in the FSMap.
99 for info
in self
.map['standbys']:
101 for fs
in self
.map['filesystems']:
102 for info
in fs
['mdsmap']['info'].values():
105 def get_standbys(self
):
107 Iterator for all standbys.
109 for info
in self
.map['standbys']:
112 def get_fsmap(self
, fscid
):
114 Get the fsmap for the given FSCID.
116 for fs
in self
.map['filesystems']:
117 if fscid
is None or fs
['id'] == fscid
:
119 raise FSMissing(fscid
)
121 def get_fsmap_byname(self
, name
):
123 Get the fsmap for the given file system name.
125 for fs
in self
.map['filesystems']:
126 if name
is None or fs
['mdsmap']['fs_name'] == name
:
128 raise FSMissing(name
)
130 def get_replays(self
, fscid
):
132 Get the standby:replay MDS for the given FSCID.
134 fs
= self
.get_fsmap(fscid
)
135 for info
in fs
['mdsmap']['info'].values():
136 if info
['state'] == 'up:standby-replay':
139 def get_ranks(self
, fscid
):
141 Get the ranks for the given FSCID.
143 fs
= self
.get_fsmap(fscid
)
144 for info
in fs
['mdsmap']['info'].values():
145 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
148 def get_rank(self
, fscid
, rank
):
150 Get the rank for the given FSCID.
152 for info
in self
.get_ranks(fscid
):
153 if info
['rank'] == rank
:
155 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
157 def get_mds(self
, name
):
159 Get the info for the given MDS name.
161 for info
in self
.get_all():
162 if info
['name'] == name
:
166 def get_mds_addr(self
, name
):
168 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
170 info
= self
.get_mds(name
)
174 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
175 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
177 def get_mds_addrs(self
, name
):
179 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
181 info
= self
.get_mds(name
)
183 return [e
['addr'] for e
in info
['addrs']['addrvec']]
185 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
186 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
188 def get_mds_gid(self
, gid
):
190 Get the info for the given MDS gid.
192 for info
in self
.get_all():
193 if info
['gid'] == gid
:
197 def hadfailover(self
, status
):
199 Compares two statuses for mds failovers.
200 Returns True if there is a failover.
202 for fs
in status
.map['filesystems']:
203 for info
in fs
['mdsmap']['info'].values():
204 oldinfo
= self
.get_mds_gid(info
['gid'])
205 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
210 class CephCluster(object):
212 def admin_remote(self
):
213 first_mon
= misc
.get_first_mon(self
._ctx
, None)
214 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
217 def __init__(self
, ctx
) -> None:
219 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
221 def get_config(self
, key
, service_type
=None):
223 Get config from mon by default, or a specific service if caller asks for it
225 if service_type
is None:
228 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
229 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
231 def set_ceph_conf(self
, subsys
, key
, value
):
232 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
233 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
234 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
235 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
236 # used a different config path this won't work.
238 def clear_ceph_conf(self
, subsys
, key
):
239 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
240 write_conf(self
._ctx
)
242 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
245 command
.insert(0, '--format=json')
246 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
247 response_data
= proc
.stdout
.getvalue().strip()
248 if len(response_data
) > 0:
249 j
= json
.loads(response_data
)
250 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
251 log
.debug(f
"_json_asok output\n{pretty}")
254 log
.debug("_json_asok output empty")
257 def is_addr_blocklisted(self
, addr
=None):
259 log
.warn("Couldn't get the client address, so the blocklisted "
260 "status undetermined")
263 blocklist
= json
.loads(self
.mon_manager
.run_cluster_cmd(
264 args
=["osd", "blocklist", "ls", "--format=json"],
265 stdout
=StringIO()).stdout
.getvalue())
267 if addr
== b
["addr"]:
272 class MDSCluster(CephCluster
):
274 Collective operations on all the MDS daemons in the Ceph cluster. These
275 daemons may be in use by various Filesystems.
277 For the benefit of pre-multi-filesystem tests, this class is also
278 a parent of Filesystem. The correct way to use MDSCluster going forward is
279 as a separate instance outside of your (multiple) Filesystem instances.
282 def __init__(self
, ctx
):
283 super(MDSCluster
, self
).__init
__(ctx
)
287 # do this dynamically because the list of ids may change periodically with cephadm
288 return list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'mds'))
291 def mds_daemons(self
):
292 return dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
294 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
296 Call a callback for a single named MDS, or for all.
298 Note that the parallelism here isn't for performance, it's to avoid being overly kind
299 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
300 avoid being overly kind by executing them in a particular order. However, some actions
301 don't cope with being done in parallel, so it's optional (`in_parallel`)
303 :param mds_id: MDS daemon name, or None
304 :param cb: Callback taking single argument of MDS daemon name
305 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
310 with
parallel() as p
:
311 for mds_id
in self
.mds_ids
:
314 for mds_id
in self
.mds_ids
:
319 def get_config(self
, key
, service_type
=None):
321 get_config specialization of service_type="mds"
323 if service_type
!= "mds":
324 return super(MDSCluster
, self
).get_config(key
, service_type
)
326 # Some tests stop MDS daemons, don't send commands to a dead one:
327 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
328 service_id
= random
.sample(running_daemons
, 1)[0]
329 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
331 def mds_stop(self
, mds_id
=None):
333 Stop the MDS daemon process(se). If it held a rank, that rank
334 will eventually go laggy.
336 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
338 def mds_fail(self
, mds_id
=None):
340 Inform MDSMonitor of the death of the daemon process(es). If it held
341 a rank, that rank will be relinquished.
343 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
345 def mds_restart(self
, mds_id
=None):
346 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
348 def mds_fail_restart(self
, mds_id
=None):
350 Variation on restart that includes marking MDSs as failed, so that doing this
351 operation followed by waiting for healthy daemon states guarantees that they
352 have gone down and come up, rather than potentially seeing the healthy states
353 that existed before the restart.
355 def _fail_restart(id_
):
356 self
.mds_daemons
[id_
].stop()
357 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
358 self
.mds_daemons
[id_
].restart()
360 self
._one
_or
_all
(mds_id
, _fail_restart
)
362 def mds_signal(self
, mds_id
, sig
, silent
=False):
366 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
368 def newfs(self
, name
='cephfs', create
=True):
369 return Filesystem(self
._ctx
, name
=name
, create
=create
)
372 return FSStatus(self
.mon_manager
)
374 def get_standby_daemons(self
):
375 return set([s
['name'] for s
in self
.status().get_standbys()])
377 def get_mds_hostnames(self
):
379 for mds_id
in self
.mds_ids
:
380 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
381 result
.add(mds_remote
.hostname
)
385 def set_clients_block(self
, blocked
, mds_id
=None):
387 Block (using iptables) client communications to this MDS. Be careful: if
388 other services are running on this MDS, or other MDSs try to talk to this
389 MDS, their communications may also be blocked as collatoral damage.
391 :param mds_id: Optional ID of MDS to block, default to all
394 da_flag
= "-A" if blocked
else "-D"
396 def set_block(_mds_id
):
397 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
398 status
= self
.status()
400 addr
= status
.get_mds_addr(_mds_id
)
401 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
404 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
405 "comment", "--comment", "teuthology"])
407 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
410 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
412 def set_inter_mds_block(self
, blocked
, mds_rank_1
, mds_rank_2
):
414 Block (using iptables) communications from a provided MDS to other MDSs.
415 Block all ports that an MDS uses for communication.
417 :param blocked: True to block the MDS, False otherwise
418 :param mds_rank_1: MDS rank
419 :param mds_rank_2: MDS rank
422 da_flag
= "-A" if blocked
else "-D"
424 def set_block(mds_ids
):
425 status
= self
.status()
428 remote
= self
.mon_manager
.find_remote('mds', mds
)
429 addrs
= status
.get_mds_addrs(mds
)
431 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
433 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
434 "comment", "--comment", "teuthology"])
438 remote
= self
.mon_manager
.find_remote('mds', mds
)
439 addrs
= status
.get_mds_addrs(mds
)
441 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
443 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
444 "comment", "--comment", "teuthology"])
446 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
449 self
._one
_or
_all
((mds_rank_1
, mds_rank_2
), set_block
, in_parallel
=False)
451 def clear_firewall(self
):
452 clear_firewall(self
._ctx
)
454 def get_mds_info(self
, mds_id
):
455 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
457 def is_pool_full(self
, pool_name
):
458 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
460 if pool
['pool_name'] == pool_name
:
461 return 'full' in pool
['flags_names'].split(",")
463 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
465 def delete_all_filesystems(self
):
467 Remove all filesystems that exist, and any pools in use by them.
469 for fs
in self
.status().get_filesystems():
470 Filesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
473 class Filesystem(MDSCluster
):
475 This object is for driving a CephFS filesystem. The MDS daemons driven by
476 MDSCluster may be shared with other Filesystems.
478 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
479 super(Filesystem
, self
).__init
__(ctx
)
483 self
.metadata_pool_name
= None
484 self
.metadata_overlay
= False
485 self
.data_pool_name
= None
486 self
.data_pools
= None
487 self
.fs_config
= fs_config
488 self
.ec_profile
= fs_config
.get('ec_profile')
490 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
491 self
.client_id
= client_list
[0]
492 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
495 if fscid
is not None:
496 raise RuntimeError("cannot specify fscid when creating fs")
497 if create
and not self
.legacy_configured():
500 if fscid
is not None:
502 self
.getinfo(refresh
= True)
504 # Stash a reference to the first created filesystem on ctx, so
505 # that if someone drops to the interactive shell they can easily
507 if not hasattr(self
._ctx
, "filesystem"):
508 self
._ctx
.filesystem
= self
512 return not bool(self
.get_mds_map())
516 def get_task_status(self
, status_key
):
517 return self
.mon_manager
.get_service_task_status("mds", status_key
)
519 def getinfo(self
, refresh
= False):
520 status
= self
.status()
521 if self
.id is not None:
522 fsmap
= status
.get_fsmap(self
.id)
523 elif self
.name
is not None:
524 fsmap
= status
.get_fsmap_byname(self
.name
)
526 fss
= [fs
for fs
in status
.get_filesystems()]
530 raise RuntimeError("no file system available")
532 raise RuntimeError("more than one file system available")
533 self
.id = fsmap
['id']
534 self
.name
= fsmap
['mdsmap']['fs_name']
535 self
.get_pool_names(status
= status
, refresh
= refresh
)
538 def set_metadata_overlay(self
, overlay
):
539 if self
.id is not None:
540 raise RuntimeError("cannot specify fscid when configuring overlay")
541 self
.metadata_overlay
= overlay
543 def deactivate(self
, rank
):
545 raise RuntimeError("invalid rank")
547 raise RuntimeError("cannot deactivate rank 0")
548 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
550 def reach_max_mds(self
):
551 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
552 status
= self
.getinfo()
553 mds_map
= self
.get_mds_map(status
=status
)
554 max_mds
= mds_map
['max_mds']
556 count
= len(list(self
.get_ranks(status
=status
)))
559 # deactivate mds in decending order
560 status
= self
.wait_for_daemons(status
=status
, skip_max_mds_check
=True)
561 while count
> max_mds
:
562 targets
= sorted(self
.get_ranks(status
=status
), key
=lambda r
: r
['rank'], reverse
=True)
564 log
.debug("deactivating rank %d" % target
['rank'])
565 self
.deactivate(target
['rank'])
566 status
= self
.wait_for_daemons(skip_max_mds_check
=True)
567 count
= len(list(self
.get_ranks(status
=status
)))
569 # In Mimic, deactivation is done automatically:
570 log
.info("Error:\n{}".format(traceback
.format_exc()))
571 status
= self
.wait_for_daemons()
573 status
= self
.wait_for_daemons()
575 mds_map
= self
.get_mds_map(status
=status
)
576 assert(mds_map
['max_mds'] == max_mds
)
577 assert(mds_map
['in'] == list(range(0, max_mds
)))
580 self
.mon_manager
.raw_cluster_cmd("fs", "reset", str(self
.name
), '--yes-i-really-mean-it')
583 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
585 def set_flag(self
, var
, *args
):
586 a
= map(lambda x
: str(x
).lower(), args
)
587 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
589 def set_allow_multifs(self
, yes
=True):
590 self
.set_flag("enable_multiple", yes
)
592 def set_var(self
, var
, *args
):
593 a
= map(lambda x
: str(x
).lower(), args
)
594 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
596 def set_down(self
, down
=True):
597 self
.set_var("down", str(down
).lower())
599 def set_joinable(self
, joinable
=True):
600 self
.set_var("joinable", joinable
)
602 def set_max_mds(self
, max_mds
):
603 self
.set_var("max_mds", "%d" % max_mds
)
605 def set_session_timeout(self
, timeout
):
606 self
.set_var("session_timeout", "%d" % timeout
)
608 def set_allow_standby_replay(self
, yes
):
609 self
.set_var("allow_standby_replay", yes
)
611 def set_allow_new_snaps(self
, yes
):
612 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
614 def required_client_features(self
, *args
, **kwargs
):
615 c
= ["fs", "required_client_features", self
.name
, *args
]
616 return self
.mon_manager
.run_cluster_cmd(args
=c
, **kwargs
)
618 # In Octopus+, the PG count can be omitted to use the default. We keep the
619 # hard-coded value for deployments of Mimic/Nautilus.
623 if self
.name
is None:
625 if self
.metadata_pool_name
is None:
626 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
627 if self
.data_pool_name
is None:
628 data_pool_name
= "{0}_data".format(self
.name
)
630 data_pool_name
= self
.data_pool_name
632 log
.debug("Creating filesystem '{0}'".format(self
.name
))
634 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
635 self
.metadata_pool_name
, self
.pgs_per_fs_pool
.__str
__())
637 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
638 data_pool_name
, self
.pgs_per_fs_pool
.__str
__())
640 if self
.metadata_overlay
:
641 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
642 self
.name
, self
.metadata_pool_name
, data_pool_name
,
643 '--allow-dangerous-metadata-overlay')
645 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
647 self
.metadata_pool_name
,
650 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
651 ec_data_pool_name
= data_pool_name
+ "_ec"
652 log
.debug("EC profile is %s", self
.ec_profile
)
653 cmd
= ['osd', 'erasure-code-profile', 'set', ec_data_pool_name
]
654 cmd
.extend(self
.ec_profile
)
655 self
.mon_manager
.raw_cluster_cmd(*cmd
)
656 self
.mon_manager
.raw_cluster_cmd(
657 'osd', 'pool', 'create',
658 ec_data_pool_name
, self
.pgs_per_fs_pool
.__str
__(), 'erasure',
660 self
.mon_manager
.raw_cluster_cmd(
661 'osd', 'pool', 'set',
662 ec_data_pool_name
, 'allow_ec_overwrites', 'true')
663 self
.add_data_pool(ec_data_pool_name
, create
=False)
664 self
.check_pool_application(ec_data_pool_name
)
666 self
.run_client_payload(f
"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
668 self
.check_pool_application(self
.metadata_pool_name
)
669 self
.check_pool_application(data_pool_name
)
671 # Turn off spurious standby count warnings from modifying max_mds in tests.
673 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
674 except CommandFailedError
as e
:
675 if e
.exitstatus
== 22:
676 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
681 if self
.fs_config
is not None:
682 max_mds
= self
.fs_config
.get('max_mds', 1)
684 self
.set_max_mds(max_mds
)
686 standby_replay
= self
.fs_config
.get('standby_replay', False)
687 self
.set_allow_standby_replay(standby_replay
)
689 # If absent will use the default value (60 seconds)
690 session_timeout
= self
.fs_config
.get('session_timeout', 60)
691 if session_timeout
!= 60:
692 self
.set_session_timeout(session_timeout
)
694 self
.getinfo(refresh
= True)
696 def run_client_payload(self
, cmd
):
697 # avoid circular dep by importing here:
698 from tasks
.cephfs
.fuse_mount
import FuseMount
699 d
= misc
.get_testdir(self
._ctx
)
700 m
= FuseMount(self
._ctx
, {}, d
, "admin", self
.client_remote
, cephfs_name
=self
.name
)
702 m
.run_shell_payload(cmd
)
703 m
.umount_wait(require_clean
=True)
705 def _remove_pool(self
, name
, **kwargs
):
706 c
= f
'osd pool rm {name} {name} --yes-i-really-really-mean-it'
707 return self
.mon_manager
.ceph(c
, **kwargs
)
709 def rm(self
, **kwargs
):
710 c
= f
'fs rm {self.name} --yes-i-really-mean-it'
711 return self
.mon_manager
.ceph(c
, **kwargs
)
713 def remove_pools(self
, data_pools
):
714 self
._remove
_pool
(self
.get_metadata_pool_name())
715 for poolname
in data_pools
:
717 self
._remove
_pool
(poolname
)
718 except CommandFailedError
as e
:
719 # EBUSY, this data pool is used by two metadata pools, let the
721 if e
.exitstatus
== EBUSY
:
726 def destroy(self
, reset_obj_attrs
=True):
727 log
.info(f
'Destroying file system {self.name} and related pools')
730 log
.debug('already dead...')
733 data_pools
= self
.get_data_pool_names(refresh
=True)
735 # make sure no MDSs are attached to given FS.
739 self
.remove_pools(data_pools
)
744 self
.metadata_pool_name
= None
745 self
.data_pool_name
= None
746 self
.data_pools
= None
752 self
.getinfo(refresh
=True)
754 def check_pool_application(self
, pool_name
):
755 osd_map
= self
.mon_manager
.get_osd_dump_json()
756 for pool
in osd_map
['pools']:
757 if pool
['pool_name'] == pool_name
:
758 if "application_metadata" in pool
:
759 if not "cephfs" in pool
['application_metadata']:
760 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
761 format(pool_name
=pool_name
))
764 if getattr(self
._ctx
, "filesystem", None) == self
:
765 delattr(self
._ctx
, "filesystem")
769 Whether a filesystem exists in the mon's filesystem list
771 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
772 return self
.name
in [fs
['name'] for fs
in fs_list
]
774 def legacy_configured(self
):
776 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
777 the case, the caller should avoid using Filesystem.create
780 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
781 pools
= json
.loads(out_text
)
782 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
783 if metadata_pool_exists
:
784 self
.metadata_pool_name
= 'metadata'
785 except CommandFailedError
as e
:
786 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
787 # structured output (--format) from the CLI.
788 if e
.exitstatus
== 22:
789 metadata_pool_exists
= True
793 return metadata_pool_exists
796 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
798 # may raise FSMissing
799 def get_mds_map(self
, status
=None):
801 status
= self
.status()
802 return status
.get_fsmap(self
.id)['mdsmap']
804 def get_var(self
, var
, status
=None):
805 return self
.get_mds_map(status
=status
)[var
]
807 def set_dir_layout(self
, mount
, path
, layout
):
808 for name
, value
in layout
.items():
809 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
811 def add_data_pool(self
, name
, create
=True):
813 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.pgs_per_fs_pool
.__str
__())
814 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
815 self
.get_pool_names(refresh
= True)
816 for poolid
, fs_name
in self
.data_pools
.items():
819 raise RuntimeError("could not get just created pool '{0}'".format(name
))
821 def get_pool_names(self
, refresh
= False, status
= None):
822 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
824 status
= self
.status()
825 fsmap
= status
.get_fsmap(self
.id)
827 osd_map
= self
.mon_manager
.get_osd_dump_json()
829 for p
in osd_map
['pools']:
830 id_to_name
[p
['pool']] = p
['pool_name']
832 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
834 for data_pool
in fsmap
['mdsmap']['data_pools']:
835 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
837 def get_data_pool_name(self
, refresh
= False):
838 if refresh
or self
.data_pools
is None:
839 self
.get_pool_names(refresh
= True)
840 assert(len(self
.data_pools
) == 1)
841 return next(iter(self
.data_pools
.values()))
843 def get_data_pool_id(self
, refresh
= False):
845 Don't call this if you have multiple data pools
848 if refresh
or self
.data_pools
is None:
849 self
.get_pool_names(refresh
= True)
850 assert(len(self
.data_pools
) == 1)
851 return next(iter(self
.data_pools
.keys()))
853 def get_data_pool_names(self
, refresh
= False):
854 if refresh
or self
.data_pools
is None:
855 self
.get_pool_names(refresh
= True)
856 return list(self
.data_pools
.values())
858 def get_metadata_pool_name(self
):
859 return self
.metadata_pool_name
861 def set_data_pool_name(self
, name
):
862 if self
.id is not None:
863 raise RuntimeError("can't set filesystem name if its fscid is set")
864 self
.data_pool_name
= name
866 def get_namespace_id(self
):
869 def get_pool_df(self
, pool_name
):
872 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
874 for pool_df
in self
._df
()['pools']:
875 if pool_df
['name'] == pool_name
:
876 return pool_df
['stats']
878 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
881 return self
._df
()['stats']['total_used_bytes']
883 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
885 Return true if all daemons are in one of active, standby, standby-replay, and
886 at least max_mds daemons are in 'active'.
888 Unlike most of Filesystem, this function is tolerant of new-style `fs`
889 commands being missing, because we are part of the ceph installation
890 process during upgrade suites, so must fall back to old style commands
891 when we get an EINVAL on a new style command.
895 # First, check to see that processes haven't exited with an error code
896 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
900 mds_map
= self
.get_mds_map(status
=status
)
902 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
904 for mds_id
, mds_status
in mds_map
['info'].items():
905 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
906 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
908 elif mds_status
['state'] == 'up:active':
911 log
.debug("are_daemons_healthy: {0}/{1}".format(
912 active_count
, mds_map
['max_mds']
915 if not skip_max_mds_check
:
916 if active_count
> mds_map
['max_mds']:
917 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
919 elif active_count
== mds_map
['max_mds']:
920 # The MDSMap says these guys are active, but let's check they really are
921 for mds_id
, mds_status
in mds_map
['info'].items():
922 if mds_status
['state'] == 'up:active':
924 daemon_status
= self
.mds_tell(["status"], mds_id
=mds_status
['name'])
925 except CommandFailedError
as cfe
:
926 if cfe
.exitstatus
== errno
.EINVAL
:
927 # Old version, can't do this check
930 # MDS not even running
933 if daemon_status
['state'] != 'up:active':
934 # MDS hasn't taken the latest map yet
941 log
.debug("are_daemons_healthy: skipping max_mds check")
944 def get_daemon_names(self
, state
=None, status
=None):
946 Return MDS daemon names of those daemons in the given state
950 mdsmap
= self
.get_mds_map(status
)
952 for mds_status
in sorted(mdsmap
['info'].values(),
953 key
=lambda _
: _
['rank']):
954 if mds_status
['state'] == state
or state
is None:
955 result
.append(mds_status
['name'])
959 def get_active_names(self
, status
=None):
961 Return MDS daemon names of those daemons holding ranks
964 :return: list of strings like ['a', 'b'], sorted by rank
966 return self
.get_daemon_names("up:active", status
=status
)
968 def get_all_mds_rank(self
, status
=None):
969 mdsmap
= self
.get_mds_map(status
)
971 for mds_status
in sorted(mdsmap
['info'].values(),
972 key
=lambda _
: _
['rank']):
973 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
974 result
.append(mds_status
['rank'])
978 def get_rank(self
, rank
=None, status
=None):
980 status
= self
.getinfo()
983 return status
.get_rank(self
.id, rank
)
985 def rank_restart(self
, rank
=0, status
=None):
986 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
987 self
.mds_restart(mds_id
=name
)
989 def rank_signal(self
, signal
, rank
=0, status
=None):
990 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
991 self
.mds_signal(name
, signal
)
993 def rank_freeze(self
, yes
, rank
=0):
994 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
996 def rank_fail(self
, rank
=0):
997 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
999 def get_ranks(self
, status
=None):
1001 status
= self
.getinfo()
1002 return status
.get_ranks(self
.id)
1004 def get_replays(self
, status
=None):
1006 status
= self
.getinfo()
1007 return status
.get_replays(self
.id)
1009 def get_replay(self
, rank
=0, status
=None):
1010 for replay
in self
.get_replays(status
=status
):
1011 if replay
['rank'] == rank
:
1015 def get_rank_names(self
, status
=None):
1017 Return MDS daemon names of those daemons holding a rank,
1018 sorted by rank. This includes e.g. up:replay/reconnect
1019 as well as active, but does not include standby or
1022 mdsmap
= self
.get_mds_map(status
)
1024 for mds_status
in sorted(mdsmap
['info'].values(),
1025 key
=lambda _
: _
['rank']):
1026 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1027 result
.append(mds_status
['name'])
1031 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
1033 Wait until all daemons are healthy
1038 timeout
= DAEMON_WAIT_TIMEOUT
1041 status
= self
.status()
1045 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
1051 if elapsed
> timeout
:
1052 log
.debug("status = {0}".format(status
))
1053 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1055 status
= self
.status()
1057 def dencoder(self
, obj_type
, obj_blob
):
1058 args
= [os
.path
.join(self
._prefix
, "ceph-dencoder"), 'type', obj_type
, 'import', '-', 'decode', 'dump_json']
1059 p
= self
.mon_manager
.controller
.run(args
=args
, stdin
=BytesIO(obj_blob
), stdout
=BytesIO())
1060 return p
.stdout
.getvalue()
1062 def rados(self
, *args
, **kwargs
):
1064 Callout to rados CLI.
1067 return self
.mon_manager
.do_rados(*args
, **kwargs
)
1069 def radosm(self
, *args
, **kwargs
):
1071 Interact with the metadata pool via rados CLI.
1074 return self
.rados(*args
, **kwargs
, pool
=self
.get_metadata_pool_name())
1076 def radosmo(self
, *args
, stdout
=BytesIO(), **kwargs
):
1078 Interact with the metadata pool via rados CLI. Get the stdout.
1081 return self
.radosm(*args
, **kwargs
, stdout
=stdout
).stdout
.getvalue()
1083 def get_metadata_object(self
, object_type
, object_id
):
1085 Retrieve an object from the metadata pool, pass it through
1086 ceph-dencoder to dump it to JSON, and return the decoded object.
1089 o
= self
.radosmo(['get', object_id
, '-'])
1090 j
= self
.dencoder(object_type
, o
)
1092 return json
.loads(j
)
1093 except (TypeError, ValueError):
1094 log
.error("Failed to decode JSON: '{0}'".format(j
))
1097 def get_journal_version(self
):
1099 Read the JournalPointer and Journal::Header objects to learn the version of
1102 journal_pointer_object
= '400.00000000'
1103 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1104 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1106 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1107 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1109 version
= journal_header_dump
['journal_header']['stream_format']
1110 log
.debug("Read journal version {0}".format(version
))
1114 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1116 return self
.rank_asok(command
, timeout
=timeout
)
1118 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1120 def mds_tell(self
, command
, mds_id
=None):
1122 return self
.rank_tell(command
)
1124 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{mds_id}", *command
))
1126 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1127 info
= self
.get_rank(rank
=rank
, status
=status
)
1128 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1130 def rank_tell(self
, command
, rank
=0, status
=None):
1131 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{self.id}:{rank}", *command
))
1133 def ranks_tell(self
, command
, status
=None):
1135 status
= self
.status()
1137 for r
in status
.get_ranks(self
.id):
1138 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1139 out
.append((r
['rank'], result
))
1142 def ranks_perf(self
, f
, status
=None):
1143 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1145 for rank
, perf
in perf
:
1146 out
.append((rank
, f(perf
)))
1149 def read_cache(self
, path
, depth
=None):
1150 cmd
= ["dump", "tree", path
]
1151 if depth
is not None:
1152 cmd
.append(depth
.__str
__())
1153 result
= self
.mds_asok(cmd
)
1154 if len(result
) == 0:
1155 raise RuntimeError("Path not found in cache: {0}".format(path
))
1159 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1161 Block until the MDS reaches a particular state, or a failure condition
1164 When there are multiple MDSs, succeed when exaclty one MDS is in the
1165 goal state, or fail when any MDS is in the reject state.
1167 :param goal_state: Return once the MDS is in this state
1168 :param reject: Fail if the MDS enters this state before the goal state
1169 :param timeout: Fail if this many seconds pass before reaching goal
1170 :return: number of seconds waited, rounded down to integer
1173 started_at
= time
.time()
1175 status
= self
.status()
1176 if rank
is not None:
1178 mds_info
= status
.get_rank(self
.id, rank
)
1179 current_state
= mds_info
['state'] if mds_info
else None
1180 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1182 mdsmap
= self
.get_mds_map(status
=status
)
1183 if rank
in mdsmap
['failed']:
1184 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1185 current_state
= None
1188 elif mds_id
is not None:
1189 # mds_info is None if no daemon with this ID exists in the map
1190 mds_info
= status
.get_mds(mds_id
)
1191 current_state
= mds_info
['state'] if mds_info
else None
1192 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1194 # In general, look for a single MDS
1195 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1196 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1197 current_state
= goal_state
1198 elif reject
in states
:
1199 current_state
= reject
1201 current_state
= None
1202 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1204 elapsed
= time
.time() - started_at
1205 if current_state
== goal_state
:
1206 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1208 elif reject
is not None and current_state
== reject
:
1209 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1210 elif timeout
is not None and elapsed
> timeout
:
1211 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1213 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1214 elapsed
, goal_state
, current_state
1219 def _read_data_xattr(self
, ino_no
, xattr_name
, obj_type
, pool
):
1221 pool
= self
.get_data_pool_name()
1223 obj_name
= "{0:x}.00000000".format(ino_no
)
1225 args
= ["getxattr", obj_name
, xattr_name
]
1227 proc
= self
.rados(args
, pool
=pool
, stdout
=BytesIO())
1228 except CommandFailedError
as e
:
1229 log
.error(e
.__str
__())
1230 raise ObjectNotFound(obj_name
)
1232 obj_blob
= proc
.stdout
.getvalue()
1233 return json
.loads(self
.dencoder(obj_type
, obj_blob
).strip())
1235 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1237 Write to an xattr of the 0th data object of an inode. Will
1238 succeed whether the object and/or xattr already exist or not.
1240 :param ino_no: integer inode number
1241 :param xattr_name: string name of the xattr
1242 :param data: byte array data to write to the xattr
1243 :param pool: name of data pool or None to use primary data pool
1247 pool
= self
.get_data_pool_name()
1249 obj_name
= "{0:x}.00000000".format(ino_no
)
1250 args
= ["setxattr", obj_name
, xattr_name
, data
]
1251 self
.rados(args
, pool
=pool
)
1253 def read_backtrace(self
, ino_no
, pool
=None):
1255 Read the backtrace from the data pool, return a dict in the format
1256 given by inode_backtrace_t::dump, which is something like:
1260 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1261 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1263 { "ino": 1099511627778,
1271 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1272 one data pool and that will be used.
1274 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1276 def read_layout(self
, ino_no
, pool
=None):
1278 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1281 "stripe_unit": 4194304,
1283 "object_size": 4194304,
1288 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1289 one data pool and that will be used.
1291 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1293 def _enumerate_data_objects(self
, ino
, size
):
1295 Get the list of expected data objects for a range, and the list of objects
1298 :return a tuple of two lists of strings (expected, actual)
1300 stripe_size
= 1024 * 1024 * 4
1302 size
= max(stripe_size
, size
)
1305 "{0:x}.{1:08x}".format(ino
, n
)
1306 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1309 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name(), stdout
=StringIO()).stdout
.getvalue().split("\n")
1311 return want_objects
, exist_objects
1313 def data_objects_present(self
, ino
, size
):
1315 Check that *all* the expected data objects for an inode are present in the data pool
1318 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1319 missing
= set(want_objects
) - set(exist_objects
)
1322 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1327 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1330 def data_objects_absent(self
, ino
, size
):
1331 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1332 present
= set(want_objects
) & set(exist_objects
)
1335 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1340 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1343 def dirfrag_exists(self
, ino
, frag
):
1345 self
.radosm(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1346 except CommandFailedError
:
1351 def list_dirfrag(self
, dir_ino
):
1353 Read the named object and return the list of omap keys
1355 :return a list of 0 or more strings
1358 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1361 key_list_str
= self
.radosmo(["listomapkeys", dirfrag_obj_name
], stdout
=StringIO())
1362 except CommandFailedError
as e
:
1363 log
.error(e
.__str
__())
1364 raise ObjectNotFound(dirfrag_obj_name
)
1366 return key_list_str
.strip().split("\n") if key_list_str
else []
1368 def get_meta_of_fs_file(self
, dir_ino
, obj_name
, out
):
1370 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1371 warning : The splitting of directory is not considered here.
1374 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1376 self
.radosm(["getomapval", dirfrag_obj_name
, obj_name
+"_head", out
])
1377 except CommandFailedError
as e
:
1378 log
.error(e
.__str
__())
1379 raise ObjectNotFound(dir_ino
)
1381 def erase_metadata_objects(self
, prefix
):
1383 For all objects in the metadata pool matching the prefix,
1386 This O(N) with the number of objects in the pool, so only suitable
1387 for use on toy test filesystems.
1389 all_objects
= self
.radosmo(["ls"], stdout
=StringIO()).strip().split("\n")
1390 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1391 for o
in matching_objects
:
1392 self
.radosm(["rm", o
])
1394 def erase_mds_objects(self
, rank
):
1396 Erase all the per-MDS objects for a particular rank. This includes
1397 inotable, sessiontable, journal
1400 def obj_prefix(multiplier
):
1402 MDS object naming conventions like rank 1's
1403 journal is at 201.***
1405 return "%x." % (multiplier
* 0x100 + rank
)
1407 # MDS_INO_LOG_OFFSET
1408 self
.erase_metadata_objects(obj_prefix(2))
1409 # MDS_INO_LOG_BACKUP_OFFSET
1410 self
.erase_metadata_objects(obj_prefix(3))
1411 # MDS_INO_LOG_POINTER_OFFSET
1412 self
.erase_metadata_objects(obj_prefix(4))
1413 # MDSTables & SessionMap
1414 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1419 Override this to set a different
1423 def _make_rank(self
, rank
):
1424 return "{}:{}".format(self
.name
, rank
)
1426 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1427 # Tests frequently have [client] configuration that jacks up
1428 # the objecter log level (unlikely to be interesting here)
1429 # and does not set the mds log level (very interesting here)
1431 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1433 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1435 if rank
is not None:
1436 base_args
.extend(["--rank", "%s" % str(rank
)])
1438 t1
= datetime
.datetime
.now()
1439 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1440 duration
= datetime
.datetime
.now() - t1
1441 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1442 base_args
+ args
, duration
, r
1447 def tool_remote(self
):
1449 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1450 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1451 so that tests can use this remote to go get locally written output files from the tools.
1453 return self
.mon_manager
.controller
1455 def journal_tool(self
, args
, rank
, quiet
=False):
1457 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1459 fs_rank
= self
._make
_rank
(rank
)
1460 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1462 def meta_tool(self
, args
, rank
, quiet
=False):
1464 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1466 fs_rank
= self
._make
_rank
(rank
)
1467 return self
._run
_tool
("cephfs-meta-injection", args
, fs_rank
, quiet
)
1469 def table_tool(self
, args
, quiet
=False):
1471 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1473 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1475 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1477 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1479 :param worker_count: if greater than 1, multiple workers will be run
1480 in parallel and the return value will be None
1485 for n
in range(0, worker_count
):
1486 if worker_count
> 1:
1487 # data-scan args first token is a command, followed by args to it.
1488 # insert worker arguments after the command.
1490 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1494 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1495 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1500 if worker_count
== 1:
1501 return workers
[0].value
1506 return self
.is_pool_full(self
.get_data_pool_name())
1508 def authorize(self
, client_id
, caps
=('/', 'rw')):
1510 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1513 client_id: client id that will be authorized
1514 caps: tuple containing the path and permission (can be r or rw)
1517 client_name
= 'client.' + client_id
1518 return self
.mon_manager
.raw_cluster_cmd('fs', 'authorize', self
.name
,
1521 def grow(self
, new_max_mds
, status
=None):
1522 oldmax
= self
.get_var('max_mds', status
=status
)
1523 assert(new_max_mds
> oldmax
)
1524 self
.set_max_mds(new_max_mds
)
1525 return self
.wait_for_daemons()
1527 def shrink(self
, new_max_mds
, status
=None):
1528 oldmax
= self
.get_var('max_mds', status
=status
)
1529 assert(new_max_mds
< oldmax
)
1530 self
.set_max_mds(new_max_mds
)
1531 return self
.wait_for_daemons()
1533 def run_scrub(self
, cmd
, rank
=0):
1534 return self
.rank_tell(["scrub"] + cmd
, rank
)
1536 def get_scrub_status(self
, rank
=0):
1537 return self
.run_scrub(["status"], rank
)
1539 def wait_until_scrub_complete(self
, result
=None, tag
=None, rank
=0, sleep
=30,
1540 timeout
=300, reverse
=False):
1541 # time out after "timeout" seconds and assume as done
1543 result
= "no active scrubs running"
1544 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
1546 out_json
= self
.rank_tell(["scrub", "status"], rank
=rank
)
1547 assert out_json
is not None
1549 if result
in out_json
['status']:
1550 log
.info("all active scrubs completed")
1553 if result
not in out_json
['status']:
1554 log
.info("all active scrubs completed")
1558 status
= out_json
['scrubs'][tag
]
1559 if status
is not None:
1560 log
.info(f
"scrub status for tag:{tag} - {status}")
1562 log
.info(f
"scrub has completed for tag:{tag}")
1565 # timed out waiting for scrub to complete