]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
4 from gevent
import Greenlet
13 from io
import BytesIO
14 from six
import StringIO
16 from teuthology
.exceptions
import CommandFailedError
17 from teuthology
import misc
18 from teuthology
.nuke
import clear_firewall
19 from teuthology
.parallel
import parallel
20 from tasks
.ceph_manager
import write_conf
21 from tasks
import ceph_manager
24 log
= logging
.getLogger(__name__
)
27 DAEMON_WAIT_TIMEOUT
= 120
30 class FileLayout(object):
31 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
33 self
.pool_namespace
= pool_namespace
34 self
.stripe_unit
= stripe_unit
35 self
.stripe_count
= stripe_count
36 self
.object_size
= object_size
39 def load_from_ceph(layout_str
):
44 if self
.pool
is not None:
45 yield ("pool", self
.pool
)
46 if self
.pool_namespace
:
47 yield ("pool_namespace", self
.pool_namespace
)
48 if self
.stripe_unit
is not None:
49 yield ("stripe_unit", self
.stripe_unit
)
50 if self
.stripe_count
is not None:
51 yield ("stripe_count", self
.stripe_count
)
52 if self
.object_size
is not None:
53 yield ("object_size", self
.stripe_size
)
55 class ObjectNotFound(Exception):
56 def __init__(self
, object_name
):
57 self
._object
_name
= object_name
60 return "Object not found: '{0}'".format(self
._object
_name
)
62 class FSStatus(object):
64 Operations on a snapshot of the FSMap.
66 def __init__(self
, mon_manager
):
67 self
.mon
= mon_manager
68 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
71 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
73 # Expose the fsmap for manual inspection.
74 def __getitem__(self
, key
):
76 Get a field from the fsmap.
80 def get_filesystems(self
):
82 Iterator for all filesystems.
84 for fs
in self
.map['filesystems']:
89 Iterator for all the mds_info components in the FSMap.
91 for info
in self
.map['standbys']:
93 for fs
in self
.map['filesystems']:
94 for info
in fs
['mdsmap']['info'].values():
97 def get_standbys(self
):
99 Iterator for all standbys.
101 for info
in self
.map['standbys']:
104 def get_fsmap(self
, fscid
):
106 Get the fsmap for the given FSCID.
108 for fs
in self
.map['filesystems']:
109 if fscid
is None or fs
['id'] == fscid
:
111 raise RuntimeError("FSCID {0} not in map".format(fscid
))
113 def get_fsmap_byname(self
, name
):
115 Get the fsmap for the given file system name.
117 for fs
in self
.map['filesystems']:
118 if name
is None or fs
['mdsmap']['fs_name'] == name
:
120 raise RuntimeError("FS {0} not in map".format(name
))
122 def get_replays(self
, fscid
):
124 Get the standby:replay MDS for the given FSCID.
126 fs
= self
.get_fsmap(fscid
)
127 for info
in fs
['mdsmap']['info'].values():
128 if info
['state'] == 'up:standby-replay':
131 def get_ranks(self
, fscid
):
133 Get the ranks for the given FSCID.
135 fs
= self
.get_fsmap(fscid
)
136 for info
in fs
['mdsmap']['info'].values():
137 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
140 def get_rank(self
, fscid
, rank
):
142 Get the rank for the given FSCID.
144 for info
in self
.get_ranks(fscid
):
145 if info
['rank'] == rank
:
147 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
149 def get_mds(self
, name
):
151 Get the info for the given MDS name.
153 for info
in self
.get_all():
154 if info
['name'] == name
:
158 def get_mds_addr(self
, name
):
160 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
162 info
= self
.get_mds(name
)
166 log
.warning(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
167 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
169 def get_mds_gid(self
, gid
):
171 Get the info for the given MDS gid.
173 for info
in self
.get_all():
174 if info
['gid'] == gid
:
178 def hadfailover(self
, status
):
180 Compares two statuses for mds failovers.
181 Returns True if there is a failover.
183 for fs
in status
.map['filesystems']:
184 for info
in fs
['mdsmap']['info'].values():
185 oldinfo
= self
.get_mds_gid(info
['gid'])
186 if oldinfo
is None or oldinfo
['incarnation'] != info
['incarnation']:
191 class CephCluster(object):
193 def admin_remote(self
):
194 first_mon
= misc
.get_first_mon(self
._ctx
, None)
195 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.keys()
198 def __init__(self
, ctx
):
200 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
202 def get_config(self
, key
, service_type
=None):
204 Get config from mon by default, or a specific service if caller asks for it
206 if service_type
is None:
209 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
210 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
212 def set_ceph_conf(self
, subsys
, key
, value
):
213 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
214 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
215 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
216 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
217 # used a different config path this won't work.
219 def clear_ceph_conf(self
, subsys
, key
):
220 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
221 write_conf(self
._ctx
)
223 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
226 command
.insert(0, '--format=json')
227 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
228 response_data
= proc
.stdout
.getvalue().strip()
229 if len(response_data
) > 0:
230 j
= json
.loads(response_data
)
231 pretty
= json
.dumps(j
, sort_keys
=True, indent
=2)
232 log
.debug(f
"_json_asok output\n{pretty}")
235 log
.debug("_json_asok output empty")
239 class MDSCluster(CephCluster
):
241 Collective operations on all the MDS daemons in the Ceph cluster. These
242 daemons may be in use by various Filesystems.
244 For the benefit of pre-multi-filesystem tests, this class is also
245 a parent of Filesystem. The correct way to use MDSCluster going forward is
246 as a separate instance outside of your (multiple) Filesystem instances.
248 def __init__(self
, ctx
):
249 super(MDSCluster
, self
).__init
__(ctx
)
251 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
253 if len(self
.mds_ids
) == 0:
254 raise RuntimeError("This task requires at least one MDS")
256 if hasattr(self
._ctx
, "daemons"):
257 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
258 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
260 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
262 Call a callback for a single named MDS, or for all.
264 Note that the parallelism here isn't for performance, it's to avoid being overly kind
265 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
266 avoid being overly kind by executing them in a particular order. However, some actions
267 don't cope with being done in parallel, so it's optional (`in_parallel`)
269 :param mds_id: MDS daemon name, or None
270 :param cb: Callback taking single argument of MDS daemon name
271 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
275 with
parallel() as p
:
276 for mds_id
in self
.mds_ids
:
279 for mds_id
in self
.mds_ids
:
284 def get_config(self
, key
, service_type
=None):
286 get_config specialization of service_type="mds"
288 if service_type
!= "mds":
289 return super(MDSCluster
, self
).get_config(key
, service_type
)
291 # Some tests stop MDS daemons, don't send commands to a dead one:
292 running_daemons
= [i
for i
, mds
in self
.mds_daemons
.items() if mds
.running()]
293 service_id
= random
.sample(running_daemons
, 1)[0]
294 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
296 def mds_stop(self
, mds_id
=None):
298 Stop the MDS daemon process(se). If it held a rank, that rank
299 will eventually go laggy.
301 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
303 def mds_fail(self
, mds_id
=None):
305 Inform MDSMonitor of the death of the daemon process(es). If it held
306 a rank, that rank will be relinquished.
308 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
310 def mds_restart(self
, mds_id
=None):
311 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
313 def mds_fail_restart(self
, mds_id
=None):
315 Variation on restart that includes marking MDSs as failed, so that doing this
316 operation followed by waiting for healthy daemon states guarantees that they
317 have gone down and come up, rather than potentially seeing the healthy states
318 that existed before the restart.
320 def _fail_restart(id_
):
321 self
.mds_daemons
[id_
].stop()
322 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
323 self
.mds_daemons
[id_
].restart()
325 self
._one
_or
_all
(mds_id
, _fail_restart
)
327 def mds_signal(self
, mds_id
, sig
, silent
=False):
331 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
333 def newfs(self
, name
='cephfs', create
=True):
334 return Filesystem(self
._ctx
, name
=name
, create
=create
)
337 return FSStatus(self
.mon_manager
)
339 def delete_all_filesystems(self
):
341 Remove all filesystems that exist, and any pools in use by them.
343 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
346 pool_id_name
[pool
['pool']] = pool
['pool_name']
348 # mark cluster down for each fs to prevent churn during deletion
349 status
= self
.status()
350 for fs
in status
.get_filesystems():
351 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(fs
['mdsmap']['fs_name']))
353 # get a new copy as actives may have since changed
354 status
= self
.status()
355 for fs
in status
.get_filesystems():
356 mdsmap
= fs
['mdsmap']
357 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
359 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
360 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
361 metadata_pool
, metadata_pool
,
362 '--yes-i-really-really-mean-it')
363 for data_pool
in mdsmap
['data_pools']:
364 data_pool
= pool_id_name
[data_pool
]
366 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
367 data_pool
, data_pool
,
368 '--yes-i-really-really-mean-it')
369 except CommandFailedError
as e
:
370 if e
.exitstatus
== 16: # EBUSY, this data pool is used
371 pass # by two metadata pools, let the 2nd
372 else: # pass delete it
375 def get_standby_daemons(self
):
376 return set([s
['name'] for s
in self
.status().get_standbys()])
378 def get_mds_hostnames(self
):
380 for mds_id
in self
.mds_ids
:
381 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
382 result
.add(mds_remote
.hostname
)
386 def set_clients_block(self
, blocked
, mds_id
=None):
388 Block (using iptables) client communications to this MDS. Be careful: if
389 other services are running on this MDS, or other MDSs try to talk to this
390 MDS, their communications may also be blocked as collatoral damage.
392 :param mds_id: Optional ID of MDS to block, default to all
395 da_flag
= "-A" if blocked
else "-D"
397 def set_block(_mds_id
):
398 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
399 status
= self
.status()
401 addr
= status
.get_mds_addr(_mds_id
)
402 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
405 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
406 "comment", "--comment", "teuthology"])
408 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
409 "comment", "--comment", "teuthology"])
411 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
413 def clear_firewall(self
):
414 clear_firewall(self
._ctx
)
416 def get_mds_info(self
, mds_id
):
417 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
419 def is_pool_full(self
, pool_name
):
420 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
422 if pool
['pool_name'] == pool_name
:
423 return 'full' in pool
['flags_names'].split(",")
425 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
427 class Filesystem(MDSCluster
):
429 This object is for driving a CephFS filesystem. The MDS daemons driven by
430 MDSCluster may be shared with other Filesystems.
432 def __init__(self
, ctx
, fs_config
=None, fscid
=None, name
=None, create
=False,
434 super(Filesystem
, self
).__init
__(ctx
)
437 self
.ec_profile
= ec_profile
439 self
.metadata_pool_name
= None
440 self
.metadata_overlay
= False
441 self
.data_pool_name
= None
442 self
.data_pools
= None
443 self
.fs_config
= fs_config
445 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
446 self
.client_id
= client_list
[0]
447 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
450 if fscid
is not None:
451 raise RuntimeError("cannot specify fscid when creating fs")
452 if create
and not self
.legacy_configured():
455 if fscid
is not None:
457 self
.getinfo(refresh
= True)
459 # Stash a reference to the first created filesystem on ctx, so
460 # that if someone drops to the interactive shell they can easily
462 if not hasattr(self
._ctx
, "filesystem"):
463 self
._ctx
.filesystem
= self
465 def get_task_status(self
, status_key
):
466 return self
.mon_manager
.get_service_task_status("mds", status_key
)
468 def getinfo(self
, refresh
= False):
469 status
= self
.status()
470 if self
.id is not None:
471 fsmap
= status
.get_fsmap(self
.id)
472 elif self
.name
is not None:
473 fsmap
= status
.get_fsmap_byname(self
.name
)
475 fss
= [fs
for fs
in status
.get_filesystems()]
479 raise RuntimeError("no file system available")
481 raise RuntimeError("more than one file system available")
482 self
.id = fsmap
['id']
483 self
.name
= fsmap
['mdsmap']['fs_name']
484 self
.get_pool_names(status
= status
, refresh
= refresh
)
487 def set_metadata_overlay(self
, overlay
):
488 if self
.id is not None:
489 raise RuntimeError("cannot specify fscid when configuring overlay")
490 self
.metadata_overlay
= overlay
492 def deactivate(self
, rank
):
494 raise RuntimeError("invalid rank")
496 raise RuntimeError("cannot deactivate rank 0")
497 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
499 def reach_max_mds(self
):
500 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
501 status
= self
.getinfo()
502 mds_map
= self
.get_mds_map(status
=status
)
503 max_mds
= mds_map
['max_mds']
505 count
= len(list(self
.get_ranks(status
=status
)))
508 # deactivate mds in decending order
509 status
= self
.wait_for_daemons(status
=status
, skip_max_mds_check
=True)
510 while count
> max_mds
:
511 targets
= sorted(self
.get_ranks(status
=status
), key
=lambda r
: r
['rank'], reverse
=True)
513 log
.debug("deactivating rank %d" % target
['rank'])
514 self
.deactivate(target
['rank'])
515 status
= self
.wait_for_daemons(skip_max_mds_check
=True)
516 count
= len(list(self
.get_ranks(status
=status
)))
518 # In Mimic, deactivation is done automatically:
519 log
.info("Error:\n{}".format(traceback
.format_exc()))
520 status
= self
.wait_for_daemons()
522 status
= self
.wait_for_daemons()
524 mds_map
= self
.get_mds_map(status
=status
)
525 assert(mds_map
['max_mds'] == max_mds
)
526 assert(mds_map
['in'] == list(range(0, max_mds
)))
529 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
531 def set_flag(self
, var
, *args
):
532 a
= map(lambda x
: str(x
).lower(), args
)
533 self
.mon_manager
.raw_cluster_cmd("fs", "flag", "set", var
, *a
)
535 def set_allow_multifs(self
, yes
=True):
536 self
.set_flag("enable_multiple", yes
)
538 def set_var(self
, var
, *args
):
539 a
= map(lambda x
: str(x
).lower(), args
)
540 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
542 def set_down(self
, down
=True):
543 self
.set_var("down", str(down
).lower())
545 def set_joinable(self
, joinable
=True):
546 self
.set_var("joinable", joinable
)
548 def set_max_mds(self
, max_mds
):
549 self
.set_var("max_mds", "%d" % max_mds
)
551 def set_session_timeout(self
, timeout
):
552 self
.set_var("session_timeout", "%d" % timeout
)
554 def set_allow_standby_replay(self
, yes
):
555 self
.set_var("allow_standby_replay", yes
)
557 def set_allow_new_snaps(self
, yes
):
558 self
.set_var("allow_new_snaps", yes
, '--yes-i-really-mean-it')
560 # In Octopus+, the PG count can be omitted to use the default. We keep the
561 # hard-coded value for deployments of Mimic/Nautilus.
565 if self
.name
is None:
567 if self
.metadata_pool_name
is None:
568 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
569 if self
.data_pool_name
is None:
570 data_pool_name
= "{0}_data".format(self
.name
)
572 data_pool_name
= self
.data_pool_name
574 log
.debug("Creating filesystem '{0}'".format(self
.name
))
576 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
577 self
.metadata_pool_name
, self
.pgs_per_fs_pool
.__str
__())
578 if self
.metadata_overlay
:
579 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
580 self
.name
, self
.metadata_pool_name
, data_pool_name
,
581 '--allow-dangerous-metadata-overlay')
583 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
584 log
.debug("EC profile is %s", self
.ec_profile
)
585 cmd
= ['osd', 'erasure-code-profile', 'set', data_pool_name
]
586 cmd
.extend(self
.ec_profile
)
587 self
.mon_manager
.raw_cluster_cmd(*cmd
)
588 self
.mon_manager
.raw_cluster_cmd(
589 'osd', 'pool', 'create',
590 data_pool_name
, self
.pgs_per_fs_pool
.__str
__(), 'erasure',
592 self
.mon_manager
.raw_cluster_cmd(
593 'osd', 'pool', 'set',
594 data_pool_name
, 'allow_ec_overwrites', 'true')
596 self
.mon_manager
.raw_cluster_cmd(
597 'osd', 'pool', 'create',
598 data_pool_name
, self
.pgs_per_fs_pool
.__str
__())
599 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
601 self
.metadata_pool_name
,
604 self
.check_pool_application(self
.metadata_pool_name
)
605 self
.check_pool_application(data_pool_name
)
606 # Turn off spurious standby count warnings from modifying max_mds in tests.
608 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
609 except CommandFailedError
as e
:
610 if e
.exitstatus
== 22:
611 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
616 if self
.fs_config
is not None:
617 max_mds
= self
.fs_config
.get('max_mds', 1)
619 self
.set_max_mds(max_mds
)
621 # If absent will use the default value (60 seconds)
622 session_timeout
= self
.fs_config
.get('session_timeout', 60)
623 if session_timeout
!= 60:
624 self
.set_session_timeout(session_timeout
)
626 self
.getinfo(refresh
= True)
629 def check_pool_application(self
, pool_name
):
630 osd_map
= self
.mon_manager
.get_osd_dump_json()
631 for pool
in osd_map
['pools']:
632 if pool
['pool_name'] == pool_name
:
633 if "application_metadata" in pool
:
634 if not "cephfs" in pool
['application_metadata']:
635 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
636 format(pool_name
=pool_name
))
640 if getattr(self
._ctx
, "filesystem", None) == self
:
641 delattr(self
._ctx
, "filesystem")
645 Whether a filesystem exists in the mon's filesystem list
647 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
648 return self
.name
in [fs
['name'] for fs
in fs_list
]
650 def legacy_configured(self
):
652 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
653 the case, the caller should avoid using Filesystem.create
656 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
657 pools
= json
.loads(out_text
)
658 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
659 if metadata_pool_exists
:
660 self
.metadata_pool_name
= 'metadata'
661 except CommandFailedError
as e
:
662 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
663 # structured output (--format) from the CLI.
664 if e
.exitstatus
== 22:
665 metadata_pool_exists
= True
669 return metadata_pool_exists
672 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
674 def get_mds_map(self
, status
=None):
676 status
= self
.status()
677 return status
.get_fsmap(self
.id)['mdsmap']
679 def get_var(self
, var
, status
=None):
680 return self
.get_mds_map(status
=status
)[var
]
682 def set_dir_layout(self
, mount
, path
, layout
):
683 for name
, value
in layout
.items():
684 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
686 def add_data_pool(self
, name
, create
=True):
688 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.pgs_per_fs_pool
.__str
__())
689 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
690 self
.get_pool_names(refresh
= True)
691 for poolid
, fs_name
in self
.data_pools
.items():
694 raise RuntimeError("could not get just created pool '{0}'".format(name
))
696 def get_pool_names(self
, refresh
= False, status
= None):
697 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
699 status
= self
.status()
700 fsmap
= status
.get_fsmap(self
.id)
702 osd_map
= self
.mon_manager
.get_osd_dump_json()
704 for p
in osd_map
['pools']:
705 id_to_name
[p
['pool']] = p
['pool_name']
707 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
709 for data_pool
in fsmap
['mdsmap']['data_pools']:
710 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
712 def get_data_pool_name(self
, refresh
= False):
713 if refresh
or self
.data_pools
is None:
714 self
.get_pool_names(refresh
= True)
715 assert(len(self
.data_pools
) == 1)
716 return next(iter(self
.data_pools
.values()))
718 def get_data_pool_id(self
, refresh
= False):
720 Don't call this if you have multiple data pools
723 if refresh
or self
.data_pools
is None:
724 self
.get_pool_names(refresh
= True)
725 assert(len(self
.data_pools
) == 1)
726 return next(iter(self
.data_pools
.keys()))
728 def get_data_pool_names(self
, refresh
= False):
729 if refresh
or self
.data_pools
is None:
730 self
.get_pool_names(refresh
= True)
731 return list(self
.data_pools
.values())
733 def get_metadata_pool_name(self
):
734 return self
.metadata_pool_name
736 def set_data_pool_name(self
, name
):
737 if self
.id is not None:
738 raise RuntimeError("can't set filesystem name if its fscid is set")
739 self
.data_pool_name
= name
741 def get_namespace_id(self
):
744 def get_pool_df(self
, pool_name
):
747 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
749 for pool_df
in self
._df
()['pools']:
750 if pool_df
['name'] == pool_name
:
751 return pool_df
['stats']
753 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
756 return self
._df
()['stats']['total_used_bytes']
758 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
760 Return true if all daemons are in one of active, standby, standby-replay, and
761 at least max_mds daemons are in 'active'.
763 Unlike most of Filesystem, this function is tolerant of new-style `fs`
764 commands being missing, because we are part of the ceph installation
765 process during upgrade suites, so must fall back to old style commands
766 when we get an EINVAL on a new style command.
770 # First, check to see that processes haven't exited with an error code
771 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
776 mds_map
= self
.get_mds_map(status
=status
)
777 except CommandFailedError
as cfe
:
778 # Old version, fall back to non-multi-fs commands
779 if cfe
.exitstatus
== errno
.EINVAL
:
780 mds_map
= json
.loads(
781 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
785 log
.debug("are_daemons_healthy: mds map: {0}".format(mds_map
))
787 for mds_id
, mds_status
in mds_map
['info'].items():
788 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
789 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
791 elif mds_status
['state'] == 'up:active':
794 log
.debug("are_daemons_healthy: {0}/{1}".format(
795 active_count
, mds_map
['max_mds']
798 if not skip_max_mds_check
:
799 if active_count
> mds_map
['max_mds']:
800 log
.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
802 elif active_count
== mds_map
['max_mds']:
803 # The MDSMap says these guys are active, but let's check they really are
804 for mds_id
, mds_status
in mds_map
['info'].items():
805 if mds_status
['state'] == 'up:active':
807 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
808 except CommandFailedError
as cfe
:
809 if cfe
.exitstatus
== errno
.EINVAL
:
810 # Old version, can't do this check
813 # MDS not even running
816 if daemon_status
['state'] != 'up:active':
817 # MDS hasn't taken the latest map yet
824 log
.debug("are_daemons_healthy: skipping max_mds check")
827 def get_daemon_names(self
, state
=None, status
=None):
829 Return MDS daemon names of those daemons in the given state
833 mdsmap
= self
.get_mds_map(status
)
835 for mds_status
in sorted(mdsmap
['info'].values(),
836 key
=lambda _
: _
['rank']):
837 if mds_status
['state'] == state
or state
is None:
838 result
.append(mds_status
['name'])
842 def get_active_names(self
, status
=None):
844 Return MDS daemon names of those daemons holding ranks
847 :return: list of strings like ['a', 'b'], sorted by rank
849 return self
.get_daemon_names("up:active", status
=status
)
851 def get_all_mds_rank(self
, status
=None):
852 mdsmap
= self
.get_mds_map(status
)
854 for mds_status
in sorted(mdsmap
['info'].values(),
855 key
=lambda _
: _
['rank']):
856 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
857 result
.append(mds_status
['rank'])
861 def get_rank(self
, rank
=None, status
=None):
863 status
= self
.getinfo()
866 return status
.get_rank(self
.id, rank
)
868 def rank_restart(self
, rank
=0, status
=None):
869 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
870 self
.mds_restart(mds_id
=name
)
872 def rank_signal(self
, signal
, rank
=0, status
=None):
873 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
874 self
.mds_signal(name
, signal
)
876 def rank_freeze(self
, yes
, rank
=0):
877 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
879 def rank_fail(self
, rank
=0):
880 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
882 def get_ranks(self
, status
=None):
884 status
= self
.getinfo()
885 return status
.get_ranks(self
.id)
887 def get_replays(self
, status
=None):
889 status
= self
.getinfo()
890 return status
.get_replays(self
.id)
892 def get_replay(self
, rank
=0, status
=None):
893 for replay
in self
.get_replays(status
=status
):
894 if replay
['rank'] == rank
:
898 def get_rank_names(self
, status
=None):
900 Return MDS daemon names of those daemons holding a rank,
901 sorted by rank. This includes e.g. up:replay/reconnect
902 as well as active, but does not include standby or
905 mdsmap
= self
.get_mds_map(status
)
907 for mds_status
in sorted(mdsmap
['info'].values(),
908 key
=lambda _
: _
['rank']):
909 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
910 result
.append(mds_status
['name'])
914 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
916 Wait until all daemons are healthy
921 timeout
= DAEMON_WAIT_TIMEOUT
924 status
= self
.status()
928 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
934 if elapsed
> timeout
:
935 log
.debug("status = {0}".format(status
))
936 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
938 status
= self
.status()
940 def get_lone_mds_id(self
):
942 Get a single MDS ID: the only one if there is only one
943 configured, else the only one currently holding a rank,
946 if len(self
.mds_ids
) != 1:
947 alive
= self
.get_rank_names()
951 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
953 return self
.mds_ids
[0]
956 log
.info("Creating new filesystem")
957 self
.delete_all_filesystems()
961 def put_metadata_object_raw(self
, object_id
, infile
):
963 Save an object to the metadata pool
965 temp_bin_path
= infile
966 self
.client_remote
.run(args
=[
967 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
970 def get_metadata_object_raw(self
, object_id
):
972 Retrieve an object from the metadata pool and store it in a file.
974 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
976 self
.client_remote
.run(args
=[
977 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
982 def get_metadata_object(self
, object_type
, object_id
):
984 Retrieve an object from the metadata pool, pass it through
985 ceph-dencoder to dump it to JSON, and return the decoded object.
987 temp_bin_path
= '/tmp/out.bin'
989 self
.client_remote
.run(args
=[
990 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
993 dump_json
= self
.client_remote
.sh([
994 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
997 dump
= json
.loads(dump_json
)
998 except (TypeError, ValueError):
999 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
1004 def get_journal_version(self
):
1006 Read the JournalPointer and Journal::Header objects to learn the version of
1009 journal_pointer_object
= '400.00000000'
1010 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
1011 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
1013 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
1014 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
1016 version
= journal_header_dump
['journal_header']['stream_format']
1017 log
.debug("Read journal version {0}".format(version
))
1021 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
1023 mds_id
= self
.get_lone_mds_id()
1025 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
1027 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
1028 info
= self
.get_rank(rank
=rank
, status
=status
)
1029 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
1031 def rank_tell(self
, command
, rank
=0, status
=None):
1032 info
= self
.get_rank(rank
=rank
, status
=status
)
1033 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", 'mds.{0}'.format(info
['name']), *command
))
1035 def ranks_tell(self
, command
, status
=None):
1037 status
= self
.status()
1039 for r
in status
.get_ranks(self
.id):
1040 result
= self
.rank_tell(command
, rank
=r
['rank'], status
=status
)
1041 out
.append((r
['rank'], result
))
1044 def ranks_perf(self
, f
, status
=None):
1045 perf
= self
.ranks_tell(["perf", "dump"], status
=status
)
1047 for rank
, perf
in perf
:
1048 out
.append((rank
, f(perf
)))
1051 def read_cache(self
, path
, depth
=None):
1052 cmd
= ["dump", "tree", path
]
1053 if depth
is not None:
1054 cmd
.append(depth
.__str
__())
1055 result
= self
.mds_asok(cmd
)
1056 if len(result
) == 0:
1057 raise RuntimeError("Path not found in cache: {0}".format(path
))
1061 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1063 Block until the MDS reaches a particular state, or a failure condition
1066 When there are multiple MDSs, succeed when exaclty one MDS is in the
1067 goal state, or fail when any MDS is in the reject state.
1069 :param goal_state: Return once the MDS is in this state
1070 :param reject: Fail if the MDS enters this state before the goal state
1071 :param timeout: Fail if this many seconds pass before reaching goal
1072 :return: number of seconds waited, rounded down to integer
1075 started_at
= time
.time()
1077 status
= self
.status()
1078 if rank
is not None:
1080 mds_info
= status
.get_rank(self
.id, rank
)
1081 current_state
= mds_info
['state'] if mds_info
else None
1082 log
.debug("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1084 mdsmap
= self
.get_mds_map(status
=status
)
1085 if rank
in mdsmap
['failed']:
1086 log
.debug("Waiting for rank {0} to come back.".format(rank
))
1087 current_state
= None
1090 elif mds_id
is not None:
1091 # mds_info is None if no daemon with this ID exists in the map
1092 mds_info
= status
.get_mds(mds_id
)
1093 current_state
= mds_info
['state'] if mds_info
else None
1094 log
.debug("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1096 # In general, look for a single MDS
1097 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1098 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1099 current_state
= goal_state
1100 elif reject
in states
:
1101 current_state
= reject
1103 current_state
= None
1104 log
.debug("mapped states {0} to {1}".format(states
, current_state
))
1106 elapsed
= time
.time() - started_at
1107 if current_state
== goal_state
:
1108 log
.debug("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1110 elif reject
is not None and current_state
== reject
:
1111 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1112 elif timeout
is not None and elapsed
> timeout
:
1113 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1115 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1116 elapsed
, goal_state
, current_state
1121 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
1122 mds_id
= self
.mds_ids
[0]
1123 remote
= self
.mds_daemons
[mds_id
].remote
1125 pool
= self
.get_data_pool_name()
1127 obj_name
= "{0:x}.00000000".format(ino_no
)
1130 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
1133 proc
= remote
.run(args
=args
, stdout
=BytesIO())
1134 except CommandFailedError
as e
:
1135 log
.error(e
.__str
__())
1136 raise ObjectNotFound(obj_name
)
1138 data
= proc
.stdout
.getvalue()
1140 [os
.path
.join(self
._prefix
, "ceph-dencoder"),
1143 "decode", "dump_json"],
1147 return json
.loads(dump
.strip())
1149 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1151 Write to an xattr of the 0th data object of an inode. Will
1152 succeed whether the object and/or xattr already exist or not.
1154 :param ino_no: integer inode number
1155 :param xattr_name: string name of the xattr
1156 :param data: byte array data to write to the xattr
1157 :param pool: name of data pool or None to use primary data pool
1160 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
1162 pool
= self
.get_data_pool_name()
1164 obj_name
= "{0:x}.00000000".format(ino_no
)
1166 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
1167 obj_name
, xattr_name
, data
1171 def read_backtrace(self
, ino_no
, pool
=None):
1173 Read the backtrace from the data pool, return a dict in the format
1174 given by inode_backtrace_t::dump, which is something like:
1178 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1179 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1181 { "ino": 1099511627778,
1189 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1190 one data pool and that will be used.
1192 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1194 def read_layout(self
, ino_no
, pool
=None):
1196 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1199 "stripe_unit": 4194304,
1201 "object_size": 4194304,
1206 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1207 one data pool and that will be used.
1209 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1211 def _enumerate_data_objects(self
, ino
, size
):
1213 Get the list of expected data objects for a range, and the list of objects
1216 :return a tuple of two lists of strings (expected, actual)
1218 stripe_size
= 1024 * 1024 * 4
1220 size
= max(stripe_size
, size
)
1223 "{0:x}.{1:08x}".format(ino
, n
)
1224 for n
in range(0, ((size
- 1) // stripe_size
) + 1)
1227 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
1229 return want_objects
, exist_objects
1231 def data_objects_present(self
, ino
, size
):
1233 Check that *all* the expected data objects for an inode are present in the data pool
1236 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1237 missing
= set(want_objects
) - set(exist_objects
)
1240 log
.debug("Objects missing (ino {0}, size {1}): {2}".format(
1245 log
.debug("All objects for ino {0} size {1} found".format(ino
, size
))
1248 def data_objects_absent(self
, ino
, size
):
1249 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1250 present
= set(want_objects
) & set(exist_objects
)
1253 log
.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1258 log
.debug("All objects for ino {0} size {1} are absent".format(ino
, size
))
1261 def dirfrag_exists(self
, ino
, frag
):
1263 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1264 except CommandFailedError
:
1269 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None,
1273 Call into the `rados` CLI from an MDS
1277 pool
= self
.get_metadata_pool_name()
1279 # Doesn't matter which MDS we use to run rados commands, they all
1280 # have access to the pools
1281 mds_id
= self
.mds_ids
[0]
1282 remote
= self
.mds_daemons
[mds_id
].remote
1284 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1285 # using the `rados` CLI
1286 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1287 (["--namespace", namespace
] if namespace
else []) +
1290 if stdin_file
is not None:
1291 args
= ["bash", "-c", "cat " + stdin_file
+ " | " + " ".join(args
)]
1292 if stdout_data
is None:
1293 stdout_data
= StringIO()
1295 p
= remote
.run(args
=args
,
1298 return p
.stdout
.getvalue().strip()
1300 def list_dirfrag(self
, dir_ino
):
1302 Read the named object and return the list of omap keys
1304 :return a list of 0 or more strings
1307 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1310 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1311 except CommandFailedError
as e
:
1312 log
.error(e
.__str
__())
1313 raise ObjectNotFound(dirfrag_obj_name
)
1315 return key_list_str
.split("\n") if key_list_str
else []
1317 def erase_metadata_objects(self
, prefix
):
1319 For all objects in the metadata pool matching the prefix,
1322 This O(N) with the number of objects in the pool, so only suitable
1323 for use on toy test filesystems.
1325 all_objects
= self
.rados(["ls"]).split("\n")
1326 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1327 for o
in matching_objects
:
1328 self
.rados(["rm", o
])
1330 def erase_mds_objects(self
, rank
):
1332 Erase all the per-MDS objects for a particular rank. This includes
1333 inotable, sessiontable, journal
1336 def obj_prefix(multiplier
):
1338 MDS object naming conventions like rank 1's
1339 journal is at 201.***
1341 return "%x." % (multiplier
* 0x100 + rank
)
1343 # MDS_INO_LOG_OFFSET
1344 self
.erase_metadata_objects(obj_prefix(2))
1345 # MDS_INO_LOG_BACKUP_OFFSET
1346 self
.erase_metadata_objects(obj_prefix(3))
1347 # MDS_INO_LOG_POINTER_OFFSET
1348 self
.erase_metadata_objects(obj_prefix(4))
1349 # MDSTables & SessionMap
1350 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1355 Override this to set a different
1359 def _make_rank(self
, rank
):
1360 return "{}:{}".format(self
.name
, rank
)
1362 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1363 # Tests frequently have [client] configuration that jacks up
1364 # the objecter log level (unlikely to be interesting here)
1365 # and does not set the mds log level (very interesting here)
1367 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1369 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1371 if rank
is not None:
1372 base_args
.extend(["--rank", "%s" % str(rank
)])
1374 t1
= datetime
.datetime
.now()
1375 r
= self
.tool_remote
.sh(script
=base_args
+ args
, stdout
=StringIO()).strip()
1376 duration
= datetime
.datetime
.now() - t1
1377 log
.debug("Ran {0} in time {1}, result:\n{2}".format(
1378 base_args
+ args
, duration
, r
1383 def tool_remote(self
):
1385 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1386 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1387 so that tests can use this remote to go get locally written output files from the tools.
1389 mds_id
= self
.mds_ids
[0]
1390 return self
.mds_daemons
[mds_id
].remote
1392 def journal_tool(self
, args
, rank
, quiet
=False):
1394 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1396 fs_rank
= self
._make
_rank
(rank
)
1397 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1399 def table_tool(self
, args
, quiet
=False):
1401 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1403 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1405 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1407 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1409 :param worker_count: if greater than 1, multiple workers will be run
1410 in parallel and the return value will be None
1415 for n
in range(0, worker_count
):
1416 if worker_count
> 1:
1417 # data-scan args first token is a command, followed by args to it.
1418 # insert worker arguments after the command.
1420 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1424 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1425 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1430 if worker_count
== 1:
1431 return workers
[0].value
1436 return self
.is_pool_full(self
.get_data_pool_name())