]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
2 from StringIO
import StringIO
5 from gevent
import Greenlet
13 from teuthology
.exceptions
import CommandFailedError
14 from teuthology
import misc
15 from teuthology
.nuke
import clear_firewall
16 from teuthology
.parallel
import parallel
17 from tasks
.ceph_manager
import write_conf
18 from tasks
import ceph_manager
21 log
= logging
.getLogger(__name__
)
24 DAEMON_WAIT_TIMEOUT
= 120
28 class ObjectNotFound(Exception):
29 def __init__(self
, object_name
):
30 self
._object
_name
= object_name
33 return "Object not found: '{0}'".format(self
._object
_name
)
35 class FSStatus(object):
37 Operations on a snapshot of the FSMap.
39 def __init__(self
, mon_manager
):
40 self
.mon
= mon_manager
41 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
44 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self
, key
):
49 Get a field from the fsmap.
53 def get_filesystems(self
):
55 Iterator for all filesystems.
57 for fs
in self
.map['filesystems']:
62 Iterator for all the mds_info components in the FSMap.
64 for info
in self
.get_standbys():
66 for fs
in self
.map['filesystems']:
67 for info
in fs
['mdsmap']['info'].values():
70 def get_standbys(self
):
72 Iterator for all standbys.
74 for info
in self
.map['standbys']:
77 def get_fsmap(self
, fscid
):
79 Get the fsmap for the given FSCID.
81 for fs
in self
.map['filesystems']:
82 if fscid
is None or fs
['id'] == fscid
:
84 raise RuntimeError("FSCID {0} not in map".format(fscid
))
86 def get_fsmap_byname(self
, name
):
88 Get the fsmap for the given file system name.
90 for fs
in self
.map['filesystems']:
91 if name
is None or fs
['mdsmap']['fs_name'] == name
:
93 raise RuntimeError("FS {0} not in map".format(name
))
95 def get_replays(self
, fscid
):
97 Get the standby:replay MDS for the given FSCID.
99 fs
= self
.get_fsmap(fscid
)
100 for info
in fs
['mdsmap']['info'].values():
101 if info
['state'] == 'up:standby-replay':
104 def get_ranks(self
, fscid
):
106 Get the ranks for the given FSCID.
108 fs
= self
.get_fsmap(fscid
)
109 for info
in fs
['mdsmap']['info'].values():
110 if info
['rank'] >= 0:
113 def get_rank(self
, fscid
, rank
):
115 Get the rank for the given FSCID.
117 for info
in self
.get_ranks(fscid
):
118 if info
['rank'] == rank
:
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
122 def get_mds(self
, name
):
124 Get the info for the given MDS name.
126 for info
in self
.get_all():
127 if info
['name'] == name
:
131 def get_mds_addr(self
, name
):
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
135 info
= self
.get_mds(name
)
139 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
142 class CephCluster(object):
144 def admin_remote(self
):
145 first_mon
= misc
.get_first_mon(self
._ctx
, None)
146 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.iterkeys()
149 def __init__(self
, ctx
):
151 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
153 def get_config(self
, key
, service_type
=None):
155 Get config from mon by default, or a specific service if caller asks for it
157 if service_type
is None:
160 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
161 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
163 def set_ceph_conf(self
, subsys
, key
, value
):
164 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
165 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
166 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
167 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
170 def clear_ceph_conf(self
, subsys
, key
):
171 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
172 write_conf(self
._ctx
)
174 def json_asok(self
, command
, service_type
, service_id
):
175 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
)
176 response_data
= proc
.stdout
.getvalue()
177 log
.info("_json_asok output: {0}".format(response_data
))
178 if response_data
.strip():
179 return json
.loads(response_data
)
184 class MDSCluster(CephCluster
):
186 Collective operations on all the MDS daemons in the Ceph cluster. These
187 daemons may be in use by various Filesystems.
189 For the benefit of pre-multi-filesystem tests, this class is also
190 a parent of Filesystem. The correct way to use MDSCluster going forward is
191 as a separate instance outside of your (multiple) Filesystem instances.
193 def __init__(self
, ctx
):
194 super(MDSCluster
, self
).__init
__(ctx
)
196 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
198 if len(self
.mds_ids
) == 0:
199 raise RuntimeError("This task requires at least one MDS")
201 if hasattr(self
._ctx
, "daemons"):
202 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
205 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
207 Call a callback for a single named MDS, or for all.
209 Note that the parallelism here isn't for performance, it's to avoid being overly kind
210 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211 avoid being overly kind by executing them in a particular order. However, some actions
212 don't cope with being done in parallel, so it's optional (`in_parallel`)
214 :param mds_id: MDS daemon name, or None
215 :param cb: Callback taking single argument of MDS daemon name
216 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
220 with
parallel() as p
:
221 for mds_id
in self
.mds_ids
:
224 for mds_id
in self
.mds_ids
:
229 def get_config(self
, key
, service_type
=None):
231 get_config specialization of service_type="mds"
233 if service_type
!= "mds":
234 return super(MDSCluster
, self
).get_config(key
, service_type
)
236 # Some tests stop MDS daemons, don't send commands to a dead one:
237 service_id
= random
.sample(filter(lambda i
: self
.mds_daemons
[i
].running(), self
.mds_daemons
), 1)[0]
238 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
240 def mds_stop(self
, mds_id
=None):
242 Stop the MDS daemon process(se). If it held a rank, that rank
243 will eventually go laggy.
245 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
247 def mds_fail(self
, mds_id
=None):
249 Inform MDSMonitor of the death of the daemon process(es). If it held
250 a rank, that rank will be relinquished.
252 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
254 def mds_restart(self
, mds_id
=None):
255 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
257 def mds_fail_restart(self
, mds_id
=None):
259 Variation on restart that includes marking MDSs as failed, so that doing this
260 operation followed by waiting for healthy daemon states guarantees that they
261 have gone down and come up, rather than potentially seeing the healthy states
262 that existed before the restart.
264 def _fail_restart(id_
):
265 self
.mds_daemons
[id_
].stop()
266 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
267 self
.mds_daemons
[id_
].restart()
269 self
._one
_or
_all
(mds_id
, _fail_restart
)
271 def newfs(self
, name
='cephfs', create
=True):
272 return Filesystem(self
._ctx
, name
=name
, create
=create
)
275 return FSStatus(self
.mon_manager
)
277 def delete_all_filesystems(self
):
279 Remove all filesystems that exist, and any pools in use by them.
281 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
284 pool_id_name
[pool
['pool']] = pool
['pool_name']
286 # mark cluster down for each fs to prevent churn during deletion
287 status
= self
.status()
288 for fs
in status
.get_filesystems():
289 self
.mon_manager
.raw_cluster_cmd("fs", "set", fs
['mdsmap']['fs_name'], "cluster_down", "true")
291 # get a new copy as actives may have since changed
292 status
= self
.status()
293 for fs
in status
.get_filesystems():
294 mdsmap
= fs
['mdsmap']
295 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
297 for gid
in mdsmap
['up'].values():
298 self
.mon_manager
.raw_cluster_cmd('mds', 'fail', gid
.__str
__())
300 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
301 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
302 metadata_pool
, metadata_pool
,
303 '--yes-i-really-really-mean-it')
304 for data_pool
in mdsmap
['data_pools']:
305 data_pool
= pool_id_name
[data_pool
]
307 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
308 data_pool
, data_pool
,
309 '--yes-i-really-really-mean-it')
310 except CommandFailedError
as e
:
311 if e
.exitstatus
== 16: # EBUSY, this data pool is used
312 pass # by two metadata pools, let the 2nd
313 else: # pass delete it
316 def get_standby_daemons(self
):
317 return set([s
['name'] for s
in self
.status().get_standbys()])
319 def get_mds_hostnames(self
):
321 for mds_id
in self
.mds_ids
:
322 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
323 result
.add(mds_remote
.hostname
)
327 def set_clients_block(self
, blocked
, mds_id
=None):
329 Block (using iptables) client communications to this MDS. Be careful: if
330 other services are running on this MDS, or other MDSs try to talk to this
331 MDS, their communications may also be blocked as collatoral damage.
333 :param mds_id: Optional ID of MDS to block, default to all
336 da_flag
= "-A" if blocked
else "-D"
338 def set_block(_mds_id
):
339 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
340 status
= self
.status()
342 addr
= status
.get_mds_addr(_mds_id
)
343 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
346 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
347 "comment", "--comment", "teuthology"])
349 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
350 "comment", "--comment", "teuthology"])
352 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
354 def clear_firewall(self
):
355 clear_firewall(self
._ctx
)
357 def get_mds_info(self
, mds_id
):
358 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
361 flags
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
362 return 'full' in flags
364 def is_pool_full(self
, pool_name
):
365 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
367 if pool
['pool_name'] == pool_name
:
368 return 'full' in pool
['flags_names'].split(",")
370 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
372 class Filesystem(MDSCluster
):
374 This object is for driving a CephFS filesystem. The MDS daemons driven by
375 MDSCluster may be shared with other Filesystems.
377 def __init__(self
, ctx
, fscid
=None, name
=None, create
=False):
378 super(Filesystem
, self
).__init
__(ctx
)
382 self
.metadata_pool_name
= None
383 self
.metadata_overlay
= False
384 self
.data_pool_name
= None
385 self
.data_pools
= None
387 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
388 self
.client_id
= client_list
[0]
389 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
392 if fscid
is not None:
393 raise RuntimeError("cannot specify fscid when creating fs")
394 if create
and not self
.legacy_configured():
397 if fscid
is not None:
399 self
.getinfo(refresh
= True)
401 # Stash a reference to the first created filesystem on ctx, so
402 # that if someone drops to the interactive shell they can easily
404 if not hasattr(self
._ctx
, "filesystem"):
405 self
._ctx
.filesystem
= self
407 def getinfo(self
, refresh
= False):
408 status
= self
.status()
409 if self
.id is not None:
410 fsmap
= status
.get_fsmap(self
.id)
411 elif self
.name
is not None:
412 fsmap
= status
.get_fsmap_byname(self
.name
)
414 fss
= [fs
for fs
in status
.get_filesystems()]
418 raise RuntimeError("no file system available")
420 raise RuntimeError("more than one file system available")
421 self
.id = fsmap
['id']
422 self
.name
= fsmap
['mdsmap']['fs_name']
423 self
.get_pool_names(status
= status
, refresh
= refresh
)
426 def set_metadata_overlay(self
, overlay
):
427 if self
.id is not None:
428 raise RuntimeError("cannot specify fscid when configuring overlay")
429 self
.metadata_overlay
= overlay
431 def deactivate(self
, rank
):
433 raise RuntimeError("invalid rank")
435 raise RuntimeError("cannot deactivate rank 0")
436 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
438 def set_max_mds(self
, max_mds
):
439 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "max_mds", "%d" % max_mds
)
441 def set_allow_dirfrags(self
, yes
):
442 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "allow_dirfrags", str(yes
).lower(), '--yes-i-really-mean-it')
444 def get_pgs_per_fs_pool(self
):
446 Calculate how many PGs to use when creating a pool, in order to avoid raising any
447 health warnings about mon_pg_warn_min_per_osd
449 :return: an integer number of PGs
451 pg_warn_min_per_osd
= int(self
.get_config('mon_pg_warn_min_per_osd'))
452 osd_count
= len(list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'osd')))
453 return pg_warn_min_per_osd
* osd_count
456 if self
.name
is None:
458 if self
.metadata_pool_name
is None:
459 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
460 if self
.data_pool_name
is None:
461 data_pool_name
= "{0}_data".format(self
.name
)
463 data_pool_name
= self
.data_pool_name
465 log
.info("Creating filesystem '{0}'".format(self
.name
))
467 pgs_per_fs_pool
= self
.get_pgs_per_fs_pool()
469 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
470 self
.metadata_pool_name
, pgs_per_fs_pool
.__str
__())
471 if self
.metadata_overlay
:
472 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
473 self
.name
, self
.metadata_pool_name
, data_pool_name
,
474 '--allow-dangerous-metadata-overlay')
476 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
477 data_pool_name
, pgs_per_fs_pool
.__str
__())
478 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
479 self
.name
, self
.metadata_pool_name
, data_pool_name
)
480 self
.check_pool_application(self
.metadata_pool_name
)
481 self
.check_pool_application(data_pool_name
)
482 # Turn off spurious standby count warnings from modifying max_mds in tests.
484 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
485 except CommandFailedError
as e
:
486 if e
.exitstatus
== 22:
487 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
492 self
.getinfo(refresh
= True)
495 def check_pool_application(self
, pool_name
):
496 osd_map
= self
.mon_manager
.get_osd_dump_json()
497 for pool
in osd_map
['pools']:
498 if pool
['pool_name'] == pool_name
:
499 if "application_metadata" in pool
:
500 if not "cephfs" in pool
['application_metadata']:
501 raise RuntimeError("Pool %p does not name cephfs as application!".\
506 if getattr(self
._ctx
, "filesystem", None) == self
:
507 delattr(self
._ctx
, "filesystem")
511 Whether a filesystem exists in the mon's filesystem list
513 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
514 return self
.name
in [fs
['name'] for fs
in fs_list
]
516 def legacy_configured(self
):
518 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
519 the case, the caller should avoid using Filesystem.create
522 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
523 pools
= json
.loads(out_text
)
524 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
525 if metadata_pool_exists
:
526 self
.metadata_pool_name
= 'metadata'
527 except CommandFailedError
as e
:
528 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
529 # structured output (--format) from the CLI.
530 if e
.exitstatus
== 22:
531 metadata_pool_exists
= True
535 return metadata_pool_exists
538 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
540 def get_mds_map(self
):
541 return self
.status().get_fsmap(self
.id)['mdsmap']
543 def add_data_pool(self
, name
):
544 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.get_pgs_per_fs_pool().__str
__())
545 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
546 self
.get_pool_names(refresh
= True)
547 for poolid
, fs_name
in self
.data_pools
.items():
550 raise RuntimeError("could not get just created pool '{0}'".format(name
))
552 def get_pool_names(self
, refresh
= False, status
= None):
553 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
555 status
= self
.status()
556 fsmap
= status
.get_fsmap(self
.id)
558 osd_map
= self
.mon_manager
.get_osd_dump_json()
560 for p
in osd_map
['pools']:
561 id_to_name
[p
['pool']] = p
['pool_name']
563 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
565 for data_pool
in fsmap
['mdsmap']['data_pools']:
566 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
568 def get_data_pool_name(self
, refresh
= False):
569 if refresh
or self
.data_pools
is None:
570 self
.get_pool_names(refresh
= True)
571 assert(len(self
.data_pools
) == 1)
572 return self
.data_pools
.values()[0]
574 def get_data_pool_id(self
, refresh
= False):
576 Don't call this if you have multiple data pools
579 if refresh
or self
.data_pools
is None:
580 self
.get_pool_names(refresh
= True)
581 assert(len(self
.data_pools
) == 1)
582 return self
.data_pools
.keys()[0]
584 def get_data_pool_names(self
, refresh
= False):
585 if refresh
or self
.data_pools
is None:
586 self
.get_pool_names(refresh
= True)
587 return self
.data_pools
.values()
589 def get_metadata_pool_name(self
):
590 return self
.metadata_pool_name
592 def set_data_pool_name(self
, name
):
593 if self
.id is not None:
594 raise RuntimeError("can't set filesystem name if its fscid is set")
595 self
.data_pool_name
= name
597 def get_namespace_id(self
):
600 def get_pool_df(self
, pool_name
):
603 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
605 for pool_df
in self
._df
()['pools']:
606 if pool_df
['name'] == pool_name
:
607 return pool_df
['stats']
609 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
612 return self
._df
()['stats']['total_used_bytes']
614 def are_daemons_healthy(self
):
616 Return true if all daemons are in one of active, standby, standby-replay, and
617 at least max_mds daemons are in 'active'.
619 Unlike most of Filesystem, this function is tolerant of new-style `fs`
620 commands being missing, because we are part of the ceph installation
621 process during upgrade suites, so must fall back to old style commands
622 when we get an EINVAL on a new style command.
629 mds_map
= self
.get_mds_map()
630 except CommandFailedError
as cfe
:
631 # Old version, fall back to non-multi-fs commands
632 if cfe
.exitstatus
== errno
.EINVAL
:
633 mds_map
= json
.loads(
634 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
638 log
.info("are_daemons_healthy: mds map: {0}".format(mds_map
))
640 for mds_id
, mds_status
in mds_map
['info'].items():
641 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
642 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
644 elif mds_status
['state'] == 'up:active':
647 log
.info("are_daemons_healthy: {0}/{1}".format(
648 active_count
, mds_map
['max_mds']
651 if active_count
>= mds_map
['max_mds']:
652 # The MDSMap says these guys are active, but let's check they really are
653 for mds_id
, mds_status
in mds_map
['info'].items():
654 if mds_status
['state'] == 'up:active':
656 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
657 except CommandFailedError
as cfe
:
658 if cfe
.exitstatus
== errno
.EINVAL
:
659 # Old version, can't do this check
662 # MDS not even running
665 if daemon_status
['state'] != 'up:active':
666 # MDS hasn't taken the latest map yet
673 def get_daemon_names(self
, state
=None):
675 Return MDS daemon names of those daemons in the given state
679 status
= self
.get_mds_map()
681 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
682 if mds_status
['state'] == state
or state
is None:
683 result
.append(mds_status
['name'])
687 def get_active_names(self
):
689 Return MDS daemon names of those daemons holding ranks
692 :return: list of strings like ['a', 'b'], sorted by rank
694 return self
.get_daemon_names("up:active")
696 def get_all_mds_rank(self
):
697 status
= self
.get_mds_map()
699 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
700 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
701 result
.append(mds_status
['rank'])
705 def get_rank_names(self
):
707 Return MDS daemon names of those daemons holding a rank,
708 sorted by rank. This includes e.g. up:replay/reconnect
709 as well as active, but does not include standby or
712 status
= self
.get_mds_map()
714 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
715 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
716 result
.append(mds_status
['name'])
720 def wait_for_daemons(self
, timeout
=None):
722 Wait until all daemons are healthy
727 timeout
= DAEMON_WAIT_TIMEOUT
731 if self
.are_daemons_healthy():
737 if elapsed
> timeout
:
738 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
740 def get_lone_mds_id(self
):
742 Get a single MDS ID: the only one if there is only one
743 configured, else the only one currently holding a rank,
746 if len(self
.mds_ids
) != 1:
747 alive
= self
.get_rank_names()
751 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
753 return self
.mds_ids
[0]
756 log
.info("Creating new filesystem")
757 self
.delete_all_filesystems()
761 def put_metadata_object_raw(self
, object_id
, infile
):
763 Save an object to the metadata pool
765 temp_bin_path
= infile
766 self
.client_remote
.run(args
=[
767 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
770 def get_metadata_object_raw(self
, object_id
):
772 Retrieve an object from the metadata pool and store it in a file.
774 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
776 self
.client_remote
.run(args
=[
777 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
782 def get_metadata_object(self
, object_type
, object_id
):
784 Retrieve an object from the metadata pool, pass it through
785 ceph-dencoder to dump it to JSON, and return the decoded object.
787 temp_bin_path
= '/tmp/out.bin'
789 self
.client_remote
.run(args
=[
790 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
794 self
.client_remote
.run(args
=[
795 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
797 dump_json
= stdout
.getvalue().strip()
799 dump
= json
.loads(dump_json
)
800 except (TypeError, ValueError):
801 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
806 def get_journal_version(self
):
808 Read the JournalPointer and Journal::Header objects to learn the version of
811 journal_pointer_object
= '400.00000000'
812 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
813 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
815 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
816 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
818 version
= journal_header_dump
['journal_header']['stream_format']
819 log
.info("Read journal version {0}".format(version
))
823 def mds_asok(self
, command
, mds_id
=None):
825 mds_id
= self
.get_lone_mds_id()
827 return self
.json_asok(command
, 'mds', mds_id
)
829 def read_cache(self
, path
, depth
=None):
830 cmd
= ["dump", "tree", path
]
831 if depth
is not None:
832 cmd
.append(depth
.__str
__())
833 result
= self
.mds_asok(cmd
)
835 raise RuntimeError("Path not found in cache: {0}".format(path
))
839 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
841 Block until the MDS reaches a particular state, or a failure condition
844 When there are multiple MDSs, succeed when exaclty one MDS is in the
845 goal state, or fail when any MDS is in the reject state.
847 :param goal_state: Return once the MDS is in this state
848 :param reject: Fail if the MDS enters this state before the goal state
849 :param timeout: Fail if this many seconds pass before reaching goal
850 :return: number of seconds waited, rounded down to integer
853 started_at
= time
.time()
855 status
= self
.status()
857 mds_info
= status
.get_rank(self
.id, rank
)
858 current_state
= mds_info
['state'] if mds_info
else None
859 log
.info("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
860 elif mds_id
is not None:
861 # mds_info is None if no daemon with this ID exists in the map
862 mds_info
= status
.get_mds(mds_id
)
863 current_state
= mds_info
['state'] if mds_info
else None
864 log
.info("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
866 # In general, look for a single MDS
867 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
868 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
869 current_state
= goal_state
870 elif reject
in states
:
871 current_state
= reject
874 log
.info("mapped states {0} to {1}".format(states
, current_state
))
876 elapsed
= time
.time() - started_at
877 if current_state
== goal_state
:
878 log
.info("reached state '{0}' in {1}s".format(current_state
, elapsed
))
880 elif reject
is not None and current_state
== reject
:
881 raise RuntimeError("MDS in reject state {0}".format(current_state
))
882 elif timeout
is not None and elapsed
> timeout
:
883 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
885 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
886 elapsed
, goal_state
, current_state
891 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
892 mds_id
= self
.mds_ids
[0]
893 remote
= self
.mds_daemons
[mds_id
].remote
895 pool
= self
.get_data_pool_name()
897 obj_name
= "{0:x}.00000000".format(ino_no
)
900 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
906 except CommandFailedError
as e
:
907 log
.error(e
.__str
__())
908 raise ObjectNotFound(obj_name
)
910 data
= proc
.stdout
.getvalue()
913 args
=[os
.path
.join(self
._prefix
, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
918 return json
.loads(p
.stdout
.getvalue().strip())
920 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
922 Write to an xattr of the 0th data object of an inode. Will
923 succeed whether the object and/or xattr already exist or not.
925 :param ino_no: integer inode number
926 :param xattr_name: string name of the xattr
927 :param data: byte array data to write to the xattr
928 :param pool: name of data pool or None to use primary data pool
931 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
933 pool
= self
.get_data_pool_name()
935 obj_name
= "{0:x}.00000000".format(ino_no
)
937 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
938 obj_name
, xattr_name
, data
944 def read_backtrace(self
, ino_no
, pool
=None):
946 Read the backtrace from the data pool, return a dict in the format
947 given by inode_backtrace_t::dump, which is something like:
951 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
952 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
954 { "ino": 1099511627778,
962 :param pool: name of pool to read backtrace from. If omitted, FS must have only
963 one data pool and that will be used.
965 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
967 def read_layout(self
, ino_no
, pool
=None):
969 Read 'layout' xattr of an inode and parse the result, returning a dict like:
972 "stripe_unit": 4194304,
974 "object_size": 4194304,
979 :param pool: name of pool to read backtrace from. If omitted, FS must have only
980 one data pool and that will be used.
982 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
984 def _enumerate_data_objects(self
, ino
, size
):
986 Get the list of expected data objects for a range, and the list of objects
989 :return a tuple of two lists of strings (expected, actual)
991 stripe_size
= 1024 * 1024 * 4
993 size
= max(stripe_size
, size
)
996 "{0:x}.{1:08x}".format(ino
, n
)
997 for n
in range(0, ((size
- 1) / stripe_size
) + 1)
1000 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
1002 return want_objects
, exist_objects
1004 def data_objects_present(self
, ino
, size
):
1006 Check that *all* the expected data objects for an inode are present in the data pool
1009 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1010 missing
= set(want_objects
) - set(exist_objects
)
1013 log
.info("Objects missing (ino {0}, size {1}): {2}".format(
1018 log
.info("All objects for ino {0} size {1} found".format(ino
, size
))
1021 def data_objects_absent(self
, ino
, size
):
1022 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1023 present
= set(want_objects
) & set(exist_objects
)
1026 log
.info("Objects not absent (ino {0}, size {1}): {2}".format(
1031 log
.info("All objects for ino {0} size {1} are absent".format(ino
, size
))
1034 def dirfrag_exists(self
, ino
, frag
):
1036 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1037 except CommandFailedError
as e
:
1042 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None):
1044 Call into the `rados` CLI from an MDS
1048 pool
= self
.get_metadata_pool_name()
1050 # Doesn't matter which MDS we use to run rados commands, they all
1051 # have access to the pools
1052 mds_id
= self
.mds_ids
[0]
1053 remote
= self
.mds_daemons
[mds_id
].remote
1055 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1056 # using the `rados` CLI
1057 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1058 (["--namespace", namespace
] if namespace
else []) +
1064 return p
.stdout
.getvalue().strip()
1066 def list_dirfrag(self
, dir_ino
):
1068 Read the named object and return the list of omap keys
1070 :return a list of 0 or more strings
1073 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1076 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1077 except CommandFailedError
as e
:
1078 log
.error(e
.__str
__())
1079 raise ObjectNotFound(dirfrag_obj_name
)
1081 return key_list_str
.split("\n") if key_list_str
else []
1083 def erase_metadata_objects(self
, prefix
):
1085 For all objects in the metadata pool matching the prefix,
1088 This O(N) with the number of objects in the pool, so only suitable
1089 for use on toy test filesystems.
1091 all_objects
= self
.rados(["ls"]).split("\n")
1092 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1093 for o
in matching_objects
:
1094 self
.rados(["rm", o
])
1096 def erase_mds_objects(self
, rank
):
1098 Erase all the per-MDS objects for a particular rank. This includes
1099 inotable, sessiontable, journal
1102 def obj_prefix(multiplier
):
1104 MDS object naming conventions like rank 1's
1105 journal is at 201.***
1107 return "%x." % (multiplier
* 0x100 + rank
)
1109 # MDS_INO_LOG_OFFSET
1110 self
.erase_metadata_objects(obj_prefix(2))
1111 # MDS_INO_LOG_BACKUP_OFFSET
1112 self
.erase_metadata_objects(obj_prefix(3))
1113 # MDS_INO_LOG_POINTER_OFFSET
1114 self
.erase_metadata_objects(obj_prefix(4))
1115 # MDSTables & SessionMap
1116 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1121 Override this to set a different
1125 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1126 # Tests frequently have [client] configuration that jacks up
1127 # the objecter log level (unlikely to be interesting here)
1128 # and does not set the mds log level (very interesting here)
1130 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1132 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1134 if rank
is not None:
1135 base_args
.extend(["--rank", "%d" % rank
])
1137 t1
= datetime
.datetime
.now()
1138 r
= self
.tool_remote
.run(
1139 args
=base_args
+ args
,
1140 stdout
=StringIO()).stdout
.getvalue().strip()
1141 duration
= datetime
.datetime
.now() - t1
1142 log
.info("Ran {0} in time {1}, result:\n{2}".format(
1143 base_args
+ args
, duration
, r
1148 def tool_remote(self
):
1150 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1151 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1152 so that tests can use this remote to go get locally written output files from the tools.
1154 mds_id
= self
.mds_ids
[0]
1155 return self
.mds_daemons
[mds_id
].remote
1157 def journal_tool(self
, args
, rank
=None, quiet
=False):
1159 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1161 return self
._run
_tool
("cephfs-journal-tool", args
, rank
, quiet
)
1163 def table_tool(self
, args
, quiet
=False):
1165 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1167 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1169 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1171 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1173 :param worker_count: if greater than 1, multiple workers will be run
1174 in parallel and the return value will be None
1179 for n
in range(0, worker_count
):
1180 if worker_count
> 1:
1181 # data-scan args first token is a command, followed by args to it.
1182 # insert worker arguments after the command.
1184 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1188 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1189 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1194 if worker_count
== 1:
1195 return workers
[0].value