]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
2 from StringIO
import StringIO
5 from gevent
import Greenlet
14 from teuthology
.exceptions
import CommandFailedError
15 from teuthology
import misc
16 from teuthology
.nuke
import clear_firewall
17 from teuthology
.parallel
import parallel
18 from tasks
.ceph_manager
import write_conf
19 from tasks
import ceph_manager
22 log
= logging
.getLogger(__name__
)
25 DAEMON_WAIT_TIMEOUT
= 120
28 class FileLayout(object):
29 def __init__(self
, pool
=None, pool_namespace
=None, stripe_unit
=None, stripe_count
=None, object_size
=None):
31 self
.pool_namespace
= pool_namespace
32 self
.stripe_unit
= stripe_unit
33 self
.stripe_count
= stripe_count
34 self
.object_size
= object_size
37 def load_from_ceph(layout_str
):
42 if self
.pool
is not None:
43 yield ("pool", self
.pool
)
44 if self
.pool_namespace
:
45 yield ("pool_namespace", self
.pool_namespace
)
46 if self
.stripe_unit
is not None:
47 yield ("stripe_unit", self
.stripe_unit
)
48 if self
.stripe_count
is not None:
49 yield ("stripe_count", self
.stripe_count
)
50 if self
.object_size
is not None:
51 yield ("object_size", self
.stripe_size
)
53 class ObjectNotFound(Exception):
54 def __init__(self
, object_name
):
55 self
._object
_name
= object_name
58 return "Object not found: '{0}'".format(self
._object
_name
)
60 class FSStatus(object):
62 Operations on a snapshot of the FSMap.
64 def __init__(self
, mon_manager
):
65 self
.mon
= mon_manager
66 self
.map = json
.loads(self
.mon
.raw_cluster_cmd("fs", "dump", "--format=json"))
69 return json
.dumps(self
.map, indent
= 2, sort_keys
= True)
71 # Expose the fsmap for manual inspection.
72 def __getitem__(self
, key
):
74 Get a field from the fsmap.
78 def get_filesystems(self
):
80 Iterator for all filesystems.
82 for fs
in self
.map['filesystems']:
87 Iterator for all the mds_info components in the FSMap.
89 for info
in self
.get_standbys():
91 for fs
in self
.map['filesystems']:
92 for info
in fs
['mdsmap']['info'].values():
95 def get_standbys(self
):
97 Iterator for all standbys.
99 for info
in self
.map['standbys']:
102 def get_fsmap(self
, fscid
):
104 Get the fsmap for the given FSCID.
106 for fs
in self
.map['filesystems']:
107 if fscid
is None or fs
['id'] == fscid
:
109 raise RuntimeError("FSCID {0} not in map".format(fscid
))
111 def get_fsmap_byname(self
, name
):
113 Get the fsmap for the given file system name.
115 for fs
in self
.map['filesystems']:
116 if name
is None or fs
['mdsmap']['fs_name'] == name
:
118 raise RuntimeError("FS {0} not in map".format(name
))
120 def get_replays(self
, fscid
):
122 Get the standby:replay MDS for the given FSCID.
124 fs
= self
.get_fsmap(fscid
)
125 for info
in fs
['mdsmap']['info'].values():
126 if info
['state'] == 'up:standby-replay':
129 def get_ranks(self
, fscid
):
131 Get the ranks for the given FSCID.
133 fs
= self
.get_fsmap(fscid
)
134 for info
in fs
['mdsmap']['info'].values():
135 if info
['rank'] >= 0 and info
['state'] != 'up:standby-replay':
138 def get_rank(self
, fscid
, rank
):
140 Get the rank for the given FSCID.
142 for info
in self
.get_ranks(fscid
):
143 if info
['rank'] == rank
:
145 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid
, rank
))
147 def get_mds(self
, name
):
149 Get the info for the given MDS name.
151 for info
in self
.get_all():
152 if info
['name'] == name
:
156 def get_mds_addr(self
, name
):
158 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
160 info
= self
.get_mds(name
)
164 log
.warn(json
.dumps(list(self
.get_all()), indent
=2)) # dump for debugging
165 raise RuntimeError("MDS id '{0}' not found in map".format(name
))
167 class CephCluster(object):
169 def admin_remote(self
):
170 first_mon
= misc
.get_first_mon(self
._ctx
, None)
171 (result
,) = self
._ctx
.cluster
.only(first_mon
).remotes
.iterkeys()
174 def __init__(self
, ctx
):
176 self
.mon_manager
= ceph_manager
.CephManager(self
.admin_remote
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'))
178 def get_config(self
, key
, service_type
=None):
180 Get config from mon by default, or a specific service if caller asks for it
182 if service_type
is None:
185 service_id
= sorted(misc
.all_roles_of_type(self
._ctx
.cluster
, service_type
))[0]
186 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
188 def set_ceph_conf(self
, subsys
, key
, value
):
189 if subsys
not in self
._ctx
.ceph
['ceph'].conf
:
190 self
._ctx
.ceph
['ceph'].conf
[subsys
] = {}
191 self
._ctx
.ceph
['ceph'].conf
[subsys
][key
] = value
192 write_conf(self
._ctx
) # XXX because we don't have the ceph task's config object, if they
193 # used a different config path this won't work.
195 def clear_ceph_conf(self
, subsys
, key
):
196 del self
._ctx
.ceph
['ceph'].conf
[subsys
][key
]
197 write_conf(self
._ctx
)
199 def json_asok(self
, command
, service_type
, service_id
, timeout
=None):
202 proc
= self
.mon_manager
.admin_socket(service_type
, service_id
, command
, timeout
=timeout
)
203 response_data
= proc
.stdout
.getvalue()
204 log
.info("_json_asok output: {0}".format(response_data
))
205 if response_data
.strip():
206 return json
.loads(response_data
)
211 class MDSCluster(CephCluster
):
213 Collective operations on all the MDS daemons in the Ceph cluster. These
214 daemons may be in use by various Filesystems.
216 For the benefit of pre-multi-filesystem tests, this class is also
217 a parent of Filesystem. The correct way to use MDSCluster going forward is
218 as a separate instance outside of your (multiple) Filesystem instances.
220 def __init__(self
, ctx
):
221 super(MDSCluster
, self
).__init
__(ctx
)
223 self
.mds_ids
= list(misc
.all_roles_of_type(ctx
.cluster
, 'mds'))
225 if len(self
.mds_ids
) == 0:
226 raise RuntimeError("This task requires at least one MDS")
228 if hasattr(self
._ctx
, "daemons"):
229 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
230 self
.mds_daemons
= dict([(mds_id
, self
._ctx
.daemons
.get_daemon('mds', mds_id
)) for mds_id
in self
.mds_ids
])
232 def _one_or_all(self
, mds_id
, cb
, in_parallel
=True):
234 Call a callback for a single named MDS, or for all.
236 Note that the parallelism here isn't for performance, it's to avoid being overly kind
237 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
238 avoid being overly kind by executing them in a particular order. However, some actions
239 don't cope with being done in parallel, so it's optional (`in_parallel`)
241 :param mds_id: MDS daemon name, or None
242 :param cb: Callback taking single argument of MDS daemon name
243 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
247 with
parallel() as p
:
248 for mds_id
in self
.mds_ids
:
251 for mds_id
in self
.mds_ids
:
256 def get_config(self
, key
, service_type
=None):
258 get_config specialization of service_type="mds"
260 if service_type
!= "mds":
261 return super(MDSCluster
, self
).get_config(key
, service_type
)
263 # Some tests stop MDS daemons, don't send commands to a dead one:
264 service_id
= random
.sample(filter(lambda i
: self
.mds_daemons
[i
].running(), self
.mds_daemons
), 1)[0]
265 return self
.json_asok(['config', 'get', key
], service_type
, service_id
)[key
]
267 def mds_stop(self
, mds_id
=None):
269 Stop the MDS daemon process(se). If it held a rank, that rank
270 will eventually go laggy.
272 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].stop())
274 def mds_fail(self
, mds_id
=None):
276 Inform MDSMonitor of the death of the daemon process(es). If it held
277 a rank, that rank will be relinquished.
279 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
))
281 def mds_restart(self
, mds_id
=None):
282 self
._one
_or
_all
(mds_id
, lambda id_
: self
.mds_daemons
[id_
].restart())
284 def mds_fail_restart(self
, mds_id
=None):
286 Variation on restart that includes marking MDSs as failed, so that doing this
287 operation followed by waiting for healthy daemon states guarantees that they
288 have gone down and come up, rather than potentially seeing the healthy states
289 that existed before the restart.
291 def _fail_restart(id_
):
292 self
.mds_daemons
[id_
].stop()
293 self
.mon_manager
.raw_cluster_cmd("mds", "fail", id_
)
294 self
.mds_daemons
[id_
].restart()
296 self
._one
_or
_all
(mds_id
, _fail_restart
)
298 def mds_signal(self
, mds_id
, sig
, silent
=False):
302 self
.mds_daemons
[mds_id
].signal(sig
, silent
);
304 def newfs(self
, name
='cephfs', create
=True):
305 return Filesystem(self
._ctx
, name
=name
, create
=create
)
308 return FSStatus(self
.mon_manager
)
310 def delete_all_filesystems(self
):
312 Remove all filesystems that exist, and any pools in use by them.
314 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
317 pool_id_name
[pool
['pool']] = pool
['pool_name']
319 # mark cluster down for each fs to prevent churn during deletion
320 status
= self
.status()
321 for fs
in status
.get_filesystems():
322 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(fs
['mdsmap']['fs_name']))
324 # get a new copy as actives may have since changed
325 status
= self
.status()
326 for fs
in status
.get_filesystems():
327 mdsmap
= fs
['mdsmap']
328 metadata_pool
= pool_id_name
[mdsmap
['metadata_pool']]
330 self
.mon_manager
.raw_cluster_cmd('fs', 'rm', mdsmap
['fs_name'], '--yes-i-really-mean-it')
331 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
332 metadata_pool
, metadata_pool
,
333 '--yes-i-really-really-mean-it')
334 for data_pool
in mdsmap
['data_pools']:
335 data_pool
= pool_id_name
[data_pool
]
337 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'delete',
338 data_pool
, data_pool
,
339 '--yes-i-really-really-mean-it')
340 except CommandFailedError
as e
:
341 if e
.exitstatus
== 16: # EBUSY, this data pool is used
342 pass # by two metadata pools, let the 2nd
343 else: # pass delete it
346 def get_standby_daemons(self
):
347 return set([s
['name'] for s
in self
.status().get_standbys()])
349 def get_mds_hostnames(self
):
351 for mds_id
in self
.mds_ids
:
352 mds_remote
= self
.mon_manager
.find_remote('mds', mds_id
)
353 result
.add(mds_remote
.hostname
)
357 def set_clients_block(self
, blocked
, mds_id
=None):
359 Block (using iptables) client communications to this MDS. Be careful: if
360 other services are running on this MDS, or other MDSs try to talk to this
361 MDS, their communications may also be blocked as collatoral damage.
363 :param mds_id: Optional ID of MDS to block, default to all
366 da_flag
= "-A" if blocked
else "-D"
368 def set_block(_mds_id
):
369 remote
= self
.mon_manager
.find_remote('mds', _mds_id
)
370 status
= self
.status()
372 addr
= status
.get_mds_addr(_mds_id
)
373 ip_str
, port_str
, inst_str
= re
.match("(.+):(.+)/(.+)", addr
).groups()
376 args
=["sudo", "iptables", da_flag
, "OUTPUT", "-p", "tcp", "--sport", port_str
, "-j", "REJECT", "-m",
377 "comment", "--comment", "teuthology"])
379 args
=["sudo", "iptables", da_flag
, "INPUT", "-p", "tcp", "--dport", port_str
, "-j", "REJECT", "-m",
380 "comment", "--comment", "teuthology"])
382 self
._one
_or
_all
(mds_id
, set_block
, in_parallel
=False)
384 def clear_firewall(self
):
385 clear_firewall(self
._ctx
)
387 def get_mds_info(self
, mds_id
):
388 return FSStatus(self
.mon_manager
).get_mds(mds_id
)
390 def is_pool_full(self
, pool_name
):
391 pools
= json
.loads(self
.mon_manager
.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
393 if pool
['pool_name'] == pool_name
:
394 return 'full' in pool
['flags_names'].split(",")
396 raise RuntimeError("Pool not found '{0}'".format(pool_name
))
398 class Filesystem(MDSCluster
):
400 This object is for driving a CephFS filesystem. The MDS daemons driven by
401 MDSCluster may be shared with other Filesystems.
403 def __init__(self
, ctx
, fscid
=None, name
=None, create
=False,
405 super(Filesystem
, self
).__init
__(ctx
)
408 self
.ec_profile
= ec_profile
410 self
.metadata_pool_name
= None
411 self
.metadata_overlay
= False
412 self
.data_pool_name
= None
413 self
.data_pools
= None
415 client_list
= list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'client'))
416 self
.client_id
= client_list
[0]
417 self
.client_remote
= list(misc
.get_clients(ctx
=ctx
, roles
=["client.{0}".format(self
.client_id
)]))[0][1]
420 if fscid
is not None:
421 raise RuntimeError("cannot specify fscid when creating fs")
422 if create
and not self
.legacy_configured():
425 if fscid
is not None:
427 self
.getinfo(refresh
= True)
429 # Stash a reference to the first created filesystem on ctx, so
430 # that if someone drops to the interactive shell they can easily
432 if not hasattr(self
._ctx
, "filesystem"):
433 self
._ctx
.filesystem
= self
435 def getinfo(self
, refresh
= False):
436 status
= self
.status()
437 if self
.id is not None:
438 fsmap
= status
.get_fsmap(self
.id)
439 elif self
.name
is not None:
440 fsmap
= status
.get_fsmap_byname(self
.name
)
442 fss
= [fs
for fs
in status
.get_filesystems()]
446 raise RuntimeError("no file system available")
448 raise RuntimeError("more than one file system available")
449 self
.id = fsmap
['id']
450 self
.name
= fsmap
['mdsmap']['fs_name']
451 self
.get_pool_names(status
= status
, refresh
= refresh
)
454 def set_metadata_overlay(self
, overlay
):
455 if self
.id is not None:
456 raise RuntimeError("cannot specify fscid when configuring overlay")
457 self
.metadata_overlay
= overlay
459 def deactivate(self
, rank
):
461 raise RuntimeError("invalid rank")
463 raise RuntimeError("cannot deactivate rank 0")
464 self
.mon_manager
.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self
.id, rank
))
466 def reach_max_mds(self
):
467 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
468 status
= self
.getinfo()
469 mds_map
= self
.get_mds_map(status
=status
)
470 max_mds
= mds_map
['max_mds']
472 count
= len(list(self
.get_ranks(status
=status
)))
475 # deactivate mds in decending order
476 status
= self
.wait_for_daemons(status
=status
, skip_max_mds_check
=True)
477 while count
> max_mds
:
478 targets
= sorted(self
.get_ranks(status
=status
), key
=lambda r
: r
['rank'], reverse
=True)
480 log
.info("deactivating rank %d" % target
['rank'])
481 self
.deactivate(target
['rank'])
482 status
= self
.wait_for_daemons(skip_max_mds_check
=True)
483 count
= len(list(self
.get_ranks(status
=status
)))
485 # In Mimic, deactivation is done automatically:
486 log
.info("Error:\n{}".format(traceback
.format_exc()))
487 status
= self
.wait_for_daemons()
489 status
= self
.wait_for_daemons()
491 mds_map
= self
.get_mds_map(status
=status
)
492 assert(mds_map
['max_mds'] == max_mds
)
493 assert(mds_map
['in'] == list(range(0, max_mds
)))
496 self
.mon_manager
.raw_cluster_cmd("fs", "fail", str(self
.name
))
498 def set_var(self
, var
, *args
):
500 self
.mon_manager
.raw_cluster_cmd("fs", "set", self
.name
, var
, *a
)
502 def set_down(self
, down
=True):
503 self
.set_var("down", str(down
).lower())
505 def set_joinable(self
, joinable
=True):
506 self
.set_var("joinable", str(joinable
).lower())
508 def set_max_mds(self
, max_mds
):
509 self
.set_var("max_mds", "%d" % max_mds
)
511 def set_allow_standby_replay(self
, yes
):
512 self
.set_var("allow_standby_replay", str(yes
).lower())
514 def set_allow_new_snaps(self
, yes
):
515 self
.set_var("allow_new_snaps", str(yes
).lower(), '--yes-i-really-mean-it')
517 def get_pgs_per_fs_pool(self
):
519 Calculate how many PGs to use when creating a pool, in order to avoid raising any
520 health warnings about mon_pg_warn_min_per_osd
522 :return: an integer number of PGs
524 pg_warn_min_per_osd
= int(self
.get_config('mon_pg_warn_min_per_osd'))
525 osd_count
= len(list(misc
.all_roles_of_type(self
._ctx
.cluster
, 'osd')))
526 return pg_warn_min_per_osd
* osd_count
529 if self
.name
is None:
531 if self
.metadata_pool_name
is None:
532 self
.metadata_pool_name
= "{0}_metadata".format(self
.name
)
533 if self
.data_pool_name
is None:
534 data_pool_name
= "{0}_data".format(self
.name
)
536 data_pool_name
= self
.data_pool_name
538 log
.info("Creating filesystem '{0}'".format(self
.name
))
540 pgs_per_fs_pool
= self
.get_pgs_per_fs_pool()
542 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create',
543 self
.metadata_pool_name
, pgs_per_fs_pool
.__str
__())
544 if self
.metadata_overlay
:
545 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
546 self
.name
, self
.metadata_pool_name
, data_pool_name
,
547 '--allow-dangerous-metadata-overlay')
549 if self
.ec_profile
and 'disabled' not in self
.ec_profile
:
550 log
.info("EC profile is %s", self
.ec_profile
)
551 cmd
= ['osd', 'erasure-code-profile', 'set', data_pool_name
]
552 cmd
.extend(self
.ec_profile
)
553 self
.mon_manager
.raw_cluster_cmd(*cmd
)
554 self
.mon_manager
.raw_cluster_cmd(
555 'osd', 'pool', 'create',
556 data_pool_name
, pgs_per_fs_pool
.__str
__(), 'erasure',
558 self
.mon_manager
.raw_cluster_cmd(
559 'osd', 'pool', 'set',
560 data_pool_name
, 'allow_ec_overwrites', 'true')
562 self
.mon_manager
.raw_cluster_cmd(
563 'osd', 'pool', 'create',
564 data_pool_name
, pgs_per_fs_pool
.__str
__())
565 self
.mon_manager
.raw_cluster_cmd('fs', 'new',
567 self
.metadata_pool_name
,
570 self
.check_pool_application(self
.metadata_pool_name
)
571 self
.check_pool_application(data_pool_name
)
572 # Turn off spurious standby count warnings from modifying max_mds in tests.
574 self
.mon_manager
.raw_cluster_cmd('fs', 'set', self
.name
, 'standby_count_wanted', '0')
575 except CommandFailedError
as e
:
576 if e
.exitstatus
== 22:
577 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
582 self
.getinfo(refresh
= True)
585 def check_pool_application(self
, pool_name
):
586 osd_map
= self
.mon_manager
.get_osd_dump_json()
587 for pool
in osd_map
['pools']:
588 if pool
['pool_name'] == pool_name
:
589 if "application_metadata" in pool
:
590 if not "cephfs" in pool
['application_metadata']:
591 raise RuntimeError("Pool %p does not name cephfs as application!".\
596 if getattr(self
._ctx
, "filesystem", None) == self
:
597 delattr(self
._ctx
, "filesystem")
601 Whether a filesystem exists in the mon's filesystem list
603 fs_list
= json
.loads(self
.mon_manager
.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
604 return self
.name
in [fs
['name'] for fs
in fs_list
]
606 def legacy_configured(self
):
608 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
609 the case, the caller should avoid using Filesystem.create
612 out_text
= self
.mon_manager
.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
613 pools
= json
.loads(out_text
)
614 metadata_pool_exists
= 'metadata' in [p
['poolname'] for p
in pools
]
615 if metadata_pool_exists
:
616 self
.metadata_pool_name
= 'metadata'
617 except CommandFailedError
as e
:
618 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
619 # structured output (--format) from the CLI.
620 if e
.exitstatus
== 22:
621 metadata_pool_exists
= True
625 return metadata_pool_exists
628 return json
.loads(self
.mon_manager
.raw_cluster_cmd("df", "--format=json-pretty"))
630 def get_mds_map(self
, status
=None):
632 status
= self
.status()
633 return status
.get_fsmap(self
.id)['mdsmap']
635 def get_var(self
, var
, status
=None):
636 return self
.get_mds_map(status
=status
)[var
]
638 def set_dir_layout(self
, mount
, path
, layout
):
639 for name
, value
in layout
.items():
640 mount
.run_shell(args
=["setfattr", "-n", "ceph.dir.layout."+name
, "-v", str(value
), path
])
642 def add_data_pool(self
, name
, create
=True):
644 self
.mon_manager
.raw_cluster_cmd('osd', 'pool', 'create', name
, self
.get_pgs_per_fs_pool().__str
__())
645 self
.mon_manager
.raw_cluster_cmd('fs', 'add_data_pool', self
.name
, name
)
646 self
.get_pool_names(refresh
= True)
647 for poolid
, fs_name
in self
.data_pools
.items():
650 raise RuntimeError("could not get just created pool '{0}'".format(name
))
652 def get_pool_names(self
, refresh
= False, status
= None):
653 if refresh
or self
.metadata_pool_name
is None or self
.data_pools
is None:
655 status
= self
.status()
656 fsmap
= status
.get_fsmap(self
.id)
658 osd_map
= self
.mon_manager
.get_osd_dump_json()
660 for p
in osd_map
['pools']:
661 id_to_name
[p
['pool']] = p
['pool_name']
663 self
.metadata_pool_name
= id_to_name
[fsmap
['mdsmap']['metadata_pool']]
665 for data_pool
in fsmap
['mdsmap']['data_pools']:
666 self
.data_pools
[data_pool
] = id_to_name
[data_pool
]
668 def get_data_pool_name(self
, refresh
= False):
669 if refresh
or self
.data_pools
is None:
670 self
.get_pool_names(refresh
= True)
671 assert(len(self
.data_pools
) == 1)
672 return self
.data_pools
.values()[0]
674 def get_data_pool_id(self
, refresh
= False):
676 Don't call this if you have multiple data pools
679 if refresh
or self
.data_pools
is None:
680 self
.get_pool_names(refresh
= True)
681 assert(len(self
.data_pools
) == 1)
682 return self
.data_pools
.keys()[0]
684 def get_data_pool_names(self
, refresh
= False):
685 if refresh
or self
.data_pools
is None:
686 self
.get_pool_names(refresh
= True)
687 return self
.data_pools
.values()
689 def get_metadata_pool_name(self
):
690 return self
.metadata_pool_name
692 def set_data_pool_name(self
, name
):
693 if self
.id is not None:
694 raise RuntimeError("can't set filesystem name if its fscid is set")
695 self
.data_pool_name
= name
697 def get_namespace_id(self
):
700 def get_pool_df(self
, pool_name
):
703 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
705 for pool_df
in self
._df
()['pools']:
706 if pool_df
['name'] == pool_name
:
707 return pool_df
['stats']
709 raise RuntimeError("Pool name '{0}' not found".format(pool_name
))
712 return self
._df
()['stats']['total_used_bytes']
714 def are_daemons_healthy(self
, status
=None, skip_max_mds_check
=False):
716 Return true if all daemons are in one of active, standby, standby-replay, and
717 at least max_mds daemons are in 'active'.
719 Unlike most of Filesystem, this function is tolerant of new-style `fs`
720 commands being missing, because we are part of the ceph installation
721 process during upgrade suites, so must fall back to old style commands
722 when we get an EINVAL on a new style command.
726 # First, check to see that processes haven't exited with an error code
727 for mds
in self
._ctx
.daemons
.iter_daemons_of_role('mds'):
732 mds_map
= self
.get_mds_map(status
=status
)
733 except CommandFailedError
as cfe
:
734 # Old version, fall back to non-multi-fs commands
735 if cfe
.exitstatus
== errno
.EINVAL
:
736 mds_map
= json
.loads(
737 self
.mon_manager
.raw_cluster_cmd('mds', 'dump', '--format=json'))
741 log
.info("are_daemons_healthy: mds map: {0}".format(mds_map
))
743 for mds_id
, mds_status
in mds_map
['info'].items():
744 if mds_status
['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
745 log
.warning("Unhealthy mds state {0}:{1}".format(mds_id
, mds_status
['state']))
747 elif mds_status
['state'] == 'up:active':
750 log
.info("are_daemons_healthy: {0}/{1}".format(
751 active_count
, mds_map
['max_mds']
754 if not skip_max_mds_check
:
755 if active_count
> mds_map
['max_mds']:
756 log
.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map
))
758 elif active_count
== mds_map
['max_mds']:
759 # The MDSMap says these guys are active, but let's check they really are
760 for mds_id
, mds_status
in mds_map
['info'].items():
761 if mds_status
['state'] == 'up:active':
763 daemon_status
= self
.mds_asok(["status"], mds_id
=mds_status
['name'])
764 except CommandFailedError
as cfe
:
765 if cfe
.exitstatus
== errno
.EINVAL
:
766 # Old version, can't do this check
769 # MDS not even running
772 if daemon_status
['state'] != 'up:active':
773 # MDS hasn't taken the latest map yet
780 log
.info("are_daemons_healthy: skipping max_mds check")
783 def get_daemon_names(self
, state
=None, status
=None):
785 Return MDS daemon names of those daemons in the given state
789 mdsmap
= self
.get_mds_map(status
)
791 for mds_status
in sorted(mdsmap
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
792 if mds_status
['state'] == state
or state
is None:
793 result
.append(mds_status
['name'])
797 def get_active_names(self
):
799 Return MDS daemon names of those daemons holding ranks
802 :return: list of strings like ['a', 'b'], sorted by rank
804 return self
.get_daemon_names("up:active")
806 def get_all_mds_rank(self
, status
=None):
807 mdsmap
= self
.get_mds_map(status
)
809 for mds_status
in sorted(mdsmap
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
810 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
811 result
.append(mds_status
['rank'])
815 def get_rank(self
, rank
=0, status
=None):
817 status
= self
.getinfo()
818 return status
.get_rank(self
.id, rank
)
820 def rank_restart(self
, rank
=0, status
=None):
821 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
822 self
.mds_restart(mds_id
=name
)
824 def rank_signal(self
, signal
, rank
=0, status
=None):
825 name
= self
.get_rank(rank
=rank
, status
=status
)['name']
826 self
.mds_signal(name
, signal
)
828 def rank_freeze(self
, yes
, rank
=0):
829 self
.mon_manager
.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self
.id, rank
), str(yes
).lower())
831 def rank_fail(self
, rank
=0):
832 self
.mon_manager
.raw_cluster_cmd("mds", "fail", "{}:{}".format(self
.id, rank
))
834 def get_ranks(self
, status
=None):
836 status
= self
.getinfo()
837 return status
.get_ranks(self
.id)
839 def get_replays(self
, status
=None):
841 status
= self
.getinfo()
842 return status
.get_replays(self
.id)
844 def get_replay(self
, rank
=0, status
=None):
845 for replay
in self
.get_replays(status
=status
):
846 if replay
['rank'] == rank
:
850 def get_rank_names(self
, status
=None):
852 Return MDS daemon names of those daemons holding a rank,
853 sorted by rank. This includes e.g. up:replay/reconnect
854 as well as active, but does not include standby or
857 mdsmap
= self
.get_mds_map(status
)
859 for mds_status
in sorted(mdsmap
['info'].values(), lambda a
, b
: cmp(a
['rank'], b
['rank'])):
860 if mds_status
['rank'] != -1 and mds_status
['state'] != 'up:standby-replay':
861 result
.append(mds_status
['name'])
865 def wait_for_daemons(self
, timeout
=None, skip_max_mds_check
=False, status
=None):
867 Wait until all daemons are healthy
872 timeout
= DAEMON_WAIT_TIMEOUT
875 status
= self
.status()
879 if self
.are_daemons_healthy(status
=status
, skip_max_mds_check
=skip_max_mds_check
):
885 if elapsed
> timeout
:
886 log
.info("status = {0}".format(status
))
887 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
889 status
= self
.status()
891 def get_lone_mds_id(self
):
893 Get a single MDS ID: the only one if there is only one
894 configured, else the only one currently holding a rank,
897 if len(self
.mds_ids
) != 1:
898 alive
= self
.get_rank_names()
902 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
904 return self
.mds_ids
[0]
907 log
.info("Creating new filesystem")
908 self
.delete_all_filesystems()
912 def put_metadata_object_raw(self
, object_id
, infile
):
914 Save an object to the metadata pool
916 temp_bin_path
= infile
917 self
.client_remote
.run(args
=[
918 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'put', object_id
, temp_bin_path
921 def get_metadata_object_raw(self
, object_id
):
923 Retrieve an object from the metadata pool and store it in a file.
925 temp_bin_path
= '/tmp/' + object_id
+ '.bin'
927 self
.client_remote
.run(args
=[
928 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
933 def get_metadata_object(self
, object_type
, object_id
):
935 Retrieve an object from the metadata pool, pass it through
936 ceph-dencoder to dump it to JSON, and return the decoded object.
938 temp_bin_path
= '/tmp/out.bin'
940 self
.client_remote
.run(args
=[
941 'sudo', os
.path
.join(self
._prefix
, 'rados'), '-p', self
.metadata_pool_name
, 'get', object_id
, temp_bin_path
945 self
.client_remote
.run(args
=[
946 'sudo', os
.path
.join(self
._prefix
, 'ceph-dencoder'), 'type', object_type
, 'import', temp_bin_path
, 'decode', 'dump_json'
948 dump_json
= stdout
.getvalue().strip()
950 dump
= json
.loads(dump_json
)
951 except (TypeError, ValueError):
952 log
.error("Failed to decode JSON: '{0}'".format(dump_json
))
957 def get_journal_version(self
):
959 Read the JournalPointer and Journal::Header objects to learn the version of
962 journal_pointer_object
= '400.00000000'
963 journal_pointer_dump
= self
.get_metadata_object("JournalPointer", journal_pointer_object
)
964 journal_ino
= journal_pointer_dump
['journal_pointer']['front']
966 journal_header_object
= "{0:x}.00000000".format(journal_ino
)
967 journal_header_dump
= self
.get_metadata_object('Journaler::Header', journal_header_object
)
969 version
= journal_header_dump
['journal_header']['stream_format']
970 log
.info("Read journal version {0}".format(version
))
974 def mds_asok(self
, command
, mds_id
=None, timeout
=None):
976 mds_id
= self
.get_lone_mds_id()
978 return self
.json_asok(command
, 'mds', mds_id
, timeout
=timeout
)
980 def rank_asok(self
, command
, rank
=0, status
=None, timeout
=None):
981 info
= self
.get_rank(rank
=rank
, status
=status
)
982 return self
.json_asok(command
, 'mds', info
['name'], timeout
=timeout
)
984 def rank_tell(self
, command
, rank
=0, status
=None):
985 info
= self
.get_rank(rank
=rank
, status
=status
)
986 return json
.loads(self
.mon_manager
.raw_cluster_cmd("tell", 'mds.{0}'.format(info
['name']), *command
))
988 def read_cache(self
, path
, depth
=None):
989 cmd
= ["dump", "tree", path
]
990 if depth
is not None:
991 cmd
.append(depth
.__str
__())
992 result
= self
.mds_asok(cmd
)
994 raise RuntimeError("Path not found in cache: {0}".format(path
))
998 def wait_for_state(self
, goal_state
, reject
=None, timeout
=None, mds_id
=None, rank
=None):
1000 Block until the MDS reaches a particular state, or a failure condition
1003 When there are multiple MDSs, succeed when exaclty one MDS is in the
1004 goal state, or fail when any MDS is in the reject state.
1006 :param goal_state: Return once the MDS is in this state
1007 :param reject: Fail if the MDS enters this state before the goal state
1008 :param timeout: Fail if this many seconds pass before reaching goal
1009 :return: number of seconds waited, rounded down to integer
1012 started_at
= time
.time()
1014 status
= self
.status()
1015 if rank
is not None:
1017 mds_info
= status
.get_rank(self
.id, rank
)
1018 current_state
= mds_info
['state'] if mds_info
else None
1019 log
.info("Looked up MDS state for mds.{0}: {1}".format(rank
, current_state
))
1021 mdsmap
= self
.get_mds_map(status
=status
)
1022 if rank
in mdsmap
['failed']:
1023 log
.info("Waiting for rank {0} to come back.".format(rank
))
1024 current_state
= None
1027 elif mds_id
is not None:
1028 # mds_info is None if no daemon with this ID exists in the map
1029 mds_info
= status
.get_mds(mds_id
)
1030 current_state
= mds_info
['state'] if mds_info
else None
1031 log
.info("Looked up MDS state for {0}: {1}".format(mds_id
, current_state
))
1033 # In general, look for a single MDS
1034 states
= [m
['state'] for m
in status
.get_ranks(self
.id)]
1035 if [s
for s
in states
if s
== goal_state
] == [goal_state
]:
1036 current_state
= goal_state
1037 elif reject
in states
:
1038 current_state
= reject
1040 current_state
= None
1041 log
.info("mapped states {0} to {1}".format(states
, current_state
))
1043 elapsed
= time
.time() - started_at
1044 if current_state
== goal_state
:
1045 log
.info("reached state '{0}' in {1}s".format(current_state
, elapsed
))
1047 elif reject
is not None and current_state
== reject
:
1048 raise RuntimeError("MDS in reject state {0}".format(current_state
))
1049 elif timeout
is not None and elapsed
> timeout
:
1050 log
.error("MDS status at timeout: {0}".format(status
.get_fsmap(self
.id)))
1052 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1053 elapsed
, goal_state
, current_state
1058 def _read_data_xattr(self
, ino_no
, xattr_name
, type, pool
):
1059 mds_id
= self
.mds_ids
[0]
1060 remote
= self
.mds_daemons
[mds_id
].remote
1062 pool
= self
.get_data_pool_name()
1064 obj_name
= "{0:x}.00000000".format(ino_no
)
1067 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "getxattr", obj_name
, xattr_name
1073 except CommandFailedError
as e
:
1074 log
.error(e
.__str
__())
1075 raise ObjectNotFound(obj_name
)
1077 data
= proc
.stdout
.getvalue()
1080 args
=[os
.path
.join(self
._prefix
, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
1085 return json
.loads(p
.stdout
.getvalue().strip())
1087 def _write_data_xattr(self
, ino_no
, xattr_name
, data
, pool
=None):
1089 Write to an xattr of the 0th data object of an inode. Will
1090 succeed whether the object and/or xattr already exist or not.
1092 :param ino_no: integer inode number
1093 :param xattr_name: string name of the xattr
1094 :param data: byte array data to write to the xattr
1095 :param pool: name of data pool or None to use primary data pool
1098 remote
= self
.mds_daemons
[self
.mds_ids
[0]].remote
1100 pool
= self
.get_data_pool_name()
1102 obj_name
= "{0:x}.00000000".format(ino_no
)
1104 os
.path
.join(self
._prefix
, "rados"), "-p", pool
, "setxattr",
1105 obj_name
, xattr_name
, data
1111 def read_backtrace(self
, ino_no
, pool
=None):
1113 Read the backtrace from the data pool, return a dict in the format
1114 given by inode_backtrace_t::dump, which is something like:
1118 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1119 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1121 { "ino": 1099511627778,
1129 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1130 one data pool and that will be used.
1132 return self
._read
_data
_xattr
(ino_no
, "parent", "inode_backtrace_t", pool
)
1134 def read_layout(self
, ino_no
, pool
=None):
1136 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1139 "stripe_unit": 4194304,
1141 "object_size": 4194304,
1146 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1147 one data pool and that will be used.
1149 return self
._read
_data
_xattr
(ino_no
, "layout", "file_layout_t", pool
)
1151 def _enumerate_data_objects(self
, ino
, size
):
1153 Get the list of expected data objects for a range, and the list of objects
1156 :return a tuple of two lists of strings (expected, actual)
1158 stripe_size
= 1024 * 1024 * 4
1160 size
= max(stripe_size
, size
)
1163 "{0:x}.{1:08x}".format(ino
, n
)
1164 for n
in range(0, ((size
- 1) / stripe_size
) + 1)
1167 exist_objects
= self
.rados(["ls"], pool
=self
.get_data_pool_name()).split("\n")
1169 return want_objects
, exist_objects
1171 def data_objects_present(self
, ino
, size
):
1173 Check that *all* the expected data objects for an inode are present in the data pool
1176 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1177 missing
= set(want_objects
) - set(exist_objects
)
1180 log
.info("Objects missing (ino {0}, size {1}): {2}".format(
1185 log
.info("All objects for ino {0} size {1} found".format(ino
, size
))
1188 def data_objects_absent(self
, ino
, size
):
1189 want_objects
, exist_objects
= self
._enumerate
_data
_objects
(ino
, size
)
1190 present
= set(want_objects
) & set(exist_objects
)
1193 log
.info("Objects not absent (ino {0}, size {1}): {2}".format(
1198 log
.info("All objects for ino {0} size {1} are absent".format(ino
, size
))
1201 def dirfrag_exists(self
, ino
, frag
):
1203 self
.rados(["stat", "{0:x}.{1:08x}".format(ino
, frag
)])
1204 except CommandFailedError
as e
:
1209 def rados(self
, args
, pool
=None, namespace
=None, stdin_data
=None,
1212 Call into the `rados` CLI from an MDS
1216 pool
= self
.get_metadata_pool_name()
1218 # Doesn't matter which MDS we use to run rados commands, they all
1219 # have access to the pools
1220 mds_id
= self
.mds_ids
[0]
1221 remote
= self
.mds_daemons
[mds_id
].remote
1223 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1224 # using the `rados` CLI
1225 args
= ([os
.path
.join(self
._prefix
, "rados"), "-p", pool
] +
1226 (["--namespace", namespace
] if namespace
else []) +
1229 if stdin_file
is not None:
1230 args
= ["bash", "-c", "cat " + stdin_file
+ " | " + " ".join(args
)]
1236 return p
.stdout
.getvalue().strip()
1238 def list_dirfrag(self
, dir_ino
):
1240 Read the named object and return the list of omap keys
1242 :return a list of 0 or more strings
1245 dirfrag_obj_name
= "{0:x}.00000000".format(dir_ino
)
1248 key_list_str
= self
.rados(["listomapkeys", dirfrag_obj_name
])
1249 except CommandFailedError
as e
:
1250 log
.error(e
.__str
__())
1251 raise ObjectNotFound(dirfrag_obj_name
)
1253 return key_list_str
.split("\n") if key_list_str
else []
1255 def erase_metadata_objects(self
, prefix
):
1257 For all objects in the metadata pool matching the prefix,
1260 This O(N) with the number of objects in the pool, so only suitable
1261 for use on toy test filesystems.
1263 all_objects
= self
.rados(["ls"]).split("\n")
1264 matching_objects
= [o
for o
in all_objects
if o
.startswith(prefix
)]
1265 for o
in matching_objects
:
1266 self
.rados(["rm", o
])
1268 def erase_mds_objects(self
, rank
):
1270 Erase all the per-MDS objects for a particular rank. This includes
1271 inotable, sessiontable, journal
1274 def obj_prefix(multiplier
):
1276 MDS object naming conventions like rank 1's
1277 journal is at 201.***
1279 return "%x." % (multiplier
* 0x100 + rank
)
1281 # MDS_INO_LOG_OFFSET
1282 self
.erase_metadata_objects(obj_prefix(2))
1283 # MDS_INO_LOG_BACKUP_OFFSET
1284 self
.erase_metadata_objects(obj_prefix(3))
1285 # MDS_INO_LOG_POINTER_OFFSET
1286 self
.erase_metadata_objects(obj_prefix(4))
1287 # MDSTables & SessionMap
1288 self
.erase_metadata_objects("mds{rank:d}_".format(rank
=rank
))
1293 Override this to set a different
1297 def _make_rank(self
, rank
):
1298 return "{}:{}".format(self
.name
, rank
)
1300 def _run_tool(self
, tool
, args
, rank
=None, quiet
=False):
1301 # Tests frequently have [client] configuration that jacks up
1302 # the objecter log level (unlikely to be interesting here)
1303 # and does not set the mds log level (very interesting here)
1305 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=1', '--debug-objecter=1']
1307 base_args
= [os
.path
.join(self
._prefix
, tool
), '--debug-mds=4', '--debug-objecter=1']
1309 if rank
is not None:
1310 base_args
.extend(["--rank", "%s" % str(rank
)])
1312 t1
= datetime
.datetime
.now()
1313 r
= self
.tool_remote
.run(
1314 args
=base_args
+ args
,
1315 stdout
=StringIO()).stdout
.getvalue().strip()
1316 duration
= datetime
.datetime
.now() - t1
1317 log
.info("Ran {0} in time {1}, result:\n{2}".format(
1318 base_args
+ args
, duration
, r
1323 def tool_remote(self
):
1325 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1326 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1327 so that tests can use this remote to go get locally written output files from the tools.
1329 mds_id
= self
.mds_ids
[0]
1330 return self
.mds_daemons
[mds_id
].remote
1332 def journal_tool(self
, args
, rank
, quiet
=False):
1334 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1336 fs_rank
= self
._make
_rank
(rank
)
1337 return self
._run
_tool
("cephfs-journal-tool", args
, fs_rank
, quiet
)
1339 def table_tool(self
, args
, quiet
=False):
1341 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1343 return self
._run
_tool
("cephfs-table-tool", args
, None, quiet
)
1345 def data_scan(self
, args
, quiet
=False, worker_count
=1):
1347 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1349 :param worker_count: if greater than 1, multiple workers will be run
1350 in parallel and the return value will be None
1355 for n
in range(0, worker_count
):
1356 if worker_count
> 1:
1357 # data-scan args first token is a command, followed by args to it.
1358 # insert worker arguments after the command.
1360 worker_args
= [cmd
] + ["--worker_n", n
.__str
__(), "--worker_m", worker_count
.__str
__()] + args
[1:]
1364 workers
.append(Greenlet
.spawn(lambda wargs
=worker_args
:
1365 self
._run
_tool
("cephfs-data-scan", wargs
, None, quiet
)))
1370 if worker_count
== 1:
1371 return workers
[0].value
1376 return self
.is_pool_full(self
.get_data_pool_name())