]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
4 from gevent
import Greenlet
13 from teuthology
.exceptions
import CommandFailedError
14 from teuthology
import misc
15 from teuthology
.nuke
import clear_firewall
16 from teuthology
.parallel
import parallel
17 from tasks
.ceph_manager
import write_conf
18 from tasks
import ceph_manager
21 log
= logging
.getLogger(__name__
)
24 DAEMON_WAIT_TIMEOUT
= 120
27 class FileLayout(object):
28 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
30 self
.pool_namespace
= pool_namespace
31 self
.stripe_unit
= stripe_unit
32 self
.stripe_count
= stripe_count
33 self
.object_size
= object_size
36 def load_from_ceph(layout_str
):
41 if self
.pool
is not None:
42 yield ("pool", self
.pool
)
43 if self
.pool_namespace
:
44 yield ("pool_namespace", self
.pool_namespace
)
45 if self
.stripe_unit
is not None:
46 yield ("stripe_unit", self
.stripe_unit
)
47 if self
.stripe_count
is not None:
48 yield ("stripe_count", self
.stripe_count
)
49 if self
.object_size
is not None:
50 yield ("object_size", self
.stripe_size
)
52 class ObjectNotFound(Exception):
53 def __init__(self
, object_name
):
54 self
._object
_name
= object_name
57 return "Object not found: '{0}'".format(self
._object
_name
)
59 class FSStatus(object):
61 Operations on a snapshot of the FSMap.
63 def __init__(self
, mon_manager
):
64 self
.mon
= mon_manager
65 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
68 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
70 # Expose the fsmap for manual inspection.
71 def __getitem__(self
, key
):
73 Get a field from the fsmap.
77 def get_filesystems(self
):
79 Iterator for all filesystems.
81 for fs
in self
.map['filesystems']:
86 Iterator for all the mds_info components in the FSMap.
88 for info
in self
.map['standbys']:
90 for fs
in self
.map['filesystems']:
91 for info
in fs
['mdsmap']['info'].values():
94 def get_standbys(self
):
96 Iterator for all standbys.
98 for info
in self
.map['standbys']:
101 def get_fsmap(self
, fscid
):
103 Get the fsmap for the given FSCID.
105 for fs
in self
.map['filesystems']:
106 if fscid
is None or fs
['id'] == fscid
:
108 raise RuntimeError("FSCID {0} not in map".format(fscid
))
110 def get_fsmap_byname(self
, name
):
112 Get the fsmap for the given file system name.
114 for fs
in self
.map['filesystems']:
115 if name
is None or fs
['mdsmap']['fs_name'] == name
:
117 raise RuntimeError("FS {0} not in map".format(name
))
119 def get_replays(self
, fscid
):
121 Get the standby:replay MDS for the given FSCID.
123 fs
= self
.get_fsmap(fscid
)
124 for info
in fs
['mdsmap']['info'].values():
125 if info
['state'] == 'up:standby-replay':
128 def get_ranks(self
, fscid
):
130 Get the ranks for the given FSCID.
132 fs
= self
.get_fsmap(fscid
)
133 for info
in fs
['mdsmap']['info'].values():
134 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
137 def get_rank(self
, fscid
, rank
):
139 Get the rank for the given FSCID.
141 for info
in self
.get_ranks(fscid
):
142 if info
['rank'] == rank
:
144 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
146 def get_mds(self
, name
):
148 Get the info for the given MDS name.
150 for info
in self
.get_all():
151 if info
['name'] == name
:
155 def get_mds_addr(self
, name
):
157 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
159 info
= self
.get_mds(name
)
163 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
164 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
166 def get_mds_gid(self
, gid
):
168 Get the info for the given MDS gid.
170 for info
in self
.get_all():
171 if info
['gid'] == gid
:
175 def hadfailover(self
, status
):
177 Compares two statuses for mds failovers.
178 Returns True if there is a failover.
180 for fs
in status
.map['filesystems']:
181 for info
in fs
['mdsmap']['info'].values():
182 oldinfo
= self
.get_mds_gid(info
['gid'])
183 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
188 class CephCluster(object):
190 def admin_remote(self
):
191 first_mon
= misc
.get_first_mon(self
._ctx
, None)
192 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
195 def __init__(self
, ctx
):
197 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
199 def get_config(self
, key
, service_type
=None):
201 Get config from mon by default, or a specific service if caller asks for it
203 if service_type
is None:
206 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
207 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
209 def set_ceph_conf(self
, subsys
, key
, value
):
210 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
211 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
212 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
213 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
214 # used a different config path this won't work.
216 def clear_ceph_conf(self
, subsys
, key
):
217 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
218 write_conf(self
._ctx
)
220 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
223 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
224 response_data
= proc
.stdout
.getvalue()
225 log
.info("_json_asok output: {0}".format(response_data
))
226 if response_data
.strip():
227 return json
.loads(response_data
)
232 class MDSCluster(CephCluster
):
234 Collective operations on all the MDS daemons in the Ceph cluster. These
235 daemons may be in use by various Filesystems.
237 For the benefit of pre-multi-filesystem tests, this class is also
238 a parent of Filesystem. The correct way to use MDSCluster going forward is
239 as a separate instance outside of your (multiple) Filesystem instances.
241 def __init__(self
, ctx
):
242 super(MDSCluster
, self
).__init
__(ctx
)
244 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
246 if len(self
.mds_ids
) == 0:
247 raise RuntimeError("This task requires at least one MDS")
249 if hasattr(self
._ctx
, "daemons"):
250 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
251 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
253 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
255 Call a callback for a single named MDS, or for all.
257 Note that the parallelism here isn't for performance, it's to avoid being overly kind
258 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
259 avoid being overly kind by executing them in a particular order. However, some actions
260 don't cope with being done in parallel, so it's optional (`in_parallel`)
262 :param mds_id: MDS daemon name, or None
263 :param cb: Callback taking single argument of MDS daemon name
264 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
268 with
parallel() as p
:
269 for mds_id
in self
.mds_ids
:
272 for mds_id
in self
.mds_ids
:
277 def get_config(self
, key
, service_type
=None):
279 get_config specialization of service_type="mds"
281 if service_type
!= "mds":
282 return super(MDSCluster
, self
).get_config(key
, service_type
)
284 # Some tests stop MDS daemons, don't send commands to a dead one:
285 service_id
= random
.sample(filter(lambda i
: self
.mds_daemons
[i
].running(), self
.mds_daemons
), 1)[0]
286 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
288 def mds_stop(self
, mds_id
=None):
290 Stop the MDS daemon process(se). If it held a rank, that rank
291 will eventually go laggy.
293 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
295 def mds_fail(self
, mds_id
=None):
297 Inform MDSMonitor of the death of the daemon process(es). If it held
298 a rank, that rank will be relinquished.
300 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
302 def mds_restart(self
, mds_id
=None):
303 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
305 def mds_fail_restart(self
, mds_id
=None):
307 Variation on restart that includes marking MDSs as failed, so that doing this
308 operation followed by waiting for healthy daemon states guarantees that they
309 have gone down and come up, rather than potentially seeing the healthy states
310 that existed before the restart.
312 def _fail_restart(id_
):
313 self
.mds_daemons
[id_
].stop()
314 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
315 self
.mds_daemons
[id_
].restart()
317 self
._one
_or
_all
(mds_id
, _fail_restart
)
319 def mds_signal(self
, mds_id
, sig
, silent
=False):
323 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
325 def newfs(self
, name
='cephfs', create
=True):
326 return Filesystem(self
._ctx
, name
=name
, create
=create
)
329 return FSStatus(self
.mon_manager
)
331 def delete_all_filesystems(self
):
333 Remove all filesystems that exist, and any pools in use by them.
335 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
338 pool_id_name
[pool
['pool']] = pool
['pool_name']
340 # mark cluster down for each fs to prevent churn during deletion
341 status
= self
.status()
342 for fs
in status
.get_filesystems():
343 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(fs
['mdsmap']['fs_name']))
345 # get a new copy as actives may have since changed
346 status
= self
.status()
347 for fs
in status
.get_filesystems():
348 mdsmap
= fs
['mdsmap']
349 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
351 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
352 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
353 metadata_pool
, metadata_pool
,
354 '--yes-i-really-really-mean-it')
355 for data_pool
in mdsmap
['data_pools']:
356 data_pool
= pool_id_name
[data_pool
]
358 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
359 data_pool
, data_pool
,
360 '--yes-i-really-really-mean-it')
361 except CommandFailedError
as e
:
362 if e
.exitstatus
== 16: # EBUSY, this data pool is used
363 pass # by two metadata pools, let the 2nd
364 else: # pass delete it
367 def get_standby_daemons(self
):
368 return set([s
['name'] for s
in self
.status().get_standbys()])
370 def get_mds_hostnames(self
):
372 for mds_id
in self
.mds_ids
:
373 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
374 result
.add(mds_remote
.hostname
)
378 def set_clients_block(self
, blocked
, mds_id
=None):
380 Block (using iptables) client communications to this MDS. Be careful: if
381 other services are running on this MDS, or other MDSs try to talk to this
382 MDS, their communications may also be blocked as collatoral damage.
384 :param mds_id: Optional ID of MDS to block, default to all
387 da_flag
= "-A" if blocked
else "-D"
389 def set_block(_mds_id
):
390 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
391 status
= self
.status()
393 addr
= status
.get_mds_addr(_mds_id
)
394 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
397 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
398 "comment", "--comment", "teuthology"])
400 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
401 "comment", "--comment", "teuthology"])
403 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
405 def clear_firewall(self
):
406 clear_firewall(self
._ctx
)
408 def get_mds_info(self
, mds_id
):
409 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
411 def is_pool_full(self
, pool_name
):
412 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
414 if pool
['pool_name'] == pool_name
:
415 return 'full' in pool
['flags_names'].split(",")
417 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
419 class Filesystem(MDSCluster
):
421 This object is for driving a CephFS filesystem. The MDS daemons driven by
422 MDSCluster may be shared with other Filesystems.
424 def __init__(self
, ctx
, fscid
=None, name
=None, create
=False,
426 super(Filesystem
, self
).__init
__(ctx
)
429 self
.ec_profile
= ec_profile
431 self
.metadata_pool_name
= None
432 self
.metadata_overlay
= False
433 self
.data_pool_name
= None
434 self
.data_pools
= None
436 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
437 self
.client_id
= client_list
[0]
438 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
441 if fscid
is not None:
442 raise RuntimeError("cannot specify fscid when creating fs")
443 if create
and not self
.legacy_configured():
446 if fscid
is not None:
448 self
.getinfo(refresh
= True)
450 # Stash a reference to the first created filesystem on ctx, so
451 # that if someone drops to the interactive shell they can easily
453 if not hasattr(self
._ctx
, "filesystem"):
454 self
._ctx
.filesystem
= self
456 def get_task_status(self
, status_key
):
457 return self
.mon_manager
.get_service_task_status("mds", status_key
)
459 def getinfo(self
, refresh
= False):
460 status
= self
.status()
461 if self
.id is not None:
462 fsmap
= status
.get_fsmap(self
.id)
463 elif self
.name
is not None:
464 fsmap
= status
.get_fsmap_byname(self
.name
)
466 fss
= [fs
for fs
in status
.get_filesystems()]
470 raise RuntimeError("no file system available")
472 raise RuntimeError("more than one file system available")
473 self
.id = fsmap
['id']
474 self
.name
= fsmap
['mdsmap']['fs_name']
475 self
.get_pool_names(status
= status
, refresh
= refresh
)
478 def set_metadata_overlay(self
, overlay
):
479 if self
.id is not None:
480 raise RuntimeError("cannot specify fscid when configuring overlay")
481 self
.metadata_overlay
= overlay
483 def deactivate(self
, rank
):
485 raise RuntimeError("invalid rank")
487 raise RuntimeError("cannot deactivate rank 0")
488 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
490 def reach_max_mds(self
):
491 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
492 status
= self
.getinfo()
493 mds_map
= self
.get_mds_map(status
=status
)
494 max_mds
= mds_map
['max_mds']
496 count
= len(list(self
.get_ranks(status
=status
)))
499 # deactivate mds in decending order
500 status
= self
.wait_for_daemons(status
=status
, skip_max_mds_check
=True)
501 while count
> max_mds
:
502 targets
= sorted(self
.get_ranks(status
=status
), key
=lambda r
: r
['rank'], reverse
=True)
504 log
.info("deactivating rank %d" % target
['rank'])
505 self
.deactivate(target
['rank'])
506 status
= self
.wait_for_daemons(skip_max_mds_check
=True)
507 count
= len(list(self
.get_ranks(status
=status
)))
509 # In Mimic, deactivation is done automatically:
510 log
.info("Error:\n{}".format(traceback
.format_exc()))
511 status
= self
.wait_for_daemons()
513 status
= self
.wait_for_daemons()
515 mds_map
= self
.get_mds_map(status
=status
)
516 assert(mds_map
['max_mds'] == max_mds
)
517 assert(mds_map
['in'] == list(range(0, max_mds
)))
520 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
522 def set_flag(self
, var
, *args
):
523 a
= map(lambda x
: str(x
).lower(), args
)
524 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
526 def set_allow_multifs(self
, yes
=True):
527 self
.set_flag("enable_multiple", yes
)
529 def set_var(self
, var
, *args
):
530 a
= map(lambda x
: str(x
).lower(), args
)
531 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
533 def set_down(self
, down
=True):
534 self
.set_var("down", str(down
).lower())
536 def set_joinable(self
, joinable
=True):
537 self
.set_var("joinable", joinable
)
539 def set_max_mds(self
, max_mds
):
540 self
.set_var("max_mds", "%d" % max_mds
)
542 def set_allow_standby_replay(self
, yes
):
543 self
.set_var("allow_standby_replay", yes
)
545 def set_allow_new_snaps(self
, yes
):
546 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
548 # In Octopus+, the PG count can be omitted to use the default. We keep the
549 # hard-coded value for deployments of Mimic/Nautilus.
553 if self
.name
is None:
555 if self
.metadata_pool_name
is None:
556 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
557 if self
.data_pool_name
is None:
558 data_pool_name
= "{0}_data".format(self
.name
)
560 data_pool_name
= self
.data_pool_name
562 log
.info("Creating filesystem '{0}'".format(self
.name
))
564 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
565 self
.metadata_pool_name
, self
.pgs_per_fs_pool
.__str
__())
566 if self
.metadata_overlay
:
567 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
568 self
.name
, self
.metadata_pool_name
, data_pool_name
,
569 '--allow-dangerous-metadata-overlay')
571 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
572 log
.info("EC profile is %s", self
.ec_profile
)
573 cmd
= ['osd', 'erasure-code-profile', 'set', data_pool_name
]
574 cmd
.extend(self
.ec_profile
)
575 self
.mon_manager
.raw_cluster_cmd(*cmd
)
576 self
.mon_manager
.raw_cluster_cmd(
577 'osd', 'pool', 'create',
578 data_pool_name
, self
.pgs_per_fs_pool
.__str
__(), 'erasure',
580 self
.mon_manager
.raw_cluster_cmd(
581 'osd', 'pool', 'set',
582 data_pool_name
, 'allow_ec_overwrites', 'true')
584 self
.mon_manager
.raw_cluster_cmd(
585 'osd', 'pool', 'create',
586 data_pool_name
, self
.pgs_per_fs_pool
.__str
__())
587 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
589 self
.metadata_pool_name
,
592 self
.check_pool_application(self
.metadata_pool_name
)
593 self
.check_pool_application(data_pool_name
)
594 # Turn off spurious standby count warnings from modifying max_mds in tests.
596 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
597 except CommandFailedError
as e
:
598 if e
.exitstatus
== 22:
599 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
604 self
.getinfo(refresh
= True)
607 def check_pool_application(self
, pool_name
):
608 osd_map
= self
.mon_manager
.get_osd_dump_json()
609 for pool
in osd_map
['pools']:
610 if pool
['pool_name'] == pool_name
:
611 if "application_metadata" in pool
:
612 if not "cephfs" in pool
['application_metadata']:
613 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
614 format(pool_name
=pool_name
))
618 if getattr(self
._ctx
, "filesystem", None) == self
:
619 delattr(self
._ctx
, "filesystem")
623 Whether a filesystem exists in the mon's filesystem list
625 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
626 return self
.name
in [fs
['name'] for fs
in fs_list
]
628 def legacy_configured(self
):
630 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
631 the case, the caller should avoid using Filesystem.create
634 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
635 pools
= json
.loads(out_text
)
636 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
637 if metadata_pool_exists
:
638 self
.metadata_pool_name
= 'metadata'
639 except CommandFailedError
as e
:
640 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
641 # structured output (--format) from the CLI.
642 if e
.exitstatus
== 22:
643 metadata_pool_exists
= True
647 return metadata_pool_exists
650 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
652 def get_mds_map(self
, status
=None):
654 status
= self
.status()
655 return status
.get_fsmap(self
.id)['mdsmap']
657 def get_var(self
, var
, status
=None):
658 return self
.get_mds_map(status
=status
)[var
]
660 def set_dir_layout(self
, mount
, path
, layout
):
661 for name
, value
in layout
.items():
662 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
664 def add_data_pool(self
, name
, create
=True):
666 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.pgs_per_fs_pool
.__str
__())
667 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
668 self
.get_pool_names(refresh
= True)
669 for poolid
, fs_name
in self
.data_pools
.items():
672 raise RuntimeError("could not get just created pool '{0}'".format(name
))
674 def get_pool_names(self
, refresh
= False, status
= None):
675 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
677 status
= self
.status()
678 fsmap
= status
.get_fsmap(self
.id)
680 osd_map
= self
.mon_manager
.get_osd_dump_json()
682 for p
in osd_map
['pools']:
683 id_to_name
[p
['pool']] = p
['pool_name']
685 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
687 for data_pool
in fsmap
['mdsmap']['data_pools']:
688 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
690 def get_data_pool_name(self
, refresh
= False):
691 if refresh
or self
.data_pools
is None:
692 self
.get_pool_names(refresh
= True)
693 assert(len(self
.data_pools
) == 1)
694 return self
.data_pools
.values()[0]
696 def get_data_pool_id(self
, refresh
= False):
698 Don't call this if you have multiple data pools
701 if refresh
or self
.data_pools
is None:
702 self
.get_pool_names(refresh
= True)
703 assert(len(self
.data_pools
) == 1)
704 return self
.data_pools
.keys()[0]
706 def get_data_pool_names(self
, refresh
= False):
707 if refresh
or self
.data_pools
is None:
708 self
.get_pool_names(refresh
= True)
709 return self
.data_pools
.values()
711 def get_metadata_pool_name(self
):
712 return self
.metadata_pool_name
714 def set_data_pool_name(self
, name
):
715 if self
.id is not None:
716 raise RuntimeError("can't set filesystem name if its fscid is set")
717 self
.data_pool_name
= name
719 def get_namespace_id(self
):
722 def get_pool_df(self
, pool_name
):
725 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
727 for pool_df
in self
._df
()['pools']:
728 if pool_df
['name'] == pool_name
:
729 return pool_df
['stats']
731 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
734 return self
._df
()['stats']['total_used_bytes']
736 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
738 Return true if all daemons are in one of active, standby, standby-replay, and
739 at least max_mds daemons are in 'active'.
741 Unlike most of Filesystem, this function is tolerant of new-style `fs`
742 commands being missing, because we are part of the ceph installation
743 process during upgrade suites, so must fall back to old style commands
744 when we get an EINVAL on a new style command.
748 # First, check to see that processes haven't exited with an error code
749 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
754 mds_map
= self
.get_mds_map(status
=status
)
755 except CommandFailedError
as cfe
:
756 # Old version, fall back to non-multi-fs commands
757 if cfe
.exitstatus
== errno
.EINVAL
:
758 mds_map
= json
.loads(
759 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
763 log
.info("are_daemons_healthy: mds map: {0}".format(mds_map
))
765 for mds_id
, mds_status
in mds_map
['info'].items():
766 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
767 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
769 elif mds_status
['state'] == 'up:active':
772 log
.info("are_daemons_healthy: {0}/{1}".format(
773 active_count
, mds_map
['max_mds']
776 if not skip_max_mds_check
:
777 if active_count
> mds_map
['max_mds']:
778 log
.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
780 elif active_count
== mds_map
['max_mds']:
781 # The MDSMap says these guys are active, but let's check they really are
782 for mds_id
, mds_status
in mds_map
['info'].items():
783 if mds_status
['state'] == 'up:active':
785 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
786 except CommandFailedError
as cfe
:
787 if cfe
.exitstatus
== errno
.EINVAL
:
788 # Old version, can't do this check
791 # MDS not even running
794 if daemon_status
['state'] != 'up:active':
795 # MDS hasn't taken the latest map yet
802 log
.info("are_daemons_healthy: skipping max_mds check")
805 def get_daemon_names(self
, state
=None, status
=None):
807 Return MDS daemon names of those daemons in the given state
811 mdsmap
= self
.get_mds_map(status
)
813 for mds_status
in sorted(mdsmap
['info'].values(),
814 key
=lambda _
: _
['rank']):
815 if mds_status
['state'] == state
or state
is None:
816 result
.append(mds_status
['name'])
820 def get_active_names(self
, status
=None):
822 Return MDS daemon names of those daemons holding ranks
825 :return: list of strings like ['a', 'b'], sorted by rank
827 return self
.get_daemon_names("up:active", status
=status
)
829 def get_all_mds_rank(self
, status
=None):
830 mdsmap
= self
.get_mds_map(status
)
832 for mds_status
in sorted(mdsmap
['info'].values(),
833 key
=lambda _
: _
['rank']):
834 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
835 result
.append(mds_status
['rank'])
839 def get_rank(self
, rank
=0, status
=None):
841 status
= self
.getinfo()
842 return status
.get_rank(self
.id, rank
)
844 def rank_restart(self
, rank
=0, status
=None):
845 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
846 self
.mds_restart(mds_id
=name
)
848 def rank_signal(self
, signal
, rank
=0, status
=None):
849 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
850 self
.mds_signal(name
, signal
)
852 def rank_freeze(self
, yes
, rank
=0):
853 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
855 def rank_fail(self
, rank
=0):
856 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
858 def get_ranks(self
, status
=None):
860 status
= self
.getinfo()
861 return status
.get_ranks(self
.id)
863 def get_replays(self
, status
=None):
865 status
= self
.getinfo()
866 return status
.get_replays(self
.id)
868 def get_replay(self
, rank
=0, status
=None):
869 for replay
in self
.get_replays(status
=status
):
870 if replay
['rank'] == rank
:
874 def get_rank_names(self
, status
=None):
876 Return MDS daemon names of those daemons holding a rank,
877 sorted by rank. This includes e.g. up:replay/reconnect
878 as well as active, but does not include standby or
881 mdsmap
= self
.get_mds_map(status
)
883 for mds_status
in sorted(mdsmap
['info'].values(),
884 key
=lambda _
: _
['rank']):
885 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
886 result
.append(mds_status
['name'])
890 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
892 Wait until all daemons are healthy
897 timeout
= DAEMON_WAIT_TIMEOUT
900 status
= self
.status()
904 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
910 if elapsed
> timeout
:
911 log
.info("status = {0}".format(status
))
912 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
914 status
= self
.status()
916 def get_lone_mds_id(self
):
918 Get a single MDS ID: the only one if there is only one
919 configured, else the only one currently holding a rank,
922 if len(self
.mds_ids
) != 1:
923 alive
= self
.get_rank_names()
927 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
929 return self
.mds_ids
[0]
932 log
.info("Creating new filesystem")
933 self
.delete_all_filesystems()
937 def put_metadata_object_raw(self
, object_id
, infile
):
939 Save an object to the metadata pool
941 temp_bin_path
= infile
942 self
.client_remote
.run(args
=[
943 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
946 def get_metadata_object_raw(self
, object_id
):
948 Retrieve an object from the metadata pool and store it in a file.
950 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
952 self
.client_remote
.run(args
=[
953 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
958 def get_metadata_object(self
, object_type
, object_id
):
960 Retrieve an object from the metadata pool, pass it through
961 ceph-dencoder to dump it to JSON, and return the decoded object.
963 temp_bin_path
= '/tmp/out.bin'
965 self
.client_remote
.run(args
=[
966 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
969 dump_json
= self
.client_remote
.sh([
970 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
973 dump
= json
.loads(dump_json
)
974 except (TypeError, ValueError):
975 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
980 def get_journal_version(self
):
982 Read the JournalPointer and Journal::Header objects to learn the version of
985 journal_pointer_object
= '400.00000000'
986 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
987 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
989 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
990 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
992 version
= journal_header_dump
['journal_header']['stream_format']
993 log
.info("Read journal version {0}".format(version
))
997 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
999 mds_id
= self
.get_lone_mds_id()
1001 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1003 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1004 info
= self
.get_rank(rank
=rank
, status
=status
)
1005 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1007 def rank_tell(self
, command
, rank
=0, status
=None):
1008 info
= self
.get_rank(rank
=rank
, status
=status
)
1009 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", 'mds.{0}'.format(info
['name']), *command
))
1011 def read_cache(self
, path
, depth
=None):
1012 cmd
= ["dump", "tree", path
]
1013 if depth
is not None:
1014 cmd
.append(depth
.__str
__())
1015 result
= self
.mds_asok(cmd
)
1016 if len(result
) == 0:
1017 raise RuntimeError("Path not found in cache: {0}".format(path
))
1021 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1023 Block until the MDS reaches a particular state, or a failure condition
1026 When there are multiple MDSs, succeed when exaclty one MDS is in the
1027 goal state, or fail when any MDS is in the reject state.
1029 :param goal_state: Return once the MDS is in this state
1030 :param reject: Fail if the MDS enters this state before the goal state
1031 :param timeout: Fail if this many seconds pass before reaching goal
1032 :return: number of seconds waited, rounded down to integer
1035 started_at
= time
.time()
1037 status
= self
.status()
1038 if rank
is not None:
1040 mds_info
= status
.get_rank(self
.id, rank
)
1041 current_state
= mds_info
['state'] if mds_info
else None
1042 log
.info("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1044 mdsmap
= self
.get_mds_map(status
=status
)
1045 if rank
in mdsmap
['failed']:
1046 log
.info("Waiting for rank {0} to come back.".format(rank
))
1047 current_state
= None
1050 elif mds_id
is not None:
1051 # mds_info is None if no daemon with this ID exists in the map
1052 mds_info
= status
.get_mds(mds_id
)
1053 current_state
= mds_info
['state'] if mds_info
else None
1054 log
.info("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1056 # In general, look for a single MDS
1057 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1058 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1059 current_state
= goal_state
1060 elif reject
in states
:
1061 current_state
= reject
1063 current_state
= None
1064 log
.info("mapped states {0} to {1}".format(states
, current_state
))
1066 elapsed
= time
.time() - started_at
1067 if current_state
== goal_state
:
1068 log
.info("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1070 elif reject
is not None and current_state
== reject
:
1071 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1072 elif timeout
is not None and elapsed
> timeout
:
1073 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1075 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1076 elapsed
, goal_state
, current_state
1081 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
1082 mds_id
= self
.mds_ids
[0]
1083 remote
= self
.mds_daemons
[mds_id
].remote
1085 pool
= self
.get_data_pool_name()
1087 obj_name
= "{0:x}.00000000".format(ino_no
)
1090 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
1093 data
= remote
.sh(args
)
1094 except CommandFailedError
as e
:
1095 log
.error(e
.__str
__())
1096 raise ObjectNotFound(obj_name
)
1099 [os
.path
.join(self
._prefix
, "ceph-dencoder"),
1102 "decode", "dump_json"],
1106 return json
.loads(dump
.strip())
1108 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1110 Write to an xattr of the 0th data object of an inode. Will
1111 succeed whether the object and/or xattr already exist or not.
1113 :param ino_no: integer inode number
1114 :param xattr_name: string name of the xattr
1115 :param data: byte array data to write to the xattr
1116 :param pool: name of data pool or None to use primary data pool
1119 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
1121 pool
= self
.get_data_pool_name()
1123 obj_name
= "{0:x}.00000000".format(ino_no
)
1125 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
1126 obj_name
, xattr_name
, data
1130 def read_backtrace(self
, ino_no
, pool
=None):
1132 Read the backtrace from the data pool, return a dict in the format
1133 given by inode_backtrace_t::dump, which is something like:
1137 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1138 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1140 { "ino": 1099511627778,
1148 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1149 one data pool and that will be used.
1151 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1153 def read_layout(self
, ino_no
, pool
=None):
1155 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1158 "stripe_unit": 4194304,
1160 "object_size": 4194304,
1165 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1166 one data pool and that will be used.
1168 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1170 def _enumerate_data_objects(self
, ino
, size
):
1172 Get the list of expected data objects for a range, and the list of objects
1175 :return a tuple of two lists of strings (expected, actual)
1177 stripe_size
= 1024 * 1024 * 4
1179 size
= max(stripe_size
, size
)
1182 "{0:x}.{1:08x}".format(ino
, n
)
1183 for n
in range(0, ((size
- 1) / stripe_size
) + 1)
1186 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
1188 return want_objects
, exist_objects
1190 def data_objects_present(self
, ino
, size
):
1192 Check that *all* the expected data objects for an inode are present in the data pool
1195 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1196 missing
= set(want_objects
) - set(exist_objects
)
1199 log
.info("Objects missing (ino {0}, size {1}): {2}".format(
1204 log
.info("All objects for ino {0} size {1} found".format(ino
, size
))
1207 def data_objects_absent(self
, ino
, size
):
1208 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1209 present
= set(want_objects
) & set(exist_objects
)
1212 log
.info("Objects not absent (ino {0}, size {1}): {2}".format(
1217 log
.info("All objects for ino {0} size {1} are absent".format(ino
, size
))
1220 def dirfrag_exists(self
, ino
, frag
):
1222 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1223 except CommandFailedError
:
1228 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None,
1231 Call into the `rados` CLI from an MDS
1235 pool
= self
.get_metadata_pool_name()
1237 # Doesn't matter which MDS we use to run rados commands, they all
1238 # have access to the pools
1239 mds_id
= self
.mds_ids
[0]
1240 remote
= self
.mds_daemons
[mds_id
].remote
1242 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1243 # using the `rados` CLI
1244 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1245 (["--namespace", namespace
] if namespace
else []) +
1248 if stdin_file
is not None:
1249 args
= ["bash", "-c", "cat " + stdin_file
+ " | " + " ".join(args
)]
1251 output
= remote
.sh(args
, stdin
=stdin_data
)
1252 return output
.strip()
1254 def list_dirfrag(self
, dir_ino
):
1256 Read the named object and return the list of omap keys
1258 :return a list of 0 or more strings
1261 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1264 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1265 except CommandFailedError
as e
:
1266 log
.error(e
.__str
__())
1267 raise ObjectNotFound(dirfrag_obj_name
)
1269 return key_list_str
.split("\n") if key_list_str
else []
1271 def erase_metadata_objects(self
, prefix
):
1273 For all objects in the metadata pool matching the prefix,
1276 This O(N) with the number of objects in the pool, so only suitable
1277 for use on toy test filesystems.
1279 all_objects
= self
.rados(["ls"]).split("\n")
1280 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1281 for o
in matching_objects
:
1282 self
.rados(["rm", o
])
1284 def erase_mds_objects(self
, rank
):
1286 Erase all the per-MDS objects for a particular rank. This includes
1287 inotable, sessiontable, journal
1290 def obj_prefix(multiplier
):
1292 MDS object naming conventions like rank 1's
1293 journal is at 201.***
1295 return "%x." % (multiplier
* 0x100 + rank
)
1297 # MDS_INO_LOG_OFFSET
1298 self
.erase_metadata_objects(obj_prefix(2))
1299 # MDS_INO_LOG_BACKUP_OFFSET
1300 self
.erase_metadata_objects(obj_prefix(3))
1301 # MDS_INO_LOG_POINTER_OFFSET
1302 self
.erase_metadata_objects(obj_prefix(4))
1303 # MDSTables & SessionMap
1304 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1309 Override this to set a different
1313 def _make_rank(self
, rank
):
1314 return "{}:{}".format(self
.name
, rank
)
1316 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1317 # Tests frequently have [client] configuration that jacks up
1318 # the objecter log level (unlikely to be interesting here)
1319 # and does not set the mds log level (very interesting here)
1321 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1323 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1325 if rank
is not None:
1326 base_args
.extend(["--rank", "%s" % str(rank
)])
1328 t1
= datetime
.datetime
.now()
1329 r
= self
.tool_remote
.sh(base_args
+ args
).strip()
1330 duration
= datetime
.datetime
.now() - t1
1331 log
.info("Ran {0} in time {1}, result:\n{2}".format(
1332 base_args
+ args
, duration
, r
1337 def tool_remote(self
):
1339 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1340 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1341 so that tests can use this remote to go get locally written output files from the tools.
1343 mds_id
= self
.mds_ids
[0]
1344 return self
.mds_daemons
[mds_id
].remote
1346 def journal_tool(self
, args
, rank
, quiet
=False):
1348 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1350 fs_rank
= self
._make
_rank
(rank
)
1351 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1353 def table_tool(self
, args
, quiet
=False):
1355 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1357 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1359 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1361 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1363 :param worker_count: if greater than 1, multiple workers will be run
1364 in parallel and the return value will be None
1369 for n
in range(0, worker_count
):
1370 if worker_count
> 1:
1371 # data-scan args first token is a command, followed by args to it.
1372 # insert worker arguments after the command.
1374 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1378 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1379 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1384 if worker_count
== 1:
1385 return workers
[0].value
1390 return self
.is_pool_full(self
.get_data_pool_name())