]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
4 from gevent
import Greenlet
13 from io
import BytesIO
, StringIO
14 from errno
import EBUSY
16 from teuthology
.exceptions
import CommandFailedError
17 from teuthology
import misc
18 from teuthology
.nuke
import clear_firewall
19 from teuthology
.parallel
import parallel
20 from teuthology
import contextutil
21 from tasks
.ceph_manager
import write_conf
22 from tasks
import ceph_manager
25 log
= logging
.getLogger(__name__
)
28 DAEMON_WAIT_TIMEOUT
= 120
31 class FileLayout(object):
32 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
34 self
.pool_namespace
= pool_namespace
35 self
.stripe_unit
= stripe_unit
36 self
.stripe_count
= stripe_count
37 self
.object_size
= object_size
40 def load_from_ceph(layout_str
):
45 if self
.pool
is not None:
46 yield ("pool", self
.pool
)
47 if self
.pool_namespace
:
48 yield ("pool_namespace", self
.pool_namespace
)
49 if self
.stripe_unit
is not None:
50 yield ("stripe_unit", self
.stripe_unit
)
51 if self
.stripe_count
is not None:
52 yield ("stripe_count", self
.stripe_count
)
53 if self
.object_size
is not None:
54 yield ("object_size", self
.stripe_size
)
56 class ObjectNotFound(Exception):
57 def __init__(self
, object_name
):
58 self
._object
_name
= object_name
61 return "Object not found: '{0}'".format(self
._object
_name
)
63 class FSMissing(Exception):
64 def __init__(self
, ident
):
68 return f
"File system {self.ident} does not exist in the map"
70 class FSStatus(object):
72 Operations on a snapshot of the FSMap.
74 def __init__(self
, mon_manager
, epoch
=None):
75 self
.mon
= mon_manager
76 cmd
= ["fs", "dump", "--format=json"]
78 cmd
.append(str(epoch
))
79 self
.map = json
.loads(self
.mon
.raw_cluster_cmd(*cmd
))
82 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
84 # Expose the fsmap for manual inspection.
85 def __getitem__(self
, key
):
87 Get a field from the fsmap.
91 def get_filesystems(self
):
93 Iterator for all filesystems.
95 for fs
in self
.map['filesystems']:
100 Iterator for all the mds_info components in the FSMap.
102 for info
in self
.map['standbys']:
104 for fs
in self
.map['filesystems']:
105 for info
in fs
['mdsmap']['info'].values():
108 def get_standbys(self
):
110 Iterator for all standbys.
112 for info
in self
.map['standbys']:
115 def get_fsmap(self
, fscid
):
117 Get the fsmap for the given FSCID.
119 for fs
in self
.map['filesystems']:
120 if fscid
is None or fs
['id'] == fscid
:
122 raise FSMissing(fscid
)
124 def get_fsmap_byname(self
, name
):
126 Get the fsmap for the given file system name.
128 for fs
in self
.map['filesystems']:
129 if name
is None or fs
['mdsmap']['fs_name'] == name
:
131 raise FSMissing(name
)
133 def get_replays(self
, fscid
):
135 Get the standby:replay MDS for the given FSCID.
137 fs
= self
.get_fsmap(fscid
)
138 for info
in fs
['mdsmap']['info'].values():
139 if info
['state'] == 'up:standby-replay':
142 def get_ranks(self
, fscid
):
144 Get the ranks for the given FSCID.
146 fs
= self
.get_fsmap(fscid
)
147 for info
in fs
['mdsmap']['info'].values():
148 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
151 def get_rank(self
, fscid
, rank
):
153 Get the rank for the given FSCID.
155 for info
in self
.get_ranks(fscid
):
156 if info
['rank'] == rank
:
158 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
160 def get_mds(self
, name
):
162 Get the info for the given MDS name.
164 for info
in self
.get_all():
165 if info
['name'] == name
:
169 def get_mds_addr(self
, name
):
171 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
173 info
= self
.get_mds(name
)
177 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
178 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
180 def get_mds_addrs(self
, name
):
182 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
184 info
= self
.get_mds(name
)
186 return [e
['addr'] for e
in info
['addrs']['addrvec']]
188 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
189 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
191 def get_mds_gid(self
, gid
):
193 Get the info for the given MDS gid.
195 for info
in self
.get_all():
196 if info
['gid'] == gid
:
200 def hadfailover(self
, status
):
202 Compares two statuses for mds failovers.
203 Returns True if there is a failover.
205 for fs
in status
.map['filesystems']:
206 for info
in fs
['mdsmap']['info'].values():
207 oldinfo
= self
.get_mds_gid(info
['gid'])
208 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
213 class CephCluster(object):
215 def admin_remote(self
):
216 first_mon
= misc
.get_first_mon(self
._ctx
, None)
217 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
220 def __init__(self
, ctx
) -> None:
222 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
224 def get_config(self
, key
, service_type
=None):
226 Get config from mon by default, or a specific service if caller asks for it
228 if service_type
is None:
231 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
232 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
234 def set_ceph_conf(self
, subsys
, key
, value
):
235 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
236 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
237 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
238 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
239 # used a different config path this won't work.
241 def clear_ceph_conf(self
, subsys
, key
):
242 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
243 write_conf(self
._ctx
)
245 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
248 command
.insert(0, '--format=json')
249 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
250 response_data
= proc
.stdout
.getvalue().strip()
251 if len(response_data
) > 0:
252 j
= json
.loads(response_data
)
253 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
254 log
.debug(f
"_json_asok output\n{pretty}")
257 log
.debug("_json_asok output empty")
260 def is_addr_blocklisted(self
, addr
=None):
262 log
.warn("Couldn't get the client address, so the blocklisted "
263 "status undetermined")
266 blocklist
= json
.loads(self
.mon_manager
.run_cluster_cmd(
267 args
=["osd", "blocklist", "ls", "--format=json"],
268 stdout
=StringIO()).stdout
.getvalue())
270 if addr
== b
["addr"]:
275 class MDSCluster(CephCluster
):
277 Collective operations on all the MDS daemons in the Ceph cluster. These
278 daemons may be in use by various Filesystems.
280 For the benefit of pre-multi-filesystem tests, this class is also
281 a parent of Filesystem. The correct way to use MDSCluster going forward is
282 as a separate instance outside of your (multiple) Filesystem instances.
285 def __init__(self
, ctx
):
286 super(MDSCluster
, self
).__init
__(ctx
)
290 # do this dynamically because the list of ids may change periodically with cephadm
291 return list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'mds'))
294 def mds_daemons(self
):
295 return dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
297 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
299 Call a callback for a single named MDS, or for all.
301 Note that the parallelism here isn't for performance, it's to avoid being overly kind
302 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
303 avoid being overly kind by executing them in a particular order. However, some actions
304 don't cope with being done in parallel, so it's optional (`in_parallel`)
306 :param mds_id: MDS daemon name, or None
307 :param cb: Callback taking single argument of MDS daemon name
308 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
313 with
parallel() as p
:
314 for mds_id
in self
.mds_ids
:
317 for mds_id
in self
.mds_ids
:
322 def get_config(self
, key
, service_type
=None):
324 get_config specialization of service_type="mds"
326 if service_type
!= "mds":
327 return super(MDSCluster
, self
).get_config(key
, service_type
)
329 # Some tests stop MDS daemons, don't send commands to a dead one:
330 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
331 service_id
= random
.sample(running_daemons
, 1)[0]
332 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
334 def mds_stop(self
, mds_id
=None):
336 Stop the MDS daemon process(se). If it held a rank, that rank
337 will eventually go laggy.
339 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
341 def mds_fail(self
, mds_id
=None):
343 Inform MDSMonitor of the death of the daemon process(es). If it held
344 a rank, that rank will be relinquished.
346 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
348 def mds_restart(self
, mds_id
=None):
349 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
351 def mds_fail_restart(self
, mds_id
=None):
353 Variation on restart that includes marking MDSs as failed, so that doing this
354 operation followed by waiting for healthy daemon states guarantees that they
355 have gone down and come up, rather than potentially seeing the healthy states
356 that existed before the restart.
358 def _fail_restart(id_
):
359 self
.mds_daemons
[id_
].stop()
360 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
361 self
.mds_daemons
[id_
].restart()
363 self
._one
_or
_all
(mds_id
, _fail_restart
)
365 def mds_signal(self
, mds_id
, sig
, silent
=False):
369 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
371 def newfs(self
, name
='cephfs', create
=True):
372 return Filesystem(self
._ctx
, name
=name
, create
=create
)
374 def status(self
, epoch
=None):
375 return FSStatus(self
.mon_manager
, epoch
)
377 def get_standby_daemons(self
):
378 return set([s
['name'] for s
in self
.status().get_standbys()])
380 def get_mds_hostnames(self
):
382 for mds_id
in self
.mds_ids
:
383 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
384 result
.add(mds_remote
.hostname
)
388 def set_clients_block(self
, blocked
, mds_id
=None):
390 Block (using iptables) client communications to this MDS. Be careful: if
391 other services are running on this MDS, or other MDSs try to talk to this
392 MDS, their communications may also be blocked as collatoral damage.
394 :param mds_id: Optional ID of MDS to block, default to all
397 da_flag
= "-A" if blocked
else "-D"
399 def set_block(_mds_id
):
400 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
401 status
= self
.status()
403 addr
= status
.get_mds_addr(_mds_id
)
404 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
407 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
410 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
411 "comment", "--comment", "teuthology"])
413 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
415 def set_inter_mds_block(self
, blocked
, mds_rank_1
, mds_rank_2
):
417 Block (using iptables) communications from a provided MDS to other MDSs.
418 Block all ports that an MDS uses for communication.
420 :param blocked: True to block the MDS, False otherwise
421 :param mds_rank_1: MDS rank
422 :param mds_rank_2: MDS rank
425 da_flag
= "-A" if blocked
else "-D"
427 def set_block(mds_ids
):
428 status
= self
.status()
431 remote
= self
.mon_manager
.find_remote('mds', mds
)
432 addrs
= status
.get_mds_addrs(mds
)
434 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
436 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
437 "comment", "--comment", "teuthology"])
441 remote
= self
.mon_manager
.find_remote('mds', mds
)
442 addrs
= status
.get_mds_addrs(mds
)
444 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
446 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
449 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
450 "comment", "--comment", "teuthology"])
452 self
._one
_or
_all
((mds_rank_1
, mds_rank_2
), set_block
, in_parallel
=False)
454 def clear_firewall(self
):
455 clear_firewall(self
._ctx
)
457 def get_mds_info(self
, mds_id
):
458 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
460 def is_pool_full(self
, pool_name
):
461 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
463 if pool
['pool_name'] == pool_name
:
464 return 'full' in pool
['flags_names'].split(",")
466 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
468 def delete_all_filesystems(self
):
470 Remove all filesystems that exist, and any pools in use by them.
472 for fs
in self
.status().get_filesystems():
473 Filesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
476 class Filesystem(MDSCluster
):
478 This object is for driving a CephFS filesystem. The MDS daemons driven by
479 MDSCluster may be shared with other Filesystems.
481 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
482 super(Filesystem
, self
).__init
__(ctx
)
486 self
.metadata_pool_name
= None
487 self
.metadata_overlay
= False
488 self
.data_pool_name
= None
489 self
.data_pools
= None
490 self
.fs_config
= fs_config
491 self
.ec_profile
= fs_config
.get('ec_profile')
493 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
494 self
.client_id
= client_list
[0]
495 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
498 if fscid
is not None:
499 raise RuntimeError("cannot specify fscid when creating fs")
500 if create
and not self
.legacy_configured():
503 if fscid
is not None:
505 self
.getinfo(refresh
= True)
507 # Stash a reference to the first created filesystem on ctx, so
508 # that if someone drops to the interactive shell they can easily
510 if not hasattr(self
._ctx
, "filesystem"):
511 self
._ctx
.filesystem
= self
515 return not bool(self
.get_mds_map())
519 def get_task_status(self
, status_key
):
520 return self
.mon_manager
.get_service_task_status("mds", status_key
)
522 def getinfo(self
, refresh
= False):
523 status
= self
.status()
524 if self
.id is not None:
525 fsmap
= status
.get_fsmap(self
.id)
526 elif self
.name
is not None:
527 fsmap
= status
.get_fsmap_byname(self
.name
)
529 fss
= [fs
for fs
in status
.get_filesystems()]
533 raise RuntimeError("no file system available")
535 raise RuntimeError("more than one file system available")
536 self
.id = fsmap
['id']
537 self
.name
= fsmap
['mdsmap']['fs_name']
538 self
.get_pool_names(status
= status
, refresh
= refresh
)
541 def set_metadata_overlay(self
, overlay
):
542 if self
.id is not None:
543 raise RuntimeError("cannot specify fscid when configuring overlay")
544 self
.metadata_overlay
= overlay
546 def deactivate(self
, rank
):
548 raise RuntimeError("invalid rank")
550 raise RuntimeError("cannot deactivate rank 0")
551 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
553 def reach_max_mds(self
):
554 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
555 status
= self
.getinfo()
556 mds_map
= self
.get_mds_map(status
=status
)
557 max_mds
= mds_map
['max_mds']
559 count
= len(list(self
.get_ranks(status
=status
)))
562 # deactivate mds in decending order
563 status
= self
.wait_for_daemons(status
=status
, skip_max_mds_check
=True)
564 while count
> max_mds
:
565 targets
= sorted(self
.get_ranks(status
=status
), key
=lambda r
: r
['rank'], reverse
=True)
567 log
.debug("deactivating rank %d" % target
['rank'])
568 self
.deactivate(target
['rank'])
569 status
= self
.wait_for_daemons(skip_max_mds_check
=True)
570 count
= len(list(self
.get_ranks(status
=status
)))
572 # In Mimic, deactivation is done automatically:
573 log
.info("Error:\n{}".format(traceback
.format_exc()))
574 status
= self
.wait_for_daemons()
576 status
= self
.wait_for_daemons()
578 mds_map
= self
.get_mds_map(status
=status
)
579 assert(mds_map
['max_mds'] == max_mds
)
580 assert(mds_map
['in'] == list(range(0, max_mds
)))
583 self
.mon_manager
.raw_cluster_cmd("fs", "reset", str(self
.name
), '--yes-i-really-mean-it')
586 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
588 def set_flag(self
, var
, *args
):
589 a
= map(lambda x
: str(x
).lower(), args
)
590 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
592 def set_allow_multifs(self
, yes
=True):
593 self
.set_flag("enable_multiple", yes
)
595 def set_var(self
, var
, *args
):
596 a
= map(lambda x
: str(x
).lower(), args
)
597 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
599 def set_down(self
, down
=True):
600 self
.set_var("down", str(down
).lower())
602 def set_joinable(self
, joinable
=True):
603 self
.set_var("joinable", joinable
)
605 def set_max_mds(self
, max_mds
):
606 self
.set_var("max_mds", "%d" % max_mds
)
608 def set_session_timeout(self
, timeout
):
609 self
.set_var("session_timeout", "%d" % timeout
)
611 def set_allow_standby_replay(self
, yes
):
612 self
.set_var("allow_standby_replay", yes
)
614 def set_allow_new_snaps(self
, yes
):
615 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
617 def compat(self
, *args
):
618 a
= map(lambda x
: str(x
).lower(), args
)
619 self
.mon_manager
.raw_cluster_cmd("fs", "compat", self
.name
, *a
)
621 def add_compat(self
, *args
):
622 self
.compat("add_compat", *args
)
624 def add_incompat(self
, *args
):
625 self
.compat("add_incompat", *args
)
627 def rm_compat(self
, *args
):
628 self
.compat("rm_compat", *args
)
630 def rm_incompat(self
, *args
):
631 self
.compat("rm_incompat", *args
)
633 def required_client_features(self
, *args
, **kwargs
):
634 c
= ["fs", "required_client_features", self
.name
, *args
]
635 return self
.mon_manager
.run_cluster_cmd(args
=c
, **kwargs
)
637 # Since v15.1.0 the pg autoscale mode has been enabled as default,
638 # will let the pg autoscale mode to calculate the pg_num as needed.
639 # We set the pg_num_min to 64 to make sure that pg autoscale mode
640 # won't set the pg_num to low to fix Tracker#45434.
643 target_size_ratio
= 0.9
644 target_size_ratio_ec
= 0.9
647 if self
.name
is None:
649 if self
.metadata_pool_name
is None:
650 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
651 if self
.data_pool_name
is None:
652 data_pool_name
= "{0}_data".format(self
.name
)
654 data_pool_name
= self
.data_pool_name
656 # will use the ec pool to store the data and a small amount of
657 # metadata still goes to the primary data pool for all files.
658 if not self
.metadata_overlay
and self
.ec_profile
and 'disabled' not in self
.ec_profile
:
659 self
.target_size_ratio
= 0.05
661 log
.debug("Creating filesystem '{0}'".format(self
.name
))
663 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
664 self
.metadata_pool_name
, str(self
.pg_num
),
665 '--pg_num_min', str(self
.pg_num_min
))
667 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
668 data_pool_name
, str(self
.pg_num
),
669 '--pg_num_min', str(self
.pg_num_min
),
670 '--target_size_ratio',
671 str(self
.target_size_ratio
))
673 if self
.metadata_overlay
:
674 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
675 self
.name
, self
.metadata_pool_name
, data_pool_name
,
676 '--allow-dangerous-metadata-overlay')
678 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
680 self
.metadata_pool_name
,
683 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
684 ec_data_pool_name
= data_pool_name
+ "_ec"
685 log
.debug("EC profile is %s", self
.ec_profile
)
686 cmd
= ['osd', 'erasure-code-profile', 'set', ec_data_pool_name
]
687 cmd
.extend(self
.ec_profile
)
688 self
.mon_manager
.raw_cluster_cmd(*cmd
)
689 self
.mon_manager
.raw_cluster_cmd(
690 'osd', 'pool', 'create', ec_data_pool_name
,
691 'erasure', ec_data_pool_name
,
692 '--pg_num_min', str(self
.pg_num_min
),
693 '--target_size_ratio', str(self
.target_size_ratio_ec
))
694 self
.mon_manager
.raw_cluster_cmd(
695 'osd', 'pool', 'set',
696 ec_data_pool_name
, 'allow_ec_overwrites', 'true')
697 self
.add_data_pool(ec_data_pool_name
, create
=False)
698 self
.check_pool_application(ec_data_pool_name
)
700 self
.run_client_payload(f
"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
702 self
.check_pool_application(self
.metadata_pool_name
)
703 self
.check_pool_application(data_pool_name
)
705 # Turn off spurious standby count warnings from modifying max_mds in tests.
707 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
708 except CommandFailedError
as e
:
709 if e
.exitstatus
== 22:
710 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
715 if self
.fs_config
is not None:
716 max_mds
= self
.fs_config
.get('max_mds', 1)
718 self
.set_max_mds(max_mds
)
720 standby_replay
= self
.fs_config
.get('standby_replay', False)
721 self
.set_allow_standby_replay(standby_replay
)
723 # If absent will use the default value (60 seconds)
724 session_timeout
= self
.fs_config
.get('session_timeout', 60)
725 if session_timeout
!= 60:
726 self
.set_session_timeout(session_timeout
)
728 self
.getinfo(refresh
= True)
730 # wait pgs to be clean
731 self
.mon_manager
.wait_for_clean()
733 def run_client_payload(self
, cmd
):
734 # avoid circular dep by importing here:
735 from tasks
.cephfs
.fuse_mount
import FuseMount
736 d
= misc
.get_testdir(self
._ctx
)
737 m
= FuseMount(self
._ctx
, {}, d
, "admin", self
.client_remote
, cephfs_name
=self
.name
)
739 m
.run_shell_payload(cmd
)
740 m
.umount_wait(require_clean
=True)
742 def _remove_pool(self
, name
, **kwargs
):
743 c
= f
'osd pool rm {name} {name} --yes-i-really-really-mean-it'
744 return self
.mon_manager
.ceph(c
, **kwargs
)
746 def rm(self
, **kwargs
):
747 c
= f
'fs rm {self.name} --yes-i-really-mean-it'
748 return self
.mon_manager
.ceph(c
, **kwargs
)
750 def remove_pools(self
, data_pools
):
751 self
._remove
_pool
(self
.get_metadata_pool_name())
752 for poolname
in data_pools
:
754 self
._remove
_pool
(poolname
)
755 except CommandFailedError
as e
:
756 # EBUSY, this data pool is used by two metadata pools, let the
758 if e
.exitstatus
== EBUSY
:
763 def destroy(self
, reset_obj_attrs
=True):
764 log
.info(f
'Destroying file system {self.name} and related pools')
767 log
.debug('already dead...')
770 data_pools
= self
.get_data_pool_names(refresh
=True)
772 # make sure no MDSs are attached to given FS.
776 self
.remove_pools(data_pools
)
781 self
.metadata_pool_name
= None
782 self
.data_pool_name
= None
783 self
.data_pools
= None
789 self
.getinfo(refresh
=True)
791 def check_pool_application(self
, pool_name
):
792 osd_map
= self
.mon_manager
.get_osd_dump_json()
793 for pool
in osd_map
['pools']:
794 if pool
['pool_name'] == pool_name
:
795 if "application_metadata" in pool
:
796 if not "cephfs" in pool
['application_metadata']:
797 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
798 format(pool_name
=pool_name
))
801 if getattr(self
._ctx
, "filesystem", None) == self
:
802 delattr(self
._ctx
, "filesystem")
806 Whether a filesystem exists in the mon's filesystem list
808 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
809 return self
.name
in [fs
['name'] for fs
in fs_list
]
811 def legacy_configured(self
):
813 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
814 the case, the caller should avoid using Filesystem.create
817 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
818 pools
= json
.loads(out_text
)
819 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
820 if metadata_pool_exists
:
821 self
.metadata_pool_name
= 'metadata'
822 except CommandFailedError
as e
:
823 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
824 # structured output (--format) from the CLI.
825 if e
.exitstatus
== 22:
826 metadata_pool_exists
= True
830 return metadata_pool_exists
833 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
835 # may raise FSMissing
836 def get_mds_map(self
, status
=None):
838 status
= self
.status()
839 return status
.get_fsmap(self
.id)['mdsmap']
841 def get_var(self
, var
, status
=None):
842 return self
.get_mds_map(status
=status
)[var
]
844 def set_dir_layout(self
, mount
, path
, layout
):
845 for name
, value
in layout
.items():
846 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
848 def add_data_pool(self
, name
, create
=True):
850 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
851 '--pg_num_min', str(self
.pg_num_min
))
852 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
853 self
.get_pool_names(refresh
= True)
854 for poolid
, fs_name
in self
.data_pools
.items():
857 raise RuntimeError("could not get just created pool '{0}'".format(name
))
859 def get_pool_names(self
, refresh
= False, status
= None):
860 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
862 status
= self
.status()
863 fsmap
= status
.get_fsmap(self
.id)
865 osd_map
= self
.mon_manager
.get_osd_dump_json()
867 for p
in osd_map
['pools']:
868 id_to_name
[p
['pool']] = p
['pool_name']
870 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
872 for data_pool
in fsmap
['mdsmap']['data_pools']:
873 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
875 def get_data_pool_name(self
, refresh
= False):
876 if refresh
or self
.data_pools
is None:
877 self
.get_pool_names(refresh
= True)
878 assert(len(self
.data_pools
) == 1)
879 return next(iter(self
.data_pools
.values()))
881 def get_data_pool_id(self
, refresh
= False):
883 Don't call this if you have multiple data pools
886 if refresh
or self
.data_pools
is None:
887 self
.get_pool_names(refresh
= True)
888 assert(len(self
.data_pools
) == 1)
889 return next(iter(self
.data_pools
.keys()))
891 def get_data_pool_names(self
, refresh
= False):
892 if refresh
or self
.data_pools
is None:
893 self
.get_pool_names(refresh
= True)
894 return list(self
.data_pools
.values())
896 def get_metadata_pool_name(self
):
897 return self
.metadata_pool_name
899 def set_data_pool_name(self
, name
):
900 if self
.id is not None:
901 raise RuntimeError("can't set filesystem name if its fscid is set")
902 self
.data_pool_name
= name
904 def get_pool_pg_num(self
, pool_name
):
905 pgs
= json
.loads(self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'get',
907 '--format=json-pretty'))
908 return int(pgs
['pg_num'])
910 def get_namespace_id(self
):
913 def get_pool_df(self
, pool_name
):
916 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
918 for pool_df
in self
._df
()['pools']:
919 if pool_df
['name'] == pool_name
:
920 return pool_df
['stats']
922 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
925 return self
._df
()['stats']['total_used_bytes']
927 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
929 Return true if all daemons are in one of active, standby, standby-replay, and
930 at least max_mds daemons are in 'active'.
932 Unlike most of Filesystem, this function is tolerant of new-style `fs`
933 commands being missing, because we are part of the ceph installation
934 process during upgrade suites, so must fall back to old style commands
935 when we get an EINVAL on a new style command.
939 # First, check to see that processes haven't exited with an error code
940 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
944 mds_map
= self
.get_mds_map(status
=status
)
946 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
948 for mds_id
, mds_status
in mds_map
['info'].items():
949 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
950 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
952 elif mds_status
['state'] == 'up:active':
955 log
.debug("are_daemons_healthy: {0}/{1}".format(
956 active_count
, mds_map
['max_mds']
959 if not skip_max_mds_check
:
960 if active_count
> mds_map
['max_mds']:
961 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
963 elif active_count
== mds_map
['max_mds']:
964 # The MDSMap says these guys are active, but let's check they really are
965 for mds_id
, mds_status
in mds_map
['info'].items():
966 if mds_status
['state'] == 'up:active':
968 daemon_status
= self
.mds_tell(["status"], mds_id
=mds_status
['name'])
969 except CommandFailedError
as cfe
:
970 if cfe
.exitstatus
== errno
.EINVAL
:
971 # Old version, can't do this check
974 # MDS not even running
977 if daemon_status
['state'] != 'up:active':
978 # MDS hasn't taken the latest map yet
985 log
.debug("are_daemons_healthy: skipping max_mds check")
988 def get_daemon_names(self
, state
=None, status
=None):
990 Return MDS daemon names of those daemons in the given state
994 mdsmap
= self
.get_mds_map(status
)
996 for mds_status
in sorted(mdsmap
['info'].values(),
997 key
=lambda _
: _
['rank']):
998 if mds_status
['state'] == state
or state
is None:
999 result
.append(mds_status
['name'])
1003 def get_active_names(self
, status
=None):
1005 Return MDS daemon names of those daemons holding ranks
1008 :return: list of strings like ['a', 'b'], sorted by rank
1010 return self
.get_daemon_names("up:active", status
=status
)
1012 def get_all_mds_rank(self
, status
=None):
1013 mdsmap
= self
.get_mds_map(status
)
1015 for mds_status
in sorted(mdsmap
['info'].values(),
1016 key
=lambda _
: _
['rank']):
1017 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1018 result
.append(mds_status
['rank'])
1022 def get_rank(self
, rank
=None, status
=None):
1024 status
= self
.getinfo()
1027 return status
.get_rank(self
.id, rank
)
1029 def rank_restart(self
, rank
=0, status
=None):
1030 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1031 self
.mds_restart(mds_id
=name
)
1033 def rank_signal(self
, signal
, rank
=0, status
=None):
1034 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1035 self
.mds_signal(name
, signal
)
1037 def rank_freeze(self
, yes
, rank
=0):
1038 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
1040 def rank_fail(self
, rank
=0):
1041 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
1043 def get_ranks(self
, status
=None):
1045 status
= self
.getinfo()
1046 return status
.get_ranks(self
.id)
1048 def get_replays(self
, status
=None):
1050 status
= self
.getinfo()
1051 return status
.get_replays(self
.id)
1053 def get_replay(self
, rank
=0, status
=None):
1054 for replay
in self
.get_replays(status
=status
):
1055 if replay
['rank'] == rank
:
1059 def get_rank_names(self
, status
=None):
1061 Return MDS daemon names of those daemons holding a rank,
1062 sorted by rank. This includes e.g. up:replay/reconnect
1063 as well as active, but does not include standby or
1066 mdsmap
= self
.get_mds_map(status
)
1068 for mds_status
in sorted(mdsmap
['info'].values(),
1069 key
=lambda _
: _
['rank']):
1070 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1071 result
.append(mds_status
['name'])
1075 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
1077 Wait until all daemons are healthy
1082 timeout
= DAEMON_WAIT_TIMEOUT
1085 status
= self
.status()
1089 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
1095 if elapsed
> timeout
:
1096 log
.debug("status = {0}".format(status
))
1097 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1099 status
= self
.status()
1101 def dencoder(self
, obj_type
, obj_blob
):
1102 args
= [os
.path
.join(self
._prefix
, "ceph-dencoder"), 'type', obj_type
, 'import', '-', 'decode', 'dump_json']
1103 p
= self
.mon_manager
.controller
.run(args
=args
, stdin
=BytesIO(obj_blob
), stdout
=BytesIO())
1104 return p
.stdout
.getvalue()
1106 def rados(self
, *args
, **kwargs
):
1108 Callout to rados CLI.
1111 return self
.mon_manager
.do_rados(*args
, **kwargs
)
1113 def radosm(self
, *args
, **kwargs
):
1115 Interact with the metadata pool via rados CLI.
1118 return self
.rados(*args
, **kwargs
, pool
=self
.get_metadata_pool_name())
1120 def radosmo(self
, *args
, stdout
=BytesIO(), **kwargs
):
1122 Interact with the metadata pool via rados CLI. Get the stdout.
1125 return self
.radosm(*args
, **kwargs
, stdout
=stdout
).stdout
.getvalue()
1127 def get_metadata_object(self
, object_type
, object_id
):
1129 Retrieve an object from the metadata pool, pass it through
1130 ceph-dencoder to dump it to JSON, and return the decoded object.
1133 o
= self
.radosmo(['get', object_id
, '-'])
1134 j
= self
.dencoder(object_type
, o
)
1136 return json
.loads(j
)
1137 except (TypeError, ValueError):
1138 log
.error("Failed to decode JSON: '{0}'".format(j
))
1141 def get_journal_version(self
):
1143 Read the JournalPointer and Journal::Header objects to learn the version of
1146 journal_pointer_object
= '400.00000000'
1147 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1148 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1150 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1151 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1153 version
= journal_header_dump
['journal_header']['stream_format']
1154 log
.debug("Read journal version {0}".format(version
))
1158 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1160 return self
.rank_asok(command
, timeout
=timeout
)
1162 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1164 def mds_tell(self
, command
, mds_id
=None):
1166 return self
.rank_tell(command
)
1168 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{mds_id}", *command
))
1170 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1171 info
= self
.get_rank(rank
=rank
, status
=status
)
1172 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1174 def rank_tell(self
, command
, rank
=0, status
=None):
1175 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{self.id}:{rank}", *command
))
1177 def ranks_tell(self
, command
, status
=None):
1179 status
= self
.status()
1181 for r
in status
.get_ranks(self
.id):
1182 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1183 out
.append((r
['rank'], result
))
1186 def ranks_perf(self
, f
, status
=None):
1187 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1189 for rank
, perf
in perf
:
1190 out
.append((rank
, f(perf
)))
1193 def read_cache(self
, path
, depth
=None):
1194 cmd
= ["dump", "tree", path
]
1195 if depth
is not None:
1196 cmd
.append(depth
.__str
__())
1197 result
= self
.mds_asok(cmd
)
1198 if len(result
) == 0:
1199 raise RuntimeError("Path not found in cache: {0}".format(path
))
1203 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1205 Block until the MDS reaches a particular state, or a failure condition
1208 When there are multiple MDSs, succeed when exaclty one MDS is in the
1209 goal state, or fail when any MDS is in the reject state.
1211 :param goal_state: Return once the MDS is in this state
1212 :param reject: Fail if the MDS enters this state before the goal state
1213 :param timeout: Fail if this many seconds pass before reaching goal
1214 :return: number of seconds waited, rounded down to integer
1217 started_at
= time
.time()
1219 status
= self
.status()
1220 if rank
is not None:
1222 mds_info
= status
.get_rank(self
.id, rank
)
1223 current_state
= mds_info
['state'] if mds_info
else None
1224 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1226 mdsmap
= self
.get_mds_map(status
=status
)
1227 if rank
in mdsmap
['failed']:
1228 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1229 current_state
= None
1232 elif mds_id
is not None:
1233 # mds_info is None if no daemon with this ID exists in the map
1234 mds_info
= status
.get_mds(mds_id
)
1235 current_state
= mds_info
['state'] if mds_info
else None
1236 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1238 # In general, look for a single MDS
1239 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1240 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1241 current_state
= goal_state
1242 elif reject
in states
:
1243 current_state
= reject
1245 current_state
= None
1246 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1248 elapsed
= time
.time() - started_at
1249 if current_state
== goal_state
:
1250 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1252 elif reject
is not None and current_state
== reject
:
1253 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1254 elif timeout
is not None and elapsed
> timeout
:
1255 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1257 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1258 elapsed
, goal_state
, current_state
1263 def _read_data_xattr(self
, ino_no
, xattr_name
, obj_type
, pool
):
1265 pool
= self
.get_data_pool_name()
1267 obj_name
= "{0:x}.00000000".format(ino_no
)
1269 args
= ["getxattr", obj_name
, xattr_name
]
1271 proc
= self
.rados(args
, pool
=pool
, stdout
=BytesIO())
1272 except CommandFailedError
as e
:
1273 log
.error(e
.__str
__())
1274 raise ObjectNotFound(obj_name
)
1276 obj_blob
= proc
.stdout
.getvalue()
1277 return json
.loads(self
.dencoder(obj_type
, obj_blob
).strip())
1279 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1281 Write to an xattr of the 0th data object of an inode. Will
1282 succeed whether the object and/or xattr already exist or not.
1284 :param ino_no: integer inode number
1285 :param xattr_name: string name of the xattr
1286 :param data: byte array data to write to the xattr
1287 :param pool: name of data pool or None to use primary data pool
1291 pool
= self
.get_data_pool_name()
1293 obj_name
= "{0:x}.00000000".format(ino_no
)
1294 args
= ["setxattr", obj_name
, xattr_name
, data
]
1295 self
.rados(args
, pool
=pool
)
1297 def read_backtrace(self
, ino_no
, pool
=None):
1299 Read the backtrace from the data pool, return a dict in the format
1300 given by inode_backtrace_t::dump, which is something like:
1304 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1305 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1307 { "ino": 1099511627778,
1315 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1316 one data pool and that will be used.
1318 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1320 def read_layout(self
, ino_no
, pool
=None):
1322 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1325 "stripe_unit": 4194304,
1327 "object_size": 4194304,
1332 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1333 one data pool and that will be used.
1335 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1337 def _enumerate_data_objects(self
, ino
, size
):
1339 Get the list of expected data objects for a range, and the list of objects
1342 :return a tuple of two lists of strings (expected, actual)
1344 stripe_size
= 1024 * 1024 * 4
1346 size
= max(stripe_size
, size
)
1349 "{0:x}.{1:08x}".format(ino
, n
)
1350 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1353 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name(), stdout
=StringIO()).stdout
.getvalue().split("\n")
1355 return want_objects
, exist_objects
1357 def data_objects_present(self
, ino
, size
):
1359 Check that *all* the expected data objects for an inode are present in the data pool
1362 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1363 missing
= set(want_objects
) - set(exist_objects
)
1366 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1371 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1374 def data_objects_absent(self
, ino
, size
):
1375 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1376 present
= set(want_objects
) & set(exist_objects
)
1379 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1384 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1387 def dirfrag_exists(self
, ino
, frag
):
1389 self
.radosm(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1390 except CommandFailedError
:
1395 def list_dirfrag(self
, dir_ino
):
1397 Read the named object and return the list of omap keys
1399 :return a list of 0 or more strings
1402 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1405 key_list_str
= self
.radosmo(["listomapkeys", dirfrag_obj_name
], stdout
=StringIO())
1406 except CommandFailedError
as e
:
1407 log
.error(e
.__str
__())
1408 raise ObjectNotFound(dirfrag_obj_name
)
1410 return key_list_str
.strip().split("\n") if key_list_str
else []
1412 def get_meta_of_fs_file(self
, dir_ino
, obj_name
, out
):
1414 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1415 warning : The splitting of directory is not considered here.
1418 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1420 self
.radosm(["getomapval", dirfrag_obj_name
, obj_name
+"_head", out
])
1421 except CommandFailedError
as e
:
1422 log
.error(e
.__str
__())
1423 raise ObjectNotFound(dir_ino
)
1425 def erase_metadata_objects(self
, prefix
):
1427 For all objects in the metadata pool matching the prefix,
1430 This O(N) with the number of objects in the pool, so only suitable
1431 for use on toy test filesystems.
1433 all_objects
= self
.radosmo(["ls"], stdout
=StringIO()).strip().split("\n")
1434 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1435 for o
in matching_objects
:
1436 self
.radosm(["rm", o
])
1438 def erase_mds_objects(self
, rank
):
1440 Erase all the per-MDS objects for a particular rank. This includes
1441 inotable, sessiontable, journal
1444 def obj_prefix(multiplier
):
1446 MDS object naming conventions like rank 1's
1447 journal is at 201.***
1449 return "%x." % (multiplier
* 0x100 + rank
)
1451 # MDS_INO_LOG_OFFSET
1452 self
.erase_metadata_objects(obj_prefix(2))
1453 # MDS_INO_LOG_BACKUP_OFFSET
1454 self
.erase_metadata_objects(obj_prefix(3))
1455 # MDS_INO_LOG_POINTER_OFFSET
1456 self
.erase_metadata_objects(obj_prefix(4))
1457 # MDSTables & SessionMap
1458 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1463 Override this to set a different
1467 def _make_rank(self
, rank
):
1468 return "{}:{}".format(self
.name
, rank
)
1470 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1471 # Tests frequently have [client] configuration that jacks up
1472 # the objecter log level (unlikely to be interesting here)
1473 # and does not set the mds log level (very interesting here)
1475 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1477 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1479 if rank
is not None:
1480 base_args
.extend(["--rank", "%s" % str(rank
)])
1482 t1
= datetime
.datetime
.now()
1483 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1484 duration
= datetime
.datetime
.now() - t1
1485 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1486 base_args
+ args
, duration
, r
1491 def tool_remote(self
):
1493 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1494 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1495 so that tests can use this remote to go get locally written output files from the tools.
1497 return self
.mon_manager
.controller
1499 def journal_tool(self
, args
, rank
, quiet
=False):
1501 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1503 fs_rank
= self
._make
_rank
(rank
)
1504 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1506 def meta_tool(self
, args
, rank
, quiet
=False):
1508 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1510 fs_rank
= self
._make
_rank
(rank
)
1511 return self
._run
_tool
("cephfs-meta-injection", args
, fs_rank
, quiet
)
1513 def table_tool(self
, args
, quiet
=False):
1515 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1517 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1519 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1521 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1523 :param worker_count: if greater than 1, multiple workers will be run
1524 in parallel and the return value will be None
1529 for n
in range(0, worker_count
):
1530 if worker_count
> 1:
1531 # data-scan args first token is a command, followed by args to it.
1532 # insert worker arguments after the command.
1534 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1538 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1539 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1544 if worker_count
== 1:
1545 return workers
[0].value
1550 return self
.is_pool_full(self
.get_data_pool_name())
1552 def authorize(self
, client_id
, caps
=('/', 'rw')):
1554 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1557 client_id: client id that will be authorized
1558 caps: tuple containing the path and permission (can be r or rw)
1561 client_name
= 'client.' + client_id
1562 return self
.mon_manager
.raw_cluster_cmd('fs', 'authorize', self
.name
,
1565 def grow(self
, new_max_mds
, status
=None):
1566 oldmax
= self
.get_var('max_mds', status
=status
)
1567 assert(new_max_mds
> oldmax
)
1568 self
.set_max_mds(new_max_mds
)
1569 return self
.wait_for_daemons()
1571 def shrink(self
, new_max_mds
, status
=None):
1572 oldmax
= self
.get_var('max_mds', status
=status
)
1573 assert(new_max_mds
< oldmax
)
1574 self
.set_max_mds(new_max_mds
)
1575 return self
.wait_for_daemons()
1577 def run_scrub(self
, cmd
, rank
=0):
1578 return self
.rank_tell(["scrub"] + cmd
, rank
)
1580 def get_scrub_status(self
, rank
=0):
1581 return self
.run_scrub(["status"], rank
)
1583 def wait_until_scrub_complete(self
, result
=None, tag
=None, rank
=0, sleep
=30,
1584 timeout
=300, reverse
=False):
1585 # time out after "timeout" seconds and assume as done
1587 result
= "no active scrubs running"
1588 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
1590 out_json
= self
.rank_tell(["scrub", "status"], rank
=rank
)
1591 assert out_json
is not None
1593 if result
in out_json
['status']:
1594 log
.info("all active scrubs completed")
1597 if result
not in out_json
['status']:
1598 log
.info("all active scrubs completed")
1602 status
= out_json
['scrubs'][tag
]
1603 if status
is not None:
1604 log
.info(f
"scrub status for tag:{tag} - {status}")
1606 log
.info(f
"scrub has completed for tag:{tag}")
1609 # timed out waiting for scrub to complete