]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
update sources to 12.2.2
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11 import random
12
13 from teuthology.exceptions import CommandFailedError
14 from teuthology import misc
15 from teuthology.nuke import clear_firewall
16 from teuthology.parallel import parallel
17 from tasks.ceph_manager import write_conf
18 from tasks import ceph_manager
19
20
21 log = logging.getLogger(__name__)
22
23
24 DAEMON_WAIT_TIMEOUT = 120
25 ROOT_INO = 1
26
27
28 class ObjectNotFound(Exception):
29 def __init__(self, object_name):
30 self._object_name = object_name
31
32 def __str__(self):
33 return "Object not found: '{0}'".format(self._object_name)
34
35 class FSStatus(object):
36 """
37 Operations on a snapshot of the FSMap.
38 """
39 def __init__(self, mon_manager):
40 self.mon = mon_manager
41 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
42
43 def __str__(self):
44 return json.dumps(self.map, indent = 2, sort_keys = True)
45
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self, key):
48 """
49 Get a field from the fsmap.
50 """
51 return self.map[key]
52
53 def get_filesystems(self):
54 """
55 Iterator for all filesystems.
56 """
57 for fs in self.map['filesystems']:
58 yield fs
59
60 def get_all(self):
61 """
62 Iterator for all the mds_info components in the FSMap.
63 """
64 for info in self.get_standbys():
65 yield info
66 for fs in self.map['filesystems']:
67 for info in fs['mdsmap']['info'].values():
68 yield info
69
70 def get_standbys(self):
71 """
72 Iterator for all standbys.
73 """
74 for info in self.map['standbys']:
75 yield info
76
77 def get_fsmap(self, fscid):
78 """
79 Get the fsmap for the given FSCID.
80 """
81 for fs in self.map['filesystems']:
82 if fscid is None or fs['id'] == fscid:
83 return fs
84 raise RuntimeError("FSCID {0} not in map".format(fscid))
85
86 def get_fsmap_byname(self, name):
87 """
88 Get the fsmap for the given file system name.
89 """
90 for fs in self.map['filesystems']:
91 if name is None or fs['mdsmap']['fs_name'] == name:
92 return fs
93 raise RuntimeError("FS {0} not in map".format(name))
94
95 def get_replays(self, fscid):
96 """
97 Get the standby:replay MDS for the given FSCID.
98 """
99 fs = self.get_fsmap(fscid)
100 for info in fs['mdsmap']['info'].values():
101 if info['state'] == 'up:standby-replay':
102 yield info
103
104 def get_ranks(self, fscid):
105 """
106 Get the ranks for the given FSCID.
107 """
108 fs = self.get_fsmap(fscid)
109 for info in fs['mdsmap']['info'].values():
110 if info['rank'] >= 0:
111 yield info
112
113 def get_rank(self, fscid, rank):
114 """
115 Get the rank for the given FSCID.
116 """
117 for info in self.get_ranks(fscid):
118 if info['rank'] == rank:
119 return info
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
121
122 def get_mds(self, name):
123 """
124 Get the info for the given MDS name.
125 """
126 for info in self.get_all():
127 if info['name'] == name:
128 return info
129 return None
130
131 def get_mds_addr(self, name):
132 """
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134 """
135 info = self.get_mds(name)
136 if info:
137 return info['addr']
138 else:
139 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name))
141
142 class CephCluster(object):
143 @property
144 def admin_remote(self):
145 first_mon = misc.get_first_mon(self._ctx, None)
146 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
147 return result
148
149 def __init__(self, ctx):
150 self._ctx = ctx
151 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
152
153 def get_config(self, key, service_type=None):
154 """
155 Get config from mon by default, or a specific service if caller asks for it
156 """
157 if service_type is None:
158 service_type = 'mon'
159
160 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
161 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
162
163 def set_ceph_conf(self, subsys, key, value):
164 if subsys not in self._ctx.ceph['ceph'].conf:
165 self._ctx.ceph['ceph'].conf[subsys] = {}
166 self._ctx.ceph['ceph'].conf[subsys][key] = value
167 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
169
170 def clear_ceph_conf(self, subsys, key):
171 del self._ctx.ceph['ceph'].conf[subsys][key]
172 write_conf(self._ctx)
173
174 def json_asok(self, command, service_type, service_id):
175 proc = self.mon_manager.admin_socket(service_type, service_id, command)
176 response_data = proc.stdout.getvalue()
177 log.info("_json_asok output: {0}".format(response_data))
178 if response_data.strip():
179 return json.loads(response_data)
180 else:
181 return None
182
183
184 class MDSCluster(CephCluster):
185 """
186 Collective operations on all the MDS daemons in the Ceph cluster. These
187 daemons may be in use by various Filesystems.
188
189 For the benefit of pre-multi-filesystem tests, this class is also
190 a parent of Filesystem. The correct way to use MDSCluster going forward is
191 as a separate instance outside of your (multiple) Filesystem instances.
192 """
193 def __init__(self, ctx):
194 super(MDSCluster, self).__init__(ctx)
195
196 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
197
198 if len(self.mds_ids) == 0:
199 raise RuntimeError("This task requires at least one MDS")
200
201 if hasattr(self._ctx, "daemons"):
202 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
204
205 def _one_or_all(self, mds_id, cb, in_parallel=True):
206 """
207 Call a callback for a single named MDS, or for all.
208
209 Note that the parallelism here isn't for performance, it's to avoid being overly kind
210 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211 avoid being overly kind by executing them in a particular order. However, some actions
212 don't cope with being done in parallel, so it's optional (`in_parallel`)
213
214 :param mds_id: MDS daemon name, or None
215 :param cb: Callback taking single argument of MDS daemon name
216 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
217 """
218 if mds_id is None:
219 if in_parallel:
220 with parallel() as p:
221 for mds_id in self.mds_ids:
222 p.spawn(cb, mds_id)
223 else:
224 for mds_id in self.mds_ids:
225 cb(mds_id)
226 else:
227 cb(mds_id)
228
229 def get_config(self, key, service_type=None):
230 """
231 get_config specialization of service_type="mds"
232 """
233 if service_type != "mds":
234 return super(MDSCluster, self).get_config(key, service_type)
235
236 # Some tests stop MDS daemons, don't send commands to a dead one:
237 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def mds_stop(self, mds_id=None):
241 """
242 Stop the MDS daemon process(se). If it held a rank, that rank
243 will eventually go laggy.
244 """
245 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
246
247 def mds_fail(self, mds_id=None):
248 """
249 Inform MDSMonitor of the death of the daemon process(es). If it held
250 a rank, that rank will be relinquished.
251 """
252 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
253
254 def mds_restart(self, mds_id=None):
255 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
256
257 def mds_fail_restart(self, mds_id=None):
258 """
259 Variation on restart that includes marking MDSs as failed, so that doing this
260 operation followed by waiting for healthy daemon states guarantees that they
261 have gone down and come up, rather than potentially seeing the healthy states
262 that existed before the restart.
263 """
264 def _fail_restart(id_):
265 self.mds_daemons[id_].stop()
266 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
267 self.mds_daemons[id_].restart()
268
269 self._one_or_all(mds_id, _fail_restart)
270
271 def newfs(self, name='cephfs', create=True):
272 return Filesystem(self._ctx, name=name, create=create)
273
274 def status(self):
275 return FSStatus(self.mon_manager)
276
277 def delete_all_filesystems(self):
278 """
279 Remove all filesystems that exist, and any pools in use by them.
280 """
281 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
282 pool_id_name = {}
283 for pool in pools:
284 pool_id_name[pool['pool']] = pool['pool_name']
285
286 # mark cluster down for each fs to prevent churn during deletion
287 status = self.status()
288 for fs in status.get_filesystems():
289 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
290
291 # get a new copy as actives may have since changed
292 status = self.status()
293 for fs in status.get_filesystems():
294 mdsmap = fs['mdsmap']
295 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
296
297 for gid in mdsmap['up'].values():
298 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
299
300 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
301 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
302 metadata_pool, metadata_pool,
303 '--yes-i-really-really-mean-it')
304 for data_pool in mdsmap['data_pools']:
305 data_pool = pool_id_name[data_pool]
306 try:
307 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
308 data_pool, data_pool,
309 '--yes-i-really-really-mean-it')
310 except CommandFailedError as e:
311 if e.exitstatus == 16: # EBUSY, this data pool is used
312 pass # by two metadata pools, let the 2nd
313 else: # pass delete it
314 raise
315
316 def get_standby_daemons(self):
317 return set([s['name'] for s in self.status().get_standbys()])
318
319 def get_mds_hostnames(self):
320 result = set()
321 for mds_id in self.mds_ids:
322 mds_remote = self.mon_manager.find_remote('mds', mds_id)
323 result.add(mds_remote.hostname)
324
325 return list(result)
326
327 def set_clients_block(self, blocked, mds_id=None):
328 """
329 Block (using iptables) client communications to this MDS. Be careful: if
330 other services are running on this MDS, or other MDSs try to talk to this
331 MDS, their communications may also be blocked as collatoral damage.
332
333 :param mds_id: Optional ID of MDS to block, default to all
334 :return:
335 """
336 da_flag = "-A" if blocked else "-D"
337
338 def set_block(_mds_id):
339 remote = self.mon_manager.find_remote('mds', _mds_id)
340 status = self.status()
341
342 addr = status.get_mds_addr(_mds_id)
343 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
344
345 remote.run(
346 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
347 "comment", "--comment", "teuthology"])
348 remote.run(
349 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
350 "comment", "--comment", "teuthology"])
351
352 self._one_or_all(mds_id, set_block, in_parallel=False)
353
354 def clear_firewall(self):
355 clear_firewall(self._ctx)
356
357 def get_mds_info(self, mds_id):
358 return FSStatus(self.mon_manager).get_mds(mds_id)
359
360 def is_full(self):
361 flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
362 return 'full' in flags
363
364 def is_pool_full(self, pool_name):
365 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
366 for pool in pools:
367 if pool['pool_name'] == pool_name:
368 return 'full' in pool['flags_names'].split(",")
369
370 raise RuntimeError("Pool not found '{0}'".format(pool_name))
371
372 class Filesystem(MDSCluster):
373 """
374 This object is for driving a CephFS filesystem. The MDS daemons driven by
375 MDSCluster may be shared with other Filesystems.
376 """
377 def __init__(self, ctx, fscid=None, name=None, create=False,
378 ec_profile=None):
379 super(Filesystem, self).__init__(ctx)
380
381 self.name = name
382 self.ec_profile = ec_profile
383 self.id = None
384 self.metadata_pool_name = None
385 self.metadata_overlay = False
386 self.data_pool_name = None
387 self.data_pools = None
388
389 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
390 self.client_id = client_list[0]
391 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
392
393 if name is not None:
394 if fscid is not None:
395 raise RuntimeError("cannot specify fscid when creating fs")
396 if create and not self.legacy_configured():
397 self.create()
398 else:
399 if fscid is not None:
400 self.id = fscid
401 self.getinfo(refresh = True)
402
403 # Stash a reference to the first created filesystem on ctx, so
404 # that if someone drops to the interactive shell they can easily
405 # poke our methods.
406 if not hasattr(self._ctx, "filesystem"):
407 self._ctx.filesystem = self
408
409 def getinfo(self, refresh = False):
410 status = self.status()
411 if self.id is not None:
412 fsmap = status.get_fsmap(self.id)
413 elif self.name is not None:
414 fsmap = status.get_fsmap_byname(self.name)
415 else:
416 fss = [fs for fs in status.get_filesystems()]
417 if len(fss) == 1:
418 fsmap = fss[0]
419 elif len(fss) == 0:
420 raise RuntimeError("no file system available")
421 else:
422 raise RuntimeError("more than one file system available")
423 self.id = fsmap['id']
424 self.name = fsmap['mdsmap']['fs_name']
425 self.get_pool_names(status = status, refresh = refresh)
426 return status
427
428 def set_metadata_overlay(self, overlay):
429 if self.id is not None:
430 raise RuntimeError("cannot specify fscid when configuring overlay")
431 self.metadata_overlay = overlay
432
433 def deactivate(self, rank):
434 if rank < 0:
435 raise RuntimeError("invalid rank")
436 elif rank == 0:
437 raise RuntimeError("cannot deactivate rank 0")
438 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
439
440 def set_max_mds(self, max_mds):
441 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
442
443 def set_allow_dirfrags(self, yes):
444 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
445
446 def get_pgs_per_fs_pool(self):
447 """
448 Calculate how many PGs to use when creating a pool, in order to avoid raising any
449 health warnings about mon_pg_warn_min_per_osd
450
451 :return: an integer number of PGs
452 """
453 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
454 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
455 return pg_warn_min_per_osd * osd_count
456
457 def create(self):
458 if self.name is None:
459 self.name = "cephfs"
460 if self.metadata_pool_name is None:
461 self.metadata_pool_name = "{0}_metadata".format(self.name)
462 if self.data_pool_name is None:
463 data_pool_name = "{0}_data".format(self.name)
464 else:
465 data_pool_name = self.data_pool_name
466
467 log.info("Creating filesystem '{0}'".format(self.name))
468
469 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
470
471 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
472 self.metadata_pool_name, pgs_per_fs_pool.__str__())
473 if self.metadata_overlay:
474 self.mon_manager.raw_cluster_cmd('fs', 'new',
475 self.name, self.metadata_pool_name, data_pool_name,
476 '--allow-dangerous-metadata-overlay')
477 else:
478 if self.ec_profile:
479 log.info("EC profile is %s", self.ec_profile)
480 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
481 cmd.extend(self.ec_profile)
482 self.mon_manager.raw_cluster_cmd(*cmd)
483 self.mon_manager.raw_cluster_cmd(
484 'osd', 'pool', 'create',
485 data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
486 data_pool_name)
487 self.mon_manager.raw_cluster_cmd(
488 'osd', 'pool', 'set',
489 data_pool_name, 'allow_ec_overwrites', 'true')
490 else:
491 self.mon_manager.raw_cluster_cmd(
492 'osd', 'pool', 'create',
493 data_pool_name, pgs_per_fs_pool.__str__())
494 self.mon_manager.raw_cluster_cmd('fs', 'new',
495 self.name, self.metadata_pool_name, data_pool_name)
496 self.check_pool_application(self.metadata_pool_name)
497 self.check_pool_application(data_pool_name)
498 # Turn off spurious standby count warnings from modifying max_mds in tests.
499 try:
500 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
501 except CommandFailedError as e:
502 if e.exitstatus == 22:
503 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
504 pass
505 else:
506 raise
507
508 self.getinfo(refresh = True)
509
510
511 def check_pool_application(self, pool_name):
512 osd_map = self.mon_manager.get_osd_dump_json()
513 for pool in osd_map['pools']:
514 if pool['pool_name'] == pool_name:
515 if "application_metadata" in pool:
516 if not "cephfs" in pool['application_metadata']:
517 raise RuntimeError("Pool %p does not name cephfs as application!".\
518 format(pool_name))
519
520
521 def __del__(self):
522 if getattr(self._ctx, "filesystem", None) == self:
523 delattr(self._ctx, "filesystem")
524
525 def exists(self):
526 """
527 Whether a filesystem exists in the mon's filesystem list
528 """
529 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
530 return self.name in [fs['name'] for fs in fs_list]
531
532 def legacy_configured(self):
533 """
534 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
535 the case, the caller should avoid using Filesystem.create
536 """
537 try:
538 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
539 pools = json.loads(out_text)
540 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
541 if metadata_pool_exists:
542 self.metadata_pool_name = 'metadata'
543 except CommandFailedError as e:
544 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
545 # structured output (--format) from the CLI.
546 if e.exitstatus == 22:
547 metadata_pool_exists = True
548 else:
549 raise
550
551 return metadata_pool_exists
552
553 def _df(self):
554 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
555
556 def get_mds_map(self):
557 return self.status().get_fsmap(self.id)['mdsmap']
558
559 def add_data_pool(self, name):
560 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
561 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
562 self.get_pool_names(refresh = True)
563 for poolid, fs_name in self.data_pools.items():
564 if name == fs_name:
565 return poolid
566 raise RuntimeError("could not get just created pool '{0}'".format(name))
567
568 def get_pool_names(self, refresh = False, status = None):
569 if refresh or self.metadata_pool_name is None or self.data_pools is None:
570 if status is None:
571 status = self.status()
572 fsmap = status.get_fsmap(self.id)
573
574 osd_map = self.mon_manager.get_osd_dump_json()
575 id_to_name = {}
576 for p in osd_map['pools']:
577 id_to_name[p['pool']] = p['pool_name']
578
579 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
580 self.data_pools = {}
581 for data_pool in fsmap['mdsmap']['data_pools']:
582 self.data_pools[data_pool] = id_to_name[data_pool]
583
584 def get_data_pool_name(self, refresh = False):
585 if refresh or self.data_pools is None:
586 self.get_pool_names(refresh = True)
587 assert(len(self.data_pools) == 1)
588 return self.data_pools.values()[0]
589
590 def get_data_pool_id(self, refresh = False):
591 """
592 Don't call this if you have multiple data pools
593 :return: integer
594 """
595 if refresh or self.data_pools is None:
596 self.get_pool_names(refresh = True)
597 assert(len(self.data_pools) == 1)
598 return self.data_pools.keys()[0]
599
600 def get_data_pool_names(self, refresh = False):
601 if refresh or self.data_pools is None:
602 self.get_pool_names(refresh = True)
603 return self.data_pools.values()
604
605 def get_metadata_pool_name(self):
606 return self.metadata_pool_name
607
608 def set_data_pool_name(self, name):
609 if self.id is not None:
610 raise RuntimeError("can't set filesystem name if its fscid is set")
611 self.data_pool_name = name
612
613 def get_namespace_id(self):
614 return self.id
615
616 def get_pool_df(self, pool_name):
617 """
618 Return a dict like:
619 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
620 """
621 for pool_df in self._df()['pools']:
622 if pool_df['name'] == pool_name:
623 return pool_df['stats']
624
625 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
626
627 def get_usage(self):
628 return self._df()['stats']['total_used_bytes']
629
630 def are_daemons_healthy(self):
631 """
632 Return true if all daemons are in one of active, standby, standby-replay, and
633 at least max_mds daemons are in 'active'.
634
635 Unlike most of Filesystem, this function is tolerant of new-style `fs`
636 commands being missing, because we are part of the ceph installation
637 process during upgrade suites, so must fall back to old style commands
638 when we get an EINVAL on a new style command.
639
640 :return:
641 """
642
643 active_count = 0
644 try:
645 mds_map = self.get_mds_map()
646 except CommandFailedError as cfe:
647 # Old version, fall back to non-multi-fs commands
648 if cfe.exitstatus == errno.EINVAL:
649 mds_map = json.loads(
650 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
651 else:
652 raise
653
654 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
655
656 for mds_id, mds_status in mds_map['info'].items():
657 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
658 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
659 return False
660 elif mds_status['state'] == 'up:active':
661 active_count += 1
662
663 log.info("are_daemons_healthy: {0}/{1}".format(
664 active_count, mds_map['max_mds']
665 ))
666
667 if active_count >= mds_map['max_mds']:
668 # The MDSMap says these guys are active, but let's check they really are
669 for mds_id, mds_status in mds_map['info'].items():
670 if mds_status['state'] == 'up:active':
671 try:
672 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
673 except CommandFailedError as cfe:
674 if cfe.exitstatus == errno.EINVAL:
675 # Old version, can't do this check
676 continue
677 else:
678 # MDS not even running
679 return False
680
681 if daemon_status['state'] != 'up:active':
682 # MDS hasn't taken the latest map yet
683 return False
684
685 return True
686 else:
687 return False
688
689 def get_daemon_names(self, state=None):
690 """
691 Return MDS daemon names of those daemons in the given state
692 :param state:
693 :return:
694 """
695 status = self.get_mds_map()
696 result = []
697 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
698 if mds_status['state'] == state or state is None:
699 result.append(mds_status['name'])
700
701 return result
702
703 def get_active_names(self):
704 """
705 Return MDS daemon names of those daemons holding ranks
706 in state up:active
707
708 :return: list of strings like ['a', 'b'], sorted by rank
709 """
710 return self.get_daemon_names("up:active")
711
712 def get_all_mds_rank(self):
713 status = self.get_mds_map()
714 result = []
715 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
716 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
717 result.append(mds_status['rank'])
718
719 return result
720
721 def get_rank_names(self):
722 """
723 Return MDS daemon names of those daemons holding a rank,
724 sorted by rank. This includes e.g. up:replay/reconnect
725 as well as active, but does not include standby or
726 standby-replay.
727 """
728 status = self.get_mds_map()
729 result = []
730 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
731 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
732 result.append(mds_status['name'])
733
734 return result
735
736 def wait_for_daemons(self, timeout=None):
737 """
738 Wait until all daemons are healthy
739 :return:
740 """
741
742 if timeout is None:
743 timeout = DAEMON_WAIT_TIMEOUT
744
745 elapsed = 0
746 while True:
747 if self.are_daemons_healthy():
748 return
749 else:
750 time.sleep(1)
751 elapsed += 1
752
753 if elapsed > timeout:
754 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
755
756 def get_lone_mds_id(self):
757 """
758 Get a single MDS ID: the only one if there is only one
759 configured, else the only one currently holding a rank,
760 else raise an error.
761 """
762 if len(self.mds_ids) != 1:
763 alive = self.get_rank_names()
764 if len(alive) == 1:
765 return alive[0]
766 else:
767 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
768 else:
769 return self.mds_ids[0]
770
771 def recreate(self):
772 log.info("Creating new filesystem")
773 self.delete_all_filesystems()
774 self.id = None
775 self.create()
776
777 def put_metadata_object_raw(self, object_id, infile):
778 """
779 Save an object to the metadata pool
780 """
781 temp_bin_path = infile
782 self.client_remote.run(args=[
783 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
784 ])
785
786 def get_metadata_object_raw(self, object_id):
787 """
788 Retrieve an object from the metadata pool and store it in a file.
789 """
790 temp_bin_path = '/tmp/' + object_id + '.bin'
791
792 self.client_remote.run(args=[
793 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
794 ])
795
796 return temp_bin_path
797
798 def get_metadata_object(self, object_type, object_id):
799 """
800 Retrieve an object from the metadata pool, pass it through
801 ceph-dencoder to dump it to JSON, and return the decoded object.
802 """
803 temp_bin_path = '/tmp/out.bin'
804
805 self.client_remote.run(args=[
806 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
807 ])
808
809 stdout = StringIO()
810 self.client_remote.run(args=[
811 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
812 ], stdout=stdout)
813 dump_json = stdout.getvalue().strip()
814 try:
815 dump = json.loads(dump_json)
816 except (TypeError, ValueError):
817 log.error("Failed to decode JSON: '{0}'".format(dump_json))
818 raise
819
820 return dump
821
822 def get_journal_version(self):
823 """
824 Read the JournalPointer and Journal::Header objects to learn the version of
825 encoding in use.
826 """
827 journal_pointer_object = '400.00000000'
828 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
829 journal_ino = journal_pointer_dump['journal_pointer']['front']
830
831 journal_header_object = "{0:x}.00000000".format(journal_ino)
832 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
833
834 version = journal_header_dump['journal_header']['stream_format']
835 log.info("Read journal version {0}".format(version))
836
837 return version
838
839 def mds_asok(self, command, mds_id=None):
840 if mds_id is None:
841 mds_id = self.get_lone_mds_id()
842
843 return self.json_asok(command, 'mds', mds_id)
844
845 def read_cache(self, path, depth=None):
846 cmd = ["dump", "tree", path]
847 if depth is not None:
848 cmd.append(depth.__str__())
849 result = self.mds_asok(cmd)
850 if len(result) == 0:
851 raise RuntimeError("Path not found in cache: {0}".format(path))
852
853 return result
854
855 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
856 """
857 Block until the MDS reaches a particular state, or a failure condition
858 is met.
859
860 When there are multiple MDSs, succeed when exaclty one MDS is in the
861 goal state, or fail when any MDS is in the reject state.
862
863 :param goal_state: Return once the MDS is in this state
864 :param reject: Fail if the MDS enters this state before the goal state
865 :param timeout: Fail if this many seconds pass before reaching goal
866 :return: number of seconds waited, rounded down to integer
867 """
868
869 started_at = time.time()
870 while True:
871 status = self.status()
872 if rank is not None:
873 mds_info = status.get_rank(self.id, rank)
874 current_state = mds_info['state'] if mds_info else None
875 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
876 elif mds_id is not None:
877 # mds_info is None if no daemon with this ID exists in the map
878 mds_info = status.get_mds(mds_id)
879 current_state = mds_info['state'] if mds_info else None
880 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
881 else:
882 # In general, look for a single MDS
883 states = [m['state'] for m in status.get_ranks(self.id)]
884 if [s for s in states if s == goal_state] == [goal_state]:
885 current_state = goal_state
886 elif reject in states:
887 current_state = reject
888 else:
889 current_state = None
890 log.info("mapped states {0} to {1}".format(states, current_state))
891
892 elapsed = time.time() - started_at
893 if current_state == goal_state:
894 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
895 return elapsed
896 elif reject is not None and current_state == reject:
897 raise RuntimeError("MDS in reject state {0}".format(current_state))
898 elif timeout is not None and elapsed > timeout:
899 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
900 raise RuntimeError(
901 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
902 elapsed, goal_state, current_state
903 ))
904 else:
905 time.sleep(1)
906
907 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
908 mds_id = self.mds_ids[0]
909 remote = self.mds_daemons[mds_id].remote
910 if pool is None:
911 pool = self.get_data_pool_name()
912
913 obj_name = "{0:x}.00000000".format(ino_no)
914
915 args = [
916 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
917 ]
918 try:
919 proc = remote.run(
920 args=args,
921 stdout=StringIO())
922 except CommandFailedError as e:
923 log.error(e.__str__())
924 raise ObjectNotFound(obj_name)
925
926 data = proc.stdout.getvalue()
927
928 p = remote.run(
929 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
930 stdout=StringIO(),
931 stdin=data
932 )
933
934 return json.loads(p.stdout.getvalue().strip())
935
936 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
937 """
938 Write to an xattr of the 0th data object of an inode. Will
939 succeed whether the object and/or xattr already exist or not.
940
941 :param ino_no: integer inode number
942 :param xattr_name: string name of the xattr
943 :param data: byte array data to write to the xattr
944 :param pool: name of data pool or None to use primary data pool
945 :return: None
946 """
947 remote = self.mds_daemons[self.mds_ids[0]].remote
948 if pool is None:
949 pool = self.get_data_pool_name()
950
951 obj_name = "{0:x}.00000000".format(ino_no)
952 args = [
953 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
954 obj_name, xattr_name, data
955 ]
956 remote.run(
957 args=args,
958 stdout=StringIO())
959
960 def read_backtrace(self, ino_no, pool=None):
961 """
962 Read the backtrace from the data pool, return a dict in the format
963 given by inode_backtrace_t::dump, which is something like:
964
965 ::
966
967 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
968 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
969
970 { "ino": 1099511627778,
971 "ancestors": [
972 { "dirino": 1,
973 "dname": "blah",
974 "version": 11}],
975 "pool": 1,
976 "old_pools": []}
977
978 :param pool: name of pool to read backtrace from. If omitted, FS must have only
979 one data pool and that will be used.
980 """
981 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
982
983 def read_layout(self, ino_no, pool=None):
984 """
985 Read 'layout' xattr of an inode and parse the result, returning a dict like:
986 ::
987 {
988 "stripe_unit": 4194304,
989 "stripe_count": 1,
990 "object_size": 4194304,
991 "pool_id": 1,
992 "pool_ns": "",
993 }
994
995 :param pool: name of pool to read backtrace from. If omitted, FS must have only
996 one data pool and that will be used.
997 """
998 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
999
1000 def _enumerate_data_objects(self, ino, size):
1001 """
1002 Get the list of expected data objects for a range, and the list of objects
1003 that really exist.
1004
1005 :return a tuple of two lists of strings (expected, actual)
1006 """
1007 stripe_size = 1024 * 1024 * 4
1008
1009 size = max(stripe_size, size)
1010
1011 want_objects = [
1012 "{0:x}.{1:08x}".format(ino, n)
1013 for n in range(0, ((size - 1) / stripe_size) + 1)
1014 ]
1015
1016 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1017
1018 return want_objects, exist_objects
1019
1020 def data_objects_present(self, ino, size):
1021 """
1022 Check that *all* the expected data objects for an inode are present in the data pool
1023 """
1024
1025 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1026 missing = set(want_objects) - set(exist_objects)
1027
1028 if missing:
1029 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1030 ino, size, missing
1031 ))
1032 return False
1033 else:
1034 log.info("All objects for ino {0} size {1} found".format(ino, size))
1035 return True
1036
1037 def data_objects_absent(self, ino, size):
1038 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1039 present = set(want_objects) & set(exist_objects)
1040
1041 if present:
1042 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1043 ino, size, present
1044 ))
1045 return False
1046 else:
1047 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1048 return True
1049
1050 def dirfrag_exists(self, ino, frag):
1051 try:
1052 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1053 except CommandFailedError as e:
1054 return False
1055 else:
1056 return True
1057
1058 def rados(self, args, pool=None, namespace=None, stdin_data=None):
1059 """
1060 Call into the `rados` CLI from an MDS
1061 """
1062
1063 if pool is None:
1064 pool = self.get_metadata_pool_name()
1065
1066 # Doesn't matter which MDS we use to run rados commands, they all
1067 # have access to the pools
1068 mds_id = self.mds_ids[0]
1069 remote = self.mds_daemons[mds_id].remote
1070
1071 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1072 # using the `rados` CLI
1073 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1074 (["--namespace", namespace] if namespace else []) +
1075 args)
1076 p = remote.run(
1077 args=args,
1078 stdin=stdin_data,
1079 stdout=StringIO())
1080 return p.stdout.getvalue().strip()
1081
1082 def list_dirfrag(self, dir_ino):
1083 """
1084 Read the named object and return the list of omap keys
1085
1086 :return a list of 0 or more strings
1087 """
1088
1089 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1090
1091 try:
1092 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1093 except CommandFailedError as e:
1094 log.error(e.__str__())
1095 raise ObjectNotFound(dirfrag_obj_name)
1096
1097 return key_list_str.split("\n") if key_list_str else []
1098
1099 def erase_metadata_objects(self, prefix):
1100 """
1101 For all objects in the metadata pool matching the prefix,
1102 erase them.
1103
1104 This O(N) with the number of objects in the pool, so only suitable
1105 for use on toy test filesystems.
1106 """
1107 all_objects = self.rados(["ls"]).split("\n")
1108 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1109 for o in matching_objects:
1110 self.rados(["rm", o])
1111
1112 def erase_mds_objects(self, rank):
1113 """
1114 Erase all the per-MDS objects for a particular rank. This includes
1115 inotable, sessiontable, journal
1116 """
1117
1118 def obj_prefix(multiplier):
1119 """
1120 MDS object naming conventions like rank 1's
1121 journal is at 201.***
1122 """
1123 return "%x." % (multiplier * 0x100 + rank)
1124
1125 # MDS_INO_LOG_OFFSET
1126 self.erase_metadata_objects(obj_prefix(2))
1127 # MDS_INO_LOG_BACKUP_OFFSET
1128 self.erase_metadata_objects(obj_prefix(3))
1129 # MDS_INO_LOG_POINTER_OFFSET
1130 self.erase_metadata_objects(obj_prefix(4))
1131 # MDSTables & SessionMap
1132 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1133
1134 @property
1135 def _prefix(self):
1136 """
1137 Override this to set a different
1138 """
1139 return ""
1140
1141 def _run_tool(self, tool, args, rank=None, quiet=False):
1142 # Tests frequently have [client] configuration that jacks up
1143 # the objecter log level (unlikely to be interesting here)
1144 # and does not set the mds log level (very interesting here)
1145 if quiet:
1146 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1147 else:
1148 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1149
1150 if rank is not None:
1151 base_args.extend(["--rank", "%d" % rank])
1152
1153 t1 = datetime.datetime.now()
1154 r = self.tool_remote.run(
1155 args=base_args + args,
1156 stdout=StringIO()).stdout.getvalue().strip()
1157 duration = datetime.datetime.now() - t1
1158 log.info("Ran {0} in time {1}, result:\n{2}".format(
1159 base_args + args, duration, r
1160 ))
1161 return r
1162
1163 @property
1164 def tool_remote(self):
1165 """
1166 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1167 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1168 so that tests can use this remote to go get locally written output files from the tools.
1169 """
1170 mds_id = self.mds_ids[0]
1171 return self.mds_daemons[mds_id].remote
1172
1173 def journal_tool(self, args, rank=None, quiet=False):
1174 """
1175 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1176 """
1177 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1178
1179 def table_tool(self, args, quiet=False):
1180 """
1181 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1182 """
1183 return self._run_tool("cephfs-table-tool", args, None, quiet)
1184
1185 def data_scan(self, args, quiet=False, worker_count=1):
1186 """
1187 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1188
1189 :param worker_count: if greater than 1, multiple workers will be run
1190 in parallel and the return value will be None
1191 """
1192
1193 workers = []
1194
1195 for n in range(0, worker_count):
1196 if worker_count > 1:
1197 # data-scan args first token is a command, followed by args to it.
1198 # insert worker arguments after the command.
1199 cmd = args[0]
1200 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1201 else:
1202 worker_args = args
1203
1204 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1205 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1206
1207 for w in workers:
1208 w.get()
1209
1210 if worker_count == 1:
1211 return workers[0].value
1212 else:
1213 return None