]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
01e1ca588c1e8ee3076ad5f9b0fd10efa44a63c5
2 from StringIO
import StringIO
5 from gevent
import Greenlet
12 from teuthology
.exceptions
import CommandFailedError
13 from teuthology
import misc
14 from teuthology
.nuke
import clear_firewall
15 from teuthology
.parallel
import parallel
16 from tasks
.ceph_manager
import write_conf
17 from tasks
import ceph_manager
20 log
= logging
.getLogger(__name__
)
23 DAEMON_WAIT_TIMEOUT
= 120
27 class ObjectNotFound(Exception):
28 def __init__(self
, object_name
):
29 self
._object
_name
= object_name
32 return "Object not found: '{0}'".format(self
._object
_name
)
34 class FSStatus(object):
36 Operations on a snapshot of the FSMap.
38 def __init__(self
, mon_manager
):
39 self
.mon
= mon_manager
40 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
43 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
45 # Expose the fsmap for manual inspection.
46 def __getitem__(self
, key
):
48 Get a field from the fsmap.
52 def get_filesystems(self
):
54 Iterator for all filesystems.
56 for fs
in self
.map['filesystems']:
61 Iterator for all the mds_info components in the FSMap.
63 for info
in self
.get_standbys():
65 for fs
in self
.map['filesystems']:
66 for info
in fs
['mdsmap']['info'].values():
69 def get_standbys(self
):
71 Iterator for all standbys.
73 for info
in self
.map['standbys']:
76 def get_fsmap(self
, fscid
):
78 Get the fsmap for the given FSCID.
80 for fs
in self
.map['filesystems']:
81 if fscid
is None or fs
['id'] == fscid
:
83 raise RuntimeError("FSCID {0} not in map".format(fscid
))
85 def get_fsmap_byname(self
, name
):
87 Get the fsmap for the given file system name.
89 for fs
in self
.map['filesystems']:
90 if name
is None or fs
['mdsmap']['fs_name'] == name
:
92 raise RuntimeError("FS {0} not in map".format(name
))
94 def get_replays(self
, fscid
):
96 Get the standby:replay MDS for the given FSCID.
98 fs
= self
.get_fsmap(fscid
)
99 for info
in fs
['mdsmap']['info'].values():
100 if info
['state'] == 'up:standby-replay':
103 def get_ranks(self
, fscid
):
105 Get the ranks for the given FSCID.
107 fs
= self
.get_fsmap(fscid
)
108 for info
in fs
['mdsmap']['info'].values():
109 if info
['rank'] >= 0:
112 def get_rank(self
, fscid
, rank
):
114 Get the rank for the given FSCID.
116 for info
in self
.get_ranks(fscid
):
117 if info
['rank'] == rank
:
119 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
121 def get_mds(self
, name
):
123 Get the info for the given MDS name.
125 for info
in self
.get_all():
126 if info
['name'] == name
:
130 def get_mds_addr(self
, name
):
132 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134 info
= self
.get_mds(name
)
138 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
139 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
141 class CephCluster(object):
143 def admin_remote(self
):
144 first_mon
= misc
.get_first_mon(self
._ctx
, None)
145 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.iterkeys()
148 def __init__(self
, ctx
):
150 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
152 def get_config(self
, key
, service_type
=None):
154 Get config from mon by default, or a specific service if caller asks for it
156 if service_type
is None:
159 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
160 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
162 def set_ceph_conf(self
, subsys
, key
, value
):
163 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
164 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
165 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
166 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
167 # used a different config path this won't work.
169 def clear_ceph_conf(self
, subsys
, key
):
170 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
171 write_conf(self
._ctx
)
173 def json_asok(self
, command
, service_type
, service_id
):
174 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
)
175 response_data
= proc
.stdout
.getvalue()
176 log
.info("_json_asok output: {0}".format(response_data
))
177 if response_data
.strip():
178 return json
.loads(response_data
)
183 class MDSCluster(CephCluster
):
185 Collective operations on all the MDS daemons in the Ceph cluster. These
186 daemons may be in use by various Filesystems.
188 For the benefit of pre-multi-filesystem tests, this class is also
189 a parent of Filesystem. The correct way to use MDSCluster going forward is
190 as a separate instance outside of your (multiple) Filesystem instances.
192 def __init__(self
, ctx
):
193 super(MDSCluster
, self
).__init
__(ctx
)
195 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
197 if len(self
.mds_ids
) == 0:
198 raise RuntimeError("This task requires at least one MDS")
200 if hasattr(self
._ctx
, "daemons"):
201 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
202 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
204 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
206 Call a callback for a single named MDS, or for all.
208 Note that the parallelism here isn't for performance, it's to avoid being overly kind
209 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
210 avoid being overly kind by executing them in a particular order. However, some actions
211 don't cope with being done in parallel, so it's optional (`in_parallel`)
213 :param mds_id: MDS daemon name, or None
214 :param cb: Callback taking single argument of MDS daemon name
215 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
219 with
parallel() as p
:
220 for mds_id
in self
.mds_ids
:
223 for mds_id
in self
.mds_ids
:
228 def mds_stop(self
, mds_id
=None):
230 Stop the MDS daemon process(se). If it held a rank, that rank
231 will eventually go laggy.
233 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
235 def mds_fail(self
, mds_id
=None):
237 Inform MDSMonitor of the death of the daemon process(es). If it held
238 a rank, that rank will be relinquished.
240 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
242 def mds_restart(self
, mds_id
=None):
243 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
245 def mds_fail_restart(self
, mds_id
=None):
247 Variation on restart that includes marking MDSs as failed, so that doing this
248 operation followed by waiting for healthy daemon states guarantees that they
249 have gone down and come up, rather than potentially seeing the healthy states
250 that existed before the restart.
252 def _fail_restart(id_
):
253 self
.mds_daemons
[id_
].stop()
254 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
255 self
.mds_daemons
[id_
].restart()
257 self
._one
_or
_all
(mds_id
, _fail_restart
)
259 def newfs(self
, name
):
260 return Filesystem(self
._ctx
, create
=name
)
263 return FSStatus(self
.mon_manager
)
265 def delete_all_filesystems(self
):
267 Remove all filesystems that exist, and any pools in use by them.
269 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
272 pool_id_name
[pool
['pool']] = pool
['pool_name']
274 # mark cluster down for each fs to prevent churn during deletion
275 status
= self
.status()
276 for fs
in status
.get_filesystems():
277 self
.mon_manager
.raw_cluster_cmd("fs", "set", fs
['mdsmap']['fs_name'], "cluster_down", "true")
279 # get a new copy as actives may have since changed
280 status
= self
.status()
281 for fs
in status
.get_filesystems():
282 mdsmap
= fs
['mdsmap']
283 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
285 for gid
in mdsmap
['up'].values():
286 self
.mon_manager
.raw_cluster_cmd('mds', 'fail', gid
.__str
__())
288 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
289 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
290 metadata_pool
, metadata_pool
,
291 '--yes-i-really-really-mean-it')
292 for data_pool
in mdsmap
['data_pools']:
293 data_pool
= pool_id_name
[data_pool
]
295 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
296 data_pool
, data_pool
,
297 '--yes-i-really-really-mean-it')
298 except CommandFailedError
as e
:
299 if e
.exitstatus
== 16: # EBUSY, this data pool is used
300 pass # by two metadata pools, let the 2nd
301 else: # pass delete it
304 def get_standby_daemons(self
):
305 return set([s
['name'] for s
in self
.status().get_standbys()])
307 def get_mds_hostnames(self
):
309 for mds_id
in self
.mds_ids
:
310 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
311 result
.add(mds_remote
.hostname
)
315 def set_clients_block(self
, blocked
, mds_id
=None):
317 Block (using iptables) client communications to this MDS. Be careful: if
318 other services are running on this MDS, or other MDSs try to talk to this
319 MDS, their communications may also be blocked as collatoral damage.
321 :param mds_id: Optional ID of MDS to block, default to all
324 da_flag
= "-A" if blocked
else "-D"
326 def set_block(_mds_id
):
327 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
328 status
= self
.status()
330 addr
= status
.get_mds_addr(_mds_id
)
331 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
334 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
335 "comment", "--comment", "teuthology"])
337 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
338 "comment", "--comment", "teuthology"])
340 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
342 def clear_firewall(self
):
343 clear_firewall(self
._ctx
)
345 def get_mds_info(self
, mds_id
):
346 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
349 flags
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
350 return 'full' in flags
352 def is_pool_full(self
, pool_name
):
353 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
355 if pool
['pool_name'] == pool_name
:
356 return 'full' in pool
['flags_names'].split(",")
358 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
360 class Filesystem(MDSCluster
):
362 This object is for driving a CephFS filesystem. The MDS daemons driven by
363 MDSCluster may be shared with other Filesystems.
365 def __init__(self
, ctx
, fscid
=None, create
=None):
366 super(Filesystem
, self
).__init
__(ctx
)
370 self
.metadata_pool_name
= None
371 self
.data_pools
= None
373 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
374 self
.client_id
= client_list
[0]
375 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
377 if create
is not None:
378 if fscid
is not None:
379 raise RuntimeError("cannot specify fscid when creating fs")
384 if not self
.legacy_configured():
386 elif fscid
is not None:
388 self
.getinfo(refresh
= True)
390 # Stash a reference to the first created filesystem on ctx, so
391 # that if someone drops to the interactive shell they can easily
393 if not hasattr(self
._ctx
, "filesystem"):
394 self
._ctx
.filesystem
= self
396 def getinfo(self
, refresh
= False):
397 status
= self
.status()
398 if self
.id is not None:
399 fsmap
= status
.get_fsmap(self
.id)
400 elif self
.name
is not None:
401 fsmap
= status
.get_fsmap_byname(self
.name
)
403 fss
= [fs
for fs
in status
.get_filesystems()]
407 raise RuntimeError("no file system available")
409 raise RuntimeError("more than one file system available")
410 self
.id = fsmap
['id']
411 self
.name
= fsmap
['mdsmap']['fs_name']
412 self
.get_pool_names(status
= status
, refresh
= refresh
)
415 def deactivate(self
, rank
):
417 raise RuntimeError("invalid rank")
419 raise RuntimeError("cannot deactivate rank 0")
420 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
422 def set_max_mds(self
, max_mds
):
423 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "max_mds", "%d" % max_mds
)
425 def set_allow_dirfrags(self
, yes
):
426 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "allow_dirfrags", str(yes
).lower(), '--yes-i-really-mean-it')
428 def set_allow_multimds(self
, yes
):
429 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, "allow_multimds", str(yes
).lower(), '--yes-i-really-mean-it')
431 def get_pgs_per_fs_pool(self
):
433 Calculate how many PGs to use when creating a pool, in order to avoid raising any
434 health warnings about mon_pg_warn_min_per_osd
436 :return: an integer number of PGs
438 pg_warn_min_per_osd
= int(self
.get_config('mon_pg_warn_min_per_osd'))
439 osd_count
= len(list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'osd')))
440 return pg_warn_min_per_osd
* osd_count
443 if self
.name
is None:
445 if self
.metadata_pool_name
is None:
446 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
447 data_pool_name
= "{0}_data".format(self
.name
)
449 log
.info("Creating filesystem '{0}'".format(self
.name
))
451 pgs_per_fs_pool
= self
.get_pgs_per_fs_pool()
453 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
454 self
.metadata_pool_name
, pgs_per_fs_pool
.__str
__())
455 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
456 data_pool_name
, pgs_per_fs_pool
.__str
__())
457 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
458 self
.name
, self
.metadata_pool_name
, data_pool_name
)
459 # Turn off spurious standby count warnings from modifying max_mds in tests.
461 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
462 except CommandFailedError
as e
:
463 if e
.exitstatus
== 22:
464 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
469 self
.getinfo(refresh
= True)
472 if getattr(self
._ctx
, "filesystem", None) == self
:
473 delattr(self
._ctx
, "filesystem")
477 Whether a filesystem exists in the mon's filesystem list
479 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
480 return self
.name
in [fs
['name'] for fs
in fs_list
]
482 def legacy_configured(self
):
484 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
485 the case, the caller should avoid using Filesystem.create
488 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
489 pools
= json
.loads(out_text
)
490 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
491 if metadata_pool_exists
:
492 self
.metadata_pool_name
= 'metadata'
493 except CommandFailedError
as e
:
494 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
495 # structured output (--format) from the CLI.
496 if e
.exitstatus
== 22:
497 metadata_pool_exists
= True
501 return metadata_pool_exists
504 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
506 def get_mds_map(self
):
507 return self
.status().get_fsmap(self
.id)['mdsmap']
509 def add_data_pool(self
, name
):
510 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.get_pgs_per_fs_pool().__str
__())
511 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
512 self
.get_pool_names(refresh
= True)
513 for poolid
, fs_name
in self
.data_pools
.items():
516 raise RuntimeError("could not get just created pool '{0}'".format(name
))
518 def get_pool_names(self
, refresh
= False, status
= None):
519 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
521 status
= self
.status()
522 fsmap
= status
.get_fsmap(self
.id)
524 osd_map
= self
.mon_manager
.get_osd_dump_json()
526 for p
in osd_map
['pools']:
527 id_to_name
[p
['pool']] = p
['pool_name']
529 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
531 for data_pool
in fsmap
['mdsmap']['data_pools']:
532 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
534 def get_data_pool_name(self
, refresh
= False):
535 if refresh
or self
.data_pools
is None:
536 self
.get_pool_names(refresh
= True)
537 assert(len(self
.data_pools
) == 1)
538 return self
.data_pools
.values()[0]
540 def get_data_pool_id(self
, refresh
= False):
542 Don't call this if you have multiple data pools
545 if refresh
or self
.data_pools
is None:
546 self
.get_pool_names(refresh
= True)
547 assert(len(self
.data_pools
) == 1)
548 return self
.data_pools
.keys()[0]
550 def get_data_pool_names(self
, refresh
= False):
551 if refresh
or self
.data_pools
is None:
552 self
.get_pool_names(refresh
= True)
553 return self
.data_pools
.values()
555 def get_metadata_pool_name(self
):
556 return self
.metadata_pool_name
558 def get_namespace_id(self
):
561 def get_pool_df(self
, pool_name
):
564 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
566 for pool_df
in self
._df
()['pools']:
567 if pool_df
['name'] == pool_name
:
568 return pool_df
['stats']
570 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
573 return self
._df
()['stats']['total_used_bytes']
575 def are_daemons_healthy(self
):
577 Return true if all daemons are in one of active, standby, standby-replay, and
578 at least max_mds daemons are in 'active'.
580 Unlike most of Filesystem, this function is tolerant of new-style `fs`
581 commands being missing, because we are part of the ceph installation
582 process during upgrade suites, so must fall back to old style commands
583 when we get an EINVAL on a new style command.
590 mds_map
= self
.get_mds_map()
591 except CommandFailedError
as cfe
:
592 # Old version, fall back to non-multi-fs commands
593 if cfe
.exitstatus
== errno
.EINVAL
:
594 mds_map
= json
.loads(
595 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
599 log
.info("are_daemons_healthy: mds map: {0}".format(mds_map
))
601 for mds_id
, mds_status
in mds_map
['info'].items():
602 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
603 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
605 elif mds_status
['state'] == 'up:active':
608 log
.info("are_daemons_healthy: {0}/{1}".format(
609 active_count
, mds_map
['max_mds']
612 if active_count
>= mds_map
['max_mds']:
613 # The MDSMap says these guys are active, but let's check they really are
614 for mds_id
, mds_status
in mds_map
['info'].items():
615 if mds_status
['state'] == 'up:active':
617 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
618 except CommandFailedError
as cfe
:
619 if cfe
.exitstatus
== errno
.EINVAL
:
620 # Old version, can't do this check
623 # MDS not even running
626 if daemon_status
['state'] != 'up:active':
627 # MDS hasn't taken the latest map yet
634 def get_daemon_names(self
, state
=None):
636 Return MDS daemon names of those daemons in the given state
640 status
= self
.get_mds_map()
642 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
643 if mds_status
['state'] == state
or state
is None:
644 result
.append(mds_status
['name'])
648 def get_active_names(self
):
650 Return MDS daemon names of those daemons holding ranks
653 :return: list of strings like ['a', 'b'], sorted by rank
655 return self
.get_daemon_names("up:active")
657 def get_all_mds_rank(self
):
658 status
= self
.get_mds_map()
660 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
661 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
662 result
.append(mds_status
['rank'])
666 def get_rank_names(self
):
668 Return MDS daemon names of those daemons holding a rank,
669 sorted by rank. This includes e.g. up:replay/reconnect
670 as well as active, but does not include standby or
673 status
= self
.get_mds_map()
675 for mds_status
in sorted(status
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
676 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
677 result
.append(mds_status
['name'])
681 def wait_for_daemons(self
, timeout
=None):
683 Wait until all daemons are healthy
688 timeout
= DAEMON_WAIT_TIMEOUT
692 if self
.are_daemons_healthy():
698 if elapsed
> timeout
:
699 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
701 def get_lone_mds_id(self
):
703 Get a single MDS ID: the only one if there is only one
704 configured, else the only one currently holding a rank,
707 if len(self
.mds_ids
) != 1:
708 alive
= self
.get_rank_names()
712 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
714 return self
.mds_ids
[0]
717 log
.info("Creating new filesystem")
718 self
.delete_all_filesystems()
722 def put_metadata_object_raw(self
, object_id
, infile
):
724 Save an object to the metadata pool
726 temp_bin_path
= infile
727 self
.client_remote
.run(args
=[
728 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
731 def get_metadata_object_raw(self
, object_id
):
733 Retrieve an object from the metadata pool and store it in a file.
735 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
737 self
.client_remote
.run(args
=[
738 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
743 def get_metadata_object(self
, object_type
, object_id
):
745 Retrieve an object from the metadata pool, pass it through
746 ceph-dencoder to dump it to JSON, and return the decoded object.
748 temp_bin_path
= '/tmp/out.bin'
750 self
.client_remote
.run(args
=[
751 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
755 self
.client_remote
.run(args
=[
756 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
758 dump_json
= stdout
.getvalue().strip()
760 dump
= json
.loads(dump_json
)
761 except (TypeError, ValueError):
762 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
767 def get_journal_version(self
):
769 Read the JournalPointer and Journal::Header objects to learn the version of
772 journal_pointer_object
= '400.00000000'
773 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
774 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
776 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
777 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
779 version
= journal_header_dump
['journal_header']['stream_format']
780 log
.info("Read journal version {0}".format(version
))
784 def mds_asok(self
, command
, mds_id
=None):
786 mds_id
= self
.get_lone_mds_id()
788 return self
.json_asok(command
, 'mds', mds_id
)
790 def read_cache(self
, path
, depth
=None):
791 cmd
= ["dump", "tree", path
]
792 if depth
is not None:
793 cmd
.append(depth
.__str
__())
794 result
= self
.mds_asok(cmd
)
796 raise RuntimeError("Path not found in cache: {0}".format(path
))
800 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
802 Block until the MDS reaches a particular state, or a failure condition
805 When there are multiple MDSs, succeed when exaclty one MDS is in the
806 goal state, or fail when any MDS is in the reject state.
808 :param goal_state: Return once the MDS is in this state
809 :param reject: Fail if the MDS enters this state before the goal state
810 :param timeout: Fail if this many seconds pass before reaching goal
811 :return: number of seconds waited, rounded down to integer
814 started_at
= time
.time()
816 status
= self
.status()
818 mds_info
= status
.get_rank(self
.id, rank
)
819 current_state
= mds_info
['state'] if mds_info
else None
820 log
.info("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
821 elif mds_id
is not None:
822 # mds_info is None if no daemon with this ID exists in the map
823 mds_info
= status
.get_mds(mds_id
)
824 current_state
= mds_info
['state'] if mds_info
else None
825 log
.info("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
827 # In general, look for a single MDS
828 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
829 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
830 current_state
= goal_state
831 elif reject
in states
:
832 current_state
= reject
835 log
.info("mapped states {0} to {1}".format(states
, current_state
))
837 elapsed
= time
.time() - started_at
838 if current_state
== goal_state
:
839 log
.info("reached state '{0}' in {1}s".format(current_state
, elapsed
))
841 elif reject
is not None and current_state
== reject
:
842 raise RuntimeError("MDS in reject state {0}".format(current_state
))
843 elif timeout
is not None and elapsed
> timeout
:
844 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
846 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
847 elapsed
, goal_state
, current_state
852 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
853 mds_id
= self
.mds_ids
[0]
854 remote
= self
.mds_daemons
[mds_id
].remote
856 pool
= self
.get_data_pool_name()
858 obj_name
= "{0:x}.00000000".format(ino_no
)
861 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
867 except CommandFailedError
as e
:
868 log
.error(e
.__str
__())
869 raise ObjectNotFound(obj_name
)
871 data
= proc
.stdout
.getvalue()
874 args
=[os
.path
.join(self
._prefix
, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
879 return json
.loads(p
.stdout
.getvalue().strip())
881 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
883 Write to an xattr of the 0th data object of an inode. Will
884 succeed whether the object and/or xattr already exist or not.
886 :param ino_no: integer inode number
887 :param xattr_name: string name of the xattr
888 :param data: byte array data to write to the xattr
889 :param pool: name of data pool or None to use primary data pool
892 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
894 pool
= self
.get_data_pool_name()
896 obj_name
= "{0:x}.00000000".format(ino_no
)
898 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
899 obj_name
, xattr_name
, data
905 def read_backtrace(self
, ino_no
, pool
=None):
907 Read the backtrace from the data pool, return a dict in the format
908 given by inode_backtrace_t::dump, which is something like:
912 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
913 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
915 { "ino": 1099511627778,
923 :param pool: name of pool to read backtrace from. If omitted, FS must have only
924 one data pool and that will be used.
926 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
928 def read_layout(self
, ino_no
, pool
=None):
930 Read 'layout' xattr of an inode and parse the result, returning a dict like:
933 "stripe_unit": 4194304,
935 "object_size": 4194304,
940 :param pool: name of pool to read backtrace from. If omitted, FS must have only
941 one data pool and that will be used.
943 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
945 def _enumerate_data_objects(self
, ino
, size
):
947 Get the list of expected data objects for a range, and the list of objects
950 :return a tuple of two lists of strings (expected, actual)
952 stripe_size
= 1024 * 1024 * 4
954 size
= max(stripe_size
, size
)
957 "{0:x}.{1:08x}".format(ino
, n
)
958 for n
in range(0, ((size
- 1) / stripe_size
) + 1)
961 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
963 return want_objects
, exist_objects
965 def data_objects_present(self
, ino
, size
):
967 Check that *all* the expected data objects for an inode are present in the data pool
970 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
971 missing
= set(want_objects
) - set(exist_objects
)
974 log
.info("Objects missing (ino {0}, size {1}): {2}".format(
979 log
.info("All objects for ino {0} size {1} found".format(ino
, size
))
982 def data_objects_absent(self
, ino
, size
):
983 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
984 present
= set(want_objects
) & set(exist_objects
)
987 log
.info("Objects not absent (ino {0}, size {1}): {2}".format(
992 log
.info("All objects for ino {0} size {1} are absent".format(ino
, size
))
995 def dirfrag_exists(self
, ino
, frag
):
997 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
998 except CommandFailedError
as e
:
1003 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None):
1005 Call into the `rados` CLI from an MDS
1009 pool
= self
.get_metadata_pool_name()
1011 # Doesn't matter which MDS we use to run rados commands, they all
1012 # have access to the pools
1013 mds_id
= self
.mds_ids
[0]
1014 remote
= self
.mds_daemons
[mds_id
].remote
1016 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1017 # using the `rados` CLI
1018 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1019 (["--namespace", namespace
] if namespace
else []) +
1025 return p
.stdout
.getvalue().strip()
1027 def list_dirfrag(self
, dir_ino
):
1029 Read the named object and return the list of omap keys
1031 :return a list of 0 or more strings
1034 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1037 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1038 except CommandFailedError
as e
:
1039 log
.error(e
.__str
__())
1040 raise ObjectNotFound(dirfrag_obj_name
)
1042 return key_list_str
.split("\n") if key_list_str
else []
1044 def erase_metadata_objects(self
, prefix
):
1046 For all objects in the metadata pool matching the prefix,
1049 This O(N) with the number of objects in the pool, so only suitable
1050 for use on toy test filesystems.
1052 all_objects
= self
.rados(["ls"]).split("\n")
1053 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1054 for o
in matching_objects
:
1055 self
.rados(["rm", o
])
1057 def erase_mds_objects(self
, rank
):
1059 Erase all the per-MDS objects for a particular rank. This includes
1060 inotable, sessiontable, journal
1063 def obj_prefix(multiplier
):
1065 MDS object naming conventions like rank 1's
1066 journal is at 201.***
1068 return "%x." % (multiplier
* 0x100 + rank
)
1070 # MDS_INO_LOG_OFFSET
1071 self
.erase_metadata_objects(obj_prefix(2))
1072 # MDS_INO_LOG_BACKUP_OFFSET
1073 self
.erase_metadata_objects(obj_prefix(3))
1074 # MDS_INO_LOG_POINTER_OFFSET
1075 self
.erase_metadata_objects(obj_prefix(4))
1076 # MDSTables & SessionMap
1077 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1082 Override this to set a different
1086 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1087 # Tests frequently have [client] configuration that jacks up
1088 # the objecter log level (unlikely to be interesting here)
1089 # and does not set the mds log level (very interesting here)
1091 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1093 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1095 if rank
is not None:
1096 base_args
.extend(["--rank", "%d" % rank
])
1098 t1
= datetime
.datetime
.now()
1099 r
= self
.tool_remote
.run(
1100 args
=base_args
+ args
,
1101 stdout
=StringIO()).stdout
.getvalue().strip()
1102 duration
= datetime
.datetime
.now() - t1
1103 log
.info("Ran {0} in time {1}, result:\n{2}".format(
1104 base_args
+ args
, duration
, r
1109 def tool_remote(self
):
1111 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1112 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1113 so that tests can use this remote to go get locally written output files from the tools.
1115 mds_id
= self
.mds_ids
[0]
1116 return self
.mds_daemons
[mds_id
].remote
1118 def journal_tool(self
, args
, rank
=None, quiet
=False):
1120 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1122 return self
._run
_tool
("cephfs-journal-tool", args
, rank
, quiet
)
1124 def table_tool(self
, args
, quiet
=False):
1126 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1128 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1130 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1132 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1134 :param worker_count: if greater than 1, multiple workers will be run
1135 in parallel and the return value will be None
1140 for n
in range(0, worker_count
):
1141 if worker_count
> 1:
1142 # data-scan args first token is a command, followed by args to it.
1143 # insert worker arguments after the command.
1145 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1149 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1150 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1155 if worker_count
== 1:
1156 return workers
[0].value