]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
4 from gevent
import Greenlet
12 from io
import BytesIO
, StringIO
13 from errno
import EBUSY
15 from teuthology
.exceptions
import CommandFailedError
16 from teuthology
import misc
17 from teuthology
.nuke
import clear_firewall
18 from teuthology
.parallel
import parallel
19 from teuthology
import contextutil
20 from tasks
.ceph_manager
import write_conf
21 from tasks
import ceph_manager
24 log
= logging
.getLogger(__name__
)
27 DAEMON_WAIT_TIMEOUT
= 120
30 class FileLayout(object):
31 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
33 self
.pool_namespace
= pool_namespace
34 self
.stripe_unit
= stripe_unit
35 self
.stripe_count
= stripe_count
36 self
.object_size
= object_size
39 def load_from_ceph(layout_str
):
44 if self
.pool
is not None:
45 yield ("pool", self
.pool
)
46 if self
.pool_namespace
:
47 yield ("pool_namespace", self
.pool_namespace
)
48 if self
.stripe_unit
is not None:
49 yield ("stripe_unit", self
.stripe_unit
)
50 if self
.stripe_count
is not None:
51 yield ("stripe_count", self
.stripe_count
)
52 if self
.object_size
is not None:
53 yield ("object_size", self
.stripe_size
)
55 class ObjectNotFound(Exception):
56 def __init__(self
, object_name
):
57 self
._object
_name
= object_name
60 return "Object not found: '{0}'".format(self
._object
_name
)
62 class FSMissing(Exception):
63 def __init__(self
, ident
):
67 return f
"File system {self.ident} does not exist in the map"
69 class FSStatus(object):
71 Operations on a snapshot of the FSMap.
73 def __init__(self
, mon_manager
, epoch
=None):
74 self
.mon
= mon_manager
75 cmd
= ["fs", "dump", "--format=json"]
77 cmd
.append(str(epoch
))
78 self
.map = json
.loads(self
.mon
.raw_cluster_cmd(*cmd
))
81 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self
, key
):
86 Get a field from the fsmap.
90 def get_filesystems(self
):
92 Iterator for all filesystems.
94 for fs
in self
.map['filesystems']:
99 Iterator for all the mds_info components in the FSMap.
101 for info
in self
.map['standbys']:
103 for fs
in self
.map['filesystems']:
104 for info
in fs
['mdsmap']['info'].values():
107 def get_standbys(self
):
109 Iterator for all standbys.
111 for info
in self
.map['standbys']:
114 def get_fsmap(self
, fscid
):
116 Get the fsmap for the given FSCID.
118 for fs
in self
.map['filesystems']:
119 if fscid
is None or fs
['id'] == fscid
:
121 raise FSMissing(fscid
)
123 def get_fsmap_byname(self
, name
):
125 Get the fsmap for the given file system name.
127 for fs
in self
.map['filesystems']:
128 if name
is None or fs
['mdsmap']['fs_name'] == name
:
130 raise FSMissing(name
)
132 def get_replays(self
, fscid
):
134 Get the standby:replay MDS for the given FSCID.
136 fs
= self
.get_fsmap(fscid
)
137 for info
in fs
['mdsmap']['info'].values():
138 if info
['state'] == 'up:standby-replay':
141 def get_ranks(self
, fscid
):
143 Get the ranks for the given FSCID.
145 fs
= self
.get_fsmap(fscid
)
146 for info
in fs
['mdsmap']['info'].values():
147 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
150 def get_damaged(self
, fscid
):
152 Get the damaged ranks for the given FSCID.
154 fs
= self
.get_fsmap(fscid
)
155 return fs
['mdsmap']['damaged']
157 def get_rank(self
, fscid
, rank
):
159 Get the rank for the given FSCID.
161 for info
in self
.get_ranks(fscid
):
162 if info
['rank'] == rank
:
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
166 def get_mds(self
, name
):
168 Get the info for the given MDS name.
170 for info
in self
.get_all():
171 if info
['name'] == name
:
175 def get_mds_addr(self
, name
):
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
179 info
= self
.get_mds(name
)
183 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
184 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
186 def get_mds_addrs(self
, name
):
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
190 info
= self
.get_mds(name
)
192 return [e
['addr'] for e
in info
['addrs']['addrvec']]
194 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
197 def get_mds_gid(self
, gid
):
199 Get the info for the given MDS gid.
201 for info
in self
.get_all():
202 if info
['gid'] == gid
:
206 def hadfailover(self
, status
):
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
211 for fs
in status
.map['filesystems']:
212 for info
in fs
['mdsmap']['info'].values():
213 oldinfo
= self
.get_mds_gid(info
['gid'])
214 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
219 class CephCluster(object):
221 def admin_remote(self
):
222 first_mon
= misc
.get_first_mon(self
._ctx
, None)
223 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
226 def __init__(self
, ctx
) -> None:
228 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
230 def get_config(self
, key
, service_type
=None):
232 Get config from mon by default, or a specific service if caller asks for it
234 if service_type
is None:
237 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
238 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
240 def set_ceph_conf(self
, subsys
, key
, value
):
241 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
242 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
243 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
244 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
247 def clear_ceph_conf(self
, subsys
, key
):
248 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
249 write_conf(self
._ctx
)
251 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
254 command
.insert(0, '--format=json')
255 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
256 response_data
= proc
.stdout
.getvalue().strip()
257 if len(response_data
) > 0:
259 def get_nonnumeric_values(value
):
260 c
= {"NaN": float("nan"), "Infinity": float("inf"),
261 "-Infinity": -float("inf")}
264 j
= json
.loads(response_data
.replace('inf', 'Infinity'),
265 parse_constant
=get_nonnumeric_values
)
266 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
267 log
.debug(f
"_json_asok output\n{pretty}")
270 log
.debug("_json_asok output empty")
273 def is_addr_blocklisted(self
, addr
):
274 blocklist
= json
.loads(self
.mon_manager
.raw_cluster_cmd(
275 "osd", "dump", "--format=json"))['blocklist']
276 if addr
in blocklist
:
278 log
.warn(f
'The address {addr} is not blocklisted')
282 class MDSCluster(CephCluster
):
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
292 def __init__(self
, ctx
):
293 super(MDSCluster
, self
).__init
__(ctx
)
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'mds'))
301 def mds_daemons(self
):
302 return dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
304 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
306 Call a callback for a single named MDS, or for all.
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
320 with
parallel() as p
:
321 for mds_id
in self
.mds_ids
:
324 for mds_id
in self
.mds_ids
:
329 def get_config(self
, key
, service_type
=None):
331 get_config specialization of service_type="mds"
333 if service_type
!= "mds":
334 return super(MDSCluster
, self
).get_config(key
, service_type
)
336 # Some tests stop MDS daemons, don't send commands to a dead one:
337 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
338 service_id
= random
.sample(running_daemons
, 1)[0]
339 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
341 def mds_stop(self
, mds_id
=None):
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
346 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
348 def mds_fail(self
, mds_id
=None):
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
353 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
355 def mds_restart(self
, mds_id
=None):
356 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
358 def mds_fail_restart(self
, mds_id
=None):
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
365 def _fail_restart(id_
):
366 self
.mds_daemons
[id_
].stop()
367 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
368 self
.mds_daemons
[id_
].restart()
370 self
._one
_or
_all
(mds_id
, _fail_restart
)
372 def mds_signal(self
, mds_id
, sig
, silent
=False):
376 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
378 def mds_is_running(self
, mds_id
):
379 return self
.mds_daemons
[mds_id
].running()
381 def newfs(self
, name
='cephfs', create
=True):
382 return Filesystem(self
._ctx
, name
=name
, create
=create
)
384 def status(self
, epoch
=None):
385 return FSStatus(self
.mon_manager
, epoch
)
387 def get_standby_daemons(self
):
388 return set([s
['name'] for s
in self
.status().get_standbys()])
390 def get_mds_hostnames(self
):
392 for mds_id
in self
.mds_ids
:
393 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
394 result
.add(mds_remote
.hostname
)
398 def set_clients_block(self
, blocked
, mds_id
=None):
400 Block (using iptables) client communications to this MDS. Be careful: if
401 other services are running on this MDS, or other MDSs try to talk to this
402 MDS, their communications may also be blocked as collatoral damage.
404 :param mds_id: Optional ID of MDS to block, default to all
407 da_flag
= "-A" if blocked
else "-D"
409 def set_block(_mds_id
):
410 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
411 status
= self
.status()
413 addr
= status
.get_mds_addr(_mds_id
)
414 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
417 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
420 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
421 "comment", "--comment", "teuthology"])
423 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
425 def set_inter_mds_block(self
, blocked
, mds_rank_1
, mds_rank_2
):
427 Block (using iptables) communications from a provided MDS to other MDSs.
428 Block all ports that an MDS uses for communication.
430 :param blocked: True to block the MDS, False otherwise
431 :param mds_rank_1: MDS rank
432 :param mds_rank_2: MDS rank
435 da_flag
= "-A" if blocked
else "-D"
437 def set_block(mds_ids
):
438 status
= self
.status()
441 remote
= self
.mon_manager
.find_remote('mds', mds
)
442 addrs
= status
.get_mds_addrs(mds
)
444 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
446 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"], omit_sudo
=False)
451 remote
= self
.mon_manager
.find_remote('mds', mds
)
452 addrs
= status
.get_mds_addrs(mds
)
454 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
456 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
457 "comment", "--comment", "teuthology"], omit_sudo
=False)
459 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
460 "comment", "--comment", "teuthology"], omit_sudo
=False)
462 self
._one
_or
_all
((mds_rank_1
, mds_rank_2
), set_block
, in_parallel
=False)
464 def clear_firewall(self
):
465 clear_firewall(self
._ctx
)
467 def get_mds_info(self
, mds_id
):
468 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
470 def is_pool_full(self
, pool_name
):
471 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
473 if pool
['pool_name'] == pool_name
:
474 return 'full' in pool
['flags_names'].split(",")
476 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
478 def delete_all_filesystems(self
):
480 Remove all filesystems that exist, and any pools in use by them.
482 for fs
in self
.status().get_filesystems():
483 Filesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
486 def beacon_timeout(self
):
488 Generate an acceptable timeout for the mons to drive some MDSMap change
489 because of missed beacons from some MDS. This involves looking up the
490 grace period in use by the mons and adding an acceptable buffer.
493 grace
= float(self
.get_config("mds_beacon_grace", service_type
="mon"))
497 class Filesystem(MDSCluster
):
500 Generator for all Filesystems in the cluster.
503 def get_all_fs(cls
, ctx
):
504 mdsc
= MDSCluster(ctx
)
505 status
= mdsc
.status()
506 for fs
in status
.get_filesystems():
507 yield cls(ctx
, fscid
=fs
['id'])
510 This object is for driving a CephFS filesystem. The MDS daemons driven by
511 MDSCluster may be shared with other Filesystems.
513 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
514 super(Filesystem
, self
).__init
__(ctx
)
518 self
.metadata_pool_name
= None
519 self
.data_pool_name
= None
520 self
.data_pools
= None
521 self
.fs_config
= fs_config
522 self
.ec_profile
= fs_config
.get('ec_profile')
524 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
525 self
.client_id
= client_list
[0]
526 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
529 if fscid
is not None:
530 raise RuntimeError("cannot specify fscid when creating fs")
531 if create
and not self
.legacy_configured():
534 if fscid
is not None:
536 self
.getinfo(refresh
= True)
538 # Stash a reference to the first created filesystem on ctx, so
539 # that if someone drops to the interactive shell they can easily
541 if not hasattr(self
._ctx
, "filesystem"):
542 self
._ctx
.filesystem
= self
546 return not bool(self
.get_mds_map())
550 def get_task_status(self
, status_key
):
551 return self
.mon_manager
.get_service_task_status("mds", status_key
)
553 def getinfo(self
, refresh
= False):
554 status
= self
.status()
555 if self
.id is not None:
556 fsmap
= status
.get_fsmap(self
.id)
557 elif self
.name
is not None:
558 fsmap
= status
.get_fsmap_byname(self
.name
)
560 fss
= [fs
for fs
in status
.get_filesystems()]
564 raise RuntimeError("no file system available")
566 raise RuntimeError("more than one file system available")
567 self
.id = fsmap
['id']
568 self
.name
= fsmap
['mdsmap']['fs_name']
569 self
.get_pool_names(status
= status
, refresh
= refresh
)
572 def reach_max_mds(self
):
573 status
= self
.wait_for_daemons()
574 mds_map
= self
.get_mds_map(status
=status
)
575 assert(mds_map
['in'] == list(range(0, mds_map
['max_mds'])))
578 self
.mon_manager
.raw_cluster_cmd("fs", "reset", str(self
.name
), '--yes-i-really-mean-it')
581 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
583 def set_flag(self
, var
, *args
):
584 a
= map(lambda x
: str(x
).lower(), args
)
585 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
587 def set_allow_multifs(self
, yes
=True):
588 self
.set_flag("enable_multiple", yes
)
590 def set_var(self
, var
, *args
):
591 a
= map(lambda x
: str(x
).lower(), args
)
592 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
594 def set_down(self
, down
=True):
595 self
.set_var("down", str(down
).lower())
597 def set_joinable(self
, joinable
=True):
598 self
.set_var("joinable", joinable
)
600 def set_max_mds(self
, max_mds
):
601 self
.set_var("max_mds", "%d" % max_mds
)
603 def set_session_timeout(self
, timeout
):
604 self
.set_var("session_timeout", "%d" % timeout
)
606 def set_allow_standby_replay(self
, yes
):
607 self
.set_var("allow_standby_replay", yes
)
609 def set_allow_new_snaps(self
, yes
):
610 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
612 def set_bal_rank_mask(self
, bal_rank_mask
):
613 self
.set_var("bal_rank_mask", bal_rank_mask
)
615 def set_refuse_client_session(self
, yes
):
616 self
.set_var("refuse_client_session", yes
)
618 def compat(self
, *args
):
619 a
= map(lambda x
: str(x
).lower(), args
)
620 self
.mon_manager
.raw_cluster_cmd("fs", "compat", self
.name
, *a
)
622 def add_compat(self
, *args
):
623 self
.compat("add_compat", *args
)
625 def add_incompat(self
, *args
):
626 self
.compat("add_incompat", *args
)
628 def rm_compat(self
, *args
):
629 self
.compat("rm_compat", *args
)
631 def rm_incompat(self
, *args
):
632 self
.compat("rm_incompat", *args
)
634 def required_client_features(self
, *args
, **kwargs
):
635 c
= ["fs", "required_client_features", self
.name
, *args
]
636 return self
.mon_manager
.run_cluster_cmd(args
=c
, **kwargs
)
638 # Since v15.1.0 the pg autoscale mode has been enabled as default,
639 # will let the pg autoscale mode to calculate the pg_num as needed.
640 # We set the pg_num_min to 64 to make sure that pg autoscale mode
641 # won't set the pg_num to low to fix Tracker#45434.
644 target_size_ratio
= 0.9
645 target_size_ratio_ec
= 0.9
647 def create(self
, recover
=False, metadata_overlay
=False):
648 if self
.name
is None:
650 if self
.metadata_pool_name
is None:
651 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
652 if self
.data_pool_name
is None:
653 data_pool_name
= "{0}_data".format(self
.name
)
655 data_pool_name
= self
.data_pool_name
657 # will use the ec pool to store the data and a small amount of
658 # metadata still goes to the primary data pool for all files.
659 if not metadata_overlay
and self
.ec_profile
and 'disabled' not in self
.ec_profile
:
660 self
.target_size_ratio
= 0.05
662 log
.debug("Creating filesystem '{0}'".format(self
.name
))
665 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
666 self
.metadata_pool_name
,
667 '--pg_num_min', str(self
.pg_num_min
))
669 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
670 data_pool_name
, str(self
.pg_num
),
671 '--pg_num_min', str(self
.pg_num_min
),
672 '--target_size_ratio',
673 str(self
.target_size_ratio
))
674 except CommandFailedError
as e
:
675 if e
.exitstatus
== 22: # nautilus couldn't specify --pg_num_min option
676 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
677 self
.metadata_pool_name
,
678 str(self
.pg_num_min
))
680 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
681 data_pool_name
, str(self
.pg_num
),
682 str(self
.pg_num_min
))
686 args
= ["fs", "new", self
.name
, self
.metadata_pool_name
, data_pool_name
]
688 args
.append('--recover')
690 args
.append('--allow-dangerous-metadata-overlay')
691 self
.mon_manager
.raw_cluster_cmd(*args
)
694 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
695 ec_data_pool_name
= data_pool_name
+ "_ec"
696 log
.debug("EC profile is %s", self
.ec_profile
)
697 cmd
= ['osd', 'erasure-code-profile', 'set', ec_data_pool_name
]
698 cmd
.extend(self
.ec_profile
)
699 self
.mon_manager
.raw_cluster_cmd(*cmd
)
701 self
.mon_manager
.raw_cluster_cmd(
702 'osd', 'pool', 'create', ec_data_pool_name
,
703 'erasure', ec_data_pool_name
,
704 '--pg_num_min', str(self
.pg_num_min
),
705 '--target_size_ratio', str(self
.target_size_ratio_ec
))
706 except CommandFailedError
as e
:
707 if e
.exitstatus
== 22: # nautilus couldn't specify --pg_num_min option
708 self
.mon_manager
.raw_cluster_cmd(
709 'osd', 'pool', 'create', ec_data_pool_name
,
710 str(self
.pg_num_min
), 'erasure', ec_data_pool_name
)
713 self
.mon_manager
.raw_cluster_cmd(
714 'osd', 'pool', 'set',
715 ec_data_pool_name
, 'allow_ec_overwrites', 'true')
716 self
.add_data_pool(ec_data_pool_name
, create
=False)
717 self
.check_pool_application(ec_data_pool_name
)
719 self
.run_client_payload(f
"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
721 self
.check_pool_application(self
.metadata_pool_name
)
722 self
.check_pool_application(data_pool_name
)
724 # Turn off spurious standby count warnings from modifying max_mds in tests.
726 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
727 except CommandFailedError
as e
:
728 if e
.exitstatus
== 22:
729 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
734 if self
.fs_config
is not None:
735 log
.debug(f
"fs_config: {self.fs_config}")
736 max_mds
= self
.fs_config
.get('max_mds', 1)
738 self
.set_max_mds(max_mds
)
740 standby_replay
= self
.fs_config
.get('standby_replay', False)
741 self
.set_allow_standby_replay(standby_replay
)
743 # If absent will use the default value (60 seconds)
744 session_timeout
= self
.fs_config
.get('session_timeout', 60)
745 if session_timeout
!= 60:
746 self
.set_session_timeout(session_timeout
)
748 if self
.fs_config
.get('subvols', None) is not None:
749 log
.debug(f
"Creating {self.fs_config.get('subvols')} subvols "
750 f
"for filesystem '{self.name}'")
751 if not hasattr(self
._ctx
, "created_subvols"):
752 self
._ctx
.created_subvols
= dict()
754 subvols
= self
.fs_config
.get('subvols')
755 assert(isinstance(subvols
, dict))
756 assert(isinstance(subvols
['create'], int))
757 assert(subvols
['create'] > 0)
759 for sv
in range(0, subvols
['create']):
761 self
.mon_manager
.raw_cluster_cmd(
762 'fs', 'subvolume', 'create', self
.name
, sv_name
,
763 self
.fs_config
.get('subvol_options', ''))
765 if self
.name
not in self
._ctx
.created_subvols
:
766 self
._ctx
.created_subvols
[self
.name
] = []
768 subvol_path
= self
.mon_manager
.raw_cluster_cmd(
769 'fs', 'subvolume', 'getpath', self
.name
, sv_name
)
770 subvol_path
= subvol_path
.strip()
771 self
._ctx
.created_subvols
[self
.name
].append(subvol_path
)
773 log
.debug(f
"Not Creating any subvols for filesystem '{self.name}'")
776 self
.getinfo(refresh
= True)
778 # wait pgs to be clean
779 self
.mon_manager
.wait_for_clean()
781 def run_client_payload(self
, cmd
):
782 # avoid circular dep by importing here:
783 from tasks
.cephfs
.fuse_mount
import FuseMount
785 # Wait for at MDS daemons to be ready before mounting the
786 # ceph-fuse client in run_client_payload()
787 self
.wait_for_daemons()
789 d
= misc
.get_testdir(self
._ctx
)
790 m
= FuseMount(self
._ctx
, d
, "admin", self
.client_remote
, cephfs_name
=self
.name
)
792 m
.run_shell_payload(cmd
)
793 m
.umount_wait(require_clean
=True)
795 def _remove_pool(self
, name
, **kwargs
):
796 c
= f
'osd pool rm {name} {name} --yes-i-really-really-mean-it'
797 return self
.mon_manager
.ceph(c
, **kwargs
)
799 def rm(self
, **kwargs
):
800 c
= f
'fs rm {self.name} --yes-i-really-mean-it'
801 return self
.mon_manager
.ceph(c
, **kwargs
)
803 def remove_pools(self
, data_pools
):
804 self
._remove
_pool
(self
.get_metadata_pool_name())
805 for poolname
in data_pools
:
807 self
._remove
_pool
(poolname
)
808 except CommandFailedError
as e
:
809 # EBUSY, this data pool is used by two metadata pools, let the
811 if e
.exitstatus
== EBUSY
:
816 def destroy(self
, reset_obj_attrs
=True):
817 log
.info(f
'Destroying file system {self.name} and related pools')
820 log
.debug('already dead...')
823 data_pools
= self
.get_data_pool_names(refresh
=True)
825 # make sure no MDSs are attached to given FS.
829 self
.remove_pools(data_pools
)
834 self
.metadata_pool_name
= None
835 self
.data_pool_name
= None
836 self
.data_pools
= None
842 self
.getinfo(refresh
=True)
844 def check_pool_application(self
, pool_name
):
845 osd_map
= self
.mon_manager
.get_osd_dump_json()
846 for pool
in osd_map
['pools']:
847 if pool
['pool_name'] == pool_name
:
848 if "application_metadata" in pool
:
849 if not "cephfs" in pool
['application_metadata']:
850 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
851 format(pool_name
=pool_name
))
854 if getattr(self
._ctx
, "filesystem", None) == self
:
855 delattr(self
._ctx
, "filesystem")
859 Whether a filesystem exists in the mon's filesystem list
861 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
862 return self
.name
in [fs
['name'] for fs
in fs_list
]
864 def legacy_configured(self
):
866 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
867 the case, the caller should avoid using Filesystem.create
870 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
871 pools
= json
.loads(out_text
)
872 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
873 if metadata_pool_exists
:
874 self
.metadata_pool_name
= 'metadata'
875 except CommandFailedError
as e
:
876 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
877 # structured output (--format) from the CLI.
878 if e
.exitstatus
== 22:
879 metadata_pool_exists
= True
883 return metadata_pool_exists
886 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
888 # may raise FSMissing
889 def get_mds_map(self
, status
=None):
891 status
= self
.status()
892 return status
.get_fsmap(self
.id)['mdsmap']
894 def get_var(self
, var
, status
=None):
895 return self
.get_mds_map(status
=status
)[var
]
897 def set_dir_layout(self
, mount
, path
, layout
):
898 for name
, value
in layout
.items():
899 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
901 def add_data_pool(self
, name
, create
=True):
904 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
905 '--pg_num_min', str(self
.pg_num_min
))
906 except CommandFailedError
as e
:
907 if e
.exitstatus
== 22: # nautilus couldn't specify --pg_num_min option
908 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
909 str(self
.pg_num_min
))
912 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
913 self
.get_pool_names(refresh
= True)
914 for poolid
, fs_name
in self
.data_pools
.items():
917 raise RuntimeError("could not get just created pool '{0}'".format(name
))
919 def get_pool_names(self
, refresh
= False, status
= None):
920 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
922 status
= self
.status()
923 fsmap
= status
.get_fsmap(self
.id)
925 osd_map
= self
.mon_manager
.get_osd_dump_json()
927 for p
in osd_map
['pools']:
928 id_to_name
[p
['pool']] = p
['pool_name']
930 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
932 for data_pool
in fsmap
['mdsmap']['data_pools']:
933 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
935 def get_data_pool_name(self
, refresh
= False):
936 if refresh
or self
.data_pools
is None:
937 self
.get_pool_names(refresh
= True)
938 assert(len(self
.data_pools
) == 1)
939 return next(iter(self
.data_pools
.values()))
941 def get_data_pool_id(self
, refresh
= False):
943 Don't call this if you have multiple data pools
946 if refresh
or self
.data_pools
is None:
947 self
.get_pool_names(refresh
= True)
948 assert(len(self
.data_pools
) == 1)
949 return next(iter(self
.data_pools
.keys()))
951 def get_data_pool_names(self
, refresh
= False):
952 if refresh
or self
.data_pools
is None:
953 self
.get_pool_names(refresh
= True)
954 return list(self
.data_pools
.values())
956 def get_metadata_pool_name(self
):
957 return self
.metadata_pool_name
959 def set_data_pool_name(self
, name
):
960 if self
.id is not None:
961 raise RuntimeError("can't set filesystem name if its fscid is set")
962 self
.data_pool_name
= name
964 def get_pool_pg_num(self
, pool_name
):
965 pgs
= json
.loads(self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'get',
967 '--format=json-pretty'))
968 return int(pgs
['pg_num'])
970 def get_namespace_id(self
):
973 def get_pool_df(self
, pool_name
):
976 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
978 for pool_df
in self
._df
()['pools']:
979 if pool_df
['name'] == pool_name
:
980 return pool_df
['stats']
982 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
985 return self
._df
()['stats']['total_used_bytes']
987 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
989 Return true if all daemons are in one of active, standby, standby-replay, and
990 at least max_mds daemons are in 'active'.
992 Unlike most of Filesystem, this function is tolerant of new-style `fs`
993 commands being missing, because we are part of the ceph installation
994 process during upgrade suites, so must fall back to old style commands
995 when we get an EINVAL on a new style command.
999 # First, check to see that processes haven't exited with an error code
1000 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
1004 mds_map
= self
.get_mds_map(status
=status
)
1006 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
1008 for mds_id
, mds_status
in mds_map
['info'].items():
1009 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
1010 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
1012 elif mds_status
['state'] == 'up:active':
1015 log
.debug("are_daemons_healthy: {0}/{1}".format(
1016 active_count
, mds_map
['max_mds']
1019 if not skip_max_mds_check
:
1020 if active_count
> mds_map
['max_mds']:
1021 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
1023 elif active_count
== mds_map
['max_mds']:
1024 # The MDSMap says these guys are active, but let's check they really are
1025 for mds_id
, mds_status
in mds_map
['info'].items():
1026 if mds_status
['state'] == 'up:active':
1028 daemon_status
= self
.mds_tell(["status"], mds_id
=mds_status
['name'])
1029 except CommandFailedError
as cfe
:
1030 if cfe
.exitstatus
== errno
.EINVAL
:
1031 # Old version, can't do this check
1034 # MDS not even running
1037 if daemon_status
['state'] != 'up:active':
1038 # MDS hasn't taken the latest map yet
1045 log
.debug("are_daemons_healthy: skipping max_mds check")
1048 def get_daemon_names(self
, state
=None, status
=None):
1050 Return MDS daemon names of those daemons in the given state
1054 mdsmap
= self
.get_mds_map(status
)
1056 for mds_status
in sorted(mdsmap
['info'].values(),
1057 key
=lambda _
: _
['rank']):
1058 if mds_status
['state'] == state
or state
is None:
1059 result
.append(mds_status
['name'])
1063 def get_active_names(self
, status
=None):
1065 Return MDS daemon names of those daemons holding ranks
1068 :return: list of strings like ['a', 'b'], sorted by rank
1070 return self
.get_daemon_names("up:active", status
=status
)
1072 def get_all_mds_rank(self
, status
=None):
1073 mdsmap
= self
.get_mds_map(status
)
1075 for mds_status
in sorted(mdsmap
['info'].values(),
1076 key
=lambda _
: _
['rank']):
1077 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1078 result
.append(mds_status
['rank'])
1082 def get_rank(self
, rank
=None, status
=None):
1084 status
= self
.getinfo()
1087 return status
.get_rank(self
.id, rank
)
1089 def rank_restart(self
, rank
=0, status
=None):
1090 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1091 self
.mds_restart(mds_id
=name
)
1093 def rank_signal(self
, signal
, rank
=0, status
=None):
1094 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1095 self
.mds_signal(name
, signal
)
1097 def rank_freeze(self
, yes
, rank
=0):
1098 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
1100 def rank_repaired(self
, rank
):
1101 self
.mon_manager
.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self
.id, rank
))
1103 def rank_fail(self
, rank
=0):
1104 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
1106 def rank_is_running(self
, rank
=0, status
=None):
1107 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1108 return self
.mds_is_running(name
)
1110 def get_ranks(self
, status
=None):
1112 status
= self
.getinfo()
1113 return status
.get_ranks(self
.id)
1115 def get_damaged(self
, status
=None):
1117 status
= self
.getinfo()
1118 return status
.get_damaged(self
.id)
1120 def get_replays(self
, status
=None):
1122 status
= self
.getinfo()
1123 return status
.get_replays(self
.id)
1125 def get_replay(self
, rank
=0, status
=None):
1126 for replay
in self
.get_replays(status
=status
):
1127 if replay
['rank'] == rank
:
1131 def get_rank_names(self
, status
=None):
1133 Return MDS daemon names of those daemons holding a rank,
1134 sorted by rank. This includes e.g. up:replay/reconnect
1135 as well as active, but does not include standby or
1138 mdsmap
= self
.get_mds_map(status
)
1140 for mds_status
in sorted(mdsmap
['info'].values(),
1141 key
=lambda _
: _
['rank']):
1142 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1143 result
.append(mds_status
['name'])
1147 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
1149 Wait until all daemons are healthy
1154 timeout
= DAEMON_WAIT_TIMEOUT
1157 status
= self
.getinfo(refresh
=True)
1160 status
= self
.status()
1164 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
1170 if elapsed
> timeout
:
1171 log
.debug("status = {0}".format(status
))
1172 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1174 status
= self
.status()
1176 def dencoder(self
, obj_type
, obj_blob
):
1177 args
= [os
.path
.join(self
._prefix
, "ceph-dencoder"), 'type', obj_type
, 'import', '-', 'decode', 'dump_json']
1178 p
= self
.mon_manager
.controller
.run(args
=args
, stdin
=BytesIO(obj_blob
), stdout
=BytesIO())
1179 return p
.stdout
.getvalue()
1181 def rados(self
, *args
, **kwargs
):
1183 Callout to rados CLI.
1186 return self
.mon_manager
.do_rados(*args
, **kwargs
)
1188 def radosm(self
, *args
, **kwargs
):
1190 Interact with the metadata pool via rados CLI.
1193 return self
.rados(*args
, **kwargs
, pool
=self
.get_metadata_pool_name())
1195 def radosmo(self
, *args
, stdout
=BytesIO(), **kwargs
):
1197 Interact with the metadata pool via rados CLI. Get the stdout.
1200 return self
.radosm(*args
, **kwargs
, stdout
=stdout
).stdout
.getvalue()
1202 def get_metadata_object(self
, object_type
, object_id
):
1204 Retrieve an object from the metadata pool, pass it through
1205 ceph-dencoder to dump it to JSON, and return the decoded object.
1208 o
= self
.radosmo(['get', object_id
, '-'])
1209 j
= self
.dencoder(object_type
, o
)
1211 return json
.loads(j
)
1212 except (TypeError, ValueError):
1213 log
.error("Failed to decode JSON: '{0}'".format(j
))
1216 def get_journal_version(self
):
1218 Read the JournalPointer and Journal::Header objects to learn the version of
1221 journal_pointer_object
= '400.00000000'
1222 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1223 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1225 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1226 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1228 version
= journal_header_dump
['journal_header']['stream_format']
1229 log
.debug("Read journal version {0}".format(version
))
1233 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1235 return self
.rank_asok(command
, timeout
=timeout
)
1237 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1239 def mds_tell(self
, command
, mds_id
=None):
1241 return self
.rank_tell(command
)
1243 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{mds_id}", *command
))
1245 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1246 info
= self
.get_rank(rank
=rank
, status
=status
)
1247 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1249 def rank_tell(self
, command
, rank
=0, status
=None):
1251 out
= self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{self.id}:{rank}", *command
)
1252 return json
.loads(out
)
1253 except json
.decoder
.JSONDecodeError
:
1254 log
.error("could not decode: {}".format(out
))
1257 def ranks_tell(self
, command
, status
=None):
1259 status
= self
.status()
1261 for r
in status
.get_ranks(self
.id):
1262 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1263 out
.append((r
['rank'], result
))
1266 def ranks_perf(self
, f
, status
=None):
1267 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1269 for rank
, perf
in perf
:
1270 out
.append((rank
, f(perf
)))
1273 def read_cache(self
, path
, depth
=None, rank
=None):
1274 cmd
= ["dump", "tree", path
]
1275 if depth
is not None:
1276 cmd
.append(depth
.__str
__())
1277 result
= self
.rank_asok(cmd
, rank
=rank
)
1278 if result
is None or len(result
) == 0:
1279 raise RuntimeError("Path not found in cache: {0}".format(path
))
1283 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1285 Block until the MDS reaches a particular state, or a failure condition
1288 When there are multiple MDSs, succeed when exaclty one MDS is in the
1289 goal state, or fail when any MDS is in the reject state.
1291 :param goal_state: Return once the MDS is in this state
1292 :param reject: Fail if the MDS enters this state before the goal state
1293 :param timeout: Fail if this many seconds pass before reaching goal
1294 :return: number of seconds waited, rounded down to integer
1297 started_at
= time
.time()
1299 status
= self
.status()
1300 if rank
is not None:
1302 mds_info
= status
.get_rank(self
.id, rank
)
1303 current_state
= mds_info
['state'] if mds_info
else None
1304 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1306 mdsmap
= self
.get_mds_map(status
=status
)
1307 if rank
in mdsmap
['failed']:
1308 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1309 current_state
= None
1312 elif mds_id
is not None:
1313 # mds_info is None if no daemon with this ID exists in the map
1314 mds_info
= status
.get_mds(mds_id
)
1315 current_state
= mds_info
['state'] if mds_info
else None
1316 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1318 # In general, look for a single MDS
1319 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1320 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1321 current_state
= goal_state
1322 elif reject
in states
:
1323 current_state
= reject
1325 current_state
= None
1326 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1328 elapsed
= time
.time() - started_at
1329 if current_state
== goal_state
:
1330 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1332 elif reject
is not None and current_state
== reject
:
1333 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1334 elif timeout
is not None and elapsed
> timeout
:
1335 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1337 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1338 elapsed
, goal_state
, current_state
1343 def _read_data_xattr(self
, ino_no
, xattr_name
, obj_type
, pool
):
1345 pool
= self
.get_data_pool_name()
1347 obj_name
= "{0:x}.00000000".format(ino_no
)
1349 args
= ["getxattr", obj_name
, xattr_name
]
1351 proc
= self
.rados(args
, pool
=pool
, stdout
=BytesIO())
1352 except CommandFailedError
as e
:
1353 log
.error(e
.__str
__())
1354 raise ObjectNotFound(obj_name
)
1356 obj_blob
= proc
.stdout
.getvalue()
1357 return json
.loads(self
.dencoder(obj_type
, obj_blob
).strip())
1359 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1361 Write to an xattr of the 0th data object of an inode. Will
1362 succeed whether the object and/or xattr already exist or not.
1364 :param ino_no: integer inode number
1365 :param xattr_name: string name of the xattr
1366 :param data: byte array data to write to the xattr
1367 :param pool: name of data pool or None to use primary data pool
1371 pool
= self
.get_data_pool_name()
1373 obj_name
= "{0:x}.00000000".format(ino_no
)
1374 args
= ["setxattr", obj_name
, xattr_name
, data
]
1375 self
.rados(args
, pool
=pool
)
1377 def read_symlink(self
, ino_no
, pool
=None):
1378 return self
._read
_data
_xattr
(ino_no
, "symlink", "string_wrapper", pool
)
1380 def read_backtrace(self
, ino_no
, pool
=None):
1382 Read the backtrace from the data pool, return a dict in the format
1383 given by inode_backtrace_t::dump, which is something like:
1387 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1388 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1390 { "ino": 1099511627778,
1398 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1399 one data pool and that will be used.
1401 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1403 def read_layout(self
, ino_no
, pool
=None):
1405 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1408 "stripe_unit": 4194304,
1410 "object_size": 4194304,
1415 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1416 one data pool and that will be used.
1418 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1420 def _enumerate_data_objects(self
, ino
, size
):
1422 Get the list of expected data objects for a range, and the list of objects
1425 :return a tuple of two lists of strings (expected, actual)
1427 stripe_size
= 1024 * 1024 * 4
1429 size
= max(stripe_size
, size
)
1432 "{0:x}.{1:08x}".format(ino
, n
)
1433 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1436 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name(), stdout
=StringIO()).stdout
.getvalue().split("\n")
1438 return want_objects
, exist_objects
1440 def data_objects_present(self
, ino
, size
):
1442 Check that *all* the expected data objects for an inode are present in the data pool
1445 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1446 missing
= set(want_objects
) - set(exist_objects
)
1449 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1454 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1457 def data_objects_absent(self
, ino
, size
):
1458 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1459 present
= set(want_objects
) & set(exist_objects
)
1462 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1467 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1470 def dirfrag_exists(self
, ino
, frag
):
1472 self
.radosm(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1473 except CommandFailedError
:
1478 def list_dirfrag(self
, dir_ino
):
1480 Read the named object and return the list of omap keys
1482 :return a list of 0 or more strings
1485 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1488 key_list_str
= self
.radosmo(["listomapkeys", dirfrag_obj_name
], stdout
=StringIO())
1489 except CommandFailedError
as e
:
1490 log
.error(e
.__str
__())
1491 raise ObjectNotFound(dirfrag_obj_name
)
1493 return key_list_str
.strip().split("\n") if key_list_str
else []
1495 def get_meta_of_fs_file(self
, dir_ino
, obj_name
, out
):
1497 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1498 warning : The splitting of directory is not considered here.
1501 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1503 self
.radosm(["getomapval", dirfrag_obj_name
, obj_name
+"_head", out
])
1504 except CommandFailedError
as e
:
1505 log
.error(e
.__str
__())
1506 raise ObjectNotFound(dir_ino
)
1508 def erase_metadata_objects(self
, prefix
):
1510 For all objects in the metadata pool matching the prefix,
1513 This O(N) with the number of objects in the pool, so only suitable
1514 for use on toy test filesystems.
1516 all_objects
= self
.radosmo(["ls"], stdout
=StringIO()).strip().split("\n")
1517 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1518 for o
in matching_objects
:
1519 self
.radosm(["rm", o
])
1521 def erase_mds_objects(self
, rank
):
1523 Erase all the per-MDS objects for a particular rank. This includes
1524 inotable, sessiontable, journal
1527 def obj_prefix(multiplier
):
1529 MDS object naming conventions like rank 1's
1530 journal is at 201.***
1532 return "%x." % (multiplier
* 0x100 + rank
)
1534 # MDS_INO_LOG_OFFSET
1535 self
.erase_metadata_objects(obj_prefix(2))
1536 # MDS_INO_LOG_BACKUP_OFFSET
1537 self
.erase_metadata_objects(obj_prefix(3))
1538 # MDS_INO_LOG_POINTER_OFFSET
1539 self
.erase_metadata_objects(obj_prefix(4))
1540 # MDSTables & SessionMap
1541 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1546 Override this to set a different
1550 def _make_rank(self
, rank
):
1551 return "{}:{}".format(self
.name
, rank
)
1553 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1554 # Tests frequently have [client] configuration that jacks up
1555 # the objecter log level (unlikely to be interesting here)
1556 # and does not set the mds log level (very interesting here)
1558 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1560 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1']
1562 if rank
is not None:
1563 base_args
.extend(["--rank", "%s" % str(rank
)])
1565 t1
= datetime
.datetime
.now()
1566 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1567 duration
= datetime
.datetime
.now() - t1
1568 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1569 base_args
+ args
, duration
, r
1574 def tool_remote(self
):
1576 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1577 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1578 so that tests can use this remote to go get locally written output files from the tools.
1580 return self
.mon_manager
.controller
1582 def journal_tool(self
, args
, rank
, quiet
=False):
1584 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1586 fs_rank
= self
._make
_rank
(rank
)
1587 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1589 def meta_tool(self
, args
, rank
, quiet
=False):
1591 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1593 fs_rank
= self
._make
_rank
(rank
)
1594 return self
._run
_tool
("cephfs-meta-injection", args
, fs_rank
, quiet
)
1596 def table_tool(self
, args
, quiet
=False):
1598 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1600 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1602 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1604 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1606 :param worker_count: if greater than 1, multiple workers will be run
1607 in parallel and the return value will be None
1612 for n
in range(0, worker_count
):
1613 if worker_count
> 1:
1614 # data-scan args first token is a command, followed by args to it.
1615 # insert worker arguments after the command.
1617 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1621 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1622 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1627 if worker_count
== 1:
1628 return workers
[0].value
1633 return self
.is_pool_full(self
.get_data_pool_name())
1635 def authorize(self
, client_id
, caps
=('/', 'rw')):
1637 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1640 client_id: client id that will be authorized
1641 caps: tuple containing the path and permission (can be r or rw)
1644 if isinstance(caps
[0], (tuple, list)):
1650 client_name
= 'client.' + client_id
1651 return self
.mon_manager
.raw_cluster_cmd('fs', 'authorize', self
.name
,
1654 def grow(self
, new_max_mds
, status
=None):
1655 oldmax
= self
.get_var('max_mds', status
=status
)
1656 assert(new_max_mds
> oldmax
)
1657 self
.set_max_mds(new_max_mds
)
1658 return self
.wait_for_daemons()
1660 def shrink(self
, new_max_mds
, status
=None):
1661 oldmax
= self
.get_var('max_mds', status
=status
)
1662 assert(new_max_mds
< oldmax
)
1663 self
.set_max_mds(new_max_mds
)
1664 return self
.wait_for_daemons()
1666 def run_scrub(self
, cmd
, rank
=0):
1667 return self
.rank_tell(["scrub"] + cmd
, rank
)
1669 def get_scrub_status(self
, rank
=0):
1670 return self
.run_scrub(["status"], rank
)
1672 def flush(self
, rank
=0):
1673 return self
.rank_tell(["flush", "journal"], rank
=rank
)
1675 def wait_until_scrub_complete(self
, result
=None, tag
=None, rank
=0, sleep
=30,
1676 timeout
=300, reverse
=False):
1677 # time out after "timeout" seconds and assume as done
1679 result
= "no active scrubs running"
1680 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
1682 out_json
= self
.rank_tell(["scrub", "status"], rank
=rank
)
1683 assert out_json
is not None
1685 if result
in out_json
['status']:
1686 log
.info("all active scrubs completed")
1689 if result
not in out_json
['status']:
1690 log
.info("all active scrubs completed")
1694 status
= out_json
['scrubs'][tag
]
1695 if status
is not None:
1696 log
.info(f
"scrub status for tag:{tag} - {status}")
1698 log
.info(f
"scrub has completed for tag:{tag}")
1701 # timed out waiting for scrub to complete
1704 def get_damage(self
, rank
=None):
1707 for info
in self
.get_ranks():
1709 result
[rank
] = self
.get_damage(rank
=rank
)
1712 return self
.rank_tell(['damage', 'ls'], rank
=rank
)