]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
update source to 12.2.11
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11 import random
12
13 from teuthology.exceptions import CommandFailedError
14 from teuthology import misc
15 from teuthology.nuke import clear_firewall
16 from teuthology.parallel import parallel
17 from tasks.ceph_manager import write_conf
18 from tasks import ceph_manager
19
20
21 log = logging.getLogger(__name__)
22
23
24 DAEMON_WAIT_TIMEOUT = 120
25 ROOT_INO = 1
26
27
28 class ObjectNotFound(Exception):
29 def __init__(self, object_name):
30 self._object_name = object_name
31
32 def __str__(self):
33 return "Object not found: '{0}'".format(self._object_name)
34
35 class FSStatus(object):
36 """
37 Operations on a snapshot of the FSMap.
38 """
39 def __init__(self, mon_manager):
40 self.mon = mon_manager
41 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
42
43 def __str__(self):
44 return json.dumps(self.map, indent = 2, sort_keys = True)
45
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self, key):
48 """
49 Get a field from the fsmap.
50 """
51 return self.map[key]
52
53 def get_filesystems(self):
54 """
55 Iterator for all filesystems.
56 """
57 for fs in self.map['filesystems']:
58 yield fs
59
60 def get_all(self):
61 """
62 Iterator for all the mds_info components in the FSMap.
63 """
64 for info in self.get_standbys():
65 yield info
66 for fs in self.map['filesystems']:
67 for info in fs['mdsmap']['info'].values():
68 yield info
69
70 def get_standbys(self):
71 """
72 Iterator for all standbys.
73 """
74 for info in self.map['standbys']:
75 yield info
76
77 def get_fsmap(self, fscid):
78 """
79 Get the fsmap for the given FSCID.
80 """
81 for fs in self.map['filesystems']:
82 if fscid is None or fs['id'] == fscid:
83 return fs
84 raise RuntimeError("FSCID {0} not in map".format(fscid))
85
86 def get_fsmap_byname(self, name):
87 """
88 Get the fsmap for the given file system name.
89 """
90 for fs in self.map['filesystems']:
91 if name is None or fs['mdsmap']['fs_name'] == name:
92 return fs
93 raise RuntimeError("FS {0} not in map".format(name))
94
95 def get_replays(self, fscid):
96 """
97 Get the standby:replay MDS for the given FSCID.
98 """
99 fs = self.get_fsmap(fscid)
100 for info in fs['mdsmap']['info'].values():
101 if info['state'] == 'up:standby-replay':
102 yield info
103
104 def get_ranks(self, fscid):
105 """
106 Get the ranks for the given FSCID.
107 """
108 fs = self.get_fsmap(fscid)
109 for info in fs['mdsmap']['info'].values():
110 if info['rank'] >= 0:
111 yield info
112
113 def get_rank(self, fscid, rank):
114 """
115 Get the rank for the given FSCID.
116 """
117 for info in self.get_ranks(fscid):
118 if info['rank'] == rank:
119 return info
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
121
122 def get_mds(self, name):
123 """
124 Get the info for the given MDS name.
125 """
126 for info in self.get_all():
127 if info['name'] == name:
128 return info
129 return None
130
131 def get_mds_addr(self, name):
132 """
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134 """
135 info = self.get_mds(name)
136 if info:
137 return info['addr']
138 else:
139 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name))
141
142 class CephCluster(object):
143 @property
144 def admin_remote(self):
145 first_mon = misc.get_first_mon(self._ctx, None)
146 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
147 return result
148
149 def __init__(self, ctx):
150 self._ctx = ctx
151 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
152
153 def get_config(self, key, service_type=None):
154 """
155 Get config from mon by default, or a specific service if caller asks for it
156 """
157 if service_type is None:
158 service_type = 'mon'
159
160 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
161 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
162
163 def set_ceph_conf(self, subsys, key, value):
164 if subsys not in self._ctx.ceph['ceph'].conf:
165 self._ctx.ceph['ceph'].conf[subsys] = {}
166 self._ctx.ceph['ceph'].conf[subsys][key] = value
167 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
169
170 def clear_ceph_conf(self, subsys, key):
171 del self._ctx.ceph['ceph'].conf[subsys][key]
172 write_conf(self._ctx)
173
174 def json_asok(self, command, service_type, service_id, timeout=None):
175 if timeout is None:
176 timeout = 15*60
177 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
178 response_data = proc.stdout.getvalue()
179 log.info("_json_asok output: {0}".format(response_data))
180 if response_data.strip():
181 return json.loads(response_data)
182 else:
183 return None
184
185
186 class MDSCluster(CephCluster):
187 """
188 Collective operations on all the MDS daemons in the Ceph cluster. These
189 daemons may be in use by various Filesystems.
190
191 For the benefit of pre-multi-filesystem tests, this class is also
192 a parent of Filesystem. The correct way to use MDSCluster going forward is
193 as a separate instance outside of your (multiple) Filesystem instances.
194 """
195 def __init__(self, ctx):
196 super(MDSCluster, self).__init__(ctx)
197
198 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
199
200 if len(self.mds_ids) == 0:
201 raise RuntimeError("This task requires at least one MDS")
202
203 if hasattr(self._ctx, "daemons"):
204 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
205 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
206
207 def _one_or_all(self, mds_id, cb, in_parallel=True):
208 """
209 Call a callback for a single named MDS, or for all.
210
211 Note that the parallelism here isn't for performance, it's to avoid being overly kind
212 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
213 avoid being overly kind by executing them in a particular order. However, some actions
214 don't cope with being done in parallel, so it's optional (`in_parallel`)
215
216 :param mds_id: MDS daemon name, or None
217 :param cb: Callback taking single argument of MDS daemon name
218 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
219 """
220 if mds_id is None:
221 if in_parallel:
222 with parallel() as p:
223 for mds_id in self.mds_ids:
224 p.spawn(cb, mds_id)
225 else:
226 for mds_id in self.mds_ids:
227 cb(mds_id)
228 else:
229 cb(mds_id)
230
231 def get_config(self, key, service_type=None):
232 """
233 get_config specialization of service_type="mds"
234 """
235 if service_type != "mds":
236 return super(MDSCluster, self).get_config(key, service_type)
237
238 # Some tests stop MDS daemons, don't send commands to a dead one:
239 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
240 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
241
242 def mds_stop(self, mds_id=None):
243 """
244 Stop the MDS daemon process(se). If it held a rank, that rank
245 will eventually go laggy.
246 """
247 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
248
249 def mds_fail(self, mds_id=None):
250 """
251 Inform MDSMonitor of the death of the daemon process(es). If it held
252 a rank, that rank will be relinquished.
253 """
254 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
255
256 def mds_restart(self, mds_id=None):
257 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
258
259 def mds_fail_restart(self, mds_id=None):
260 """
261 Variation on restart that includes marking MDSs as failed, so that doing this
262 operation followed by waiting for healthy daemon states guarantees that they
263 have gone down and come up, rather than potentially seeing the healthy states
264 that existed before the restart.
265 """
266 def _fail_restart(id_):
267 self.mds_daemons[id_].stop()
268 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
269 self.mds_daemons[id_].restart()
270
271 self._one_or_all(mds_id, _fail_restart)
272
273 def mds_signal(self, mds_id, sig, silent=False):
274 """
275 signal a MDS daemon
276 """
277 self.mds_daemons[mds_id].signal(sig, silent);
278
279 def newfs(self, name='cephfs', create=True):
280 return Filesystem(self._ctx, name=name, create=create)
281
282 def status(self):
283 return FSStatus(self.mon_manager)
284
285 def delete_all_filesystems(self):
286 """
287 Remove all filesystems that exist, and any pools in use by them.
288 """
289 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
290 pool_id_name = {}
291 for pool in pools:
292 pool_id_name[pool['pool']] = pool['pool_name']
293
294 # mark cluster down for each fs to prevent churn during deletion
295 status = self.status()
296 for fs in status.get_filesystems():
297 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
298
299 # get a new copy as actives may have since changed
300 status = self.status()
301 for fs in status.get_filesystems():
302 mdsmap = fs['mdsmap']
303 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
304
305 for gid in mdsmap['up'].values():
306 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
307
308 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
309 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
310 metadata_pool, metadata_pool,
311 '--yes-i-really-really-mean-it')
312 for data_pool in mdsmap['data_pools']:
313 data_pool = pool_id_name[data_pool]
314 try:
315 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
316 data_pool, data_pool,
317 '--yes-i-really-really-mean-it')
318 except CommandFailedError as e:
319 if e.exitstatus == 16: # EBUSY, this data pool is used
320 pass # by two metadata pools, let the 2nd
321 else: # pass delete it
322 raise
323
324 def get_standby_daemons(self):
325 return set([s['name'] for s in self.status().get_standbys()])
326
327 def get_mds_hostnames(self):
328 result = set()
329 for mds_id in self.mds_ids:
330 mds_remote = self.mon_manager.find_remote('mds', mds_id)
331 result.add(mds_remote.hostname)
332
333 return list(result)
334
335 def set_clients_block(self, blocked, mds_id=None):
336 """
337 Block (using iptables) client communications to this MDS. Be careful: if
338 other services are running on this MDS, or other MDSs try to talk to this
339 MDS, their communications may also be blocked as collatoral damage.
340
341 :param mds_id: Optional ID of MDS to block, default to all
342 :return:
343 """
344 da_flag = "-A" if blocked else "-D"
345
346 def set_block(_mds_id):
347 remote = self.mon_manager.find_remote('mds', _mds_id)
348 status = self.status()
349
350 addr = status.get_mds_addr(_mds_id)
351 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
352
353 remote.run(
354 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
355 "comment", "--comment", "teuthology"])
356 remote.run(
357 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
358 "comment", "--comment", "teuthology"])
359
360 self._one_or_all(mds_id, set_block, in_parallel=False)
361
362 def clear_firewall(self):
363 clear_firewall(self._ctx)
364
365 def get_mds_info(self, mds_id):
366 return FSStatus(self.mon_manager).get_mds(mds_id)
367
368 def is_pool_full(self, pool_name):
369 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
370 for pool in pools:
371 if pool['pool_name'] == pool_name:
372 return 'full' in pool['flags_names'].split(",")
373
374 raise RuntimeError("Pool not found '{0}'".format(pool_name))
375
376 class Filesystem(MDSCluster):
377 """
378 This object is for driving a CephFS filesystem. The MDS daemons driven by
379 MDSCluster may be shared with other Filesystems.
380 """
381 def __init__(self, ctx, fscid=None, name=None, create=False,
382 ec_profile=None):
383 super(Filesystem, self).__init__(ctx)
384
385 self.name = name
386 self.ec_profile = ec_profile
387 self.id = None
388 self.metadata_pool_name = None
389 self.metadata_overlay = False
390 self.data_pool_name = None
391 self.data_pools = None
392
393 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
394 self.client_id = client_list[0]
395 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
396
397 if name is not None:
398 if fscid is not None:
399 raise RuntimeError("cannot specify fscid when creating fs")
400 if create and not self.legacy_configured():
401 self.create()
402 else:
403 if fscid is not None:
404 self.id = fscid
405 self.getinfo(refresh = True)
406
407 # Stash a reference to the first created filesystem on ctx, so
408 # that if someone drops to the interactive shell they can easily
409 # poke our methods.
410 if not hasattr(self._ctx, "filesystem"):
411 self._ctx.filesystem = self
412
413 def getinfo(self, refresh = False):
414 status = self.status()
415 if self.id is not None:
416 fsmap = status.get_fsmap(self.id)
417 elif self.name is not None:
418 fsmap = status.get_fsmap_byname(self.name)
419 else:
420 fss = [fs for fs in status.get_filesystems()]
421 if len(fss) == 1:
422 fsmap = fss[0]
423 elif len(fss) == 0:
424 raise RuntimeError("no file system available")
425 else:
426 raise RuntimeError("more than one file system available")
427 self.id = fsmap['id']
428 self.name = fsmap['mdsmap']['fs_name']
429 self.get_pool_names(status = status, refresh = refresh)
430 return status
431
432 def set_metadata_overlay(self, overlay):
433 if self.id is not None:
434 raise RuntimeError("cannot specify fscid when configuring overlay")
435 self.metadata_overlay = overlay
436
437 def deactivate(self, rank):
438 if rank < 0:
439 raise RuntimeError("invalid rank")
440 elif rank == 0:
441 raise RuntimeError("cannot deactivate rank 0")
442 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
443
444 def set_var(self, var, *args):
445 a = map(str, args)
446 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
447
448 def set_max_mds(self, max_mds):
449 self.set_var("max_mds", "%d" % max_mds)
450
451 def set_allow_dirfrags(self, yes):
452 self.set_var("allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
453
454 def get_pgs_per_fs_pool(self):
455 """
456 Calculate how many PGs to use when creating a pool, in order to avoid raising any
457 health warnings about mon_pg_warn_min_per_osd
458
459 :return: an integer number of PGs
460 """
461 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
462 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
463 return pg_warn_min_per_osd * osd_count
464
465 def create(self):
466 if self.name is None:
467 self.name = "cephfs"
468 if self.metadata_pool_name is None:
469 self.metadata_pool_name = "{0}_metadata".format(self.name)
470 if self.data_pool_name is None:
471 data_pool_name = "{0}_data".format(self.name)
472 else:
473 data_pool_name = self.data_pool_name
474
475 log.info("Creating filesystem '{0}'".format(self.name))
476
477 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
478
479 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
480 self.metadata_pool_name, pgs_per_fs_pool.__str__())
481 if self.metadata_overlay:
482 self.mon_manager.raw_cluster_cmd('fs', 'new',
483 self.name, self.metadata_pool_name, data_pool_name,
484 '--allow-dangerous-metadata-overlay')
485 else:
486 if self.ec_profile and 'disabled' not in self.ec_profile:
487 log.info("EC profile is %s", self.ec_profile)
488 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
489 cmd.extend(self.ec_profile)
490 self.mon_manager.raw_cluster_cmd(*cmd)
491 self.mon_manager.raw_cluster_cmd(
492 'osd', 'pool', 'create',
493 data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
494 data_pool_name)
495 self.mon_manager.raw_cluster_cmd(
496 'osd', 'pool', 'set',
497 data_pool_name, 'allow_ec_overwrites', 'true')
498 else:
499 self.mon_manager.raw_cluster_cmd(
500 'osd', 'pool', 'create',
501 data_pool_name, pgs_per_fs_pool.__str__())
502 self.mon_manager.raw_cluster_cmd('fs', 'new',
503 self.name, self.metadata_pool_name, data_pool_name)
504 self.check_pool_application(self.metadata_pool_name)
505 self.check_pool_application(data_pool_name)
506 # Turn off spurious standby count warnings from modifying max_mds in tests.
507 try:
508 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
509 except CommandFailedError as e:
510 if e.exitstatus == 22:
511 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
512 pass
513 else:
514 raise
515
516 self.getinfo(refresh = True)
517
518
519 def check_pool_application(self, pool_name):
520 osd_map = self.mon_manager.get_osd_dump_json()
521 for pool in osd_map['pools']:
522 if pool['pool_name'] == pool_name:
523 if "application_metadata" in pool:
524 if not "cephfs" in pool['application_metadata']:
525 raise RuntimeError("Pool %p does not name cephfs as application!".\
526 format(pool_name))
527
528
529 def __del__(self):
530 if getattr(self._ctx, "filesystem", None) == self:
531 delattr(self._ctx, "filesystem")
532
533 def exists(self):
534 """
535 Whether a filesystem exists in the mon's filesystem list
536 """
537 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
538 return self.name in [fs['name'] for fs in fs_list]
539
540 def legacy_configured(self):
541 """
542 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
543 the case, the caller should avoid using Filesystem.create
544 """
545 try:
546 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
547 pools = json.loads(out_text)
548 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
549 if metadata_pool_exists:
550 self.metadata_pool_name = 'metadata'
551 except CommandFailedError as e:
552 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
553 # structured output (--format) from the CLI.
554 if e.exitstatus == 22:
555 metadata_pool_exists = True
556 else:
557 raise
558
559 return metadata_pool_exists
560
561 def _df(self):
562 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
563
564 def get_mds_map(self, status=None):
565 if status is None:
566 status = self.status()
567 return status.get_fsmap(self.id)['mdsmap']
568
569 def get_var(self, var):
570 return self.status().get_fsmap(self.id)['mdsmap'][var]
571
572 def add_data_pool(self, name):
573 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
574 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
575 self.get_pool_names(refresh = True)
576 for poolid, fs_name in self.data_pools.items():
577 if name == fs_name:
578 return poolid
579 raise RuntimeError("could not get just created pool '{0}'".format(name))
580
581 def get_pool_names(self, refresh = False, status = None):
582 if refresh or self.metadata_pool_name is None or self.data_pools is None:
583 if status is None:
584 status = self.status()
585 fsmap = status.get_fsmap(self.id)
586
587 osd_map = self.mon_manager.get_osd_dump_json()
588 id_to_name = {}
589 for p in osd_map['pools']:
590 id_to_name[p['pool']] = p['pool_name']
591
592 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
593 self.data_pools = {}
594 for data_pool in fsmap['mdsmap']['data_pools']:
595 self.data_pools[data_pool] = id_to_name[data_pool]
596
597 def get_data_pool_name(self, refresh = False):
598 if refresh or self.data_pools is None:
599 self.get_pool_names(refresh = True)
600 assert(len(self.data_pools) == 1)
601 return self.data_pools.values()[0]
602
603 def get_data_pool_id(self, refresh = False):
604 """
605 Don't call this if you have multiple data pools
606 :return: integer
607 """
608 if refresh or self.data_pools is None:
609 self.get_pool_names(refresh = True)
610 assert(len(self.data_pools) == 1)
611 return self.data_pools.keys()[0]
612
613 def get_data_pool_names(self, refresh = False):
614 if refresh or self.data_pools is None:
615 self.get_pool_names(refresh = True)
616 return self.data_pools.values()
617
618 def get_metadata_pool_name(self):
619 return self.metadata_pool_name
620
621 def set_data_pool_name(self, name):
622 if self.id is not None:
623 raise RuntimeError("can't set filesystem name if its fscid is set")
624 self.data_pool_name = name
625
626 def get_namespace_id(self):
627 return self.id
628
629 def get_pool_df(self, pool_name):
630 """
631 Return a dict like:
632 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
633 """
634 for pool_df in self._df()['pools']:
635 if pool_df['name'] == pool_name:
636 return pool_df['stats']
637
638 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
639
640 def get_usage(self):
641 return self._df()['stats']['total_used_bytes']
642
643 def are_daemons_healthy(self):
644 """
645 Return true if all daemons are in one of active, standby, standby-replay, and
646 at least max_mds daemons are in 'active'.
647
648 Unlike most of Filesystem, this function is tolerant of new-style `fs`
649 commands being missing, because we are part of the ceph installation
650 process during upgrade suites, so must fall back to old style commands
651 when we get an EINVAL on a new style command.
652
653 :return:
654 """
655
656 active_count = 0
657 try:
658 mds_map = self.get_mds_map()
659 except CommandFailedError as cfe:
660 # Old version, fall back to non-multi-fs commands
661 if cfe.exitstatus == errno.EINVAL:
662 mds_map = json.loads(
663 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
664 else:
665 raise
666
667 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
668
669 for mds_id, mds_status in mds_map['info'].items():
670 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
671 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
672 return False
673 elif mds_status['state'] == 'up:active':
674 active_count += 1
675
676 log.info("are_daemons_healthy: {0}/{1}".format(
677 active_count, mds_map['max_mds']
678 ))
679
680 if active_count >= mds_map['max_mds']:
681 # The MDSMap says these guys are active, but let's check they really are
682 for mds_id, mds_status in mds_map['info'].items():
683 if mds_status['state'] == 'up:active':
684 try:
685 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
686 except CommandFailedError as cfe:
687 if cfe.exitstatus == errno.EINVAL:
688 # Old version, can't do this check
689 continue
690 else:
691 # MDS not even running
692 return False
693
694 if daemon_status['state'] != 'up:active':
695 # MDS hasn't taken the latest map yet
696 return False
697
698 return True
699 else:
700 return False
701
702 def get_daemon_names(self, state=None):
703 """
704 Return MDS daemon names of those daemons in the given state
705 :param state:
706 :return:
707 """
708 status = self.get_mds_map()
709 result = []
710 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
711 if mds_status['state'] == state or state is None:
712 result.append(mds_status['name'])
713
714 return result
715
716 def get_active_names(self):
717 """
718 Return MDS daemon names of those daemons holding ranks
719 in state up:active
720
721 :return: list of strings like ['a', 'b'], sorted by rank
722 """
723 return self.get_daemon_names("up:active")
724
725 def get_all_mds_rank(self):
726 status = self.get_mds_map()
727 result = []
728 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
729 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
730 result.append(mds_status['rank'])
731
732 return result
733
734 def get_rank(self, rank=0, status=None):
735 if status is None:
736 status = self.getinfo()
737 return status.get_rank(self.id, rank)
738
739 def get_ranks(self, status=None):
740 if status is None:
741 status = self.getinfo()
742 return status.get_ranks(self.id)
743
744 def get_rank_names(self, status=None):
745 """
746 Return MDS daemon names of those daemons holding a rank,
747 sorted by rank. This includes e.g. up:replay/reconnect
748 as well as active, but does not include standby or
749 standby-replay.
750 """
751 status = self.get_mds_map()
752 result = []
753 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
754 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
755 result.append(mds_status['name'])
756
757 return result
758
759 def wait_for_daemons(self, timeout=None):
760 """
761 Wait until all daemons are healthy
762 :return:
763 """
764
765 if timeout is None:
766 timeout = DAEMON_WAIT_TIMEOUT
767
768 elapsed = 0
769 while True:
770 if self.are_daemons_healthy():
771 return
772 else:
773 time.sleep(1)
774 elapsed += 1
775
776 if elapsed > timeout:
777 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
778
779 def get_lone_mds_id(self):
780 """
781 Get a single MDS ID: the only one if there is only one
782 configured, else the only one currently holding a rank,
783 else raise an error.
784 """
785 if len(self.mds_ids) != 1:
786 alive = self.get_rank_names()
787 if len(alive) == 1:
788 return alive[0]
789 else:
790 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
791 else:
792 return self.mds_ids[0]
793
794 def recreate(self):
795 log.info("Creating new filesystem")
796 self.delete_all_filesystems()
797 self.id = None
798 self.create()
799
800 def put_metadata_object_raw(self, object_id, infile):
801 """
802 Save an object to the metadata pool
803 """
804 temp_bin_path = infile
805 self.client_remote.run(args=[
806 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
807 ])
808
809 def get_metadata_object_raw(self, object_id):
810 """
811 Retrieve an object from the metadata pool and store it in a file.
812 """
813 temp_bin_path = '/tmp/' + object_id + '.bin'
814
815 self.client_remote.run(args=[
816 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
817 ])
818
819 return temp_bin_path
820
821 def get_metadata_object(self, object_type, object_id):
822 """
823 Retrieve an object from the metadata pool, pass it through
824 ceph-dencoder to dump it to JSON, and return the decoded object.
825 """
826 temp_bin_path = '/tmp/out.bin'
827
828 self.client_remote.run(args=[
829 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
830 ])
831
832 stdout = StringIO()
833 self.client_remote.run(args=[
834 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
835 ], stdout=stdout)
836 dump_json = stdout.getvalue().strip()
837 try:
838 dump = json.loads(dump_json)
839 except (TypeError, ValueError):
840 log.error("Failed to decode JSON: '{0}'".format(dump_json))
841 raise
842
843 return dump
844
845 def get_journal_version(self):
846 """
847 Read the JournalPointer and Journal::Header objects to learn the version of
848 encoding in use.
849 """
850 journal_pointer_object = '400.00000000'
851 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
852 journal_ino = journal_pointer_dump['journal_pointer']['front']
853
854 journal_header_object = "{0:x}.00000000".format(journal_ino)
855 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
856
857 version = journal_header_dump['journal_header']['stream_format']
858 log.info("Read journal version {0}".format(version))
859
860 return version
861
862 def mds_asok(self, command, mds_id=None, timeout=None):
863 if mds_id is None:
864 mds_id = self.get_lone_mds_id()
865
866 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
867
868 def rank_asok(self, command, rank=0, status=None, timeout=None):
869 info = self.get_rank(rank=rank, status=status)
870 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
871
872 def read_cache(self, path, depth=None):
873 cmd = ["dump", "tree", path]
874 if depth is not None:
875 cmd.append(depth.__str__())
876 result = self.mds_asok(cmd)
877 if len(result) == 0:
878 raise RuntimeError("Path not found in cache: {0}".format(path))
879
880 return result
881
882 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
883 """
884 Block until the MDS reaches a particular state, or a failure condition
885 is met.
886
887 When there are multiple MDSs, succeed when exaclty one MDS is in the
888 goal state, or fail when any MDS is in the reject state.
889
890 :param goal_state: Return once the MDS is in this state
891 :param reject: Fail if the MDS enters this state before the goal state
892 :param timeout: Fail if this many seconds pass before reaching goal
893 :return: number of seconds waited, rounded down to integer
894 """
895
896 started_at = time.time()
897 while True:
898 status = self.status()
899 if rank is not None:
900 try:
901 mds_info = status.get_rank(self.id, rank)
902 current_state = mds_info['state'] if mds_info else None
903 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
904 except:
905 mdsmap = self.get_mds_map(status=status)
906 if rank in mdsmap['failed']:
907 log.info("Waiting for rank {0} to come back.".format(rank))
908 current_state = None
909 else:
910 raise
911 elif mds_id is not None:
912 # mds_info is None if no daemon with this ID exists in the map
913 mds_info = status.get_mds(mds_id)
914 current_state = mds_info['state'] if mds_info else None
915 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
916 else:
917 # In general, look for a single MDS
918 states = [m['state'] for m in status.get_ranks(self.id)]
919 if [s for s in states if s == goal_state] == [goal_state]:
920 current_state = goal_state
921 elif reject in states:
922 current_state = reject
923 else:
924 current_state = None
925 log.info("mapped states {0} to {1}".format(states, current_state))
926
927 elapsed = time.time() - started_at
928 if current_state == goal_state:
929 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
930 return elapsed
931 elif reject is not None and current_state == reject:
932 raise RuntimeError("MDS in reject state {0}".format(current_state))
933 elif timeout is not None and elapsed > timeout:
934 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
935 raise RuntimeError(
936 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
937 elapsed, goal_state, current_state
938 ))
939 else:
940 time.sleep(1)
941
942 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
943 mds_id = self.mds_ids[0]
944 remote = self.mds_daemons[mds_id].remote
945 if pool is None:
946 pool = self.get_data_pool_name()
947
948 obj_name = "{0:x}.00000000".format(ino_no)
949
950 args = [
951 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
952 ]
953 try:
954 proc = remote.run(
955 args=args,
956 stdout=StringIO())
957 except CommandFailedError as e:
958 log.error(e.__str__())
959 raise ObjectNotFound(obj_name)
960
961 data = proc.stdout.getvalue()
962
963 p = remote.run(
964 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
965 stdout=StringIO(),
966 stdin=data
967 )
968
969 return json.loads(p.stdout.getvalue().strip())
970
971 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
972 """
973 Write to an xattr of the 0th data object of an inode. Will
974 succeed whether the object and/or xattr already exist or not.
975
976 :param ino_no: integer inode number
977 :param xattr_name: string name of the xattr
978 :param data: byte array data to write to the xattr
979 :param pool: name of data pool or None to use primary data pool
980 :return: None
981 """
982 remote = self.mds_daemons[self.mds_ids[0]].remote
983 if pool is None:
984 pool = self.get_data_pool_name()
985
986 obj_name = "{0:x}.00000000".format(ino_no)
987 args = [
988 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
989 obj_name, xattr_name, data
990 ]
991 remote.run(
992 args=args,
993 stdout=StringIO())
994
995 def read_backtrace(self, ino_no, pool=None):
996 """
997 Read the backtrace from the data pool, return a dict in the format
998 given by inode_backtrace_t::dump, which is something like:
999
1000 ::
1001
1002 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1003 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1004
1005 { "ino": 1099511627778,
1006 "ancestors": [
1007 { "dirino": 1,
1008 "dname": "blah",
1009 "version": 11}],
1010 "pool": 1,
1011 "old_pools": []}
1012
1013 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1014 one data pool and that will be used.
1015 """
1016 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1017
1018 def read_layout(self, ino_no, pool=None):
1019 """
1020 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1021 ::
1022 {
1023 "stripe_unit": 4194304,
1024 "stripe_count": 1,
1025 "object_size": 4194304,
1026 "pool_id": 1,
1027 "pool_ns": "",
1028 }
1029
1030 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1031 one data pool and that will be used.
1032 """
1033 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1034
1035 def _enumerate_data_objects(self, ino, size):
1036 """
1037 Get the list of expected data objects for a range, and the list of objects
1038 that really exist.
1039
1040 :return a tuple of two lists of strings (expected, actual)
1041 """
1042 stripe_size = 1024 * 1024 * 4
1043
1044 size = max(stripe_size, size)
1045
1046 want_objects = [
1047 "{0:x}.{1:08x}".format(ino, n)
1048 for n in range(0, ((size - 1) / stripe_size) + 1)
1049 ]
1050
1051 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1052
1053 return want_objects, exist_objects
1054
1055 def data_objects_present(self, ino, size):
1056 """
1057 Check that *all* the expected data objects for an inode are present in the data pool
1058 """
1059
1060 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1061 missing = set(want_objects) - set(exist_objects)
1062
1063 if missing:
1064 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1065 ino, size, missing
1066 ))
1067 return False
1068 else:
1069 log.info("All objects for ino {0} size {1} found".format(ino, size))
1070 return True
1071
1072 def data_objects_absent(self, ino, size):
1073 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1074 present = set(want_objects) & set(exist_objects)
1075
1076 if present:
1077 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1078 ino, size, present
1079 ))
1080 return False
1081 else:
1082 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1083 return True
1084
1085 def dirfrag_exists(self, ino, frag):
1086 try:
1087 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1088 except CommandFailedError as e:
1089 return False
1090 else:
1091 return True
1092
1093 def rados(self, args, pool=None, namespace=None, stdin_data=None,
1094 stdin_file=None):
1095 """
1096 Call into the `rados` CLI from an MDS
1097 """
1098
1099 if pool is None:
1100 pool = self.get_metadata_pool_name()
1101
1102 # Doesn't matter which MDS we use to run rados commands, they all
1103 # have access to the pools
1104 mds_id = self.mds_ids[0]
1105 remote = self.mds_daemons[mds_id].remote
1106
1107 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1108 # using the `rados` CLI
1109 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1110 (["--namespace", namespace] if namespace else []) +
1111 args)
1112
1113 if stdin_file is not None:
1114 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
1115
1116 p = remote.run(
1117 args=args,
1118 stdin=stdin_data,
1119 stdout=StringIO())
1120 return p.stdout.getvalue().strip()
1121
1122 def list_dirfrag(self, dir_ino):
1123 """
1124 Read the named object and return the list of omap keys
1125
1126 :return a list of 0 or more strings
1127 """
1128
1129 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1130
1131 try:
1132 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1133 except CommandFailedError as e:
1134 log.error(e.__str__())
1135 raise ObjectNotFound(dirfrag_obj_name)
1136
1137 return key_list_str.split("\n") if key_list_str else []
1138
1139 def erase_metadata_objects(self, prefix):
1140 """
1141 For all objects in the metadata pool matching the prefix,
1142 erase them.
1143
1144 This O(N) with the number of objects in the pool, so only suitable
1145 for use on toy test filesystems.
1146 """
1147 all_objects = self.rados(["ls"]).split("\n")
1148 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1149 for o in matching_objects:
1150 self.rados(["rm", o])
1151
1152 def erase_mds_objects(self, rank):
1153 """
1154 Erase all the per-MDS objects for a particular rank. This includes
1155 inotable, sessiontable, journal
1156 """
1157
1158 def obj_prefix(multiplier):
1159 """
1160 MDS object naming conventions like rank 1's
1161 journal is at 201.***
1162 """
1163 return "%x." % (multiplier * 0x100 + rank)
1164
1165 # MDS_INO_LOG_OFFSET
1166 self.erase_metadata_objects(obj_prefix(2))
1167 # MDS_INO_LOG_BACKUP_OFFSET
1168 self.erase_metadata_objects(obj_prefix(3))
1169 # MDS_INO_LOG_POINTER_OFFSET
1170 self.erase_metadata_objects(obj_prefix(4))
1171 # MDSTables & SessionMap
1172 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1173
1174 @property
1175 def _prefix(self):
1176 """
1177 Override this to set a different
1178 """
1179 return ""
1180
1181 def _make_rank(self, rank):
1182 return "{}:{}".format(self.name, rank)
1183
1184 def _run_tool(self, tool, args, rank=None, quiet=False):
1185 # Tests frequently have [client] configuration that jacks up
1186 # the objecter log level (unlikely to be interesting here)
1187 # and does not set the mds log level (very interesting here)
1188 if quiet:
1189 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1190 else:
1191 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1192
1193 if rank is not None:
1194 base_args.extend(["--rank", "%s" % str(rank)])
1195
1196 t1 = datetime.datetime.now()
1197 r = self.tool_remote.run(
1198 args=base_args + args,
1199 stdout=StringIO()).stdout.getvalue().strip()
1200 duration = datetime.datetime.now() - t1
1201 log.info("Ran {0} in time {1}, result:\n{2}".format(
1202 base_args + args, duration, r
1203 ))
1204 return r
1205
1206 @property
1207 def tool_remote(self):
1208 """
1209 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1210 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1211 so that tests can use this remote to go get locally written output files from the tools.
1212 """
1213 mds_id = self.mds_ids[0]
1214 return self.mds_daemons[mds_id].remote
1215
1216 def journal_tool(self, args, rank, quiet=False):
1217 """
1218 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1219 """
1220 fs_rank = self._make_rank(rank)
1221 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1222
1223 def table_tool(self, args, quiet=False):
1224 """
1225 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1226 """
1227 return self._run_tool("cephfs-table-tool", args, None, quiet)
1228
1229 def data_scan(self, args, quiet=False, worker_count=1):
1230 """
1231 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1232
1233 :param worker_count: if greater than 1, multiple workers will be run
1234 in parallel and the return value will be None
1235 """
1236
1237 workers = []
1238
1239 for n in range(0, worker_count):
1240 if worker_count > 1:
1241 # data-scan args first token is a command, followed by args to it.
1242 # insert worker arguments after the command.
1243 cmd = args[0]
1244 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1245 else:
1246 worker_args = args
1247
1248 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1249 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1250
1251 for w in workers:
1252 w.get()
1253
1254 if worker_count == 1:
1255 return workers[0].value
1256 else:
1257 return None
1258
1259 def is_full(self):
1260 return self.is_pool_full(self.get_data_pool_name())