]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
2 from StringIO
import StringIO
5 from gevent
import Greenlet
13 from teuthology
.exceptions
import CommandFailedError
14 from teuthology
import misc
15 from teuthology
.nuke
import clear_firewall
16 from teuthology
.parallel
import parallel
17 from tasks
.ceph_manager
import write_conf
18 from tasks
import ceph_manager
21 log
= logging
.getLogger(__name__
)
24 DAEMON_WAIT_TIMEOUT
= 120
28 class ObjectNotFound(Exception):
29 def __init__(self
, object_name
):
30 self
._object
_name
= object_name
33 return "Object not found: '{0}'".format(self
._object
_name
)
35 class FSStatus(object):
37 Operations on a snapshot of the FSMap.
39 def __init__(self
, mon_manager
):
40 self
.mon
= mon_manager
41 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
44 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self
, key
):
49 Get a field from the fsmap.
53 def get_filesystems(self
):
55 Iterator for all filesystems.
57 for fs
in self
.map['filesystems']:
62 Iterator for all the mds_info components in the FSMap.
64 for info
in self
.get_standbys():
66 for fs
in self
.map['filesystems']:
67 for info
in fs
['mdsmap']['info'].values():
70 def get_standbys(self
):
72 Iterator for all standbys.
74 for info
in self
.map['standbys']:
77 def get_fsmap(self
, fscid
):
79 Get the fsmap for the given FSCID.
81 for fs
in self
.map['filesystems']:
82 if fscid
is None or fs
['id'] == fscid
:
84 raise RuntimeError("FSCID {0} not in map".format(fscid
))
86 def get_fsmap_byname(self
, name
):
88 Get the fsmap for the given file system name.
90 for fs
in self
.map['filesystems']:
91 if name
is None or fs
['mdsmap']['fs_name'] == name
:
93 raise RuntimeError("FS {0} not in map".format(name
))
95 def get_replays(self
, fscid
):
97 Get the standby:replay MDS for the given FSCID.
99 fs
= self
.get_fsmap(fscid
)
100 for info
in fs
['mdsmap']['info'].values():
101 if info
['state'] == 'up:standby-replay':
104 def get_ranks(self
, fscid
):
106 Get the ranks for the given FSCID.
108 fs
= self
.get_fsmap(fscid
)
109 for info
in fs
['mdsmap']['info'].values():
110 if info
['rank'] >= 0:
113 def get_rank(self
, fscid
, rank
):
115 Get the rank for the given FSCID.
117 for info
in self
.get_ranks(fscid
):
118 if info
['rank'] == rank
:
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
122 def get_mds(self
, name
):
124 Get the info for the given MDS name.
126 for info
in self
.get_all():
127 if info
['name'] == name
:
131 def get_mds_addr(self
, name
):
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
135 info
= self
.get_mds(name
)
139 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
142 class CephCluster(object):
144 def admin_remote(self
):
145 first_mon
= misc
.get_first_mon(self
._ctx
, None)
146 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.iterkeys()
149 def __init__(self
, ctx
):
151 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
153 def get_config(self
, key
, service_type
=None):
155 Get config from mon by default, or a specific service if caller asks for it
157 if service_type
is None:
160 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
161 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
163 def set_ceph_conf(self
, subsys
, key
, value
):
164 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
165 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
166 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
167 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
170 def clear_ceph_conf(self
, subsys
, key
):
171 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
172 write_conf(self
._ctx
)
174 def json_asok(self
, command
, service_type
, service_id
):
175 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
)
176 response_data
= proc
.stdout
.getvalue()
177 log
.info("_json_asok output: {0}".format(response_data
))
178 if response_data
.strip():
179 return json
.loads(response_data
)
184 class MDSCluster(CephCluster
):
186 Collective operations on all the MDS daemons in the Ceph cluster. These
187 daemons may be in use by various Filesystems.
189 For the benefit of pre-multi-filesystem tests, this class is also
190 a parent of Filesystem. The correct way to use MDSCluster going forward is
191 as a separate instance outside of your (multiple) Filesystem instances.
193 def __init__(self
, ctx
):
194 super(MDSCluster
, self
).__init
__(ctx
)
196 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
198 if len(self
.mds_ids
) == 0:
199 raise RuntimeError("This task requires at least one MDS")
201 if hasattr(self
._ctx
, "daemons"):
202 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
205 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
207 Call a callback for a single named MDS, or for all.
209 Note that the parallelism here isn't for performance, it's to avoid being overly kind
210 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211 avoid being overly kind by executing them in a particular order. However, some actions
212 don't cope with being done in parallel, so it's optional (`in_parallel`)
214 :param mds_id: MDS daemon name, or None
215 :param cb: Callback taking single argument of MDS daemon name
216 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
220 with
parallel() as p
:
221 for mds_id
in self
.mds_ids
:
224 for mds_id
in self
.mds_ids
:
229 def get_config(self
, key
, service_type
=None):
231 get_config specialization of service_type="mds"
233 if service_type
!= "mds":
234 return super(MDSCluster
, self
).get_config(key
, service_type
)
236 # Some tests stop MDS daemons, don't send commands to a dead one:
237 service_id
= random
.sample(filter(lambda i
: self
.mds_daemons
[i
].running(), self
.mds_daemons
), 1)[0]
238 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
240 def mds_stop(self
, mds_id
=None):
242 Stop the MDS daemon process(se). If it held a rank, that rank
243 will eventually go laggy.
245 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
247 def mds_fail(self
, mds_id
=None):
249 Inform MDSMonitor of the death of the daemon process(es). If it held
250 a rank, that rank will be relinquished.
252 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
254 def mds_restart(self
, mds_id
=None):
255 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
257 def mds_fail_restart(self
, mds_id
=None):
259 Variation on restart that includes marking MDSs as failed, so that doing this
260 operation followed by waiting for healthy daemon states guarantees that they
261 have gone down and come up, rather than potentially seeing the healthy states
262 that existed before the restart.
264 def _fail_restart(id_
):
265 self
.mds_daemons
[id_
].stop()
266 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
267 self
.mds_daemons
[id_
].restart()
269 self
._one
_or
_all
(mds_id
, _fail_restart
)
271 def mds_signal(self
, mds_id
, sig
, silent
=False):
275 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
277 def newfs(self
, name
='cephfs', create
=True):
278 return Filesystem(self
._ctx
, name
=name
, create
=create
)
281 return FSStatus(self
.mon_manager
)
283 def delete_all_filesystems(self
):
285 Remove all filesystems that exist, and any pools in use by them.
287 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
290 pool_id_name
[pool
['pool']] = pool
['pool_name']
292 # mark cluster down for each fs to prevent churn during deletion
293 status
= self
.status()
294 for fs
in status
.get_filesystems():
295 self
.mon_manager
.raw_cluster_cmd("fs", "set", fs
['mdsmap']['fs_name'], "cluster_down", "true")
297 # get a new copy as actives may have since changed
298 status
= self
.status()
299 for fs
in status
.get_filesystems():
300 mdsmap
= fs
['mdsmap']
301 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
303 for gid
in mdsmap
['up'].values():
304 self
.mon_manager
.raw_cluster_cmd('mds', 'fail', gid
.__str
__())
306 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
307 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
308 metadata_pool
, metadata_pool
,
309 '--yes-i-really-really-mean-it')
310 for data_pool
in mdsmap
['data_pools']:
311 data_pool
= pool_id_name
[data_pool
]
313 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
314 data_pool
, data_pool
,
315 '--yes-i-really-really-mean-it')
316 except CommandFailedError
as e
:
317 if e
.exitstatus
== 16: # EBUSY, this data pool is used
318 pass # by two metadata pools, let the 2nd
319 else: # pass delete it
322 def get_standby_daemons(self
):
323 return set([s
['name'] for s
in self
.status().get_standbys()])
325 def get_mds_hostnames(self
):
327 for mds_id
in self
.mds_ids
:
328 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
329 result
.add(mds_remote
.hostname
)
333 def set_clients_block(self
, blocked
, mds_id
=None):
335 Block (using iptables) client communications to this MDS. Be careful: if
336 other services are running on this MDS, or other MDSs try to talk to this
337 MDS, their communications may also be blocked as collatoral damage.
339 :param mds_id: Optional ID of MDS to block, default to all
342 da_flag
= "-A" if blocked
else "-D"
344 def set_block(_mds_id
):
345 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
346 status
= self
.status()
348 addr
= status
.get_mds_addr(_mds_id
)
349 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
352 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
353 "comment", "--comment", "teuthology"])
355 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
356 "comment", "--comment", "teuthology"])
358 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
360 def clear_firewall(self
):
361 clear_firewall(self
._ctx
)
363 def get_mds_info(self
, mds_id
):
364 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
366 def is_pool_full(self
, pool_name
):
367 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
369 if pool
['pool_name'] == pool_name
:
370 return 'full' in pool
['flags_names'].split(",")
372 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
374 class Filesystem(MDSCluster
):
376 This object is for driving a CephFS filesystem. The MDS daemons driven by
377 MDSCluster may be shared with other Filesystems.
379 def __init__(self
, ctx
, fscid
=None, name
=None, create
=False,
381 super(Filesystem
, self
).__init
__(ctx
)
384 self
.ec_profile
= ec_profile
386 self
.metadata_pool_name
= None
387 self
.metadata_overlay
= False
388 self
.data_pool_name
= None
389 self
.data_pools
= None
391 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
392 self
.client_id
= client_list
[0]
393 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
396 if fscid
is not None:
397 raise RuntimeError("cannot specify fscid when creating fs")
398 if create
and not self
.legacy_configured():
401 if fscid
is not None:
403 self
.getinfo(refresh
= True)
405 # Stash a reference to the first created filesystem on ctx, so
406 # that if someone drops to the interactive shell they can easily
408 if not hasattr(self
._ctx
, "filesystem"):
409 self
._ctx
.filesystem
= self
411 def getinfo(self
, refresh
= False):
412 status
= self
.status()
413 if self
.id is not None:
414 fsmap
= status
.get_fsmap(self
.id)
415 elif self
.name
is not None:
416 fsmap
= status
.get_fsmap_byname(self
.name
)
418 fss
= [fs
for fs
in status
.get_filesystems()]
422 raise RuntimeError("no file system available")
424 raise RuntimeError("more than one file system available")
425 self
.id = fsmap
['id']
426 self
.name
= fsmap
['mdsmap']['fs_name']
427 self
.get_pool_names(status
= status
, refresh
= refresh
)
430 def set_metadata_overlay(self
, overlay
):
431 if self
.id is not None:
432 raise RuntimeError("cannot specify fscid when configuring overlay")
433 self
.metadata_overlay
= overlay
435 def deactivate(self
, rank
):
437 raise RuntimeError("invalid rank")
439 raise RuntimeError("cannot deactivate rank 0")
440 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
442 def set_var(self
, var
, *args
):
444 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
446 def set_max_mds(self
, max_mds
):
447 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "max_mds", "%d" % max_mds
)
449 def set_allow_dirfrags(self
, yes
):
450 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "allow_dirfrags", str(yes
).lower(), '--yes-i-really-mean-it')
452 def get_pgs_per_fs_pool(self
):
454 Calculate how many PGs to use when creating a pool, in order to avoid raising any
455 health warnings about mon_pg_warn_min_per_osd
457 :return: an integer number of PGs
459 pg_warn_min_per_osd
= int(self
.get_config('mon_pg_warn_min_per_osd'))
460 osd_count
= len(list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'osd')))
461 return pg_warn_min_per_osd
* osd_count
464 if self
.name
is None:
466 if self
.metadata_pool_name
is None:
467 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
468 if self
.data_pool_name
is None:
469 data_pool_name
= "{0}_data".format(self
.name
)
471 data_pool_name
= self
.data_pool_name
473 log
.info("Creating filesystem '{0}'".format(self
.name
))
475 pgs_per_fs_pool
= self
.get_pgs_per_fs_pool()
477 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
478 self
.metadata_pool_name
, pgs_per_fs_pool
.__str
__())
479 if self
.metadata_overlay
:
480 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
481 self
.name
, self
.metadata_pool_name
, data_pool_name
,
482 '--allow-dangerous-metadata-overlay')
484 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
485 log
.info("EC profile is %s", self
.ec_profile
)
486 cmd
= ['osd', 'erasure-code-profile', 'set', data_pool_name
]
487 cmd
.extend(self
.ec_profile
)
488 self
.mon_manager
.raw_cluster_cmd(*cmd
)
489 self
.mon_manager
.raw_cluster_cmd(
490 'osd', 'pool', 'create',
491 data_pool_name
, pgs_per_fs_pool
.__str
__(), 'erasure',
493 self
.mon_manager
.raw_cluster_cmd(
494 'osd', 'pool', 'set',
495 data_pool_name
, 'allow_ec_overwrites', 'true')
497 self
.mon_manager
.raw_cluster_cmd(
498 'osd', 'pool', 'create',
499 data_pool_name
, pgs_per_fs_pool
.__str
__())
500 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
501 self
.name
, self
.metadata_pool_name
, data_pool_name
)
502 self
.check_pool_application(self
.metadata_pool_name
)
503 self
.check_pool_application(data_pool_name
)
504 # Turn off spurious standby count warnings from modifying max_mds in tests.
506 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
507 except CommandFailedError
as e
:
508 if e
.exitstatus
== 22:
509 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
514 self
.getinfo(refresh
= True)
517 def check_pool_application(self
, pool_name
):
518 osd_map
= self
.mon_manager
.get_osd_dump_json()
519 for pool
in osd_map
['pools']:
520 if pool
['pool_name'] == pool_name
:
521 if "application_metadata" in pool
:
522 if not "cephfs" in pool
['application_metadata']:
523 raise RuntimeError("Pool %p does not name cephfs as application!".\
528 if getattr(self
._ctx
, "filesystem", None) == self
:
529 delattr(self
._ctx
, "filesystem")
533 Whether a filesystem exists in the mon's filesystem list
535 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
536 return self
.name
in [fs
['name'] for fs
in fs_list
]
538 def legacy_configured(self
):
540 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
541 the case, the caller should avoid using Filesystem.create
544 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
545 pools
= json
.loads(out_text
)
546 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
547 if metadata_pool_exists
:
548 self
.metadata_pool_name
= 'metadata'
549 except CommandFailedError
as e
:
550 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
551 # structured output (--format) from the CLI.
552 if e
.exitstatus
== 22:
553 metadata_pool_exists
= True
557 return metadata_pool_exists
560 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
562 def get_mds_map(self
):
563 return self
.status().get_fsmap(self
.id)['mdsmap']
565 def get_var(self
, var
):
566 return self
.status().get_fsmap(self
.id)['mdsmap'][var
]
568 def add_data_pool(self
, name
):
569 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.get_pgs_per_fs_pool().__str
__())
570 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
571 self
.get_pool_names(refresh
= True)
572 for poolid
, fs_name
in self
.data_pools
.items():
575 raise RuntimeError("could not get just created pool '{0}'".format(name
))
577 def get_pool_names(self
, refresh
= False, status
= None):
578 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
580 status
= self
.status()
581 fsmap
= status
.get_fsmap(self
.id)
583 osd_map
= self
.mon_manager
.get_osd_dump_json()
585 for p
in osd_map
['pools']:
586 id_to_name
[p
['pool']] = p
['pool_name']
588 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
590 for data_pool
in fsmap
['mdsmap']['data_pools']:
591 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
593 def get_data_pool_name(self
, refresh
= False):
594 if refresh
or self
.data_pools
is None:
595 self
.get_pool_names(refresh
= True)
596 assert(len(self
.data_pools
) == 1)
597 return self
.data_pools
.values()[0]
599 def get_data_pool_id(self
, refresh
= False):
601 Don't call this if you have multiple data pools
604 if refresh
or self
.data_pools
is None:
605 self
.get_pool_names(refresh
= True)
606 assert(len(self
.data_pools
) == 1)
607 return self
.data_pools
.keys()[0]
609 def get_data_pool_names(self
, refresh
= False):
610 if refresh
or self
.data_pools
is None:
611 self
.get_pool_names(refresh
= True)
612 return self
.data_pools
.values()
614 def get_metadata_pool_name(self
):
615 return self
.metadata_pool_name
617 def set_data_pool_name(self
, name
):
618 if self
.id is not None:
619 raise RuntimeError("can't set filesystem name if its fscid is set")
620 self
.data_pool_name
= name
622 def get_namespace_id(self
):
625 def get_pool_df(self
, pool_name
):
628 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
630 for pool_df
in self
._df
()['pools']:
631 if pool_df
['name'] == pool_name
:
632 return pool_df
['stats']
634 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
637 return self
._df
()['stats']['total_used_bytes']
639 def are_daemons_healthy(self
):
641 Return true if all daemons are in one of active, standby, standby-replay, and
642 at least max_mds daemons are in 'active'.
644 Unlike most of Filesystem, this function is tolerant of new-style `fs`
645 commands being missing, because we are part of the ceph installation
646 process during upgrade suites, so must fall back to old style commands
647 when we get an EINVAL on a new style command.
654 mds_map
= self
.get_mds_map()
655 except CommandFailedError
as cfe
:
656 # Old version, fall back to non-multi-fs commands
657 if cfe
.exitstatus
== errno
.EINVAL
:
658 mds_map
= json
.loads(
659 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
663 log
.info("are_daemons_healthy: mds map: {0}".format(mds_map
))
665 for mds_id
, mds_status
in mds_map
['info'].items():
666 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
667 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
669 elif mds_status
['state'] == 'up:active':
672 log
.info("are_daemons_healthy: {0}/{1}".format(
673 active_count
, mds_map
['max_mds']
676 if active_count
>= mds_map
['max_mds']:
677 # The MDSMap says these guys are active, but let's check they really are
678 for mds_id
, mds_status
in mds_map
['info'].items():
679 if mds_status
['state'] == 'up:active':
681 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
682 except CommandFailedError
as cfe
:
683 if cfe
.exitstatus
== errno
.EINVAL
:
684 # Old version, can't do this check
687 # MDS not even running
690 if daemon_status
['state'] != 'up:active':
691 # MDS hasn't taken the latest map yet
698 def get_daemon_names(self
, state
=None):
700 Return MDS daemon names of those daemons in the given state
704 status
= self
.get_mds_map()
706 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
707 if mds_status
['state'] == state
or state
is None:
708 result
.append(mds_status
['name'])
712 def get_active_names(self
):
714 Return MDS daemon names of those daemons holding ranks
717 :return: list of strings like ['a', 'b'], sorted by rank
719 return self
.get_daemon_names("up:active")
721 def get_all_mds_rank(self
):
722 status
= self
.get_mds_map()
724 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
725 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
726 result
.append(mds_status
['rank'])
730 def get_rank(self
, rank
=0, status
=None):
732 status
= self
.getinfo()
733 return status
.get_rank(self
.id, rank
)
735 def get_ranks(self
, status
=None):
737 status
= self
.getinfo()
738 return status
.get_ranks(self
.id)
740 def get_rank_names(self
, status
=None):
742 Return MDS daemon names of those daemons holding a rank,
743 sorted by rank. This includes e.g. up:replay/reconnect
744 as well as active, but does not include standby or
747 status
= self
.get_mds_map()
749 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
750 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
751 result
.append(mds_status
['name'])
755 def wait_for_daemons(self
, timeout
=None):
757 Wait until all daemons are healthy
762 timeout
= DAEMON_WAIT_TIMEOUT
766 if self
.are_daemons_healthy():
772 if elapsed
> timeout
:
773 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
775 def get_lone_mds_id(self
):
777 Get a single MDS ID: the only one if there is only one
778 configured, else the only one currently holding a rank,
781 if len(self
.mds_ids
) != 1:
782 alive
= self
.get_rank_names()
786 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
788 return self
.mds_ids
[0]
791 log
.info("Creating new filesystem")
792 self
.delete_all_filesystems()
796 def put_metadata_object_raw(self
, object_id
, infile
):
798 Save an object to the metadata pool
800 temp_bin_path
= infile
801 self
.client_remote
.run(args
=[
802 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
805 def get_metadata_object_raw(self
, object_id
):
807 Retrieve an object from the metadata pool and store it in a file.
809 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
811 self
.client_remote
.run(args
=[
812 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
817 def get_metadata_object(self
, object_type
, object_id
):
819 Retrieve an object from the metadata pool, pass it through
820 ceph-dencoder to dump it to JSON, and return the decoded object.
822 temp_bin_path
= '/tmp/out.bin'
824 self
.client_remote
.run(args
=[
825 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
829 self
.client_remote
.run(args
=[
830 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
832 dump_json
= stdout
.getvalue().strip()
834 dump
= json
.loads(dump_json
)
835 except (TypeError, ValueError):
836 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
841 def get_journal_version(self
):
843 Read the JournalPointer and Journal::Header objects to learn the version of
846 journal_pointer_object
= '400.00000000'
847 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
848 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
850 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
851 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
853 version
= journal_header_dump
['journal_header']['stream_format']
854 log
.info("Read journal version {0}".format(version
))
858 def mds_asok(self
, command
, mds_id
=None):
860 mds_id
= self
.get_lone_mds_id()
862 return self
.json_asok(command
, 'mds', mds_id
)
864 def rank_asok(self
, command
, rank
=0):
865 info
= self
.get_rank(rank
=rank
)
866 return self
.json_asok(command
, 'mds', info
['name'])
868 def read_cache(self
, path
, depth
=None):
869 cmd
= ["dump", "tree", path
]
870 if depth
is not None:
871 cmd
.append(depth
.__str
__())
872 result
= self
.mds_asok(cmd
)
874 raise RuntimeError("Path not found in cache: {0}".format(path
))
878 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
880 Block until the MDS reaches a particular state, or a failure condition
883 When there are multiple MDSs, succeed when exaclty one MDS is in the
884 goal state, or fail when any MDS is in the reject state.
886 :param goal_state: Return once the MDS is in this state
887 :param reject: Fail if the MDS enters this state before the goal state
888 :param timeout: Fail if this many seconds pass before reaching goal
889 :return: number of seconds waited, rounded down to integer
892 started_at
= time
.time()
894 status
= self
.status()
896 mds_info
= status
.get_rank(self
.id, rank
)
897 current_state
= mds_info
['state'] if mds_info
else None
898 log
.info("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
899 elif mds_id
is not None:
900 # mds_info is None if no daemon with this ID exists in the map
901 mds_info
= status
.get_mds(mds_id
)
902 current_state
= mds_info
['state'] if mds_info
else None
903 log
.info("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
905 # In general, look for a single MDS
906 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
907 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
908 current_state
= goal_state
909 elif reject
in states
:
910 current_state
= reject
913 log
.info("mapped states {0} to {1}".format(states
, current_state
))
915 elapsed
= time
.time() - started_at
916 if current_state
== goal_state
:
917 log
.info("reached state '{0}' in {1}s".format(current_state
, elapsed
))
919 elif reject
is not None and current_state
== reject
:
920 raise RuntimeError("MDS in reject state {0}".format(current_state
))
921 elif timeout
is not None and elapsed
> timeout
:
922 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
924 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
925 elapsed
, goal_state
, current_state
930 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
931 mds_id
= self
.mds_ids
[0]
932 remote
= self
.mds_daemons
[mds_id
].remote
934 pool
= self
.get_data_pool_name()
936 obj_name
= "{0:x}.00000000".format(ino_no
)
939 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
945 except CommandFailedError
as e
:
946 log
.error(e
.__str
__())
947 raise ObjectNotFound(obj_name
)
949 data
= proc
.stdout
.getvalue()
952 args
=[os
.path
.join(self
._prefix
, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
957 return json
.loads(p
.stdout
.getvalue().strip())
959 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
961 Write to an xattr of the 0th data object of an inode. Will
962 succeed whether the object and/or xattr already exist or not.
964 :param ino_no: integer inode number
965 :param xattr_name: string name of the xattr
966 :param data: byte array data to write to the xattr
967 :param pool: name of data pool or None to use primary data pool
970 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
972 pool
= self
.get_data_pool_name()
974 obj_name
= "{0:x}.00000000".format(ino_no
)
976 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
977 obj_name
, xattr_name
, data
983 def read_backtrace(self
, ino_no
, pool
=None):
985 Read the backtrace from the data pool, return a dict in the format
986 given by inode_backtrace_t::dump, which is something like:
990 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
991 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
993 { "ino": 1099511627778,
1001 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1002 one data pool and that will be used.
1004 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1006 def read_layout(self
, ino_no
, pool
=None):
1008 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1011 "stripe_unit": 4194304,
1013 "object_size": 4194304,
1018 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1019 one data pool and that will be used.
1021 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1023 def _enumerate_data_objects(self
, ino
, size
):
1025 Get the list of expected data objects for a range, and the list of objects
1028 :return a tuple of two lists of strings (expected, actual)
1030 stripe_size
= 1024 * 1024 * 4
1032 size
= max(stripe_size
, size
)
1035 "{0:x}.{1:08x}".format(ino
, n
)
1036 for n
in range(0, ((size
- 1) / stripe_size
) + 1)
1039 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
1041 return want_objects
, exist_objects
1043 def data_objects_present(self
, ino
, size
):
1045 Check that *all* the expected data objects for an inode are present in the data pool
1048 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1049 missing
= set(want_objects
) - set(exist_objects
)
1052 log
.info("Objects missing (ino {0}, size {1}): {2}".format(
1057 log
.info("All objects for ino {0} size {1} found".format(ino
, size
))
1060 def data_objects_absent(self
, ino
, size
):
1061 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1062 present
= set(want_objects
) & set(exist_objects
)
1065 log
.info("Objects not absent (ino {0}, size {1}): {2}".format(
1070 log
.info("All objects for ino {0} size {1} are absent".format(ino
, size
))
1073 def dirfrag_exists(self
, ino
, frag
):
1075 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1076 except CommandFailedError
as e
:
1081 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None,
1084 Call into the `rados` CLI from an MDS
1088 pool
= self
.get_metadata_pool_name()
1090 # Doesn't matter which MDS we use to run rados commands, they all
1091 # have access to the pools
1092 mds_id
= self
.mds_ids
[0]
1093 remote
= self
.mds_daemons
[mds_id
].remote
1095 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1096 # using the `rados` CLI
1097 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1098 (["--namespace", namespace
] if namespace
else []) +
1101 if stdin_file
is not None:
1102 args
= ["bash", "-c", "cat " + stdin_file
+ " | " + " ".join(args
)]
1108 return p
.stdout
.getvalue().strip()
1110 def list_dirfrag(self
, dir_ino
):
1112 Read the named object and return the list of omap keys
1114 :return a list of 0 or more strings
1117 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1120 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1121 except CommandFailedError
as e
:
1122 log
.error(e
.__str
__())
1123 raise ObjectNotFound(dirfrag_obj_name
)
1125 return key_list_str
.split("\n") if key_list_str
else []
1127 def erase_metadata_objects(self
, prefix
):
1129 For all objects in the metadata pool matching the prefix,
1132 This O(N) with the number of objects in the pool, so only suitable
1133 for use on toy test filesystems.
1135 all_objects
= self
.rados(["ls"]).split("\n")
1136 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1137 for o
in matching_objects
:
1138 self
.rados(["rm", o
])
1140 def erase_mds_objects(self
, rank
):
1142 Erase all the per-MDS objects for a particular rank. This includes
1143 inotable, sessiontable, journal
1146 def obj_prefix(multiplier
):
1148 MDS object naming conventions like rank 1's
1149 journal is at 201.***
1151 return "%x." % (multiplier
* 0x100 + rank
)
1153 # MDS_INO_LOG_OFFSET
1154 self
.erase_metadata_objects(obj_prefix(2))
1155 # MDS_INO_LOG_BACKUP_OFFSET
1156 self
.erase_metadata_objects(obj_prefix(3))
1157 # MDS_INO_LOG_POINTER_OFFSET
1158 self
.erase_metadata_objects(obj_prefix(4))
1159 # MDSTables & SessionMap
1160 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1165 Override this to set a different
1169 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1170 # Tests frequently have [client] configuration that jacks up
1171 # the objecter log level (unlikely to be interesting here)
1172 # and does not set the mds log level (very interesting here)
1174 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1176 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1178 if rank
is not None:
1179 base_args
.extend(["--rank", "%d" % rank
])
1181 t1
= datetime
.datetime
.now()
1182 r
= self
.tool_remote
.run(
1183 args
=base_args
+ args
,
1184 stdout
=StringIO()).stdout
.getvalue().strip()
1185 duration
= datetime
.datetime
.now() - t1
1186 log
.info("Ran {0} in time {1}, result:\n{2}".format(
1187 base_args
+ args
, duration
, r
1192 def tool_remote(self
):
1194 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1195 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1196 so that tests can use this remote to go get locally written output files from the tools.
1198 mds_id
= self
.mds_ids
[0]
1199 return self
.mds_daemons
[mds_id
].remote
1201 def journal_tool(self
, args
, rank
=None, quiet
=False):
1203 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1205 return self
._run
_tool
("cephfs-journal-tool", args
, rank
, quiet
)
1207 def table_tool(self
, args
, quiet
=False):
1209 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1211 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1213 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1215 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1217 :param worker_count: if greater than 1, multiple workers will be run
1218 in parallel and the return value will be None
1223 for n
in range(0, worker_count
):
1224 if worker_count
> 1:
1225 # data-scan args first token is a command, followed by args to it.
1226 # insert worker arguments after the command.
1228 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1232 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1233 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1238 if worker_count
== 1:
1239 return workers
[0].value
1244 return self
.is_pool_full(self
.get_data_pool_name())