]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
2 from StringIO
import StringIO
5 from gevent
import Greenlet
13 from teuthology
.exceptions
import CommandFailedError
14 from teuthology
import misc
15 from teuthology
.nuke
import clear_firewall
16 from teuthology
.parallel
import parallel
17 from tasks
.ceph_manager
import write_conf
18 from tasks
import ceph_manager
21 log
= logging
.getLogger(__name__
)
24 DAEMON_WAIT_TIMEOUT
= 120
28 class ObjectNotFound(Exception):
29 def __init__(self
, object_name
):
30 self
._object
_name
= object_name
33 return "Object not found: '{0}'".format(self
._object
_name
)
35 class FSStatus(object):
37 Operations on a snapshot of the FSMap.
39 def __init__(self
, mon_manager
):
40 self
.mon
= mon_manager
41 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
44 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self
, key
):
49 Get a field from the fsmap.
53 def get_filesystems(self
):
55 Iterator for all filesystems.
57 for fs
in self
.map['filesystems']:
62 Iterator for all the mds_info components in the FSMap.
64 for info
in self
.get_standbys():
66 for fs
in self
.map['filesystems']:
67 for info
in fs
['mdsmap']['info'].values():
70 def get_standbys(self
):
72 Iterator for all standbys.
74 for info
in self
.map['standbys']:
77 def get_fsmap(self
, fscid
):
79 Get the fsmap for the given FSCID.
81 for fs
in self
.map['filesystems']:
82 if fscid
is None or fs
['id'] == fscid
:
84 raise RuntimeError("FSCID {0} not in map".format(fscid
))
86 def get_fsmap_byname(self
, name
):
88 Get the fsmap for the given file system name.
90 for fs
in self
.map['filesystems']:
91 if name
is None or fs
['mdsmap']['fs_name'] == name
:
93 raise RuntimeError("FS {0} not in map".format(name
))
95 def get_replays(self
, fscid
):
97 Get the standby:replay MDS for the given FSCID.
99 fs
= self
.get_fsmap(fscid
)
100 for info
in fs
['mdsmap']['info'].values():
101 if info
['state'] == 'up:standby-replay':
104 def get_ranks(self
, fscid
):
106 Get the ranks for the given FSCID.
108 fs
= self
.get_fsmap(fscid
)
109 for info
in fs
['mdsmap']['info'].values():
110 if info
['rank'] >= 0:
113 def get_rank(self
, fscid
, rank
):
115 Get the rank for the given FSCID.
117 for info
in self
.get_ranks(fscid
):
118 if info
['rank'] == rank
:
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
122 def get_mds(self
, name
):
124 Get the info for the given MDS name.
126 for info
in self
.get_all():
127 if info
['name'] == name
:
131 def get_mds_addr(self
, name
):
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
135 info
= self
.get_mds(name
)
139 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
142 class CephCluster(object):
144 def admin_remote(self
):
145 first_mon
= misc
.get_first_mon(self
._ctx
, None)
146 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.iterkeys()
149 def __init__(self
, ctx
):
151 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
153 def get_config(self
, key
, service_type
=None):
155 Get config from mon by default, or a specific service if caller asks for it
157 if service_type
is None:
160 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
161 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
163 def set_ceph_conf(self
, subsys
, key
, value
):
164 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
165 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
166 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
167 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
170 def clear_ceph_conf(self
, subsys
, key
):
171 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
172 write_conf(self
._ctx
)
174 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
177 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
178 response_data
= proc
.stdout
.getvalue()
179 log
.info("_json_asok output: {0}".format(response_data
))
180 if response_data
.strip():
181 return json
.loads(response_data
)
186 class MDSCluster(CephCluster
):
188 Collective operations on all the MDS daemons in the Ceph cluster. These
189 daemons may be in use by various Filesystems.
191 For the benefit of pre-multi-filesystem tests, this class is also
192 a parent of Filesystem. The correct way to use MDSCluster going forward is
193 as a separate instance outside of your (multiple) Filesystem instances.
195 def __init__(self
, ctx
):
196 super(MDSCluster
, self
).__init
__(ctx
)
198 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
200 if len(self
.mds_ids
) == 0:
201 raise RuntimeError("This task requires at least one MDS")
203 if hasattr(self
._ctx
, "daemons"):
204 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
205 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
207 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
209 Call a callback for a single named MDS, or for all.
211 Note that the parallelism here isn't for performance, it's to avoid being overly kind
212 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
213 avoid being overly kind by executing them in a particular order. However, some actions
214 don't cope with being done in parallel, so it's optional (`in_parallel`)
216 :param mds_id: MDS daemon name, or None
217 :param cb: Callback taking single argument of MDS daemon name
218 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
222 with
parallel() as p
:
223 for mds_id
in self
.mds_ids
:
226 for mds_id
in self
.mds_ids
:
231 def get_config(self
, key
, service_type
=None):
233 get_config specialization of service_type="mds"
235 if service_type
!= "mds":
236 return super(MDSCluster
, self
).get_config(key
, service_type
)
238 # Some tests stop MDS daemons, don't send commands to a dead one:
239 service_id
= random
.sample(filter(lambda i
: self
.mds_daemons
[i
].running(), self
.mds_daemons
), 1)[0]
240 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
242 def mds_stop(self
, mds_id
=None):
244 Stop the MDS daemon process(se). If it held a rank, that rank
245 will eventually go laggy.
247 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
249 def mds_fail(self
, mds_id
=None):
251 Inform MDSMonitor of the death of the daemon process(es). If it held
252 a rank, that rank will be relinquished.
254 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
256 def mds_restart(self
, mds_id
=None):
257 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
259 def mds_fail_restart(self
, mds_id
=None):
261 Variation on restart that includes marking MDSs as failed, so that doing this
262 operation followed by waiting for healthy daemon states guarantees that they
263 have gone down and come up, rather than potentially seeing the healthy states
264 that existed before the restart.
266 def _fail_restart(id_
):
267 self
.mds_daemons
[id_
].stop()
268 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
269 self
.mds_daemons
[id_
].restart()
271 self
._one
_or
_all
(mds_id
, _fail_restart
)
273 def mds_signal(self
, mds_id
, sig
, silent
=False):
277 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
279 def newfs(self
, name
='cephfs', create
=True):
280 return Filesystem(self
._ctx
, name
=name
, create
=create
)
283 return FSStatus(self
.mon_manager
)
285 def delete_all_filesystems(self
):
287 Remove all filesystems that exist, and any pools in use by them.
289 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
292 pool_id_name
[pool
['pool']] = pool
['pool_name']
294 # mark cluster down for each fs to prevent churn during deletion
295 status
= self
.status()
296 for fs
in status
.get_filesystems():
297 self
.mon_manager
.raw_cluster_cmd("fs", "set", fs
['mdsmap']['fs_name'], "cluster_down", "true")
299 # get a new copy as actives may have since changed
300 status
= self
.status()
301 for fs
in status
.get_filesystems():
302 mdsmap
= fs
['mdsmap']
303 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
305 for gid
in mdsmap
['up'].values():
306 self
.mon_manager
.raw_cluster_cmd('mds', 'fail', gid
.__str
__())
308 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
309 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
310 metadata_pool
, metadata_pool
,
311 '--yes-i-really-really-mean-it')
312 for data_pool
in mdsmap
['data_pools']:
313 data_pool
= pool_id_name
[data_pool
]
315 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
316 data_pool
, data_pool
,
317 '--yes-i-really-really-mean-it')
318 except CommandFailedError
as e
:
319 if e
.exitstatus
== 16: # EBUSY, this data pool is used
320 pass # by two metadata pools, let the 2nd
321 else: # pass delete it
324 def get_standby_daemons(self
):
325 return set([s
['name'] for s
in self
.status().get_standbys()])
327 def get_mds_hostnames(self
):
329 for mds_id
in self
.mds_ids
:
330 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
331 result
.add(mds_remote
.hostname
)
335 def set_clients_block(self
, blocked
, mds_id
=None):
337 Block (using iptables) client communications to this MDS. Be careful: if
338 other services are running on this MDS, or other MDSs try to talk to this
339 MDS, their communications may also be blocked as collatoral damage.
341 :param mds_id: Optional ID of MDS to block, default to all
344 da_flag
= "-A" if blocked
else "-D"
346 def set_block(_mds_id
):
347 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
348 status
= self
.status()
350 addr
= status
.get_mds_addr(_mds_id
)
351 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
354 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
355 "comment", "--comment", "teuthology"])
357 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
358 "comment", "--comment", "teuthology"])
360 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
362 def clear_firewall(self
):
363 clear_firewall(self
._ctx
)
365 def get_mds_info(self
, mds_id
):
366 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
368 def is_pool_full(self
, pool_name
):
369 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
371 if pool
['pool_name'] == pool_name
:
372 return 'full' in pool
['flags_names'].split(",")
374 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
376 class Filesystem(MDSCluster
):
378 This object is for driving a CephFS filesystem. The MDS daemons driven by
379 MDSCluster may be shared with other Filesystems.
381 def __init__(self
, ctx
, fscid
=None, name
=None, create
=False,
383 super(Filesystem
, self
).__init
__(ctx
)
386 self
.ec_profile
= ec_profile
388 self
.metadata_pool_name
= None
389 self
.metadata_overlay
= False
390 self
.data_pool_name
= None
391 self
.data_pools
= None
393 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
394 self
.client_id
= client_list
[0]
395 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
398 if fscid
is not None:
399 raise RuntimeError("cannot specify fscid when creating fs")
400 if create
and not self
.legacy_configured():
403 if fscid
is not None:
405 self
.getinfo(refresh
= True)
407 # Stash a reference to the first created filesystem on ctx, so
408 # that if someone drops to the interactive shell they can easily
410 if not hasattr(self
._ctx
, "filesystem"):
411 self
._ctx
.filesystem
= self
413 def getinfo(self
, refresh
= False):
414 status
= self
.status()
415 if self
.id is not None:
416 fsmap
= status
.get_fsmap(self
.id)
417 elif self
.name
is not None:
418 fsmap
= status
.get_fsmap_byname(self
.name
)
420 fss
= [fs
for fs
in status
.get_filesystems()]
424 raise RuntimeError("no file system available")
426 raise RuntimeError("more than one file system available")
427 self
.id = fsmap
['id']
428 self
.name
= fsmap
['mdsmap']['fs_name']
429 self
.get_pool_names(status
= status
, refresh
= refresh
)
432 def set_metadata_overlay(self
, overlay
):
433 if self
.id is not None:
434 raise RuntimeError("cannot specify fscid when configuring overlay")
435 self
.metadata_overlay
= overlay
437 def deactivate(self
, rank
):
439 raise RuntimeError("invalid rank")
441 raise RuntimeError("cannot deactivate rank 0")
442 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
444 def set_var(self
, var
, *args
):
446 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
448 def set_max_mds(self
, max_mds
):
449 self
.set_var("max_mds", "%d" % max_mds
)
451 def set_allow_dirfrags(self
, yes
):
452 self
.set_var("allow_dirfrags", str(yes
).lower(), '--yes-i-really-mean-it')
454 def get_pgs_per_fs_pool(self
):
456 Calculate how many PGs to use when creating a pool, in order to avoid raising any
457 health warnings about mon_pg_warn_min_per_osd
459 :return: an integer number of PGs
461 pg_warn_min_per_osd
= int(self
.get_config('mon_pg_warn_min_per_osd'))
462 osd_count
= len(list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'osd')))
463 return pg_warn_min_per_osd
* osd_count
466 if self
.name
is None:
468 if self
.metadata_pool_name
is None:
469 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
470 if self
.data_pool_name
is None:
471 data_pool_name
= "{0}_data".format(self
.name
)
473 data_pool_name
= self
.data_pool_name
475 log
.info("Creating filesystem '{0}'".format(self
.name
))
477 pgs_per_fs_pool
= self
.get_pgs_per_fs_pool()
479 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
480 self
.metadata_pool_name
, pgs_per_fs_pool
.__str
__())
481 if self
.metadata_overlay
:
482 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
483 self
.name
, self
.metadata_pool_name
, data_pool_name
,
484 '--allow-dangerous-metadata-overlay')
486 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
487 log
.info("EC profile is %s", self
.ec_profile
)
488 cmd
= ['osd', 'erasure-code-profile', 'set', data_pool_name
]
489 cmd
.extend(self
.ec_profile
)
490 self
.mon_manager
.raw_cluster_cmd(*cmd
)
491 self
.mon_manager
.raw_cluster_cmd(
492 'osd', 'pool', 'create',
493 data_pool_name
, pgs_per_fs_pool
.__str
__(), 'erasure',
495 self
.mon_manager
.raw_cluster_cmd(
496 'osd', 'pool', 'set',
497 data_pool_name
, 'allow_ec_overwrites', 'true')
499 self
.mon_manager
.raw_cluster_cmd(
500 'osd', 'pool', 'create',
501 data_pool_name
, pgs_per_fs_pool
.__str
__())
502 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
503 self
.name
, self
.metadata_pool_name
, data_pool_name
)
504 self
.check_pool_application(self
.metadata_pool_name
)
505 self
.check_pool_application(data_pool_name
)
506 # Turn off spurious standby count warnings from modifying max_mds in tests.
508 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
509 except CommandFailedError
as e
:
510 if e
.exitstatus
== 22:
511 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
516 self
.getinfo(refresh
= True)
519 def check_pool_application(self
, pool_name
):
520 osd_map
= self
.mon_manager
.get_osd_dump_json()
521 for pool
in osd_map
['pools']:
522 if pool
['pool_name'] == pool_name
:
523 if "application_metadata" in pool
:
524 if not "cephfs" in pool
['application_metadata']:
525 raise RuntimeError("Pool %p does not name cephfs as application!".\
530 if getattr(self
._ctx
, "filesystem", None) == self
:
531 delattr(self
._ctx
, "filesystem")
535 Whether a filesystem exists in the mon's filesystem list
537 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
538 return self
.name
in [fs
['name'] for fs
in fs_list
]
540 def legacy_configured(self
):
542 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
543 the case, the caller should avoid using Filesystem.create
546 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
547 pools
= json
.loads(out_text
)
548 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
549 if metadata_pool_exists
:
550 self
.metadata_pool_name
= 'metadata'
551 except CommandFailedError
as e
:
552 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
553 # structured output (--format) from the CLI.
554 if e
.exitstatus
== 22:
555 metadata_pool_exists
= True
559 return metadata_pool_exists
562 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
564 def get_mds_map(self
, status
=None):
566 status
= self
.status()
567 return status
.get_fsmap(self
.id)['mdsmap']
569 def get_var(self
, var
):
570 return self
.status().get_fsmap(self
.id)['mdsmap'][var
]
572 def add_data_pool(self
, name
):
573 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.get_pgs_per_fs_pool().__str
__())
574 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
575 self
.get_pool_names(refresh
= True)
576 for poolid
, fs_name
in self
.data_pools
.items():
579 raise RuntimeError("could not get just created pool '{0}'".format(name
))
581 def get_pool_names(self
, refresh
= False, status
= None):
582 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
584 status
= self
.status()
585 fsmap
= status
.get_fsmap(self
.id)
587 osd_map
= self
.mon_manager
.get_osd_dump_json()
589 for p
in osd_map
['pools']:
590 id_to_name
[p
['pool']] = p
['pool_name']
592 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
594 for data_pool
in fsmap
['mdsmap']['data_pools']:
595 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
597 def get_data_pool_name(self
, refresh
= False):
598 if refresh
or self
.data_pools
is None:
599 self
.get_pool_names(refresh
= True)
600 assert(len(self
.data_pools
) == 1)
601 return self
.data_pools
.values()[0]
603 def get_data_pool_id(self
, refresh
= False):
605 Don't call this if you have multiple data pools
608 if refresh
or self
.data_pools
is None:
609 self
.get_pool_names(refresh
= True)
610 assert(len(self
.data_pools
) == 1)
611 return self
.data_pools
.keys()[0]
613 def get_data_pool_names(self
, refresh
= False):
614 if refresh
or self
.data_pools
is None:
615 self
.get_pool_names(refresh
= True)
616 return self
.data_pools
.values()
618 def get_metadata_pool_name(self
):
619 return self
.metadata_pool_name
621 def set_data_pool_name(self
, name
):
622 if self
.id is not None:
623 raise RuntimeError("can't set filesystem name if its fscid is set")
624 self
.data_pool_name
= name
626 def get_namespace_id(self
):
629 def get_pool_df(self
, pool_name
):
632 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
634 for pool_df
in self
._df
()['pools']:
635 if pool_df
['name'] == pool_name
:
636 return pool_df
['stats']
638 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
641 return self
._df
()['stats']['total_used_bytes']
643 def are_daemons_healthy(self
):
645 Return true if all daemons are in one of active, standby, standby-replay, and
646 at least max_mds daemons are in 'active'.
648 Unlike most of Filesystem, this function is tolerant of new-style `fs`
649 commands being missing, because we are part of the ceph installation
650 process during upgrade suites, so must fall back to old style commands
651 when we get an EINVAL on a new style command.
658 mds_map
= self
.get_mds_map()
659 except CommandFailedError
as cfe
:
660 # Old version, fall back to non-multi-fs commands
661 if cfe
.exitstatus
== errno
.EINVAL
:
662 mds_map
= json
.loads(
663 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
667 log
.info("are_daemons_healthy: mds map: {0}".format(mds_map
))
669 for mds_id
, mds_status
in mds_map
['info'].items():
670 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
671 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
673 elif mds_status
['state'] == 'up:active':
676 log
.info("are_daemons_healthy: {0}/{1}".format(
677 active_count
, mds_map
['max_mds']
680 if active_count
>= mds_map
['max_mds']:
681 # The MDSMap says these guys are active, but let's check they really are
682 for mds_id
, mds_status
in mds_map
['info'].items():
683 if mds_status
['state'] == 'up:active':
685 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
686 except CommandFailedError
as cfe
:
687 if cfe
.exitstatus
== errno
.EINVAL
:
688 # Old version, can't do this check
691 # MDS not even running
694 if daemon_status
['state'] != 'up:active':
695 # MDS hasn't taken the latest map yet
702 def get_daemon_names(self
, state
=None):
704 Return MDS daemon names of those daemons in the given state
708 status
= self
.get_mds_map()
710 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
711 if mds_status
['state'] == state
or state
is None:
712 result
.append(mds_status
['name'])
716 def get_active_names(self
):
718 Return MDS daemon names of those daemons holding ranks
721 :return: list of strings like ['a', 'b'], sorted by rank
723 return self
.get_daemon_names("up:active")
725 def get_all_mds_rank(self
):
726 status
= self
.get_mds_map()
728 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
729 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
730 result
.append(mds_status
['rank'])
734 def get_rank(self
, rank
=0, status
=None):
736 status
= self
.getinfo()
737 return status
.get_rank(self
.id, rank
)
739 def get_ranks(self
, status
=None):
741 status
= self
.getinfo()
742 return status
.get_ranks(self
.id)
744 def get_rank_names(self
, status
=None):
746 Return MDS daemon names of those daemons holding a rank,
747 sorted by rank. This includes e.g. up:replay/reconnect
748 as well as active, but does not include standby or
751 status
= self
.get_mds_map()
753 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
754 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
755 result
.append(mds_status
['name'])
759 def wait_for_daemons(self
, timeout
=None):
761 Wait until all daemons are healthy
766 timeout
= DAEMON_WAIT_TIMEOUT
770 if self
.are_daemons_healthy():
776 if elapsed
> timeout
:
777 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
779 def get_lone_mds_id(self
):
781 Get a single MDS ID: the only one if there is only one
782 configured, else the only one currently holding a rank,
785 if len(self
.mds_ids
) != 1:
786 alive
= self
.get_rank_names()
790 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
792 return self
.mds_ids
[0]
795 log
.info("Creating new filesystem")
796 self
.delete_all_filesystems()
800 def put_metadata_object_raw(self
, object_id
, infile
):
802 Save an object to the metadata pool
804 temp_bin_path
= infile
805 self
.client_remote
.run(args
=[
806 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
809 def get_metadata_object_raw(self
, object_id
):
811 Retrieve an object from the metadata pool and store it in a file.
813 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
815 self
.client_remote
.run(args
=[
816 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
821 def get_metadata_object(self
, object_type
, object_id
):
823 Retrieve an object from the metadata pool, pass it through
824 ceph-dencoder to dump it to JSON, and return the decoded object.
826 temp_bin_path
= '/tmp/out.bin'
828 self
.client_remote
.run(args
=[
829 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
833 self
.client_remote
.run(args
=[
834 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
836 dump_json
= stdout
.getvalue().strip()
838 dump
= json
.loads(dump_json
)
839 except (TypeError, ValueError):
840 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
845 def get_journal_version(self
):
847 Read the JournalPointer and Journal::Header objects to learn the version of
850 journal_pointer_object
= '400.00000000'
851 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
852 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
854 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
855 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
857 version
= journal_header_dump
['journal_header']['stream_format']
858 log
.info("Read journal version {0}".format(version
))
862 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
864 mds_id
= self
.get_lone_mds_id()
866 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
868 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
869 info
= self
.get_rank(rank
=rank
, status
=status
)
870 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
872 def read_cache(self
, path
, depth
=None):
873 cmd
= ["dump", "tree", path
]
874 if depth
is not None:
875 cmd
.append(depth
.__str
__())
876 result
= self
.mds_asok(cmd
)
878 raise RuntimeError("Path not found in cache: {0}".format(path
))
882 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
884 Block until the MDS reaches a particular state, or a failure condition
887 When there are multiple MDSs, succeed when exaclty one MDS is in the
888 goal state, or fail when any MDS is in the reject state.
890 :param goal_state: Return once the MDS is in this state
891 :param reject: Fail if the MDS enters this state before the goal state
892 :param timeout: Fail if this many seconds pass before reaching goal
893 :return: number of seconds waited, rounded down to integer
896 started_at
= time
.time()
898 status
= self
.status()
901 mds_info
= status
.get_rank(self
.id, rank
)
902 current_state
= mds_info
['state'] if mds_info
else None
903 log
.info("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
905 mdsmap
= self
.get_mds_map(status
=status
)
906 if rank
in mdsmap
['failed']:
907 log
.info("Waiting for rank {0} to come back.".format(rank
))
911 elif mds_id
is not None:
912 # mds_info is None if no daemon with this ID exists in the map
913 mds_info
= status
.get_mds(mds_id
)
914 current_state
= mds_info
['state'] if mds_info
else None
915 log
.info("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
917 # In general, look for a single MDS
918 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
919 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
920 current_state
= goal_state
921 elif reject
in states
:
922 current_state
= reject
925 log
.info("mapped states {0} to {1}".format(states
, current_state
))
927 elapsed
= time
.time() - started_at
928 if current_state
== goal_state
:
929 log
.info("reached state '{0}' in {1}s".format(current_state
, elapsed
))
931 elif reject
is not None and current_state
== reject
:
932 raise RuntimeError("MDS in reject state {0}".format(current_state
))
933 elif timeout
is not None and elapsed
> timeout
:
934 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
936 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
937 elapsed
, goal_state
, current_state
942 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
943 mds_id
= self
.mds_ids
[0]
944 remote
= self
.mds_daemons
[mds_id
].remote
946 pool
= self
.get_data_pool_name()
948 obj_name
= "{0:x}.00000000".format(ino_no
)
951 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
957 except CommandFailedError
as e
:
958 log
.error(e
.__str
__())
959 raise ObjectNotFound(obj_name
)
961 data
= proc
.stdout
.getvalue()
964 args
=[os
.path
.join(self
._prefix
, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
969 return json
.loads(p
.stdout
.getvalue().strip())
971 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
973 Write to an xattr of the 0th data object of an inode. Will
974 succeed whether the object and/or xattr already exist or not.
976 :param ino_no: integer inode number
977 :param xattr_name: string name of the xattr
978 :param data: byte array data to write to the xattr
979 :param pool: name of data pool or None to use primary data pool
982 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
984 pool
= self
.get_data_pool_name()
986 obj_name
= "{0:x}.00000000".format(ino_no
)
988 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
989 obj_name
, xattr_name
, data
995 def read_backtrace(self
, ino_no
, pool
=None):
997 Read the backtrace from the data pool, return a dict in the format
998 given by inode_backtrace_t::dump, which is something like:
1002 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1003 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1005 { "ino": 1099511627778,
1013 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1014 one data pool and that will be used.
1016 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1018 def read_layout(self
, ino_no
, pool
=None):
1020 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1023 "stripe_unit": 4194304,
1025 "object_size": 4194304,
1030 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1031 one data pool and that will be used.
1033 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1035 def _enumerate_data_objects(self
, ino
, size
):
1037 Get the list of expected data objects for a range, and the list of objects
1040 :return a tuple of two lists of strings (expected, actual)
1042 stripe_size
= 1024 * 1024 * 4
1044 size
= max(stripe_size
, size
)
1047 "{0:x}.{1:08x}".format(ino
, n
)
1048 for n
in range(0, ((size
- 1) / stripe_size
) + 1)
1051 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
1053 return want_objects
, exist_objects
1055 def data_objects_present(self
, ino
, size
):
1057 Check that *all* the expected data objects for an inode are present in the data pool
1060 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1061 missing
= set(want_objects
) - set(exist_objects
)
1064 log
.info("Objects missing (ino {0}, size {1}): {2}".format(
1069 log
.info("All objects for ino {0} size {1} found".format(ino
, size
))
1072 def data_objects_absent(self
, ino
, size
):
1073 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1074 present
= set(want_objects
) & set(exist_objects
)
1077 log
.info("Objects not absent (ino {0}, size {1}): {2}".format(
1082 log
.info("All objects for ino {0} size {1} are absent".format(ino
, size
))
1085 def dirfrag_exists(self
, ino
, frag
):
1087 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1088 except CommandFailedError
as e
:
1093 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None,
1096 Call into the `rados` CLI from an MDS
1100 pool
= self
.get_metadata_pool_name()
1102 # Doesn't matter which MDS we use to run rados commands, they all
1103 # have access to the pools
1104 mds_id
= self
.mds_ids
[0]
1105 remote
= self
.mds_daemons
[mds_id
].remote
1107 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1108 # using the `rados` CLI
1109 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1110 (["--namespace", namespace
] if namespace
else []) +
1113 if stdin_file
is not None:
1114 args
= ["bash", "-c", "cat " + stdin_file
+ " | " + " ".join(args
)]
1120 return p
.stdout
.getvalue().strip()
1122 def list_dirfrag(self
, dir_ino
):
1124 Read the named object and return the list of omap keys
1126 :return a list of 0 or more strings
1129 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1132 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1133 except CommandFailedError
as e
:
1134 log
.error(e
.__str
__())
1135 raise ObjectNotFound(dirfrag_obj_name
)
1137 return key_list_str
.split("\n") if key_list_str
else []
1139 def erase_metadata_objects(self
, prefix
):
1141 For all objects in the metadata pool matching the prefix,
1144 This O(N) with the number of objects in the pool, so only suitable
1145 for use on toy test filesystems.
1147 all_objects
= self
.rados(["ls"]).split("\n")
1148 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1149 for o
in matching_objects
:
1150 self
.rados(["rm", o
])
1152 def erase_mds_objects(self
, rank
):
1154 Erase all the per-MDS objects for a particular rank. This includes
1155 inotable, sessiontable, journal
1158 def obj_prefix(multiplier
):
1160 MDS object naming conventions like rank 1's
1161 journal is at 201.***
1163 return "%x." % (multiplier
* 0x100 + rank
)
1165 # MDS_INO_LOG_OFFSET
1166 self
.erase_metadata_objects(obj_prefix(2))
1167 # MDS_INO_LOG_BACKUP_OFFSET
1168 self
.erase_metadata_objects(obj_prefix(3))
1169 # MDS_INO_LOG_POINTER_OFFSET
1170 self
.erase_metadata_objects(obj_prefix(4))
1171 # MDSTables & SessionMap
1172 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1177 Override this to set a different
1181 def _make_rank(self
, rank
):
1182 return "{}:{}".format(self
.name
, rank
)
1184 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1185 # Tests frequently have [client] configuration that jacks up
1186 # the objecter log level (unlikely to be interesting here)
1187 # and does not set the mds log level (very interesting here)
1189 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1191 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1193 if rank
is not None:
1194 base_args
.extend(["--rank", "%s" % str(rank
)])
1196 t1
= datetime
.datetime
.now()
1197 r
= self
.tool_remote
.run(
1198 args
=base_args
+ args
,
1199 stdout
=StringIO()).stdout
.getvalue().strip()
1200 duration
= datetime
.datetime
.now() - t1
1201 log
.info("Ran {0} in time {1}, result:\n{2}".format(
1202 base_args
+ args
, duration
, r
1207 def tool_remote(self
):
1209 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1210 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1211 so that tests can use this remote to go get locally written output files from the tools.
1213 mds_id
= self
.mds_ids
[0]
1214 return self
.mds_daemons
[mds_id
].remote
1216 def journal_tool(self
, args
, rank
, quiet
=False):
1218 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1220 fs_rank
= self
._make
_rank
(rank
)
1221 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1223 def table_tool(self
, args
, quiet
=False):
1225 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1227 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1229 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1231 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1233 :param worker_count: if greater than 1, multiple workers will be run
1234 in parallel and the return value will be None
1239 for n
in range(0, worker_count
):
1240 if worker_count
> 1:
1241 # data-scan args first token is a command, followed by args to it.
1242 # insert worker arguments after the command.
1244 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1248 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1249 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1254 if worker_count
== 1:
1255 return workers
[0].value
1260 return self
.is_pool_full(self
.get_data_pool_name())