]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
8788070e7170c31df6ca33deb0bc3d6d925c070f
4 from gevent
import Greenlet
12 from io
import BytesIO
, StringIO
13 from errno
import EBUSY
15 from teuthology
.exceptions
import CommandFailedError
16 from teuthology
import misc
17 from teuthology
.nuke
import clear_firewall
18 from teuthology
.parallel
import parallel
19 from teuthology
import contextutil
20 from tasks
.ceph_manager
import write_conf
21 from tasks
import ceph_manager
24 log
= logging
.getLogger(__name__
)
27 DAEMON_WAIT_TIMEOUT
= 120
30 class FileLayout(object):
31 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
33 self
.pool_namespace
= pool_namespace
34 self
.stripe_unit
= stripe_unit
35 self
.stripe_count
= stripe_count
36 self
.object_size
= object_size
39 def load_from_ceph(layout_str
):
44 if self
.pool
is not None:
45 yield ("pool", self
.pool
)
46 if self
.pool_namespace
:
47 yield ("pool_namespace", self
.pool_namespace
)
48 if self
.stripe_unit
is not None:
49 yield ("stripe_unit", self
.stripe_unit
)
50 if self
.stripe_count
is not None:
51 yield ("stripe_count", self
.stripe_count
)
52 if self
.object_size
is not None:
53 yield ("object_size", self
.stripe_size
)
55 class ObjectNotFound(Exception):
56 def __init__(self
, object_name
):
57 self
._object
_name
= object_name
60 return "Object not found: '{0}'".format(self
._object
_name
)
62 class FSMissing(Exception):
63 def __init__(self
, ident
):
67 return f
"File system {self.ident} does not exist in the map"
69 class FSStatus(object):
71 Operations on a snapshot of the FSMap.
73 def __init__(self
, mon_manager
, epoch
=None):
74 self
.mon
= mon_manager
75 cmd
= ["fs", "dump", "--format=json"]
77 cmd
.append(str(epoch
))
78 self
.map = json
.loads(self
.mon
.raw_cluster_cmd(*cmd
))
81 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self
, key
):
86 Get a field from the fsmap.
90 def get_filesystems(self
):
92 Iterator for all filesystems.
94 for fs
in self
.map['filesystems']:
99 Iterator for all the mds_info components in the FSMap.
101 for info
in self
.map['standbys']:
103 for fs
in self
.map['filesystems']:
104 for info
in fs
['mdsmap']['info'].values():
107 def get_standbys(self
):
109 Iterator for all standbys.
111 for info
in self
.map['standbys']:
114 def get_fsmap(self
, fscid
):
116 Get the fsmap for the given FSCID.
118 for fs
in self
.map['filesystems']:
119 if fscid
is None or fs
['id'] == fscid
:
121 raise FSMissing(fscid
)
123 def get_fsmap_byname(self
, name
):
125 Get the fsmap for the given file system name.
127 for fs
in self
.map['filesystems']:
128 if name
is None or fs
['mdsmap']['fs_name'] == name
:
130 raise FSMissing(name
)
132 def get_replays(self
, fscid
):
134 Get the standby:replay MDS for the given FSCID.
136 fs
= self
.get_fsmap(fscid
)
137 for info
in fs
['mdsmap']['info'].values():
138 if info
['state'] == 'up:standby-replay':
141 def get_ranks(self
, fscid
):
143 Get the ranks for the given FSCID.
145 fs
= self
.get_fsmap(fscid
)
146 for info
in fs
['mdsmap']['info'].values():
147 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
150 def get_damaged(self
, fscid
):
152 Get the damaged ranks for the given FSCID.
154 fs
= self
.get_fsmap(fscid
)
155 return fs
['mdsmap']['damaged']
157 def get_rank(self
, fscid
, rank
):
159 Get the rank for the given FSCID.
161 for info
in self
.get_ranks(fscid
):
162 if info
['rank'] == rank
:
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
166 def get_mds(self
, name
):
168 Get the info for the given MDS name.
170 for info
in self
.get_all():
171 if info
['name'] == name
:
175 def get_mds_addr(self
, name
):
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
179 info
= self
.get_mds(name
)
183 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
184 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
186 def get_mds_addrs(self
, name
):
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
190 info
= self
.get_mds(name
)
192 return [e
['addr'] for e
in info
['addrs']['addrvec']]
194 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
197 def get_mds_gid(self
, gid
):
199 Get the info for the given MDS gid.
201 for info
in self
.get_all():
202 if info
['gid'] == gid
:
206 def hadfailover(self
, status
):
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
211 for fs
in status
.map['filesystems']:
212 for info
in fs
['mdsmap']['info'].values():
213 oldinfo
= self
.get_mds_gid(info
['gid'])
214 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
219 class CephCluster(object):
221 def admin_remote(self
):
222 first_mon
= misc
.get_first_mon(self
._ctx
, None)
223 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
226 def __init__(self
, ctx
) -> None:
228 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
230 def get_config(self
, key
, service_type
=None):
232 Get config from mon by default, or a specific service if caller asks for it
234 if service_type
is None:
237 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
238 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
240 def set_ceph_conf(self
, subsys
, key
, value
):
241 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
242 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
243 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
244 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
247 def clear_ceph_conf(self
, subsys
, key
):
248 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
249 write_conf(self
._ctx
)
251 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
254 command
.insert(0, '--format=json')
255 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
256 response_data
= proc
.stdout
.getvalue().strip()
257 if len(response_data
) > 0:
258 j
= json
.loads(response_data
)
259 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
260 log
.debug(f
"_json_asok output\n{pretty}")
263 log
.debug("_json_asok output empty")
266 def is_addr_blocklisted(self
, addr
):
267 blocklist
= json
.loads(self
.mon_manager
.raw_cluster_cmd(
268 "osd", "dump", "--format=json"))['blocklist']
269 if addr
in blocklist
:
271 log
.warn(f
'The address {addr} is not blocklisted')
275 class MDSCluster(CephCluster
):
277 Collective operations on all the MDS daemons in the Ceph cluster. These
278 daemons may be in use by various Filesystems.
280 For the benefit of pre-multi-filesystem tests, this class is also
281 a parent of Filesystem. The correct way to use MDSCluster going forward is
282 as a separate instance outside of your (multiple) Filesystem instances.
285 def __init__(self
, ctx
):
286 super(MDSCluster
, self
).__init
__(ctx
)
290 # do this dynamically because the list of ids may change periodically with cephadm
291 return list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'mds'))
294 def mds_daemons(self
):
295 return dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
297 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
299 Call a callback for a single named MDS, or for all.
301 Note that the parallelism here isn't for performance, it's to avoid being overly kind
302 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
303 avoid being overly kind by executing them in a particular order. However, some actions
304 don't cope with being done in parallel, so it's optional (`in_parallel`)
306 :param mds_id: MDS daemon name, or None
307 :param cb: Callback taking single argument of MDS daemon name
308 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
313 with
parallel() as p
:
314 for mds_id
in self
.mds_ids
:
317 for mds_id
in self
.mds_ids
:
322 def get_config(self
, key
, service_type
=None):
324 get_config specialization of service_type="mds"
326 if service_type
!= "mds":
327 return super(MDSCluster
, self
).get_config(key
, service_type
)
329 # Some tests stop MDS daemons, don't send commands to a dead one:
330 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
331 service_id
= random
.sample(running_daemons
, 1)[0]
332 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
334 def mds_stop(self
, mds_id
=None):
336 Stop the MDS daemon process(se). If it held a rank, that rank
337 will eventually go laggy.
339 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
341 def mds_fail(self
, mds_id
=None):
343 Inform MDSMonitor of the death of the daemon process(es). If it held
344 a rank, that rank will be relinquished.
346 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
348 def mds_restart(self
, mds_id
=None):
349 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
351 def mds_fail_restart(self
, mds_id
=None):
353 Variation on restart that includes marking MDSs as failed, so that doing this
354 operation followed by waiting for healthy daemon states guarantees that they
355 have gone down and come up, rather than potentially seeing the healthy states
356 that existed before the restart.
358 def _fail_restart(id_
):
359 self
.mds_daemons
[id_
].stop()
360 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
361 self
.mds_daemons
[id_
].restart()
363 self
._one
_or
_all
(mds_id
, _fail_restart
)
365 def mds_signal(self
, mds_id
, sig
, silent
=False):
369 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
371 def newfs(self
, name
='cephfs', create
=True):
372 return Filesystem(self
._ctx
, name
=name
, create
=create
)
374 def status(self
, epoch
=None):
375 return FSStatus(self
.mon_manager
, epoch
)
377 def get_standby_daemons(self
):
378 return set([s
['name'] for s
in self
.status().get_standbys()])
380 def get_mds_hostnames(self
):
382 for mds_id
in self
.mds_ids
:
383 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
384 result
.add(mds_remote
.hostname
)
388 def set_clients_block(self
, blocked
, mds_id
=None):
390 Block (using iptables) client communications to this MDS. Be careful: if
391 other services are running on this MDS, or other MDSs try to talk to this
392 MDS, their communications may also be blocked as collatoral damage.
394 :param mds_id: Optional ID of MDS to block, default to all
397 da_flag
= "-A" if blocked
else "-D"
399 def set_block(_mds_id
):
400 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
401 status
= self
.status()
403 addr
= status
.get_mds_addr(_mds_id
)
404 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
407 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
410 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
411 "comment", "--comment", "teuthology"])
413 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
415 def set_inter_mds_block(self
, blocked
, mds_rank_1
, mds_rank_2
):
417 Block (using iptables) communications from a provided MDS to other MDSs.
418 Block all ports that an MDS uses for communication.
420 :param blocked: True to block the MDS, False otherwise
421 :param mds_rank_1: MDS rank
422 :param mds_rank_2: MDS rank
425 da_flag
= "-A" if blocked
else "-D"
427 def set_block(mds_ids
):
428 status
= self
.status()
431 remote
= self
.mon_manager
.find_remote('mds', mds
)
432 addrs
= status
.get_mds_addrs(mds
)
434 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
436 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
437 "comment", "--comment", "teuthology"])
441 remote
= self
.mon_manager
.find_remote('mds', mds
)
442 addrs
= status
.get_mds_addrs(mds
)
444 ip_str
, port_str
= re
.match("(.+):(.+)", addr
).groups()
446 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
449 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
450 "comment", "--comment", "teuthology"])
452 self
._one
_or
_all
((mds_rank_1
, mds_rank_2
), set_block
, in_parallel
=False)
454 def clear_firewall(self
):
455 clear_firewall(self
._ctx
)
457 def get_mds_info(self
, mds_id
):
458 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
460 def is_pool_full(self
, pool_name
):
461 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
463 if pool
['pool_name'] == pool_name
:
464 return 'full' in pool
['flags_names'].split(",")
466 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
468 def delete_all_filesystems(self
):
470 Remove all filesystems that exist, and any pools in use by them.
472 for fs
in self
.status().get_filesystems():
473 Filesystem(ctx
=self
._ctx
, fscid
=fs
['id']).destroy()
476 def beacon_timeout(self
):
478 Generate an acceptable timeout for the mons to drive some MDSMap change
479 because of missed beacons from some MDS. This involves looking up the
480 grace period in use by the mons and adding an acceptable buffer.
483 grace
= float(self
.get_config("mds_beacon_grace", service_type
="mon"))
487 class Filesystem(MDSCluster
):
489 This object is for driving a CephFS filesystem. The MDS daemons driven by
490 MDSCluster may be shared with other Filesystems.
492 def __init__(self
, ctx
, fs_config
={}, fscid
=None, name
=None, create
=False):
493 super(Filesystem
, self
).__init
__(ctx
)
497 self
.metadata_pool_name
= None
498 self
.metadata_overlay
= False
499 self
.data_pool_name
= None
500 self
.data_pools
= None
501 self
.fs_config
= fs_config
502 self
.ec_profile
= fs_config
.get('ec_profile')
504 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
505 self
.client_id
= client_list
[0]
506 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
509 if fscid
is not None:
510 raise RuntimeError("cannot specify fscid when creating fs")
511 if create
and not self
.legacy_configured():
514 if fscid
is not None:
516 self
.getinfo(refresh
= True)
518 # Stash a reference to the first created filesystem on ctx, so
519 # that if someone drops to the interactive shell they can easily
521 if not hasattr(self
._ctx
, "filesystem"):
522 self
._ctx
.filesystem
= self
526 return not bool(self
.get_mds_map())
530 def get_task_status(self
, status_key
):
531 return self
.mon_manager
.get_service_task_status("mds", status_key
)
533 def getinfo(self
, refresh
= False):
534 status
= self
.status()
535 if self
.id is not None:
536 fsmap
= status
.get_fsmap(self
.id)
537 elif self
.name
is not None:
538 fsmap
= status
.get_fsmap_byname(self
.name
)
540 fss
= [fs
for fs
in status
.get_filesystems()]
544 raise RuntimeError("no file system available")
546 raise RuntimeError("more than one file system available")
547 self
.id = fsmap
['id']
548 self
.name
= fsmap
['mdsmap']['fs_name']
549 self
.get_pool_names(status
= status
, refresh
= refresh
)
552 def set_metadata_overlay(self
, overlay
):
553 if self
.id is not None:
554 raise RuntimeError("cannot specify fscid when configuring overlay")
555 self
.metadata_overlay
= overlay
557 def reach_max_mds(self
):
558 status
= self
.wait_for_daemons()
559 mds_map
= self
.get_mds_map(status
=status
)
560 assert(mds_map
['in'] == list(range(0, mds_map
['max_mds'])))
563 self
.mon_manager
.raw_cluster_cmd("fs", "reset", str(self
.name
), '--yes-i-really-mean-it')
566 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
568 def set_flag(self
, var
, *args
):
569 a
= map(lambda x
: str(x
).lower(), args
)
570 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
572 def set_allow_multifs(self
, yes
=True):
573 self
.set_flag("enable_multiple", yes
)
575 def set_var(self
, var
, *args
):
576 a
= map(lambda x
: str(x
).lower(), args
)
577 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
579 def set_down(self
, down
=True):
580 self
.set_var("down", str(down
).lower())
582 def set_joinable(self
, joinable
=True):
583 self
.set_var("joinable", joinable
)
585 def set_max_mds(self
, max_mds
):
586 self
.set_var("max_mds", "%d" % max_mds
)
588 def set_session_timeout(self
, timeout
):
589 self
.set_var("session_timeout", "%d" % timeout
)
591 def set_allow_standby_replay(self
, yes
):
592 self
.set_var("allow_standby_replay", yes
)
594 def set_allow_new_snaps(self
, yes
):
595 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
597 def compat(self
, *args
):
598 a
= map(lambda x
: str(x
).lower(), args
)
599 self
.mon_manager
.raw_cluster_cmd("fs", "compat", self
.name
, *a
)
601 def add_compat(self
, *args
):
602 self
.compat("add_compat", *args
)
604 def add_incompat(self
, *args
):
605 self
.compat("add_incompat", *args
)
607 def rm_compat(self
, *args
):
608 self
.compat("rm_compat", *args
)
610 def rm_incompat(self
, *args
):
611 self
.compat("rm_incompat", *args
)
613 def required_client_features(self
, *args
, **kwargs
):
614 c
= ["fs", "required_client_features", self
.name
, *args
]
615 return self
.mon_manager
.run_cluster_cmd(args
=c
, **kwargs
)
617 # Since v15.1.0 the pg autoscale mode has been enabled as default,
618 # will let the pg autoscale mode to calculate the pg_num as needed.
619 # We set the pg_num_min to 64 to make sure that pg autoscale mode
620 # won't set the pg_num to low to fix Tracker#45434.
623 target_size_ratio
= 0.9
624 target_size_ratio_ec
= 0.9
627 if self
.name
is None:
629 if self
.metadata_pool_name
is None:
630 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
631 if self
.data_pool_name
is None:
632 data_pool_name
= "{0}_data".format(self
.name
)
634 data_pool_name
= self
.data_pool_name
636 # will use the ec pool to store the data and a small amount of
637 # metadata still goes to the primary data pool for all files.
638 if not self
.metadata_overlay
and self
.ec_profile
and 'disabled' not in self
.ec_profile
:
639 self
.target_size_ratio
= 0.05
641 log
.debug("Creating filesystem '{0}'".format(self
.name
))
643 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
644 self
.metadata_pool_name
,
645 '--pg_num_min', str(self
.pg_num_min
))
647 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
648 data_pool_name
, str(self
.pg_num
),
649 '--pg_num_min', str(self
.pg_num_min
),
650 '--target_size_ratio',
651 str(self
.target_size_ratio
))
653 if self
.metadata_overlay
:
654 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
655 self
.name
, self
.metadata_pool_name
, data_pool_name
,
656 '--allow-dangerous-metadata-overlay')
658 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
660 self
.metadata_pool_name
,
663 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
664 ec_data_pool_name
= data_pool_name
+ "_ec"
665 log
.debug("EC profile is %s", self
.ec_profile
)
666 cmd
= ['osd', 'erasure-code-profile', 'set', ec_data_pool_name
]
667 cmd
.extend(self
.ec_profile
)
668 self
.mon_manager
.raw_cluster_cmd(*cmd
)
669 self
.mon_manager
.raw_cluster_cmd(
670 'osd', 'pool', 'create', ec_data_pool_name
,
671 'erasure', ec_data_pool_name
,
672 '--pg_num_min', str(self
.pg_num_min
),
673 '--target_size_ratio', str(self
.target_size_ratio_ec
))
674 self
.mon_manager
.raw_cluster_cmd(
675 'osd', 'pool', 'set',
676 ec_data_pool_name
, 'allow_ec_overwrites', 'true')
677 self
.add_data_pool(ec_data_pool_name
, create
=False)
678 self
.check_pool_application(ec_data_pool_name
)
680 self
.run_client_payload(f
"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
682 self
.check_pool_application(self
.metadata_pool_name
)
683 self
.check_pool_application(data_pool_name
)
685 # Turn off spurious standby count warnings from modifying max_mds in tests.
687 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
688 except CommandFailedError
as e
:
689 if e
.exitstatus
== 22:
690 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
695 if self
.fs_config
is not None:
696 max_mds
= self
.fs_config
.get('max_mds', 1)
698 self
.set_max_mds(max_mds
)
700 standby_replay
= self
.fs_config
.get('standby_replay', False)
701 self
.set_allow_standby_replay(standby_replay
)
703 # If absent will use the default value (60 seconds)
704 session_timeout
= self
.fs_config
.get('session_timeout', 60)
705 if session_timeout
!= 60:
706 self
.set_session_timeout(session_timeout
)
708 self
.getinfo(refresh
= True)
710 # wait pgs to be clean
711 self
.mon_manager
.wait_for_clean()
713 def run_client_payload(self
, cmd
):
714 # avoid circular dep by importing here:
715 from tasks
.cephfs
.fuse_mount
import FuseMount
717 # Wait for at MDS daemons to be ready before mounting the
718 # ceph-fuse client in run_client_payload()
719 self
.wait_for_daemons()
721 d
= misc
.get_testdir(self
._ctx
)
722 m
= FuseMount(self
._ctx
, d
, "admin", self
.client_remote
, cephfs_name
=self
.name
)
724 m
.run_shell_payload(cmd
)
725 m
.umount_wait(require_clean
=True)
727 def _remove_pool(self
, name
, **kwargs
):
728 c
= f
'osd pool rm {name} {name} --yes-i-really-really-mean-it'
729 return self
.mon_manager
.ceph(c
, **kwargs
)
731 def rm(self
, **kwargs
):
732 c
= f
'fs rm {self.name} --yes-i-really-mean-it'
733 return self
.mon_manager
.ceph(c
, **kwargs
)
735 def remove_pools(self
, data_pools
):
736 self
._remove
_pool
(self
.get_metadata_pool_name())
737 for poolname
in data_pools
:
739 self
._remove
_pool
(poolname
)
740 except CommandFailedError
as e
:
741 # EBUSY, this data pool is used by two metadata pools, let the
743 if e
.exitstatus
== EBUSY
:
748 def destroy(self
, reset_obj_attrs
=True):
749 log
.info(f
'Destroying file system {self.name} and related pools')
752 log
.debug('already dead...')
755 data_pools
= self
.get_data_pool_names(refresh
=True)
757 # make sure no MDSs are attached to given FS.
761 self
.remove_pools(data_pools
)
766 self
.metadata_pool_name
= None
767 self
.data_pool_name
= None
768 self
.data_pools
= None
774 self
.getinfo(refresh
=True)
776 def check_pool_application(self
, pool_name
):
777 osd_map
= self
.mon_manager
.get_osd_dump_json()
778 for pool
in osd_map
['pools']:
779 if pool
['pool_name'] == pool_name
:
780 if "application_metadata" in pool
:
781 if not "cephfs" in pool
['application_metadata']:
782 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
783 format(pool_name
=pool_name
))
786 if getattr(self
._ctx
, "filesystem", None) == self
:
787 delattr(self
._ctx
, "filesystem")
791 Whether a filesystem exists in the mon's filesystem list
793 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
794 return self
.name
in [fs
['name'] for fs
in fs_list
]
796 def legacy_configured(self
):
798 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
799 the case, the caller should avoid using Filesystem.create
802 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
803 pools
= json
.loads(out_text
)
804 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
805 if metadata_pool_exists
:
806 self
.metadata_pool_name
= 'metadata'
807 except CommandFailedError
as e
:
808 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
809 # structured output (--format) from the CLI.
810 if e
.exitstatus
== 22:
811 metadata_pool_exists
= True
815 return metadata_pool_exists
818 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
820 # may raise FSMissing
821 def get_mds_map(self
, status
=None):
823 status
= self
.status()
824 return status
.get_fsmap(self
.id)['mdsmap']
826 def get_var(self
, var
, status
=None):
827 return self
.get_mds_map(status
=status
)[var
]
829 def set_dir_layout(self
, mount
, path
, layout
):
830 for name
, value
in layout
.items():
831 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
833 def add_data_pool(self
, name
, create
=True):
835 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
,
836 '--pg_num_min', str(self
.pg_num_min
))
837 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
838 self
.get_pool_names(refresh
= True)
839 for poolid
, fs_name
in self
.data_pools
.items():
842 raise RuntimeError("could not get just created pool '{0}'".format(name
))
844 def get_pool_names(self
, refresh
= False, status
= None):
845 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
847 status
= self
.status()
848 fsmap
= status
.get_fsmap(self
.id)
850 osd_map
= self
.mon_manager
.get_osd_dump_json()
852 for p
in osd_map
['pools']:
853 id_to_name
[p
['pool']] = p
['pool_name']
855 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
857 for data_pool
in fsmap
['mdsmap']['data_pools']:
858 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
860 def get_data_pool_name(self
, refresh
= False):
861 if refresh
or self
.data_pools
is None:
862 self
.get_pool_names(refresh
= True)
863 assert(len(self
.data_pools
) == 1)
864 return next(iter(self
.data_pools
.values()))
866 def get_data_pool_id(self
, refresh
= False):
868 Don't call this if you have multiple data pools
871 if refresh
or self
.data_pools
is None:
872 self
.get_pool_names(refresh
= True)
873 assert(len(self
.data_pools
) == 1)
874 return next(iter(self
.data_pools
.keys()))
876 def get_data_pool_names(self
, refresh
= False):
877 if refresh
or self
.data_pools
is None:
878 self
.get_pool_names(refresh
= True)
879 return list(self
.data_pools
.values())
881 def get_metadata_pool_name(self
):
882 return self
.metadata_pool_name
884 def set_data_pool_name(self
, name
):
885 if self
.id is not None:
886 raise RuntimeError("can't set filesystem name if its fscid is set")
887 self
.data_pool_name
= name
889 def get_pool_pg_num(self
, pool_name
):
890 pgs
= json
.loads(self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'get',
892 '--format=json-pretty'))
893 return int(pgs
['pg_num'])
895 def get_namespace_id(self
):
898 def get_pool_df(self
, pool_name
):
901 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
903 for pool_df
in self
._df
()['pools']:
904 if pool_df
['name'] == pool_name
:
905 return pool_df
['stats']
907 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
910 return self
._df
()['stats']['total_used_bytes']
912 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
914 Return true if all daemons are in one of active, standby, standby-replay, and
915 at least max_mds daemons are in 'active'.
917 Unlike most of Filesystem, this function is tolerant of new-style `fs`
918 commands being missing, because we are part of the ceph installation
919 process during upgrade suites, so must fall back to old style commands
920 when we get an EINVAL on a new style command.
924 # First, check to see that processes haven't exited with an error code
925 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
929 mds_map
= self
.get_mds_map(status
=status
)
931 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
933 for mds_id
, mds_status
in mds_map
['info'].items():
934 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
935 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
937 elif mds_status
['state'] == 'up:active':
940 log
.debug("are_daemons_healthy: {0}/{1}".format(
941 active_count
, mds_map
['max_mds']
944 if not skip_max_mds_check
:
945 if active_count
> mds_map
['max_mds']:
946 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
948 elif active_count
== mds_map
['max_mds']:
949 # The MDSMap says these guys are active, but let's check they really are
950 for mds_id
, mds_status
in mds_map
['info'].items():
951 if mds_status
['state'] == 'up:active':
953 daemon_status
= self
.mds_tell(["status"], mds_id
=mds_status
['name'])
954 except CommandFailedError
as cfe
:
955 if cfe
.exitstatus
== errno
.EINVAL
:
956 # Old version, can't do this check
959 # MDS not even running
962 if daemon_status
['state'] != 'up:active':
963 # MDS hasn't taken the latest map yet
970 log
.debug("are_daemons_healthy: skipping max_mds check")
973 def get_daemon_names(self
, state
=None, status
=None):
975 Return MDS daemon names of those daemons in the given state
979 mdsmap
= self
.get_mds_map(status
)
981 for mds_status
in sorted(mdsmap
['info'].values(),
982 key
=lambda _
: _
['rank']):
983 if mds_status
['state'] == state
or state
is None:
984 result
.append(mds_status
['name'])
988 def get_active_names(self
, status
=None):
990 Return MDS daemon names of those daemons holding ranks
993 :return: list of strings like ['a', 'b'], sorted by rank
995 return self
.get_daemon_names("up:active", status
=status
)
997 def get_all_mds_rank(self
, status
=None):
998 mdsmap
= self
.get_mds_map(status
)
1000 for mds_status
in sorted(mdsmap
['info'].values(),
1001 key
=lambda _
: _
['rank']):
1002 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1003 result
.append(mds_status
['rank'])
1007 def get_rank(self
, rank
=None, status
=None):
1009 status
= self
.getinfo()
1012 return status
.get_rank(self
.id, rank
)
1014 def rank_restart(self
, rank
=0, status
=None):
1015 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1016 self
.mds_restart(mds_id
=name
)
1018 def rank_signal(self
, signal
, rank
=0, status
=None):
1019 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
1020 self
.mds_signal(name
, signal
)
1022 def rank_freeze(self
, yes
, rank
=0):
1023 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
1025 def rank_fail(self
, rank
=0):
1026 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
1028 def get_ranks(self
, status
=None):
1030 status
= self
.getinfo()
1031 return status
.get_ranks(self
.id)
1033 def get_damaged(self
, status
=None):
1035 status
= self
.getinfo()
1036 return status
.get_damaged(self
.id)
1038 def get_replays(self
, status
=None):
1040 status
= self
.getinfo()
1041 return status
.get_replays(self
.id)
1043 def get_replay(self
, rank
=0, status
=None):
1044 for replay
in self
.get_replays(status
=status
):
1045 if replay
['rank'] == rank
:
1049 def get_rank_names(self
, status
=None):
1051 Return MDS daemon names of those daemons holding a rank,
1052 sorted by rank. This includes e.g. up:replay/reconnect
1053 as well as active, but does not include standby or
1056 mdsmap
= self
.get_mds_map(status
)
1058 for mds_status
in sorted(mdsmap
['info'].values(),
1059 key
=lambda _
: _
['rank']):
1060 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
1061 result
.append(mds_status
['name'])
1065 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
1067 Wait until all daemons are healthy
1072 timeout
= DAEMON_WAIT_TIMEOUT
1075 status
= self
.status()
1079 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
1085 if elapsed
> timeout
:
1086 log
.debug("status = {0}".format(status
))
1087 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1089 status
= self
.status()
1091 def dencoder(self
, obj_type
, obj_blob
):
1092 args
= [os
.path
.join(self
._prefix
, "ceph-dencoder"), 'type', obj_type
, 'import', '-', 'decode', 'dump_json']
1093 p
= self
.mon_manager
.controller
.run(args
=args
, stdin
=BytesIO(obj_blob
), stdout
=BytesIO())
1094 return p
.stdout
.getvalue()
1096 def rados(self
, *args
, **kwargs
):
1098 Callout to rados CLI.
1101 return self
.mon_manager
.do_rados(*args
, **kwargs
)
1103 def radosm(self
, *args
, **kwargs
):
1105 Interact with the metadata pool via rados CLI.
1108 return self
.rados(*args
, **kwargs
, pool
=self
.get_metadata_pool_name())
1110 def radosmo(self
, *args
, stdout
=BytesIO(), **kwargs
):
1112 Interact with the metadata pool via rados CLI. Get the stdout.
1115 return self
.radosm(*args
, **kwargs
, stdout
=stdout
).stdout
.getvalue()
1117 def get_metadata_object(self
, object_type
, object_id
):
1119 Retrieve an object from the metadata pool, pass it through
1120 ceph-dencoder to dump it to JSON, and return the decoded object.
1123 o
= self
.radosmo(['get', object_id
, '-'])
1124 j
= self
.dencoder(object_type
, o
)
1126 return json
.loads(j
)
1127 except (TypeError, ValueError):
1128 log
.error("Failed to decode JSON: '{0}'".format(j
))
1131 def get_journal_version(self
):
1133 Read the JournalPointer and Journal::Header objects to learn the version of
1136 journal_pointer_object
= '400.00000000'
1137 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1138 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1140 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1141 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1143 version
= journal_header_dump
['journal_header']['stream_format']
1144 log
.debug("Read journal version {0}".format(version
))
1148 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1150 return self
.rank_asok(command
, timeout
=timeout
)
1152 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1154 def mds_tell(self
, command
, mds_id
=None):
1156 return self
.rank_tell(command
)
1158 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{mds_id}", *command
))
1160 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1161 info
= self
.get_rank(rank
=rank
, status
=status
)
1162 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1164 def rank_tell(self
, command
, rank
=0, status
=None):
1166 out
= self
.mon_manager
.raw_cluster_cmd("tell", f
"mds.{self.id}:{rank}", *command
)
1167 return json
.loads(out
)
1168 except json
.decoder
.JSONDecodeError
:
1169 log
.error("could not decode: {}".format(out
))
1172 def ranks_tell(self
, command
, status
=None):
1174 status
= self
.status()
1176 for r
in status
.get_ranks(self
.id):
1177 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1178 out
.append((r
['rank'], result
))
1181 def ranks_perf(self
, f
, status
=None):
1182 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1184 for rank
, perf
in perf
:
1185 out
.append((rank
, f(perf
)))
1188 def read_cache(self
, path
, depth
=None):
1189 cmd
= ["dump", "tree", path
]
1190 if depth
is not None:
1191 cmd
.append(depth
.__str
__())
1192 result
= self
.mds_asok(cmd
)
1193 if len(result
) == 0:
1194 raise RuntimeError("Path not found in cache: {0}".format(path
))
1198 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1200 Block until the MDS reaches a particular state, or a failure condition
1203 When there are multiple MDSs, succeed when exaclty one MDS is in the
1204 goal state, or fail when any MDS is in the reject state.
1206 :param goal_state: Return once the MDS is in this state
1207 :param reject: Fail if the MDS enters this state before the goal state
1208 :param timeout: Fail if this many seconds pass before reaching goal
1209 :return: number of seconds waited, rounded down to integer
1212 started_at
= time
.time()
1214 status
= self
.status()
1215 if rank
is not None:
1217 mds_info
= status
.get_rank(self
.id, rank
)
1218 current_state
= mds_info
['state'] if mds_info
else None
1219 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1221 mdsmap
= self
.get_mds_map(status
=status
)
1222 if rank
in mdsmap
['failed']:
1223 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1224 current_state
= None
1227 elif mds_id
is not None:
1228 # mds_info is None if no daemon with this ID exists in the map
1229 mds_info
= status
.get_mds(mds_id
)
1230 current_state
= mds_info
['state'] if mds_info
else None
1231 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1233 # In general, look for a single MDS
1234 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1235 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1236 current_state
= goal_state
1237 elif reject
in states
:
1238 current_state
= reject
1240 current_state
= None
1241 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1243 elapsed
= time
.time() - started_at
1244 if current_state
== goal_state
:
1245 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1247 elif reject
is not None and current_state
== reject
:
1248 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1249 elif timeout
is not None and elapsed
> timeout
:
1250 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1252 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1253 elapsed
, goal_state
, current_state
1258 def _read_data_xattr(self
, ino_no
, xattr_name
, obj_type
, pool
):
1260 pool
= self
.get_data_pool_name()
1262 obj_name
= "{0:x}.00000000".format(ino_no
)
1264 args
= ["getxattr", obj_name
, xattr_name
]
1266 proc
= self
.rados(args
, pool
=pool
, stdout
=BytesIO())
1267 except CommandFailedError
as e
:
1268 log
.error(e
.__str
__())
1269 raise ObjectNotFound(obj_name
)
1271 obj_blob
= proc
.stdout
.getvalue()
1272 return json
.loads(self
.dencoder(obj_type
, obj_blob
).strip())
1274 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1276 Write to an xattr of the 0th data object of an inode. Will
1277 succeed whether the object and/or xattr already exist or not.
1279 :param ino_no: integer inode number
1280 :param xattr_name: string name of the xattr
1281 :param data: byte array data to write to the xattr
1282 :param pool: name of data pool or None to use primary data pool
1286 pool
= self
.get_data_pool_name()
1288 obj_name
= "{0:x}.00000000".format(ino_no
)
1289 args
= ["setxattr", obj_name
, xattr_name
, data
]
1290 self
.rados(args
, pool
=pool
)
1292 def read_symlink(self
, ino_no
, pool
=None):
1293 return self
._read
_data
_xattr
(ino_no
, "symlink", "string_wrapper", pool
)
1295 def read_backtrace(self
, ino_no
, pool
=None):
1297 Read the backtrace from the data pool, return a dict in the format
1298 given by inode_backtrace_t::dump, which is something like:
1302 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1303 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1305 { "ino": 1099511627778,
1313 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1314 one data pool and that will be used.
1316 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1318 def read_layout(self
, ino_no
, pool
=None):
1320 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1323 "stripe_unit": 4194304,
1325 "object_size": 4194304,
1330 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1331 one data pool and that will be used.
1333 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1335 def _enumerate_data_objects(self
, ino
, size
):
1337 Get the list of expected data objects for a range, and the list of objects
1340 :return a tuple of two lists of strings (expected, actual)
1342 stripe_size
= 1024 * 1024 * 4
1344 size
= max(stripe_size
, size
)
1347 "{0:x}.{1:08x}".format(ino
, n
)
1348 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1351 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name(), stdout
=StringIO()).stdout
.getvalue().split("\n")
1353 return want_objects
, exist_objects
1355 def data_objects_present(self
, ino
, size
):
1357 Check that *all* the expected data objects for an inode are present in the data pool
1360 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1361 missing
= set(want_objects
) - set(exist_objects
)
1364 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1369 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1372 def data_objects_absent(self
, ino
, size
):
1373 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1374 present
= set(want_objects
) & set(exist_objects
)
1377 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1382 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1385 def dirfrag_exists(self
, ino
, frag
):
1387 self
.radosm(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1388 except CommandFailedError
:
1393 def list_dirfrag(self
, dir_ino
):
1395 Read the named object and return the list of omap keys
1397 :return a list of 0 or more strings
1400 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1403 key_list_str
= self
.radosmo(["listomapkeys", dirfrag_obj_name
], stdout
=StringIO())
1404 except CommandFailedError
as e
:
1405 log
.error(e
.__str
__())
1406 raise ObjectNotFound(dirfrag_obj_name
)
1408 return key_list_str
.strip().split("\n") if key_list_str
else []
1410 def get_meta_of_fs_file(self
, dir_ino
, obj_name
, out
):
1412 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1413 warning : The splitting of directory is not considered here.
1416 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1418 self
.radosm(["getomapval", dirfrag_obj_name
, obj_name
+"_head", out
])
1419 except CommandFailedError
as e
:
1420 log
.error(e
.__str
__())
1421 raise ObjectNotFound(dir_ino
)
1423 def erase_metadata_objects(self
, prefix
):
1425 For all objects in the metadata pool matching the prefix,
1428 This O(N) with the number of objects in the pool, so only suitable
1429 for use on toy test filesystems.
1431 all_objects
= self
.radosmo(["ls"], stdout
=StringIO()).strip().split("\n")
1432 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1433 for o
in matching_objects
:
1434 self
.radosm(["rm", o
])
1436 def erase_mds_objects(self
, rank
):
1438 Erase all the per-MDS objects for a particular rank. This includes
1439 inotable, sessiontable, journal
1442 def obj_prefix(multiplier
):
1444 MDS object naming conventions like rank 1's
1445 journal is at 201.***
1447 return "%x." % (multiplier
* 0x100 + rank
)
1449 # MDS_INO_LOG_OFFSET
1450 self
.erase_metadata_objects(obj_prefix(2))
1451 # MDS_INO_LOG_BACKUP_OFFSET
1452 self
.erase_metadata_objects(obj_prefix(3))
1453 # MDS_INO_LOG_POINTER_OFFSET
1454 self
.erase_metadata_objects(obj_prefix(4))
1455 # MDSTables & SessionMap
1456 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1461 Override this to set a different
1465 def _make_rank(self
, rank
):
1466 return "{}:{}".format(self
.name
, rank
)
1468 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1469 # Tests frequently have [client] configuration that jacks up
1470 # the objecter log level (unlikely to be interesting here)
1471 # and does not set the mds log level (very interesting here)
1473 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1475 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1477 if rank
is not None:
1478 base_args
.extend(["--rank", "%s" % str(rank
)])
1480 t1
= datetime
.datetime
.now()
1481 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1482 duration
= datetime
.datetime
.now() - t1
1483 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1484 base_args
+ args
, duration
, r
1489 def tool_remote(self
):
1491 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1492 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1493 so that tests can use this remote to go get locally written output files from the tools.
1495 return self
.mon_manager
.controller
1497 def journal_tool(self
, args
, rank
, quiet
=False):
1499 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1501 fs_rank
= self
._make
_rank
(rank
)
1502 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1504 def meta_tool(self
, args
, rank
, quiet
=False):
1506 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1508 fs_rank
= self
._make
_rank
(rank
)
1509 return self
._run
_tool
("cephfs-meta-injection", args
, fs_rank
, quiet
)
1511 def table_tool(self
, args
, quiet
=False):
1513 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1515 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1517 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1519 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1521 :param worker_count: if greater than 1, multiple workers will be run
1522 in parallel and the return value will be None
1527 for n
in range(0, worker_count
):
1528 if worker_count
> 1:
1529 # data-scan args first token is a command, followed by args to it.
1530 # insert worker arguments after the command.
1532 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1536 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1537 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1542 if worker_count
== 1:
1543 return workers
[0].value
1548 return self
.is_pool_full(self
.get_data_pool_name())
1550 def authorize(self
, client_id
, caps
=('/', 'rw')):
1552 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1555 client_id: client id that will be authorized
1556 caps: tuple containing the path and permission (can be r or rw)
1559 client_name
= 'client.' + client_id
1560 return self
.mon_manager
.raw_cluster_cmd('fs', 'authorize', self
.name
,
1563 def grow(self
, new_max_mds
, status
=None):
1564 oldmax
= self
.get_var('max_mds', status
=status
)
1565 assert(new_max_mds
> oldmax
)
1566 self
.set_max_mds(new_max_mds
)
1567 return self
.wait_for_daemons()
1569 def shrink(self
, new_max_mds
, status
=None):
1570 oldmax
= self
.get_var('max_mds', status
=status
)
1571 assert(new_max_mds
< oldmax
)
1572 self
.set_max_mds(new_max_mds
)
1573 return self
.wait_for_daemons()
1575 def run_scrub(self
, cmd
, rank
=0):
1576 return self
.rank_tell(["scrub"] + cmd
, rank
)
1578 def get_scrub_status(self
, rank
=0):
1579 return self
.run_scrub(["status"], rank
)
1581 def wait_until_scrub_complete(self
, result
=None, tag
=None, rank
=0, sleep
=30,
1582 timeout
=300, reverse
=False):
1583 # time out after "timeout" seconds and assume as done
1585 result
= "no active scrubs running"
1586 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
1588 out_json
= self
.rank_tell(["scrub", "status"], rank
=rank
)
1589 assert out_json
is not None
1591 if result
in out_json
['status']:
1592 log
.info("all active scrubs completed")
1595 if result
not in out_json
['status']:
1596 log
.info("all active scrubs completed")
1600 status
= out_json
['scrubs'][tag
]
1601 if status
is not None:
1602 log
.info(f
"scrub status for tag:{tag} - {status}")
1604 log
.info(f
"scrub has completed for tag:{tag}")
1607 # timed out waiting for scrub to complete