]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
41ec1113c3bd539ef9e2b10ec329ecb9363eeada
4 from gevent
import Greenlet
12 from io
import BytesIO
, StringIO
13 from errno
import EBUSY
15 from teuthology
.exceptions
import CommandFailedError
16 from teuthology
import misc
17 from teuthology
.nuke
import clear_firewall
18 from teuthology
.parallel
import parallel
19 from teuthology
import contextutil
20 from tasks
.ceph_manager
import write_conf
21 from tasks
import ceph_manager
24 log
= logging
.getLogger(__name__
)
27 DAEMON_WAIT_TIMEOUT
= 120
30 class FileLayout(object):
31 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
33 self
.pool_namespace
= pool_namespace
34 self
.stripe_unit
= stripe_unit
35 self
.stripe_count
= stripe_count
36 self
.object_size
= object_size
39 def load_from_ceph(layout_str
):
44 if self
.pool
is not None:
45 yield ("pool", self
.pool
)
46 if self
.pool_namespace
:
47 yield ("pool_namespace", self
.pool_namespace
)
48 if self
.stripe_unit
is not None:
49 yield ("stripe_unit", self
.stripe_unit
)
50 if self
.stripe_count
is not None:
51 yield ("stripe_count", self
.stripe_count
)
52 if self
.object_size
is not None:
53 yield ("object_size", self
.stripe_size
)
55 class ObjectNotFound(Exception):
56 def __init__(self
, object_name
):
57 self
._object
_name
= object_name
60 return "Object not found: '{0}'".format(self
._object
_name
)
62 class FSMissing(Exception):
63 def __init__(self
, ident
):
67 return f
"File system {self.ident} does not exist in the map"
69 class FSStatus(object):
71 Operations on a snapshot of the FSMap.
73 def __init__(self
, mon_manager
, epoch
=None):
74 self
.mon
= mon_manager
75 cmd
= ["fs", "dump", "--format=json"]
77 cmd
.append(str(epoch
))
78 self
.map = json
.loads(self
.mon
.raw_cluster_cmd(*cmd
))
81 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self
, key
):
86 Get a field from the fsmap.
90 def get_filesystems(self
):
92 Iterator for all filesystems.
94 for fs
in self
.map['filesystems']:
99 Iterator for all the mds_info components in the FSMap.
101 for info
in self
.map['standbys']:
103 for fs
in self
.map['filesystems']:
104 for info
in fs
['mdsmap']['info'].values():
107 def get_standbys(self
):
109 Iterator for all standbys.
111 for info
in self
.map['standbys']:
114 def get_fsmap(self
, fscid
):
116 Get the fsmap for the given FSCID.
118 for fs
in self
.map['filesystems']:
119 if fscid
is None or fs
['id'] == fscid
:
121 raise FSMissing(fscid
)
123 def get_fsmap_byname(self
, name
):
125 Get the fsmap for the given file system name.
127 for fs
in self
.map['filesystems']:
128 if name
is None or fs
['mdsmap']['fs_name'] == name
:
130 raise FSMissing(name
)
132 def get_replays(self
, fscid
):
134 Get the standby:replay MDS for the given FSCID.
136 fs
= self
.get_fsmap(fscid
)
137 for info
in fs
['mdsmap']['info'].values():
138 if info
['state'] == 'up:standby-replay':
141 def get_ranks(self
, fscid
):
143 Get the ranks for the given FSCID.
145 fs
= self
.get_fsmap(fscid
)
146 for info
in fs
['mdsmap']['info'].values():
147 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
150 def get_damaged(self
, fscid
):
152 Get the damaged ranks for the given FSCID.
154 fs
= self
.get_fsmap(fscid
)
155 return fs
['mdsmap']['damaged']
157 def get_rank(self
, fscid
, rank
):
159 Get the rank for the given FSCID.
161 for info
in self
.get_ranks(fscid
):
162 if info
['rank'] == rank
:
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
166 def get_mds(self
, name
):
168 Get the info for the given MDS name.
170 for info
in self
.get_all():
171 if info
['name'] == name
:
175 def get_mds_addr(self
, name
):
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
179 info
= self
.get_mds(name
)
183 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
184 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
186 def get_mds_addrs(self
, name
):
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
190 info
= self
.get_mds(name
)
192 return [e
['addr'] for e
in info
['addrs']['addrvec']]
194 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
197 def get_mds_gid(self
, gid
):
199 Get the info for the given MDS gid.
201 for info
in self
.get_all():
202 if info
['gid'] == gid
:
206 def hadfailover(self
, status
):
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
211 for fs
in status
.map['filesystems']:
212 for info
in fs
['mdsmap']['info'].values():
213 oldinfo
= self
.get_mds_gid(info
['gid'])
214 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
219 class CephCluster(object):
221 def admin_remote(self
):
222 first_mon
= misc
.get_first_mon(self
._ctx
, None)
223 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
226 def __init__(self
, ctx
) -> None:
228 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
230 def get_config(self
, key
, service_type
=None):
232 Get config from mon by default, or a specific service if caller asks for it
234 if service_type
is None:
237 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
238 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
240 def set_ceph_conf(self
, subsys
, key
, value
):
241 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
242 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
243 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
244 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
247 def clear_ceph_conf(self
, subsys
, key
):
248 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
249 write_conf(self
._ctx
)
251 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
254 command
.insert(0, '--format=json')
255 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
256 response_data
= proc
.stdout
.getvalue().strip()
257 if len(response_data
) > 0:
259 def get_nonnumeric_values(value
):
260 c
= {"NaN": float("nan"), "Infinity": float("inf"),
261 "-Infinity": -float("inf")}
264 j
= json
.loads(response_data
.replace('inf', 'Infinity'),
265 parse_constant
=get_nonnumeric_values
)
266 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
267 log
.debug(f
"_json_asok output\n{pretty}")
270 log
.debug("_json_asok output empty")
273 def is_addr_blocklisted(self
, addr
):
274 blocklist
= json
.loads(self
.mon_manager
.raw_cluster_cmd(
275 "osd", "dump", "--format=json"))['blocklist']
276 if addr
in blocklist
:
278 log
.warn(f
'The address {addr} is not blocklisted')
282 class MDSCluster(CephCluster
):
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
292 def __init__(self
, ctx
):
293 super(MDSCluster
, self
).__init
__(ctx
)
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'mds'))
301 def mds_daemons(self
):
302 return dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
304 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
306 Call a callback for a single named MDS, or for all.
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
320 with
parallel() as p
:
321 for mds_id
in self
.mds_ids
:
324 for mds_id
in self
.mds_ids
:
329 def get_config(self
, key
, service_type
=None):
331 get_config specialization of service_type="mds"
333 if service_type
!= "mds":
334 return super(MDSCluster
, self
).get_config(key
, service_type
)
336 # Some tests stop MDS daemons, don't send commands to a dead one:
337 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
338 service_id
= random
.sample(running_daemons
, 1)[0]
339 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
341 def mds_stop(self
, mds_id
=None):
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
346 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
348 def mds_fail(self
, mds_id
=None):
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
353 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
355 def mds_restart(self
, mds_id
=None):
356 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
358 def mds_fail_restart(self
, mds_id
=None):
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
365 def _fail_restart(id_
):
366 self
.mds_daemons
[id_
].stop()
367 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
368 self
.mds_daemons
[id_
].restart()
370 self
._one
_or
_all
(mds_id
, _fail_restart
)
372 def mds_signal(self
, mds_id
, sig
, silent
=False):
376 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
378 def mds_is_running(self
, mds_id
):
379 return self
.mds_daemons
[mds_id
].running()
381 def newfs(self
, name
='cephfs', create
=True):
382 return Filesystem(self
._ctx
, name
=name
, create
=create
)
384 def status(self
, epoch
=None):
385 return FSStatus(self
.mon_manager
, epoch
)
387 def get_standby_daemons(self
):
388 return set([s
['name'] for s
in self
.status().get_standbys()])
390 def get_mds_hostnames(self
):
392 for mds_id
in self
.mds_ids
:
393 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
394 result
.add(mds_remote
.hostname
)
398 def set_clients_block(self
, blocked
, mds_id
=None):
400 Block (using iptables) client communications to this MDS. Be careful: if
401 other services are running on this MDS, or other MDSs try to talk to this
402 MDS, their communications may also be blocked as collatoral damage.
404 :param mds_id: Optional ID of MDS to block, default to all
407 da_flag
= "-A" if blocked
else "-D"
409 def set_block(_mds_id
):
410 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
411 status
= self
.status()
413 addr
= status
.get_mds_addr(_mds_id
)
414 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
417 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
420 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
421 "comment", "--comment", "teuthology"])
423 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
425 def set_inter_mds_block(self
, blocked
, mds_rank_1
, mds_rank_2
):
427 Block (using iptables) communications from a provided MDS to other MDSs.
428 Block all ports that an MDS uses for communication.
430 :param blocked: True to block the MDS, False otherwise
431 :param mds_rank_1: MDS rank
432 :param mds_rank_2: MDS rank
435 da_flag
= "-A" if blocked
else "-D"
437 def set_block(mds_ids
):
438 status
= self
.status()
441 remote
= self
.mon_manager
.find_remote('mds', mds
)
442 addrs
= status
.get_mds_addrs(mds
)
444 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
446 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"], omit_sudo
=False)
451 remote
= self
.mon_manager
.find_remote('mds', mds
)
452 addrs
= status
.get_mds_addrs(mds
)
454 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
456 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
457 "comment", "--comment", "teuthology"], omit_sudo
=False)
459 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
460 "comment", "--comment", "teuthology"], omit_sudo
=False)
462 self
._one
_or
_all
((mds_rank_1
, mds_rank_2
), set_block
, in_parallel
=False)
464 def clear_firewall(self
):
465 clear_firewall(self
._ctx
)
467 def get_mds_info(self
, mds_id
):
468 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
470 def is_pool_full(self
, pool_name
):
471 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
473 if pool
['pool_name'] == pool_name
:
474 return 'full' in pool
['flags_names'].split(",")
476 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
478 def delete_all_filesystems(self
):
480 Remove all filesystems that exist, and any pools in use by them.
482 for fs
in self
.status().get_filesystems():
483 Filesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
486 def beacon_timeout(self
):
488 Generate an acceptable timeout for the mons to drive some MDSMap change
489 because of missed beacons from some MDS. This involves looking up the
490 grace period in use by the mons and adding an acceptable buffer.
493 grace
= float(self
.get_config("mds_beacon_grace", service_type
="mon"))
497 class Filesystem(MDSCluster
):
500 Generator for all Filesystems in the cluster.
503 def get_all_fs(cls
, ctx
):
504 mdsc
= MDSCluster(ctx
)
505 status
= mdsc
.status()
506 for fs
in status
.get_filesystems():
507 yield cls(ctx
, fscid
=fs
['id'])
510 This object is for driving a CephFS filesystem. The MDS daemons driven by
511 MDSCluster may be shared with other Filesystems.
513 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
514 super(Filesystem
, self
).__init
__(ctx
)
518 self
.metadata_pool_name
= None
519 self
.data_pool_name
= None
520 self
.data_pools
= None
521 self
.fs_config
= fs_config
522 self
.ec_profile
= fs_config
.get('ec_profile')
524 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
525 self
.client_id
= client_list
[0]
526 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
529 if fscid
is not None:
530 raise RuntimeError("cannot specify fscid when creating fs")
531 if create
and not self
.legacy_configured():
534 if fscid
is not None:
536 self
.getinfo(refresh
= True)
538 # Stash a reference to the first created filesystem on ctx, so
539 # that if someone drops to the interactive shell they can easily
541 if not hasattr(self
._ctx
, "filesystem"):
542 self
._ctx
.filesystem
= self
546 return not bool(self
.get_mds_map())
550 def get_task_status(self
, status_key
):
551 return self
.mon_manager
.get_service_task_status("mds", status_key
)
553 def getinfo(self
, refresh
= False):
554 status
= self
.status()
555 if self
.id is not None:
556 fsmap
= status
.get_fsmap(self
.id)
557 elif self
.name
is not None:
558 fsmap
= status
.get_fsmap_byname(self
.name
)
560 fss
= [fs
for fs
in status
.get_filesystems()]
564 raise RuntimeError("no file system available")
566 raise RuntimeError("more than one file system available")
567 self
.id = fsmap
['id']
568 self
.name
= fsmap
['mdsmap']['fs_name']
569 self
.get_pool_names(status
= status
, refresh
= refresh
)
572 def reach_max_mds(self
):
573 status
= self
.wait_for_daemons()
574 mds_map
= self
.get_mds_map(status
=status
)
575 assert(mds_map
['in'] == list(range(0, mds_map
['max_mds'])))
578 self
.mon_manager
.raw_cluster_cmd("fs", "reset", str(self
.name
), '--yes-i-really-mean-it')
581 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
583 def set_flag(self
, var
, *args
):
584 a
= map(lambda x
: str(x
).lower(), args
)
585 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
587 def set_allow_multifs(self
, yes
=True):
588 self
.set_flag("enable_multiple", yes
)
590 def set_var(self
, var
, *args
):
591 a
= map(lambda x
: str(x
).lower(), args
)
592 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
594 def set_down(self
, down
=True):
595 self
.set_var("down", str(down
).lower())
597 def set_joinable(self
, joinable
=True):
598 self
.set_var("joinable", joinable
)
600 def set_max_mds(self
, max_mds
):
601 self
.set_var("max_mds", "%d" % max_mds
)
603 def set_session_timeout(self
, timeout
):
604 self
.set_var("session_timeout", "%d" % timeout
)
606 def set_allow_standby_replay(self
, yes
):
607 self
.set_var("allow_standby_replay", yes
)
609 def set_allow_new_snaps(self
, yes
):
610 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
612 def set_bal_rank_mask(self
, bal_rank_mask
):
613 self
.set_var("bal_rank_mask", bal_rank_mask
)
615 def set_refuse_client_session(self
, yes
):
616 self
.set_var("refuse_client_session", yes
)
618 def compat(self
, *args
):
619 a
= map(lambda x
: str(x
).lower(), args
)
620 self
.mon_manager
.raw_cluster_cmd("fs", "compat", self
.name
, *a
)
622 def add_compat(self
, *args
):
623 self
.compat("add_compat", *args
)
625 def add_incompat(self
, *args
):
626 self
.compat("add_incompat", *args
)
628 def rm_compat(self
, *args
):
629 self
.compat("rm_compat", *args
)
631 def rm_incompat(self
, *args
):
632 self
.compat("rm_incompat", *args
)
634 def required_client_features(self
, *args
, **kwargs
):
635 c
= ["fs", "required_client_features", self
.name
, *args
]
636 return self
.mon_manager
.run_cluster_cmd(args
=c
, **kwargs
)
638 # Since v15.1.0 the pg autoscale mode has been enabled as default,
639 # will let the pg autoscale mode to calculate the pg_num as needed.
640 # We set the pg_num_min to 64 to make sure that pg autoscale mode
641 # won't set the pg_num to low to fix Tracker#45434.
644 target_size_ratio
= 0.9
645 target_size_ratio_ec
= 0.9
647 def create(self
, recover
=False, metadata_overlay
=False):
648 if self
.name
is None:
650 if self
.metadata_pool_name
is None:
651 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
652 if self
.data_pool_name
is None:
653 data_pool_name
= "{0}_data".format(self
.name
)
655 data_pool_name
= self
.data_pool_name
657 # will use the ec pool to store the data and a small amount of
658 # metadata still goes to the primary data pool for all files.
659 if not metadata_overlay
and self
.ec_profile
and 'disabled' not in self
.ec_profile
:
660 self
.target_size_ratio
= 0.05
662 log
.debug("Creating filesystem '{0}'".format(self
.name
))
665 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
666 self
.metadata_pool_name
,
667 '--pg_num_min', str(self
.pg_num_min
))
669 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
670 data_pool_name
, str(self
.pg_num
),
671 '--pg_num_min', str(self
.pg_num_min
),
672 '--target_size_ratio',
673 str(self
.target_size_ratio
))
674 except CommandFailedError
as e
:
675 if e
.exitstatus
== 22: # nautilus couldn't specify --pg_num_min option
676 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
677 self
.metadata_pool_name
,
678 str(self
.pg_num_min
))
680 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
681 data_pool_name
, str(self
.pg_num
),
682 str(self
.pg_num_min
))
686 args
= ["fs", "new", self
.name
, self
.metadata_pool_name
, data_pool_name
]
688 args
.append('--recover')
690 args
.append('--allow-dangerous-metadata-overlay')
691 self
.mon_manager
.raw_cluster_cmd(*args
)
694 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
695 ec_data_pool_name
= data_pool_name
+ "_ec"
696 log
.debug("EC profile is %s", self
.ec_profile
)
697 cmd
= ['osd', 'erasure-code-profile', 'set', ec_data_pool_name
]
698 cmd
.extend(self
.ec_profile
)
699 self
.mon_manager
.raw_cluster_cmd(*cmd
)
701 self
.mon_manager
.raw_cluster_cmd(
702 'osd', 'pool', 'create', ec_data_pool_name
,
703 'erasure', ec_data_pool_name
,
704 '--pg_num_min', str(self
.pg_num_min
),
705 '--target_size_ratio', str(self
.target_size_ratio_ec
))
706 except CommandFailedError
as e
:
707 if e
.exitstatus
== 22: # nautilus couldn't specify --pg_num_min option
708 self
.mon_manager
.raw_cluster_cmd(
709 'osd', 'pool', 'create', ec_data_pool_name
,
710 str(self
.pg_num_min
), 'erasure', ec_data_pool_name
)
713 self
.mon_manager
.raw_cluster_cmd(
714 'osd', 'pool', 'set',
715 ec_data_pool_name
, 'allow_ec_overwrites', 'true')
716 self
.add_data_pool(ec_data_pool_name
, create
=False)
717 self
.check_pool_application(ec_data_pool_name
)
719 self
.run_client_payload(f
"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
721 self
.check_pool_application(self
.metadata_pool_name
)
722 self
.check_pool_application(data_pool_name
)
724 # Turn off spurious standby count warnings from modifying max_mds in tests.
726 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
727 except CommandFailedError
as e
:
728 if e
.exitstatus
== 22:
729 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
734 if self
.fs_config
is not None:
735 log
.debug(f
"fs_config: {self.fs_config}")
736 max_mds
= self
.fs_config
.get('max_mds', 1)
738 self
.set_max_mds(max_mds
)
740 standby_replay
= self
.fs_config
.get('standby_replay', False)
741 self
.set_allow_standby_replay(standby_replay
)
743 # If absent will use the default value (60 seconds)
744 session_timeout
= self
.fs_config
.get('session_timeout', 60)
745 if session_timeout
!= 60:
746 self
.set_session_timeout(session_timeout
)
748 if self
.fs_config
.get('subvols', None) is not None:
749 log
.debug(f
"Creating {self.fs_config.get('subvols')} subvols "
750 f
"for filesystem '{self.name}'")
751 if not hasattr(self
._ctx
, "created_subvols"):
752 self
._ctx
.created_subvols
= dict()
754 subvols
= self
.fs_config
.get('subvols')
755 assert(isinstance(subvols
, dict))
756 assert(isinstance(subvols
['create'], int))
757 assert(subvols
['create'] > 0)
759 for sv
in range(0, subvols
['create']):
761 self
.mon_manager
.raw_cluster_cmd(
762 'fs', 'subvolume', 'create', self
.name
, sv_name
,
763 self
.fs_config
.get('subvol_options', ''))
765 if self
.name
not in self
._ctx
.created_subvols
:
766 self
._ctx
.created_subvols
[self
.name
] = []
768 subvol_path
= self
.mon_manager
.raw_cluster_cmd(
769 'fs', 'subvolume', 'getpath', self
.name
, sv_name
)
770 subvol_path
= subvol_path
.strip()
771 self
._ctx
.created_subvols
[self
.name
].append(subvol_path
)
773 log
.debug(f
"Not Creating any subvols for filesystem '{self.name}'")
776 self
.getinfo(refresh
= True)
778 # wait pgs to be clean
779 self
.mon_manager
.wait_for_clean()
781 def run_client_payload(self
, cmd
):
782 # avoid circular dep by importing here:
783 from tasks
.cephfs
.fuse_mount
import FuseMount
785 # Wait for at MDS daemons to be ready before mounting the
786 # ceph-fuse client in run_client_payload()
787 self
.wait_for_daemons()
789 d
= misc
.get_testdir(self
._ctx
)
790 m
= FuseMount(self
._ctx
, d
, "admin", self
.client_remote
, cephfs_name
=self
.name
)
792 m
.run_shell_payload(cmd
)
793 m
.umount_wait(require_clean
=True)
795 def _remove_pool(self
, name
, **kwargs
):
796 c
= f
'osd pool rm {name} {name} --yes-i-really-really-mean-it'
797 return self
.mon_manager
.ceph(c
, **kwargs
)
799 def rm(self
, **kwargs
):
800 c
= f
'fs rm {self.name} --yes-i-really-mean-it'
801 return self
.mon_manager
.ceph(c
, **kwargs
)
803 def remove_pools(self
, data_pools
):
804 self
._remove
_pool
(self
.get_metadata_pool_name())
805 for poolname
in data_pools
:
807 self
._remove
_pool
(poolname
)
808 except CommandFailedError
as e
:
809 # EBUSY, this data pool is used by two metadata pools, let the
811 if e
.exitstatus
== EBUSY
:
816 def destroy(self
, reset_obj_attrs
=True):
817 log
.info(f
'Destroying file system {self.name} and related pools')
820 log
.debug('already dead...')
823 data_pools
= self
.get_data_pool_names(refresh
=True)
825 # make sure no MDSs are attached to given FS.
829 self
.remove_pools(data_pools
)
834 self
.metadata_pool_name
= None
835 self
.data_pool_name
= None
836 self
.data_pools
= None
842 self
.getinfo(refresh
=True)
844 def check_pool_application(self
, pool_name
):
845 osd_map
= self
.mon_manager
.get_osd_dump_json()
846 for pool
in osd_map
['pools']:
847 if pool
['pool_name'] == pool_name
:
848 if "application_metadata" in pool
:
849 if not "cephfs" in pool
['application_metadata']:
850 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
851 format(pool_name
=pool_name
))
854 if getattr(self
._ctx
, "filesystem", None) == self
:
855 delattr(self
._ctx
, "filesystem")
859 Whether a filesystem exists in the mon's filesystem list
861 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
862 return self
.name
in [fs
['name'] for fs
in fs_list
]
864 def legacy_configured(self
):
866 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
867 the case, the caller should avoid using Filesystem.create
870 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
871 pools
= json
.loads(out_text
)
872 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
873 if metadata_pool_exists
:
874 self
.metadata_pool_name
= 'metadata'
875 except CommandFailedError
as e
:
876 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
877 # structured output (--format) from the CLI.
878 if e
.exitstatus
== 22:
879 metadata_pool_exists
= True
883 return metadata_pool_exists
886 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
888 # may raise FSMissing
889 def get_mds_map(self
, status
=None):
891 status
= self
.status()
892 return status
.get_fsmap(self
.id)['mdsmap']
894 def get_var(self
, var
, status
=None):
895 return self
.get_mds_map(status
=status
)[var
]
897 def set_dir_layout(self
, mount
, path
, layout
):
898 for name
, value
in layout
.items():
899 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
901 def add_data_pool(self
, name
, create
=True):
904 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
905 '--pg_num_min', str(self
.pg_num_min
))
906 except CommandFailedError
as e
:
907 if e
.exitstatus
== 22: # nautilus couldn't specify --pg_num_min option
908 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
909 str(self
.pg_num_min
))
912 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
913 self
.get_pool_names(refresh
= True)
914 for poolid
, fs_name
in self
.data_pools
.items():
917 raise RuntimeError("could not get just created pool '{0}'".format(name
))
919 def get_pool_names(self
, refresh
= False, status
= None):
920 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
922 status
= self
.status()
923 fsmap
= status
.get_fsmap(self
.id)
925 osd_map
= self
.mon_manager
.get_osd_dump_json()
927 for p
in osd_map
['pools']:
928 id_to_name
[p
['pool']] = p
['pool_name']
930 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
932 for data_pool
in fsmap
['mdsmap']['data_pools']:
933 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
935 def get_data_pool_name(self
, refresh
= False):
936 if refresh
or self
.data_pools
is None:
937 self
.get_pool_names(refresh
= True)
938 assert(len(self
.data_pools
) == 1)
939 return next(iter(self
.data_pools
.values()))
941 def get_data_pool_id(self
, refresh
= False):
943 Don't call this if you have multiple data pools
946 if refresh
or self
.data_pools
is None:
947 self
.get_pool_names(refresh
= True)
948 assert(len(self
.data_pools
) == 1)
949 return next(iter(self
.data_pools
.keys()))
951 def get_data_pool_names(self
, refresh
= False):
952 if refresh
or self
.data_pools
is None:
953 self
.get_pool_names(refresh
= True)
954 return list(self
.data_pools
.values())
956 def get_metadata_pool_name(self
):
957 return self
.metadata_pool_name
959 def set_data_pool_name(self
, name
):
960 if self
.id is not None:
961 raise RuntimeError("can't set filesystem name if its fscid is set")
962 self
.data_pool_name
= name
964 def get_pool_pg_num(self
, pool_name
):
965 pgs
= json
.loads(self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'get',
967 '--format=json-pretty'))
968 return int(pgs
['pg_num'])
970 def get_namespace_id(self
):
973 def get_pool_df(self
, pool_name
):
976 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
978 for pool_df
in self
._df
()['pools']:
979 if pool_df
['name'] == pool_name
:
980 return pool_df
['stats']
982 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
985 return self
._df
()['stats']['total_used_bytes']
987 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
989 Return true if all daemons are in one of active, standby, standby-replay, and
990 at least max_mds daemons are in 'active'.
992 Unlike most of Filesystem, this function is tolerant of new-style `fs`
993 commands being missing, because we are part of the ceph installation
994 process during upgrade suites, so must fall back to old style commands
995 when we get an EINVAL on a new style command.
999 # First, check to see that processes haven't exited with an error code
1000 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
1004 mds_map
= self
.get_mds_map(status
=status
)
1006 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
1008 for mds_id
, mds_status
in mds_map
['info'].items():
1009 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
1010 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
1012 elif mds_status
['state'] == 'up:active':
1015 log
.debug("are_daemons_healthy: {0}/{1}".format(
1016 active_count
, mds_map
['max_mds']
1019 if not skip_max_mds_check
:
1020 if active_count
> mds_map
['max_mds']:
1021 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
1023 elif active_count
== mds_map
['max_mds']:
1024 # The MDSMap says these guys are active, but let's check they really are
1025 for mds_id
, mds_status
in mds_map
['info'].items():
1026 if mds_status
['state'] == 'up:active':
1028 daemon_status
= self
.mds_tell(["status"], mds_id
=mds_status
['name'])
1029 except CommandFailedError
as cfe
:
1030 if cfe
.exitstatus
== errno
.EINVAL
:
1031 # Old version, can't do this check
1034 # MDS not even running
1037 if daemon_status
['state'] != 'up:active':
1038 # MDS hasn't taken the latest map yet
1045 log
.debug("are_daemons_healthy: skipping max_mds check")
1048 def get_daemon_names(self
, state
=None, status
=None):
1050 Return MDS daemon names of those daemons in the given state
1054 mdsmap
= self
.get_mds_map(status
)
1056 for mds_status
in sorted(mdsmap
['info'].values(),
1057 key
=lambda _
: _
['rank']):
1058 if mds_status
['state'] == state
or state
is None:
1059 result
.append(mds_status
['name'])
1063 def get_active_names(self
, status
=None):
1065 Return MDS daemon names of those daemons holding ranks
1068 :return: list of strings like ['a', 'b'], sorted by rank
1070 return self
.get_daemon_names("up:active", status
=status
)
1072 def get_all_mds_rank(self
, status
=None):
1073 mdsmap
= self
.get_mds_map(status
)
1075 for mds_status
in sorted(mdsmap
['info'].values(),
1076 key
=lambda _
: _
['rank']):
1077 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1078 result
.append(mds_status
['rank'])
1082 def get_rank(self
, rank
=None, status
=None):
1084 status
= self
.getinfo()
1087 return status
.get_rank(self
.id, rank
)
1089 def rank_restart(self
, rank
=0, status
=None):
1090 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1091 self
.mds_restart(mds_id
=name
)
1093 def rank_signal(self
, signal
, rank
=0, status
=None):
1094 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1095 self
.mds_signal(name
, signal
)
1097 def rank_freeze(self
, yes
, rank
=0):
1098 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
1100 def rank_repaired(self
, rank
):
1101 self
.mon_manager
.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self
.id, rank
))
1103 def rank_fail(self
, rank
=0):
1104 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
1106 def rank_is_running(self
, rank
=0, status
=None):
1107 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1108 return self
.mds_is_running(name
)
1110 def get_ranks(self
, status
=None):
1112 status
= self
.getinfo()
1113 return status
.get_ranks(self
.id)
1115 def get_damaged(self
, status
=None):
1117 status
= self
.getinfo()
1118 return status
.get_damaged(self
.id)
1120 def get_replays(self
, status
=None):
1122 status
= self
.getinfo()
1123 return status
.get_replays(self
.id)
1125 def get_replay(self
, rank
=0, status
=None):
1126 for replay
in self
.get_replays(status
=status
):
1127 if replay
['rank'] == rank
:
1131 def get_rank_names(self
, status
=None):
1133 Return MDS daemon names of those daemons holding a rank,
1134 sorted by rank. This includes e.g. up:replay/reconnect
1135 as well as active, but does not include standby or
1138 mdsmap
= self
.get_mds_map(status
)
1140 for mds_status
in sorted(mdsmap
['info'].values(),
1141 key
=lambda _
: _
['rank']):
1142 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1143 result
.append(mds_status
['name'])
1147 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
1149 Wait until all daemons are healthy
1154 timeout
= DAEMON_WAIT_TIMEOUT
1157 status
= self
.status()
1161 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
1167 if elapsed
> timeout
:
1168 log
.debug("status = {0}".format(status
))
1169 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1171 status
= self
.status()
1173 def dencoder(self
, obj_type
, obj_blob
):
1174 args
= [os
.path
.join(self
._prefix
, "ceph-dencoder"), 'type', obj_type
, 'import', '-', 'decode', 'dump_json']
1175 p
= self
.mon_manager
.controller
.run(args
=args
, stdin
=BytesIO(obj_blob
), stdout
=BytesIO())
1176 return p
.stdout
.getvalue()
1178 def rados(self
, *args
, **kwargs
):
1180 Callout to rados CLI.
1183 return self
.mon_manager
.do_rados(*args
, **kwargs
)
1185 def radosm(self
, *args
, **kwargs
):
1187 Interact with the metadata pool via rados CLI.
1190 return self
.rados(*args
, **kwargs
, pool
=self
.get_metadata_pool_name())
1192 def radosmo(self
, *args
, stdout
=BytesIO(), **kwargs
):
1194 Interact with the metadata pool via rados CLI. Get the stdout.
1197 return self
.radosm(*args
, **kwargs
, stdout
=stdout
).stdout
.getvalue()
1199 def get_metadata_object(self
, object_type
, object_id
):
1201 Retrieve an object from the metadata pool, pass it through
1202 ceph-dencoder to dump it to JSON, and return the decoded object.
1205 o
= self
.radosmo(['get', object_id
, '-'])
1206 j
= self
.dencoder(object_type
, o
)
1208 return json
.loads(j
)
1209 except (TypeError, ValueError):
1210 log
.error("Failed to decode JSON: '{0}'".format(j
))
1213 def get_journal_version(self
):
1215 Read the JournalPointer and Journal::Header objects to learn the version of
1218 journal_pointer_object
= '400.00000000'
1219 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1220 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1222 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1223 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1225 version
= journal_header_dump
['journal_header']['stream_format']
1226 log
.debug("Read journal version {0}".format(version
))
1230 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1232 return self
.rank_asok(command
, timeout
=timeout
)
1234 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1236 def mds_tell(self
, command
, mds_id
=None):
1238 return self
.rank_tell(command
)
1240 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{mds_id}", *command
))
1242 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1243 info
= self
.get_rank(rank
=rank
, status
=status
)
1244 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1246 def rank_tell(self
, command
, rank
=0, status
=None):
1248 out
= self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{self.id}:{rank}", *command
)
1249 return json
.loads(out
)
1250 except json
.decoder
.JSONDecodeError
:
1251 log
.error("could not decode: {}".format(out
))
1254 def ranks_tell(self
, command
, status
=None):
1256 status
= self
.status()
1258 for r
in status
.get_ranks(self
.id):
1259 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1260 out
.append((r
['rank'], result
))
1263 def ranks_perf(self
, f
, status
=None):
1264 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1266 for rank
, perf
in perf
:
1267 out
.append((rank
, f(perf
)))
1270 def read_cache(self
, path
, depth
=None, rank
=None):
1271 cmd
= ["dump", "tree", path
]
1272 if depth
is not None:
1273 cmd
.append(depth
.__str
__())
1274 result
= self
.rank_asok(cmd
, rank
=rank
)
1275 if result
is None or len(result
) == 0:
1276 raise RuntimeError("Path not found in cache: {0}".format(path
))
1280 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1282 Block until the MDS reaches a particular state, or a failure condition
1285 When there are multiple MDSs, succeed when exaclty one MDS is in the
1286 goal state, or fail when any MDS is in the reject state.
1288 :param goal_state: Return once the MDS is in this state
1289 :param reject: Fail if the MDS enters this state before the goal state
1290 :param timeout: Fail if this many seconds pass before reaching goal
1291 :return: number of seconds waited, rounded down to integer
1294 started_at
= time
.time()
1296 status
= self
.status()
1297 if rank
is not None:
1299 mds_info
= status
.get_rank(self
.id, rank
)
1300 current_state
= mds_info
['state'] if mds_info
else None
1301 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1303 mdsmap
= self
.get_mds_map(status
=status
)
1304 if rank
in mdsmap
['failed']:
1305 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1306 current_state
= None
1309 elif mds_id
is not None:
1310 # mds_info is None if no daemon with this ID exists in the map
1311 mds_info
= status
.get_mds(mds_id
)
1312 current_state
= mds_info
['state'] if mds_info
else None
1313 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1315 # In general, look for a single MDS
1316 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1317 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1318 current_state
= goal_state
1319 elif reject
in states
:
1320 current_state
= reject
1322 current_state
= None
1323 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1325 elapsed
= time
.time() - started_at
1326 if current_state
== goal_state
:
1327 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1329 elif reject
is not None and current_state
== reject
:
1330 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1331 elif timeout
is not None and elapsed
> timeout
:
1332 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1334 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1335 elapsed
, goal_state
, current_state
1340 def _read_data_xattr(self
, ino_no
, xattr_name
, obj_type
, pool
):
1342 pool
= self
.get_data_pool_name()
1344 obj_name
= "{0:x}.00000000".format(ino_no
)
1346 args
= ["getxattr", obj_name
, xattr_name
]
1348 proc
= self
.rados(args
, pool
=pool
, stdout
=BytesIO())
1349 except CommandFailedError
as e
:
1350 log
.error(e
.__str
__())
1351 raise ObjectNotFound(obj_name
)
1353 obj_blob
= proc
.stdout
.getvalue()
1354 return json
.loads(self
.dencoder(obj_type
, obj_blob
).strip())
1356 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1358 Write to an xattr of the 0th data object of an inode. Will
1359 succeed whether the object and/or xattr already exist or not.
1361 :param ino_no: integer inode number
1362 :param xattr_name: string name of the xattr
1363 :param data: byte array data to write to the xattr
1364 :param pool: name of data pool or None to use primary data pool
1368 pool
= self
.get_data_pool_name()
1370 obj_name
= "{0:x}.00000000".format(ino_no
)
1371 args
= ["setxattr", obj_name
, xattr_name
, data
]
1372 self
.rados(args
, pool
=pool
)
1374 def read_symlink(self
, ino_no
, pool
=None):
1375 return self
._read
_data
_xattr
(ino_no
, "symlink", "string_wrapper", pool
)
1377 def read_backtrace(self
, ino_no
, pool
=None):
1379 Read the backtrace from the data pool, return a dict in the format
1380 given by inode_backtrace_t::dump, which is something like:
1384 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1385 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1387 { "ino": 1099511627778,
1395 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1396 one data pool and that will be used.
1398 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1400 def read_layout(self
, ino_no
, pool
=None):
1402 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1405 "stripe_unit": 4194304,
1407 "object_size": 4194304,
1412 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1413 one data pool and that will be used.
1415 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1417 def _enumerate_data_objects(self
, ino
, size
):
1419 Get the list of expected data objects for a range, and the list of objects
1422 :return a tuple of two lists of strings (expected, actual)
1424 stripe_size
= 1024 * 1024 * 4
1426 size
= max(stripe_size
, size
)
1429 "{0:x}.{1:08x}".format(ino
, n
)
1430 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1433 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name(), stdout
=StringIO()).stdout
.getvalue().split("\n")
1435 return want_objects
, exist_objects
1437 def data_objects_present(self
, ino
, size
):
1439 Check that *all* the expected data objects for an inode are present in the data pool
1442 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1443 missing
= set(want_objects
) - set(exist_objects
)
1446 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1451 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1454 def data_objects_absent(self
, ino
, size
):
1455 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1456 present
= set(want_objects
) & set(exist_objects
)
1459 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1464 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1467 def dirfrag_exists(self
, ino
, frag
):
1469 self
.radosm(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1470 except CommandFailedError
:
1475 def list_dirfrag(self
, dir_ino
):
1477 Read the named object and return the list of omap keys
1479 :return a list of 0 or more strings
1482 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1485 key_list_str
= self
.radosmo(["listomapkeys", dirfrag_obj_name
], stdout
=StringIO())
1486 except CommandFailedError
as e
:
1487 log
.error(e
.__str
__())
1488 raise ObjectNotFound(dirfrag_obj_name
)
1490 return key_list_str
.strip().split("\n") if key_list_str
else []
1492 def get_meta_of_fs_file(self
, dir_ino
, obj_name
, out
):
1494 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1495 warning : The splitting of directory is not considered here.
1498 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1500 self
.radosm(["getomapval", dirfrag_obj_name
, obj_name
+"_head", out
])
1501 except CommandFailedError
as e
:
1502 log
.error(e
.__str
__())
1503 raise ObjectNotFound(dir_ino
)
1505 def erase_metadata_objects(self
, prefix
):
1507 For all objects in the metadata pool matching the prefix,
1510 This O(N) with the number of objects in the pool, so only suitable
1511 for use on toy test filesystems.
1513 all_objects
= self
.radosmo(["ls"], stdout
=StringIO()).strip().split("\n")
1514 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1515 for o
in matching_objects
:
1516 self
.radosm(["rm", o
])
1518 def erase_mds_objects(self
, rank
):
1520 Erase all the per-MDS objects for a particular rank. This includes
1521 inotable, sessiontable, journal
1524 def obj_prefix(multiplier
):
1526 MDS object naming conventions like rank 1's
1527 journal is at 201.***
1529 return "%x." % (multiplier
* 0x100 + rank
)
1531 # MDS_INO_LOG_OFFSET
1532 self
.erase_metadata_objects(obj_prefix(2))
1533 # MDS_INO_LOG_BACKUP_OFFSET
1534 self
.erase_metadata_objects(obj_prefix(3))
1535 # MDS_INO_LOG_POINTER_OFFSET
1536 self
.erase_metadata_objects(obj_prefix(4))
1537 # MDSTables & SessionMap
1538 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1543 Override this to set a different
1547 def _make_rank(self
, rank
):
1548 return "{}:{}".format(self
.name
, rank
)
1550 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1551 # Tests frequently have [client] configuration that jacks up
1552 # the objecter log level (unlikely to be interesting here)
1553 # and does not set the mds log level (very interesting here)
1555 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1557 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1']
1559 if rank
is not None:
1560 base_args
.extend(["--rank", "%s" % str(rank
)])
1562 t1
= datetime
.datetime
.now()
1563 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1564 duration
= datetime
.datetime
.now() - t1
1565 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1566 base_args
+ args
, duration
, r
1571 def tool_remote(self
):
1573 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1574 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1575 so that tests can use this remote to go get locally written output files from the tools.
1577 return self
.mon_manager
.controller
1579 def journal_tool(self
, args
, rank
, quiet
=False):
1581 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1583 fs_rank
= self
._make
_rank
(rank
)
1584 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1586 def meta_tool(self
, args
, rank
, quiet
=False):
1588 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1590 fs_rank
= self
._make
_rank
(rank
)
1591 return self
._run
_tool
("cephfs-meta-injection", args
, fs_rank
, quiet
)
1593 def table_tool(self
, args
, quiet
=False):
1595 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1597 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1599 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1601 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1603 :param worker_count: if greater than 1, multiple workers will be run
1604 in parallel and the return value will be None
1609 for n
in range(0, worker_count
):
1610 if worker_count
> 1:
1611 # data-scan args first token is a command, followed by args to it.
1612 # insert worker arguments after the command.
1614 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1618 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1619 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1624 if worker_count
== 1:
1625 return workers
[0].value
1630 return self
.is_pool_full(self
.get_data_pool_name())
1632 def authorize(self
, client_id
, caps
=('/', 'rw')):
1634 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1637 client_id: client id that will be authorized
1638 caps: tuple containing the path and permission (can be r or rw)
1641 if isinstance(caps
[0], (tuple, list)):
1647 client_name
= 'client.' + client_id
1648 return self
.mon_manager
.raw_cluster_cmd('fs', 'authorize', self
.name
,
1651 def grow(self
, new_max_mds
, status
=None):
1652 oldmax
= self
.get_var('max_mds', status
=status
)
1653 assert(new_max_mds
> oldmax
)
1654 self
.set_max_mds(new_max_mds
)
1655 return self
.wait_for_daemons()
1657 def shrink(self
, new_max_mds
, status
=None):
1658 oldmax
= self
.get_var('max_mds', status
=status
)
1659 assert(new_max_mds
< oldmax
)
1660 self
.set_max_mds(new_max_mds
)
1661 return self
.wait_for_daemons()
1663 def run_scrub(self
, cmd
, rank
=0):
1664 return self
.rank_tell(["scrub"] + cmd
, rank
)
1666 def get_scrub_status(self
, rank
=0):
1667 return self
.run_scrub(["status"], rank
)
1669 def flush(self
, rank
=0):
1670 return self
.rank_tell(["flush", "journal"], rank
=rank
)
1672 def wait_until_scrub_complete(self
, result
=None, tag
=None, rank
=0, sleep
=30,
1673 timeout
=300, reverse
=False):
1674 # time out after "timeout" seconds and assume as done
1676 result
= "no active scrubs running"
1677 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
1679 out_json
= self
.rank_tell(["scrub", "status"], rank
=rank
)
1680 assert out_json
is not None
1682 if result
in out_json
['status']:
1683 log
.info("all active scrubs completed")
1686 if result
not in out_json
['status']:
1687 log
.info("all active scrubs completed")
1691 status
= out_json
['scrubs'][tag
]
1692 if status
is not None:
1693 log
.info(f
"scrub status for tag:{tag} - {status}")
1695 log
.info(f
"scrub has completed for tag:{tag}")
1698 # timed out waiting for scrub to complete
1701 def get_damage(self
, rank
=None):
1704 for info
in self
.get_ranks():
1706 result
[rank
] = self
.get_damage(rank
=rank
)
1709 return self
.rank_tell(['damage', 'ls'], rank
=rank
)