]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
4 from gevent
import Greenlet
13 from io
import BytesIO
, StringIO
14 from errno
import EBUSY
16 from teuthology
.exceptions
import CommandFailedError
17 from teuthology
import misc
18 from teuthology
.nuke
import clear_firewall
19 from teuthology
.parallel
import parallel
20 from teuthology
import contextutil
21 from tasks
.ceph_manager
import write_conf
22 from tasks
import ceph_manager
25 log
= logging
.getLogger(__name__
)
28 DAEMON_WAIT_TIMEOUT
= 120
31 class FileLayout(object):
32 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
34 self
.pool_namespace
= pool_namespace
35 self
.stripe_unit
= stripe_unit
36 self
.stripe_count
= stripe_count
37 self
.object_size
= object_size
40 def load_from_ceph(layout_str
):
45 if self
.pool
is not None:
46 yield ("pool", self
.pool
)
47 if self
.pool_namespace
:
48 yield ("pool_namespace", self
.pool_namespace
)
49 if self
.stripe_unit
is not None:
50 yield ("stripe_unit", self
.stripe_unit
)
51 if self
.stripe_count
is not None:
52 yield ("stripe_count", self
.stripe_count
)
53 if self
.object_size
is not None:
54 yield ("object_size", self
.stripe_size
)
56 class ObjectNotFound(Exception):
57 def __init__(self
, object_name
):
58 self
._object
_name
= object_name
61 return "Object not found: '{0}'".format(self
._object
_name
)
63 class FSMissing(Exception):
64 def __init__(self
, ident
):
68 return f
"File system {self.ident} does not exist in the map"
70 class FSStatus(object):
72 Operations on a snapshot of the FSMap.
74 def __init__(self
, mon_manager
, epoch
=None):
75 self
.mon
= mon_manager
76 cmd
= ["fs", "dump", "--format=json"]
78 cmd
.append(str(epoch
))
79 self
.map = json
.loads(self
.mon
.raw_cluster_cmd(*cmd
))
82 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
84 # Expose the fsmap for manual inspection.
85 def __getitem__(self
, key
):
87 Get a field from the fsmap.
91 def get_filesystems(self
):
93 Iterator for all filesystems.
95 for fs
in self
.map['filesystems']:
100 Iterator for all the mds_info components in the FSMap.
102 for info
in self
.map['standbys']:
104 for fs
in self
.map['filesystems']:
105 for info
in fs
['mdsmap']['info'].values():
108 def get_standbys(self
):
110 Iterator for all standbys.
112 for info
in self
.map['standbys']:
115 def get_fsmap(self
, fscid
):
117 Get the fsmap for the given FSCID.
119 for fs
in self
.map['filesystems']:
120 if fscid
is None or fs
['id'] == fscid
:
122 raise FSMissing(fscid
)
124 def get_fsmap_byname(self
, name
):
126 Get the fsmap for the given file system name.
128 for fs
in self
.map['filesystems']:
129 if name
is None or fs
['mdsmap']['fs_name'] == name
:
131 raise FSMissing(name
)
133 def get_replays(self
, fscid
):
135 Get the standby:replay MDS for the given FSCID.
137 fs
= self
.get_fsmap(fscid
)
138 for info
in fs
['mdsmap']['info'].values():
139 if info
['state'] == 'up:standby-replay':
142 def get_ranks(self
, fscid
):
144 Get the ranks for the given FSCID.
146 fs
= self
.get_fsmap(fscid
)
147 for info
in fs
['mdsmap']['info'].values():
148 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
151 def get_damaged(self
, fscid
):
153 Get the damaged ranks for the given FSCID.
155 fs
= self
.get_fsmap(fscid
)
156 return fs
['mdsmap']['damaged']
158 def get_rank(self
, fscid
, rank
):
160 Get the rank for the given FSCID.
162 for info
in self
.get_ranks(fscid
):
163 if info
['rank'] == rank
:
165 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
167 def get_mds(self
, name
):
169 Get the info for the given MDS name.
171 for info
in self
.get_all():
172 if info
['name'] == name
:
176 def get_mds_addr(self
, name
):
178 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
180 info
= self
.get_mds(name
)
184 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
185 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
187 def get_mds_addrs(self
, name
):
189 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
191 info
= self
.get_mds(name
)
193 return [e
['addr'] for e
in info
['addrs']['addrvec']]
195 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
196 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
198 def get_mds_gid(self
, gid
):
200 Get the info for the given MDS gid.
202 for info
in self
.get_all():
203 if info
['gid'] == gid
:
207 def hadfailover(self
, status
):
209 Compares two statuses for mds failovers.
210 Returns True if there is a failover.
212 for fs
in status
.map['filesystems']:
213 for info
in fs
['mdsmap']['info'].values():
214 oldinfo
= self
.get_mds_gid(info
['gid'])
215 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
220 class CephCluster(object):
222 def admin_remote(self
):
223 first_mon
= misc
.get_first_mon(self
._ctx
, None)
224 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
227 def __init__(self
, ctx
) -> None:
229 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
231 def get_config(self
, key
, service_type
=None):
233 Get config from mon by default, or a specific service if caller asks for it
235 if service_type
is None:
238 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
239 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
241 def set_ceph_conf(self
, subsys
, key
, value
):
242 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
243 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
244 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
245 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
246 # used a different config path this won't work.
248 def clear_ceph_conf(self
, subsys
, key
):
249 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
250 write_conf(self
._ctx
)
252 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
255 command
.insert(0, '--format=json')
256 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
257 response_data
= proc
.stdout
.getvalue().strip()
258 if len(response_data
) > 0:
259 j
= json
.loads(response_data
)
260 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
261 log
.debug(f
"_json_asok output\n{pretty}")
264 log
.debug("_json_asok output empty")
267 def is_addr_blocklisted(self
, addr
=None):
269 log
.warn("Couldn't get the client address, so the blocklisted "
270 "status undetermined")
273 blocklist
= json
.loads(self
.mon_manager
.run_cluster_cmd(
274 args
=["osd", "blocklist", "ls", "--format=json"],
275 stdout
=StringIO()).stdout
.getvalue())
277 if addr
== b
["addr"]:
282 class MDSCluster(CephCluster
):
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
292 def __init__(self
, ctx
):
293 super(MDSCluster
, self
).__init
__(ctx
)
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'mds'))
301 def mds_daemons(self
):
302 return dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
304 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
306 Call a callback for a single named MDS, or for all.
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
320 with
parallel() as p
:
321 for mds_id
in self
.mds_ids
:
324 for mds_id
in self
.mds_ids
:
329 def get_config(self
, key
, service_type
=None):
331 get_config specialization of service_type="mds"
333 if service_type
!= "mds":
334 return super(MDSCluster
, self
).get_config(key
, service_type
)
336 # Some tests stop MDS daemons, don't send commands to a dead one:
337 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
338 service_id
= random
.sample(running_daemons
, 1)[0]
339 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
341 def mds_stop(self
, mds_id
=None):
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
346 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
348 def mds_fail(self
, mds_id
=None):
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
353 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
355 def mds_restart(self
, mds_id
=None):
356 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
358 def mds_fail_restart(self
, mds_id
=None):
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
365 def _fail_restart(id_
):
366 self
.mds_daemons
[id_
].stop()
367 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
368 self
.mds_daemons
[id_
].restart()
370 self
._one
_or
_all
(mds_id
, _fail_restart
)
372 def mds_signal(self
, mds_id
, sig
, silent
=False):
376 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
378 def newfs(self
, name
='cephfs', create
=True):
379 return Filesystem(self
._ctx
, name
=name
, create
=create
)
381 def status(self
, epoch
=None):
382 return FSStatus(self
.mon_manager
, epoch
)
384 def get_standby_daemons(self
):
385 return set([s
['name'] for s
in self
.status().get_standbys()])
387 def get_mds_hostnames(self
):
389 for mds_id
in self
.mds_ids
:
390 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
391 result
.add(mds_remote
.hostname
)
395 def set_clients_block(self
, blocked
, mds_id
=None):
397 Block (using iptables) client communications to this MDS. Be careful: if
398 other services are running on this MDS, or other MDSs try to talk to this
399 MDS, their communications may also be blocked as collatoral damage.
401 :param mds_id: Optional ID of MDS to block, default to all
404 da_flag
= "-A" if blocked
else "-D"
406 def set_block(_mds_id
):
407 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
408 status
= self
.status()
410 addr
= status
.get_mds_addr(_mds_id
)
411 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
414 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
415 "comment", "--comment", "teuthology"])
417 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
420 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
422 def set_inter_mds_block(self
, blocked
, mds_rank_1
, mds_rank_2
):
424 Block (using iptables) communications from a provided MDS to other MDSs.
425 Block all ports that an MDS uses for communication.
427 :param blocked: True to block the MDS, False otherwise
428 :param mds_rank_1: MDS rank
429 :param mds_rank_2: MDS rank
432 da_flag
= "-A" if blocked
else "-D"
434 def set_block(mds_ids
):
435 status
= self
.status()
438 remote
= self
.mon_manager
.find_remote('mds', mds
)
439 addrs
= status
.get_mds_addrs(mds
)
441 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
443 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
444 "comment", "--comment", "teuthology"])
448 remote
= self
.mon_manager
.find_remote('mds', mds
)
449 addrs
= status
.get_mds_addrs(mds
)
451 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
453 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
454 "comment", "--comment", "teuthology"])
456 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
457 "comment", "--comment", "teuthology"])
459 self
._one
_or
_all
((mds_rank_1
, mds_rank_2
), set_block
, in_parallel
=False)
461 def clear_firewall(self
):
462 clear_firewall(self
._ctx
)
464 def get_mds_info(self
, mds_id
):
465 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
467 def is_pool_full(self
, pool_name
):
468 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
470 if pool
['pool_name'] == pool_name
:
471 return 'full' in pool
['flags_names'].split(",")
473 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
475 def delete_all_filesystems(self
):
477 Remove all filesystems that exist, and any pools in use by them.
479 for fs
in self
.status().get_filesystems():
480 Filesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
483 class Filesystem(MDSCluster
):
485 This object is for driving a CephFS filesystem. The MDS daemons driven by
486 MDSCluster may be shared with other Filesystems.
488 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
489 super(Filesystem
, self
).__init
__(ctx
)
493 self
.metadata_pool_name
= None
494 self
.metadata_overlay
= False
495 self
.data_pool_name
= None
496 self
.data_pools
= None
497 self
.fs_config
= fs_config
498 self
.ec_profile
= fs_config
.get('ec_profile')
500 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
501 self
.client_id
= client_list
[0]
502 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
505 if fscid
is not None:
506 raise RuntimeError("cannot specify fscid when creating fs")
507 if create
and not self
.legacy_configured():
510 if fscid
is not None:
512 self
.getinfo(refresh
= True)
514 # Stash a reference to the first created filesystem on ctx, so
515 # that if someone drops to the interactive shell they can easily
517 if not hasattr(self
._ctx
, "filesystem"):
518 self
._ctx
.filesystem
= self
522 return not bool(self
.get_mds_map())
526 def get_task_status(self
, status_key
):
527 return self
.mon_manager
.get_service_task_status("mds", status_key
)
529 def getinfo(self
, refresh
= False):
530 status
= self
.status()
531 if self
.id is not None:
532 fsmap
= status
.get_fsmap(self
.id)
533 elif self
.name
is not None:
534 fsmap
= status
.get_fsmap_byname(self
.name
)
536 fss
= [fs
for fs
in status
.get_filesystems()]
540 raise RuntimeError("no file system available")
542 raise RuntimeError("more than one file system available")
543 self
.id = fsmap
['id']
544 self
.name
= fsmap
['mdsmap']['fs_name']
545 self
.get_pool_names(status
= status
, refresh
= refresh
)
548 def set_metadata_overlay(self
, overlay
):
549 if self
.id is not None:
550 raise RuntimeError("cannot specify fscid when configuring overlay")
551 self
.metadata_overlay
= overlay
553 def deactivate(self
, rank
):
555 raise RuntimeError("invalid rank")
557 raise RuntimeError("cannot deactivate rank 0")
558 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
560 def reach_max_mds(self
):
561 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
562 status
= self
.getinfo()
563 mds_map
= self
.get_mds_map(status
=status
)
564 max_mds
= mds_map
['max_mds']
566 count
= len(list(self
.get_ranks(status
=status
)))
569 # deactivate mds in decending order
570 status
= self
.wait_for_daemons(status
=status
, skip_max_mds_check
=True)
571 while count
> max_mds
:
572 targets
= sorted(self
.get_ranks(status
=status
), key
=lambda r
: r
['rank'], reverse
=True)
574 log
.debug("deactivating rank %d" % target
['rank'])
575 self
.deactivate(target
['rank'])
576 status
= self
.wait_for_daemons(skip_max_mds_check
=True)
577 count
= len(list(self
.get_ranks(status
=status
)))
579 # In Mimic, deactivation is done automatically:
580 log
.info("Error:\n{}".format(traceback
.format_exc()))
581 status
= self
.wait_for_daemons()
583 status
= self
.wait_for_daemons()
585 mds_map
= self
.get_mds_map(status
=status
)
586 assert(mds_map
['max_mds'] == max_mds
)
587 assert(mds_map
['in'] == list(range(0, max_mds
)))
590 self
.mon_manager
.raw_cluster_cmd("fs", "reset", str(self
.name
), '--yes-i-really-mean-it')
593 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
595 def set_flag(self
, var
, *args
):
596 a
= map(lambda x
: str(x
).lower(), args
)
597 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
599 def set_allow_multifs(self
, yes
=True):
600 self
.set_flag("enable_multiple", yes
)
602 def set_var(self
, var
, *args
):
603 a
= map(lambda x
: str(x
).lower(), args
)
604 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
606 def set_down(self
, down
=True):
607 self
.set_var("down", str(down
).lower())
609 def set_joinable(self
, joinable
=True):
610 self
.set_var("joinable", joinable
)
612 def set_max_mds(self
, max_mds
):
613 self
.set_var("max_mds", "%d" % max_mds
)
615 def set_session_timeout(self
, timeout
):
616 self
.set_var("session_timeout", "%d" % timeout
)
618 def set_allow_standby_replay(self
, yes
):
619 self
.set_var("allow_standby_replay", yes
)
621 def set_allow_new_snaps(self
, yes
):
622 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
624 def compat(self
, *args
):
625 a
= map(lambda x
: str(x
).lower(), args
)
626 self
.mon_manager
.raw_cluster_cmd("fs", "compat", self
.name
, *a
)
628 def add_compat(self
, *args
):
629 self
.compat("add_compat", *args
)
631 def add_incompat(self
, *args
):
632 self
.compat("add_incompat", *args
)
634 def rm_compat(self
, *args
):
635 self
.compat("rm_compat", *args
)
637 def rm_incompat(self
, *args
):
638 self
.compat("rm_incompat", *args
)
640 def required_client_features(self
, *args
, **kwargs
):
641 c
= ["fs", "required_client_features", self
.name
, *args
]
642 return self
.mon_manager
.run_cluster_cmd(args
=c
, **kwargs
)
644 # Since v15.1.0 the pg autoscale mode has been enabled as default,
645 # will let the pg autoscale mode to calculate the pg_num as needed.
646 # We set the pg_num_min to 64 to make sure that pg autoscale mode
647 # won't set the pg_num to low to fix Tracker#45434.
650 target_size_ratio
= 0.9
651 target_size_ratio_ec
= 0.9
654 if self
.name
is None:
656 if self
.metadata_pool_name
is None:
657 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
658 if self
.data_pool_name
is None:
659 data_pool_name
= "{0}_data".format(self
.name
)
661 data_pool_name
= self
.data_pool_name
663 # will use the ec pool to store the data and a small amount of
664 # metadata still goes to the primary data pool for all files.
665 if not self
.metadata_overlay
and self
.ec_profile
and 'disabled' not in self
.ec_profile
:
666 self
.target_size_ratio
= 0.05
668 log
.debug("Creating filesystem '{0}'".format(self
.name
))
670 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
671 self
.metadata_pool_name
, str(self
.pg_num
),
672 '--pg_num_min', str(self
.pg_num_min
))
674 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
675 data_pool_name
, str(self
.pg_num
),
676 '--pg_num_min', str(self
.pg_num_min
),
677 '--target_size_ratio',
678 str(self
.target_size_ratio
))
680 if self
.metadata_overlay
:
681 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
682 self
.name
, self
.metadata_pool_name
, data_pool_name
,
683 '--allow-dangerous-metadata-overlay')
685 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
687 self
.metadata_pool_name
,
690 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
691 ec_data_pool_name
= data_pool_name
+ "_ec"
692 log
.debug("EC profile is %s", self
.ec_profile
)
693 cmd
= ['osd', 'erasure-code-profile', 'set', ec_data_pool_name
]
694 cmd
.extend(self
.ec_profile
)
695 self
.mon_manager
.raw_cluster_cmd(*cmd
)
696 self
.mon_manager
.raw_cluster_cmd(
697 'osd', 'pool', 'create', ec_data_pool_name
,
698 'erasure', ec_data_pool_name
,
699 '--pg_num_min', str(self
.pg_num_min
),
700 '--target_size_ratio', str(self
.target_size_ratio_ec
))
701 self
.mon_manager
.raw_cluster_cmd(
702 'osd', 'pool', 'set',
703 ec_data_pool_name
, 'allow_ec_overwrites', 'true')
704 self
.add_data_pool(ec_data_pool_name
, create
=False)
705 self
.check_pool_application(ec_data_pool_name
)
707 self
.run_client_payload(f
"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
709 self
.check_pool_application(self
.metadata_pool_name
)
710 self
.check_pool_application(data_pool_name
)
712 # Turn off spurious standby count warnings from modifying max_mds in tests.
714 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
715 except CommandFailedError
as e
:
716 if e
.exitstatus
== 22:
717 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
722 if self
.fs_config
is not None:
723 max_mds
= self
.fs_config
.get('max_mds', 1)
725 self
.set_max_mds(max_mds
)
727 standby_replay
= self
.fs_config
.get('standby_replay', False)
728 self
.set_allow_standby_replay(standby_replay
)
730 # If absent will use the default value (60 seconds)
731 session_timeout
= self
.fs_config
.get('session_timeout', 60)
732 if session_timeout
!= 60:
733 self
.set_session_timeout(session_timeout
)
735 self
.getinfo(refresh
= True)
737 # wait pgs to be clean
738 self
.mon_manager
.wait_for_clean()
740 def run_client_payload(self
, cmd
):
741 # avoid circular dep by importing here:
742 from tasks
.cephfs
.fuse_mount
import FuseMount
743 d
= misc
.get_testdir(self
._ctx
)
744 m
= FuseMount(self
._ctx
, {}, d
, "admin", self
.client_remote
, cephfs_name
=self
.name
)
746 m
.run_shell_payload(cmd
)
747 m
.umount_wait(require_clean
=True)
749 def _remove_pool(self
, name
, **kwargs
):
750 c
= f
'osd pool rm {name} {name} --yes-i-really-really-mean-it'
751 return self
.mon_manager
.ceph(c
, **kwargs
)
753 def rm(self
, **kwargs
):
754 c
= f
'fs rm {self.name} --yes-i-really-mean-it'
755 return self
.mon_manager
.ceph(c
, **kwargs
)
757 def remove_pools(self
, data_pools
):
758 self
._remove
_pool
(self
.get_metadata_pool_name())
759 for poolname
in data_pools
:
761 self
._remove
_pool
(poolname
)
762 except CommandFailedError
as e
:
763 # EBUSY, this data pool is used by two metadata pools, let the
765 if e
.exitstatus
== EBUSY
:
770 def destroy(self
, reset_obj_attrs
=True):
771 log
.info(f
'Destroying file system {self.name} and related pools')
774 log
.debug('already dead...')
777 data_pools
= self
.get_data_pool_names(refresh
=True)
779 # make sure no MDSs are attached to given FS.
783 self
.remove_pools(data_pools
)
788 self
.metadata_pool_name
= None
789 self
.data_pool_name
= None
790 self
.data_pools
= None
796 self
.getinfo(refresh
=True)
798 def check_pool_application(self
, pool_name
):
799 osd_map
= self
.mon_manager
.get_osd_dump_json()
800 for pool
in osd_map
['pools']:
801 if pool
['pool_name'] == pool_name
:
802 if "application_metadata" in pool
:
803 if not "cephfs" in pool
['application_metadata']:
804 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
805 format(pool_name
=pool_name
))
808 if getattr(self
._ctx
, "filesystem", None) == self
:
809 delattr(self
._ctx
, "filesystem")
813 Whether a filesystem exists in the mon's filesystem list
815 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
816 return self
.name
in [fs
['name'] for fs
in fs_list
]
818 def legacy_configured(self
):
820 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
821 the case, the caller should avoid using Filesystem.create
824 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
825 pools
= json
.loads(out_text
)
826 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
827 if metadata_pool_exists
:
828 self
.metadata_pool_name
= 'metadata'
829 except CommandFailedError
as e
:
830 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
831 # structured output (--format) from the CLI.
832 if e
.exitstatus
== 22:
833 metadata_pool_exists
= True
837 return metadata_pool_exists
840 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
842 # may raise FSMissing
843 def get_mds_map(self
, status
=None):
845 status
= self
.status()
846 return status
.get_fsmap(self
.id)['mdsmap']
848 def get_var(self
, var
, status
=None):
849 return self
.get_mds_map(status
=status
)[var
]
851 def set_dir_layout(self
, mount
, path
, layout
):
852 for name
, value
in layout
.items():
853 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
855 def add_data_pool(self
, name
, create
=True):
857 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
858 '--pg_num_min', str(self
.pg_num_min
))
859 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
860 self
.get_pool_names(refresh
= True)
861 for poolid
, fs_name
in self
.data_pools
.items():
864 raise RuntimeError("could not get just created pool '{0}'".format(name
))
866 def get_pool_names(self
, refresh
= False, status
= None):
867 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
869 status
= self
.status()
870 fsmap
= status
.get_fsmap(self
.id)
872 osd_map
= self
.mon_manager
.get_osd_dump_json()
874 for p
in osd_map
['pools']:
875 id_to_name
[p
['pool']] = p
['pool_name']
877 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
879 for data_pool
in fsmap
['mdsmap']['data_pools']:
880 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
882 def get_data_pool_name(self
, refresh
= False):
883 if refresh
or self
.data_pools
is None:
884 self
.get_pool_names(refresh
= True)
885 assert(len(self
.data_pools
) == 1)
886 return next(iter(self
.data_pools
.values()))
888 def get_data_pool_id(self
, refresh
= False):
890 Don't call this if you have multiple data pools
893 if refresh
or self
.data_pools
is None:
894 self
.get_pool_names(refresh
= True)
895 assert(len(self
.data_pools
) == 1)
896 return next(iter(self
.data_pools
.keys()))
898 def get_data_pool_names(self
, refresh
= False):
899 if refresh
or self
.data_pools
is None:
900 self
.get_pool_names(refresh
= True)
901 return list(self
.data_pools
.values())
903 def get_metadata_pool_name(self
):
904 return self
.metadata_pool_name
906 def set_data_pool_name(self
, name
):
907 if self
.id is not None:
908 raise RuntimeError("can't set filesystem name if its fscid is set")
909 self
.data_pool_name
= name
911 def get_pool_pg_num(self
, pool_name
):
912 pgs
= json
.loads(self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'get',
914 '--format=json-pretty'))
915 return int(pgs
['pg_num'])
917 def get_namespace_id(self
):
920 def get_pool_df(self
, pool_name
):
923 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
925 for pool_df
in self
._df
()['pools']:
926 if pool_df
['name'] == pool_name
:
927 return pool_df
['stats']
929 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
932 return self
._df
()['stats']['total_used_bytes']
934 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
936 Return true if all daemons are in one of active, standby, standby-replay, and
937 at least max_mds daemons are in 'active'.
939 Unlike most of Filesystem, this function is tolerant of new-style `fs`
940 commands being missing, because we are part of the ceph installation
941 process during upgrade suites, so must fall back to old style commands
942 when we get an EINVAL on a new style command.
946 # First, check to see that processes haven't exited with an error code
947 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
951 mds_map
= self
.get_mds_map(status
=status
)
953 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
955 for mds_id
, mds_status
in mds_map
['info'].items():
956 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
957 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
959 elif mds_status
['state'] == 'up:active':
962 log
.debug("are_daemons_healthy: {0}/{1}".format(
963 active_count
, mds_map
['max_mds']
966 if not skip_max_mds_check
:
967 if active_count
> mds_map
['max_mds']:
968 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
970 elif active_count
== mds_map
['max_mds']:
971 # The MDSMap says these guys are active, but let's check they really are
972 for mds_id
, mds_status
in mds_map
['info'].items():
973 if mds_status
['state'] == 'up:active':
975 daemon_status
= self
.mds_tell(["status"], mds_id
=mds_status
['name'])
976 except CommandFailedError
as cfe
:
977 if cfe
.exitstatus
== errno
.EINVAL
:
978 # Old version, can't do this check
981 # MDS not even running
984 if daemon_status
['state'] != 'up:active':
985 # MDS hasn't taken the latest map yet
992 log
.debug("are_daemons_healthy: skipping max_mds check")
995 def get_daemon_names(self
, state
=None, status
=None):
997 Return MDS daemon names of those daemons in the given state
1001 mdsmap
= self
.get_mds_map(status
)
1003 for mds_status
in sorted(mdsmap
['info'].values(),
1004 key
=lambda _
: _
['rank']):
1005 if mds_status
['state'] == state
or state
is None:
1006 result
.append(mds_status
['name'])
1010 def get_active_names(self
, status
=None):
1012 Return MDS daemon names of those daemons holding ranks
1015 :return: list of strings like ['a', 'b'], sorted by rank
1017 return self
.get_daemon_names("up:active", status
=status
)
1019 def get_all_mds_rank(self
, status
=None):
1020 mdsmap
= self
.get_mds_map(status
)
1022 for mds_status
in sorted(mdsmap
['info'].values(),
1023 key
=lambda _
: _
['rank']):
1024 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1025 result
.append(mds_status
['rank'])
1029 def get_rank(self
, rank
=None, status
=None):
1031 status
= self
.getinfo()
1034 return status
.get_rank(self
.id, rank
)
1036 def rank_restart(self
, rank
=0, status
=None):
1037 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1038 self
.mds_restart(mds_id
=name
)
1040 def rank_signal(self
, signal
, rank
=0, status
=None):
1041 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1042 self
.mds_signal(name
, signal
)
1044 def rank_freeze(self
, yes
, rank
=0):
1045 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
1047 def rank_fail(self
, rank
=0):
1048 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
1050 def get_ranks(self
, status
=None):
1052 status
= self
.getinfo()
1053 return status
.get_ranks(self
.id)
1055 def get_damaged(self
, status
=None):
1057 status
= self
.getinfo()
1058 return status
.get_damaged(self
.id)
1060 def get_replays(self
, status
=None):
1062 status
= self
.getinfo()
1063 return status
.get_replays(self
.id)
1065 def get_replay(self
, rank
=0, status
=None):
1066 for replay
in self
.get_replays(status
=status
):
1067 if replay
['rank'] == rank
:
1071 def get_rank_names(self
, status
=None):
1073 Return MDS daemon names of those daemons holding a rank,
1074 sorted by rank. This includes e.g. up:replay/reconnect
1075 as well as active, but does not include standby or
1078 mdsmap
= self
.get_mds_map(status
)
1080 for mds_status
in sorted(mdsmap
['info'].values(),
1081 key
=lambda _
: _
['rank']):
1082 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1083 result
.append(mds_status
['name'])
1087 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
1089 Wait until all daemons are healthy
1094 timeout
= DAEMON_WAIT_TIMEOUT
1097 status
= self
.status()
1101 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
1107 if elapsed
> timeout
:
1108 log
.debug("status = {0}".format(status
))
1109 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1111 status
= self
.status()
1113 def dencoder(self
, obj_type
, obj_blob
):
1114 args
= [os
.path
.join(self
._prefix
, "ceph-dencoder"), 'type', obj_type
, 'import', '-', 'decode', 'dump_json']
1115 p
= self
.mon_manager
.controller
.run(args
=args
, stdin
=BytesIO(obj_blob
), stdout
=BytesIO())
1116 return p
.stdout
.getvalue()
1118 def rados(self
, *args
, **kwargs
):
1120 Callout to rados CLI.
1123 return self
.mon_manager
.do_rados(*args
, **kwargs
)
1125 def radosm(self
, *args
, **kwargs
):
1127 Interact with the metadata pool via rados CLI.
1130 return self
.rados(*args
, **kwargs
, pool
=self
.get_metadata_pool_name())
1132 def radosmo(self
, *args
, stdout
=BytesIO(), **kwargs
):
1134 Interact with the metadata pool via rados CLI. Get the stdout.
1137 return self
.radosm(*args
, **kwargs
, stdout
=stdout
).stdout
.getvalue()
1139 def get_metadata_object(self
, object_type
, object_id
):
1141 Retrieve an object from the metadata pool, pass it through
1142 ceph-dencoder to dump it to JSON, and return the decoded object.
1145 o
= self
.radosmo(['get', object_id
, '-'])
1146 j
= self
.dencoder(object_type
, o
)
1148 return json
.loads(j
)
1149 except (TypeError, ValueError):
1150 log
.error("Failed to decode JSON: '{0}'".format(j
))
1153 def get_journal_version(self
):
1155 Read the JournalPointer and Journal::Header objects to learn the version of
1158 journal_pointer_object
= '400.00000000'
1159 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1160 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1162 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1163 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1165 version
= journal_header_dump
['journal_header']['stream_format']
1166 log
.debug("Read journal version {0}".format(version
))
1170 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1172 return self
.rank_asok(command
, timeout
=timeout
)
1174 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1176 def mds_tell(self
, command
, mds_id
=None):
1178 return self
.rank_tell(command
)
1180 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{mds_id}", *command
))
1182 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1183 info
= self
.get_rank(rank
=rank
, status
=status
)
1184 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1186 def rank_tell(self
, command
, rank
=0, status
=None):
1187 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{self.id}:{rank}", *command
))
1189 def ranks_tell(self
, command
, status
=None):
1191 status
= self
.status()
1193 for r
in status
.get_ranks(self
.id):
1194 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1195 out
.append((r
['rank'], result
))
1198 def ranks_perf(self
, f
, status
=None):
1199 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1201 for rank
, perf
in perf
:
1202 out
.append((rank
, f(perf
)))
1205 def read_cache(self
, path
, depth
=None):
1206 cmd
= ["dump", "tree", path
]
1207 if depth
is not None:
1208 cmd
.append(depth
.__str
__())
1209 result
= self
.mds_asok(cmd
)
1210 if len(result
) == 0:
1211 raise RuntimeError("Path not found in cache: {0}".format(path
))
1215 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1217 Block until the MDS reaches a particular state, or a failure condition
1220 When there are multiple MDSs, succeed when exaclty one MDS is in the
1221 goal state, or fail when any MDS is in the reject state.
1223 :param goal_state: Return once the MDS is in this state
1224 :param reject: Fail if the MDS enters this state before the goal state
1225 :param timeout: Fail if this many seconds pass before reaching goal
1226 :return: number of seconds waited, rounded down to integer
1229 started_at
= time
.time()
1231 status
= self
.status()
1232 if rank
is not None:
1234 mds_info
= status
.get_rank(self
.id, rank
)
1235 current_state
= mds_info
['state'] if mds_info
else None
1236 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1238 mdsmap
= self
.get_mds_map(status
=status
)
1239 if rank
in mdsmap
['failed']:
1240 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1241 current_state
= None
1244 elif mds_id
is not None:
1245 # mds_info is None if no daemon with this ID exists in the map
1246 mds_info
= status
.get_mds(mds_id
)
1247 current_state
= mds_info
['state'] if mds_info
else None
1248 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1250 # In general, look for a single MDS
1251 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1252 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1253 current_state
= goal_state
1254 elif reject
in states
:
1255 current_state
= reject
1257 current_state
= None
1258 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1260 elapsed
= time
.time() - started_at
1261 if current_state
== goal_state
:
1262 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1264 elif reject
is not None and current_state
== reject
:
1265 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1266 elif timeout
is not None and elapsed
> timeout
:
1267 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1269 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1270 elapsed
, goal_state
, current_state
1275 def _read_data_xattr(self
, ino_no
, xattr_name
, obj_type
, pool
):
1277 pool
= self
.get_data_pool_name()
1279 obj_name
= "{0:x}.00000000".format(ino_no
)
1281 args
= ["getxattr", obj_name
, xattr_name
]
1283 proc
= self
.rados(args
, pool
=pool
, stdout
=BytesIO())
1284 except CommandFailedError
as e
:
1285 log
.error(e
.__str
__())
1286 raise ObjectNotFound(obj_name
)
1288 obj_blob
= proc
.stdout
.getvalue()
1289 return json
.loads(self
.dencoder(obj_type
, obj_blob
).strip())
1291 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1293 Write to an xattr of the 0th data object of an inode. Will
1294 succeed whether the object and/or xattr already exist or not.
1296 :param ino_no: integer inode number
1297 :param xattr_name: string name of the xattr
1298 :param data: byte array data to write to the xattr
1299 :param pool: name of data pool or None to use primary data pool
1303 pool
= self
.get_data_pool_name()
1305 obj_name
= "{0:x}.00000000".format(ino_no
)
1306 args
= ["setxattr", obj_name
, xattr_name
, data
]
1307 self
.rados(args
, pool
=pool
)
1309 def read_backtrace(self
, ino_no
, pool
=None):
1311 Read the backtrace from the data pool, return a dict in the format
1312 given by inode_backtrace_t::dump, which is something like:
1316 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1317 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1319 { "ino": 1099511627778,
1327 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1328 one data pool and that will be used.
1330 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1332 def read_layout(self
, ino_no
, pool
=None):
1334 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1337 "stripe_unit": 4194304,
1339 "object_size": 4194304,
1344 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1345 one data pool and that will be used.
1347 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1349 def _enumerate_data_objects(self
, ino
, size
):
1351 Get the list of expected data objects for a range, and the list of objects
1354 :return a tuple of two lists of strings (expected, actual)
1356 stripe_size
= 1024 * 1024 * 4
1358 size
= max(stripe_size
, size
)
1361 "{0:x}.{1:08x}".format(ino
, n
)
1362 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1365 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name(), stdout
=StringIO()).stdout
.getvalue().split("\n")
1367 return want_objects
, exist_objects
1369 def data_objects_present(self
, ino
, size
):
1371 Check that *all* the expected data objects for an inode are present in the data pool
1374 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1375 missing
= set(want_objects
) - set(exist_objects
)
1378 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1383 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1386 def data_objects_absent(self
, ino
, size
):
1387 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1388 present
= set(want_objects
) & set(exist_objects
)
1391 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1396 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1399 def dirfrag_exists(self
, ino
, frag
):
1401 self
.radosm(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1402 except CommandFailedError
:
1407 def list_dirfrag(self
, dir_ino
):
1409 Read the named object and return the list of omap keys
1411 :return a list of 0 or more strings
1414 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1417 key_list_str
= self
.radosmo(["listomapkeys", dirfrag_obj_name
], stdout
=StringIO())
1418 except CommandFailedError
as e
:
1419 log
.error(e
.__str
__())
1420 raise ObjectNotFound(dirfrag_obj_name
)
1422 return key_list_str
.strip().split("\n") if key_list_str
else []
1424 def get_meta_of_fs_file(self
, dir_ino
, obj_name
, out
):
1426 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1427 warning : The splitting of directory is not considered here.
1430 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1432 self
.radosm(["getomapval", dirfrag_obj_name
, obj_name
+"_head", out
])
1433 except CommandFailedError
as e
:
1434 log
.error(e
.__str
__())
1435 raise ObjectNotFound(dir_ino
)
1437 def erase_metadata_objects(self
, prefix
):
1439 For all objects in the metadata pool matching the prefix,
1442 This O(N) with the number of objects in the pool, so only suitable
1443 for use on toy test filesystems.
1445 all_objects
= self
.radosmo(["ls"], stdout
=StringIO()).strip().split("\n")
1446 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1447 for o
in matching_objects
:
1448 self
.radosm(["rm", o
])
1450 def erase_mds_objects(self
, rank
):
1452 Erase all the per-MDS objects for a particular rank. This includes
1453 inotable, sessiontable, journal
1456 def obj_prefix(multiplier
):
1458 MDS object naming conventions like rank 1's
1459 journal is at 201.***
1461 return "%x." % (multiplier
* 0x100 + rank
)
1463 # MDS_INO_LOG_OFFSET
1464 self
.erase_metadata_objects(obj_prefix(2))
1465 # MDS_INO_LOG_BACKUP_OFFSET
1466 self
.erase_metadata_objects(obj_prefix(3))
1467 # MDS_INO_LOG_POINTER_OFFSET
1468 self
.erase_metadata_objects(obj_prefix(4))
1469 # MDSTables & SessionMap
1470 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1475 Override this to set a different
1479 def _make_rank(self
, rank
):
1480 return "{}:{}".format(self
.name
, rank
)
1482 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1483 # Tests frequently have [client] configuration that jacks up
1484 # the objecter log level (unlikely to be interesting here)
1485 # and does not set the mds log level (very interesting here)
1487 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1489 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1491 if rank
is not None:
1492 base_args
.extend(["--rank", "%s" % str(rank
)])
1494 t1
= datetime
.datetime
.now()
1495 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1496 duration
= datetime
.datetime
.now() - t1
1497 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1498 base_args
+ args
, duration
, r
1503 def tool_remote(self
):
1505 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1506 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1507 so that tests can use this remote to go get locally written output files from the tools.
1509 return self
.mon_manager
.controller
1511 def journal_tool(self
, args
, rank
, quiet
=False):
1513 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1515 fs_rank
= self
._make
_rank
(rank
)
1516 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1518 def meta_tool(self
, args
, rank
, quiet
=False):
1520 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1522 fs_rank
= self
._make
_rank
(rank
)
1523 return self
._run
_tool
("cephfs-meta-injection", args
, fs_rank
, quiet
)
1525 def table_tool(self
, args
, quiet
=False):
1527 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1529 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1531 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1533 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1535 :param worker_count: if greater than 1, multiple workers will be run
1536 in parallel and the return value will be None
1541 for n
in range(0, worker_count
):
1542 if worker_count
> 1:
1543 # data-scan args first token is a command, followed by args to it.
1544 # insert worker arguments after the command.
1546 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1550 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1551 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1556 if worker_count
== 1:
1557 return workers
[0].value
1562 return self
.is_pool_full(self
.get_data_pool_name())
1564 def authorize(self
, client_id
, caps
=('/', 'rw')):
1566 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1569 client_id: client id that will be authorized
1570 caps: tuple containing the path and permission (can be r or rw)
1573 client_name
= 'client.' + client_id
1574 return self
.mon_manager
.raw_cluster_cmd('fs', 'authorize', self
.name
,
1577 def grow(self
, new_max_mds
, status
=None):
1578 oldmax
= self
.get_var('max_mds', status
=status
)
1579 assert(new_max_mds
> oldmax
)
1580 self
.set_max_mds(new_max_mds
)
1581 return self
.wait_for_daemons()
1583 def shrink(self
, new_max_mds
, status
=None):
1584 oldmax
= self
.get_var('max_mds', status
=status
)
1585 assert(new_max_mds
< oldmax
)
1586 self
.set_max_mds(new_max_mds
)
1587 return self
.wait_for_daemons()
1589 def run_scrub(self
, cmd
, rank
=0):
1590 return self
.rank_tell(["scrub"] + cmd
, rank
)
1592 def get_scrub_status(self
, rank
=0):
1593 return self
.run_scrub(["status"], rank
)
1595 def wait_until_scrub_complete(self
, result
=None, tag
=None, rank
=0, sleep
=30,
1596 timeout
=300, reverse
=False):
1597 # time out after "timeout" seconds and assume as done
1599 result
= "no active scrubs running"
1600 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
1602 out_json
= self
.rank_tell(["scrub", "status"], rank
=rank
)
1603 assert out_json
is not None
1605 if result
in out_json
['status']:
1606 log
.info("all active scrubs completed")
1609 if result
not in out_json
['status']:
1610 log
.info("all active scrubs completed")
1614 status
= out_json
['scrubs'][tag
]
1615 if status
is not None:
1616 log
.info(f
"scrub status for tag:{tag} - {status}")
1618 log
.info(f
"scrub has completed for tag:{tag}")
1621 # timed out waiting for scrub to complete