]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
update sources to 12.2.7
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11 import random
12
13 from teuthology.exceptions import CommandFailedError
14 from teuthology import misc
15 from teuthology.nuke import clear_firewall
16 from teuthology.parallel import parallel
17 from tasks.ceph_manager import write_conf
18 from tasks import ceph_manager
19
20
21 log = logging.getLogger(__name__)
22
23
24 DAEMON_WAIT_TIMEOUT = 120
25 ROOT_INO = 1
26
27
28 class ObjectNotFound(Exception):
29 def __init__(self, object_name):
30 self._object_name = object_name
31
32 def __str__(self):
33 return "Object not found: '{0}'".format(self._object_name)
34
35 class FSStatus(object):
36 """
37 Operations on a snapshot of the FSMap.
38 """
39 def __init__(self, mon_manager):
40 self.mon = mon_manager
41 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
42
43 def __str__(self):
44 return json.dumps(self.map, indent = 2, sort_keys = True)
45
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self, key):
48 """
49 Get a field from the fsmap.
50 """
51 return self.map[key]
52
53 def get_filesystems(self):
54 """
55 Iterator for all filesystems.
56 """
57 for fs in self.map['filesystems']:
58 yield fs
59
60 def get_all(self):
61 """
62 Iterator for all the mds_info components in the FSMap.
63 """
64 for info in self.get_standbys():
65 yield info
66 for fs in self.map['filesystems']:
67 for info in fs['mdsmap']['info'].values():
68 yield info
69
70 def get_standbys(self):
71 """
72 Iterator for all standbys.
73 """
74 for info in self.map['standbys']:
75 yield info
76
77 def get_fsmap(self, fscid):
78 """
79 Get the fsmap for the given FSCID.
80 """
81 for fs in self.map['filesystems']:
82 if fscid is None or fs['id'] == fscid:
83 return fs
84 raise RuntimeError("FSCID {0} not in map".format(fscid))
85
86 def get_fsmap_byname(self, name):
87 """
88 Get the fsmap for the given file system name.
89 """
90 for fs in self.map['filesystems']:
91 if name is None or fs['mdsmap']['fs_name'] == name:
92 return fs
93 raise RuntimeError("FS {0} not in map".format(name))
94
95 def get_replays(self, fscid):
96 """
97 Get the standby:replay MDS for the given FSCID.
98 """
99 fs = self.get_fsmap(fscid)
100 for info in fs['mdsmap']['info'].values():
101 if info['state'] == 'up:standby-replay':
102 yield info
103
104 def get_ranks(self, fscid):
105 """
106 Get the ranks for the given FSCID.
107 """
108 fs = self.get_fsmap(fscid)
109 for info in fs['mdsmap']['info'].values():
110 if info['rank'] >= 0:
111 yield info
112
113 def get_rank(self, fscid, rank):
114 """
115 Get the rank for the given FSCID.
116 """
117 for info in self.get_ranks(fscid):
118 if info['rank'] == rank:
119 return info
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
121
122 def get_mds(self, name):
123 """
124 Get the info for the given MDS name.
125 """
126 for info in self.get_all():
127 if info['name'] == name:
128 return info
129 return None
130
131 def get_mds_addr(self, name):
132 """
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134 """
135 info = self.get_mds(name)
136 if info:
137 return info['addr']
138 else:
139 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name))
141
142 class CephCluster(object):
143 @property
144 def admin_remote(self):
145 first_mon = misc.get_first_mon(self._ctx, None)
146 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
147 return result
148
149 def __init__(self, ctx):
150 self._ctx = ctx
151 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
152
153 def get_config(self, key, service_type=None):
154 """
155 Get config from mon by default, or a specific service if caller asks for it
156 """
157 if service_type is None:
158 service_type = 'mon'
159
160 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
161 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
162
163 def set_ceph_conf(self, subsys, key, value):
164 if subsys not in self._ctx.ceph['ceph'].conf:
165 self._ctx.ceph['ceph'].conf[subsys] = {}
166 self._ctx.ceph['ceph'].conf[subsys][key] = value
167 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
169
170 def clear_ceph_conf(self, subsys, key):
171 del self._ctx.ceph['ceph'].conf[subsys][key]
172 write_conf(self._ctx)
173
174 def json_asok(self, command, service_type, service_id):
175 proc = self.mon_manager.admin_socket(service_type, service_id, command)
176 response_data = proc.stdout.getvalue()
177 log.info("_json_asok output: {0}".format(response_data))
178 if response_data.strip():
179 return json.loads(response_data)
180 else:
181 return None
182
183
184 class MDSCluster(CephCluster):
185 """
186 Collective operations on all the MDS daemons in the Ceph cluster. These
187 daemons may be in use by various Filesystems.
188
189 For the benefit of pre-multi-filesystem tests, this class is also
190 a parent of Filesystem. The correct way to use MDSCluster going forward is
191 as a separate instance outside of your (multiple) Filesystem instances.
192 """
193 def __init__(self, ctx):
194 super(MDSCluster, self).__init__(ctx)
195
196 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
197
198 if len(self.mds_ids) == 0:
199 raise RuntimeError("This task requires at least one MDS")
200
201 if hasattr(self._ctx, "daemons"):
202 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
204
205 def _one_or_all(self, mds_id, cb, in_parallel=True):
206 """
207 Call a callback for a single named MDS, or for all.
208
209 Note that the parallelism here isn't for performance, it's to avoid being overly kind
210 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211 avoid being overly kind by executing them in a particular order. However, some actions
212 don't cope with being done in parallel, so it's optional (`in_parallel`)
213
214 :param mds_id: MDS daemon name, or None
215 :param cb: Callback taking single argument of MDS daemon name
216 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
217 """
218 if mds_id is None:
219 if in_parallel:
220 with parallel() as p:
221 for mds_id in self.mds_ids:
222 p.spawn(cb, mds_id)
223 else:
224 for mds_id in self.mds_ids:
225 cb(mds_id)
226 else:
227 cb(mds_id)
228
229 def get_config(self, key, service_type=None):
230 """
231 get_config specialization of service_type="mds"
232 """
233 if service_type != "mds":
234 return super(MDSCluster, self).get_config(key, service_type)
235
236 # Some tests stop MDS daemons, don't send commands to a dead one:
237 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def mds_stop(self, mds_id=None):
241 """
242 Stop the MDS daemon process(se). If it held a rank, that rank
243 will eventually go laggy.
244 """
245 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
246
247 def mds_fail(self, mds_id=None):
248 """
249 Inform MDSMonitor of the death of the daemon process(es). If it held
250 a rank, that rank will be relinquished.
251 """
252 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
253
254 def mds_restart(self, mds_id=None):
255 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
256
257 def mds_fail_restart(self, mds_id=None):
258 """
259 Variation on restart that includes marking MDSs as failed, so that doing this
260 operation followed by waiting for healthy daemon states guarantees that they
261 have gone down and come up, rather than potentially seeing the healthy states
262 that existed before the restart.
263 """
264 def _fail_restart(id_):
265 self.mds_daemons[id_].stop()
266 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
267 self.mds_daemons[id_].restart()
268
269 self._one_or_all(mds_id, _fail_restart)
270
271 def newfs(self, name='cephfs', create=True):
272 return Filesystem(self._ctx, name=name, create=create)
273
274 def status(self):
275 return FSStatus(self.mon_manager)
276
277 def delete_all_filesystems(self):
278 """
279 Remove all filesystems that exist, and any pools in use by them.
280 """
281 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
282 pool_id_name = {}
283 for pool in pools:
284 pool_id_name[pool['pool']] = pool['pool_name']
285
286 # mark cluster down for each fs to prevent churn during deletion
287 status = self.status()
288 for fs in status.get_filesystems():
289 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
290
291 # get a new copy as actives may have since changed
292 status = self.status()
293 for fs in status.get_filesystems():
294 mdsmap = fs['mdsmap']
295 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
296
297 for gid in mdsmap['up'].values():
298 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
299
300 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
301 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
302 metadata_pool, metadata_pool,
303 '--yes-i-really-really-mean-it')
304 for data_pool in mdsmap['data_pools']:
305 data_pool = pool_id_name[data_pool]
306 try:
307 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
308 data_pool, data_pool,
309 '--yes-i-really-really-mean-it')
310 except CommandFailedError as e:
311 if e.exitstatus == 16: # EBUSY, this data pool is used
312 pass # by two metadata pools, let the 2nd
313 else: # pass delete it
314 raise
315
316 def get_standby_daemons(self):
317 return set([s['name'] for s in self.status().get_standbys()])
318
319 def get_mds_hostnames(self):
320 result = set()
321 for mds_id in self.mds_ids:
322 mds_remote = self.mon_manager.find_remote('mds', mds_id)
323 result.add(mds_remote.hostname)
324
325 return list(result)
326
327 def set_clients_block(self, blocked, mds_id=None):
328 """
329 Block (using iptables) client communications to this MDS. Be careful: if
330 other services are running on this MDS, or other MDSs try to talk to this
331 MDS, their communications may also be blocked as collatoral damage.
332
333 :param mds_id: Optional ID of MDS to block, default to all
334 :return:
335 """
336 da_flag = "-A" if blocked else "-D"
337
338 def set_block(_mds_id):
339 remote = self.mon_manager.find_remote('mds', _mds_id)
340 status = self.status()
341
342 addr = status.get_mds_addr(_mds_id)
343 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
344
345 remote.run(
346 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
347 "comment", "--comment", "teuthology"])
348 remote.run(
349 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
350 "comment", "--comment", "teuthology"])
351
352 self._one_or_all(mds_id, set_block, in_parallel=False)
353
354 def clear_firewall(self):
355 clear_firewall(self._ctx)
356
357 def get_mds_info(self, mds_id):
358 return FSStatus(self.mon_manager).get_mds(mds_id)
359
360 def is_pool_full(self, pool_name):
361 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
362 for pool in pools:
363 if pool['pool_name'] == pool_name:
364 return 'full' in pool['flags_names'].split(",")
365
366 raise RuntimeError("Pool not found '{0}'".format(pool_name))
367
368 class Filesystem(MDSCluster):
369 """
370 This object is for driving a CephFS filesystem. The MDS daemons driven by
371 MDSCluster may be shared with other Filesystems.
372 """
373 def __init__(self, ctx, fscid=None, name=None, create=False,
374 ec_profile=None):
375 super(Filesystem, self).__init__(ctx)
376
377 self.name = name
378 self.ec_profile = ec_profile
379 self.id = None
380 self.metadata_pool_name = None
381 self.metadata_overlay = False
382 self.data_pool_name = None
383 self.data_pools = None
384
385 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
386 self.client_id = client_list[0]
387 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
388
389 if name is not None:
390 if fscid is not None:
391 raise RuntimeError("cannot specify fscid when creating fs")
392 if create and not self.legacy_configured():
393 self.create()
394 else:
395 if fscid is not None:
396 self.id = fscid
397 self.getinfo(refresh = True)
398
399 # Stash a reference to the first created filesystem on ctx, so
400 # that if someone drops to the interactive shell they can easily
401 # poke our methods.
402 if not hasattr(self._ctx, "filesystem"):
403 self._ctx.filesystem = self
404
405 def getinfo(self, refresh = False):
406 status = self.status()
407 if self.id is not None:
408 fsmap = status.get_fsmap(self.id)
409 elif self.name is not None:
410 fsmap = status.get_fsmap_byname(self.name)
411 else:
412 fss = [fs for fs in status.get_filesystems()]
413 if len(fss) == 1:
414 fsmap = fss[0]
415 elif len(fss) == 0:
416 raise RuntimeError("no file system available")
417 else:
418 raise RuntimeError("more than one file system available")
419 self.id = fsmap['id']
420 self.name = fsmap['mdsmap']['fs_name']
421 self.get_pool_names(status = status, refresh = refresh)
422 return status
423
424 def set_metadata_overlay(self, overlay):
425 if self.id is not None:
426 raise RuntimeError("cannot specify fscid when configuring overlay")
427 self.metadata_overlay = overlay
428
429 def deactivate(self, rank):
430 if rank < 0:
431 raise RuntimeError("invalid rank")
432 elif rank == 0:
433 raise RuntimeError("cannot deactivate rank 0")
434 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
435
436 def set_max_mds(self, max_mds):
437 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
438
439 def set_allow_dirfrags(self, yes):
440 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
441
442 def get_pgs_per_fs_pool(self):
443 """
444 Calculate how many PGs to use when creating a pool, in order to avoid raising any
445 health warnings about mon_pg_warn_min_per_osd
446
447 :return: an integer number of PGs
448 """
449 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
450 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
451 return pg_warn_min_per_osd * osd_count
452
453 def create(self):
454 if self.name is None:
455 self.name = "cephfs"
456 if self.metadata_pool_name is None:
457 self.metadata_pool_name = "{0}_metadata".format(self.name)
458 if self.data_pool_name is None:
459 data_pool_name = "{0}_data".format(self.name)
460 else:
461 data_pool_name = self.data_pool_name
462
463 log.info("Creating filesystem '{0}'".format(self.name))
464
465 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
466
467 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
468 self.metadata_pool_name, pgs_per_fs_pool.__str__())
469 if self.metadata_overlay:
470 self.mon_manager.raw_cluster_cmd('fs', 'new',
471 self.name, self.metadata_pool_name, data_pool_name,
472 '--allow-dangerous-metadata-overlay')
473 else:
474 if self.ec_profile and 'disabled' not in self.ec_profile:
475 log.info("EC profile is %s", self.ec_profile)
476 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
477 cmd.extend(self.ec_profile)
478 self.mon_manager.raw_cluster_cmd(*cmd)
479 self.mon_manager.raw_cluster_cmd(
480 'osd', 'pool', 'create',
481 data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
482 data_pool_name)
483 self.mon_manager.raw_cluster_cmd(
484 'osd', 'pool', 'set',
485 data_pool_name, 'allow_ec_overwrites', 'true')
486 else:
487 self.mon_manager.raw_cluster_cmd(
488 'osd', 'pool', 'create',
489 data_pool_name, pgs_per_fs_pool.__str__())
490 self.mon_manager.raw_cluster_cmd('fs', 'new',
491 self.name, self.metadata_pool_name, data_pool_name)
492 self.check_pool_application(self.metadata_pool_name)
493 self.check_pool_application(data_pool_name)
494 # Turn off spurious standby count warnings from modifying max_mds in tests.
495 try:
496 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
497 except CommandFailedError as e:
498 if e.exitstatus == 22:
499 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
500 pass
501 else:
502 raise
503
504 self.getinfo(refresh = True)
505
506
507 def check_pool_application(self, pool_name):
508 osd_map = self.mon_manager.get_osd_dump_json()
509 for pool in osd_map['pools']:
510 if pool['pool_name'] == pool_name:
511 if "application_metadata" in pool:
512 if not "cephfs" in pool['application_metadata']:
513 raise RuntimeError("Pool %p does not name cephfs as application!".\
514 format(pool_name))
515
516
517 def __del__(self):
518 if getattr(self._ctx, "filesystem", None) == self:
519 delattr(self._ctx, "filesystem")
520
521 def exists(self):
522 """
523 Whether a filesystem exists in the mon's filesystem list
524 """
525 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
526 return self.name in [fs['name'] for fs in fs_list]
527
528 def legacy_configured(self):
529 """
530 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
531 the case, the caller should avoid using Filesystem.create
532 """
533 try:
534 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
535 pools = json.loads(out_text)
536 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
537 if metadata_pool_exists:
538 self.metadata_pool_name = 'metadata'
539 except CommandFailedError as e:
540 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
541 # structured output (--format) from the CLI.
542 if e.exitstatus == 22:
543 metadata_pool_exists = True
544 else:
545 raise
546
547 return metadata_pool_exists
548
549 def _df(self):
550 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
551
552 def get_mds_map(self):
553 return self.status().get_fsmap(self.id)['mdsmap']
554
555 def add_data_pool(self, name):
556 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
557 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
558 self.get_pool_names(refresh = True)
559 for poolid, fs_name in self.data_pools.items():
560 if name == fs_name:
561 return poolid
562 raise RuntimeError("could not get just created pool '{0}'".format(name))
563
564 def get_pool_names(self, refresh = False, status = None):
565 if refresh or self.metadata_pool_name is None or self.data_pools is None:
566 if status is None:
567 status = self.status()
568 fsmap = status.get_fsmap(self.id)
569
570 osd_map = self.mon_manager.get_osd_dump_json()
571 id_to_name = {}
572 for p in osd_map['pools']:
573 id_to_name[p['pool']] = p['pool_name']
574
575 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
576 self.data_pools = {}
577 for data_pool in fsmap['mdsmap']['data_pools']:
578 self.data_pools[data_pool] = id_to_name[data_pool]
579
580 def get_data_pool_name(self, refresh = False):
581 if refresh or self.data_pools is None:
582 self.get_pool_names(refresh = True)
583 assert(len(self.data_pools) == 1)
584 return self.data_pools.values()[0]
585
586 def get_data_pool_id(self, refresh = False):
587 """
588 Don't call this if you have multiple data pools
589 :return: integer
590 """
591 if refresh or self.data_pools is None:
592 self.get_pool_names(refresh = True)
593 assert(len(self.data_pools) == 1)
594 return self.data_pools.keys()[0]
595
596 def get_data_pool_names(self, refresh = False):
597 if refresh or self.data_pools is None:
598 self.get_pool_names(refresh = True)
599 return self.data_pools.values()
600
601 def get_metadata_pool_name(self):
602 return self.metadata_pool_name
603
604 def set_data_pool_name(self, name):
605 if self.id is not None:
606 raise RuntimeError("can't set filesystem name if its fscid is set")
607 self.data_pool_name = name
608
609 def get_namespace_id(self):
610 return self.id
611
612 def get_pool_df(self, pool_name):
613 """
614 Return a dict like:
615 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
616 """
617 for pool_df in self._df()['pools']:
618 if pool_df['name'] == pool_name:
619 return pool_df['stats']
620
621 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
622
623 def get_usage(self):
624 return self._df()['stats']['total_used_bytes']
625
626 def are_daemons_healthy(self):
627 """
628 Return true if all daemons are in one of active, standby, standby-replay, and
629 at least max_mds daemons are in 'active'.
630
631 Unlike most of Filesystem, this function is tolerant of new-style `fs`
632 commands being missing, because we are part of the ceph installation
633 process during upgrade suites, so must fall back to old style commands
634 when we get an EINVAL on a new style command.
635
636 :return:
637 """
638
639 active_count = 0
640 try:
641 mds_map = self.get_mds_map()
642 except CommandFailedError as cfe:
643 # Old version, fall back to non-multi-fs commands
644 if cfe.exitstatus == errno.EINVAL:
645 mds_map = json.loads(
646 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
647 else:
648 raise
649
650 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
651
652 for mds_id, mds_status in mds_map['info'].items():
653 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
654 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
655 return False
656 elif mds_status['state'] == 'up:active':
657 active_count += 1
658
659 log.info("are_daemons_healthy: {0}/{1}".format(
660 active_count, mds_map['max_mds']
661 ))
662
663 if active_count >= mds_map['max_mds']:
664 # The MDSMap says these guys are active, but let's check they really are
665 for mds_id, mds_status in mds_map['info'].items():
666 if mds_status['state'] == 'up:active':
667 try:
668 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
669 except CommandFailedError as cfe:
670 if cfe.exitstatus == errno.EINVAL:
671 # Old version, can't do this check
672 continue
673 else:
674 # MDS not even running
675 return False
676
677 if daemon_status['state'] != 'up:active':
678 # MDS hasn't taken the latest map yet
679 return False
680
681 return True
682 else:
683 return False
684
685 def get_daemon_names(self, state=None):
686 """
687 Return MDS daemon names of those daemons in the given state
688 :param state:
689 :return:
690 """
691 status = self.get_mds_map()
692 result = []
693 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
694 if mds_status['state'] == state or state is None:
695 result.append(mds_status['name'])
696
697 return result
698
699 def get_active_names(self):
700 """
701 Return MDS daemon names of those daemons holding ranks
702 in state up:active
703
704 :return: list of strings like ['a', 'b'], sorted by rank
705 """
706 return self.get_daemon_names("up:active")
707
708 def get_all_mds_rank(self):
709 status = self.get_mds_map()
710 result = []
711 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
712 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
713 result.append(mds_status['rank'])
714
715 return result
716
717 def get_rank(self, rank=0, status=None):
718 if status is None:
719 status = self.getinfo()
720 return status.get_rank(self.id, rank)
721
722 def get_ranks(self, status=None):
723 if status is None:
724 status = self.getinfo()
725 return status.get_ranks(self.id)
726
727 def get_rank_names(self, status=None):
728 """
729 Return MDS daemon names of those daemons holding a rank,
730 sorted by rank. This includes e.g. up:replay/reconnect
731 as well as active, but does not include standby or
732 standby-replay.
733 """
734 status = self.get_mds_map()
735 result = []
736 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
737 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
738 result.append(mds_status['name'])
739
740 return result
741
742 def wait_for_daemons(self, timeout=None):
743 """
744 Wait until all daemons are healthy
745 :return:
746 """
747
748 if timeout is None:
749 timeout = DAEMON_WAIT_TIMEOUT
750
751 elapsed = 0
752 while True:
753 if self.are_daemons_healthy():
754 return
755 else:
756 time.sleep(1)
757 elapsed += 1
758
759 if elapsed > timeout:
760 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
761
762 def get_lone_mds_id(self):
763 """
764 Get a single MDS ID: the only one if there is only one
765 configured, else the only one currently holding a rank,
766 else raise an error.
767 """
768 if len(self.mds_ids) != 1:
769 alive = self.get_rank_names()
770 if len(alive) == 1:
771 return alive[0]
772 else:
773 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
774 else:
775 return self.mds_ids[0]
776
777 def recreate(self):
778 log.info("Creating new filesystem")
779 self.delete_all_filesystems()
780 self.id = None
781 self.create()
782
783 def put_metadata_object_raw(self, object_id, infile):
784 """
785 Save an object to the metadata pool
786 """
787 temp_bin_path = infile
788 self.client_remote.run(args=[
789 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
790 ])
791
792 def get_metadata_object_raw(self, object_id):
793 """
794 Retrieve an object from the metadata pool and store it in a file.
795 """
796 temp_bin_path = '/tmp/' + object_id + '.bin'
797
798 self.client_remote.run(args=[
799 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
800 ])
801
802 return temp_bin_path
803
804 def get_metadata_object(self, object_type, object_id):
805 """
806 Retrieve an object from the metadata pool, pass it through
807 ceph-dencoder to dump it to JSON, and return the decoded object.
808 """
809 temp_bin_path = '/tmp/out.bin'
810
811 self.client_remote.run(args=[
812 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
813 ])
814
815 stdout = StringIO()
816 self.client_remote.run(args=[
817 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
818 ], stdout=stdout)
819 dump_json = stdout.getvalue().strip()
820 try:
821 dump = json.loads(dump_json)
822 except (TypeError, ValueError):
823 log.error("Failed to decode JSON: '{0}'".format(dump_json))
824 raise
825
826 return dump
827
828 def get_journal_version(self):
829 """
830 Read the JournalPointer and Journal::Header objects to learn the version of
831 encoding in use.
832 """
833 journal_pointer_object = '400.00000000'
834 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
835 journal_ino = journal_pointer_dump['journal_pointer']['front']
836
837 journal_header_object = "{0:x}.00000000".format(journal_ino)
838 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
839
840 version = journal_header_dump['journal_header']['stream_format']
841 log.info("Read journal version {0}".format(version))
842
843 return version
844
845 def mds_asok(self, command, mds_id=None):
846 if mds_id is None:
847 mds_id = self.get_lone_mds_id()
848
849 return self.json_asok(command, 'mds', mds_id)
850
851 def rank_asok(self, command, rank=0):
852 info = self.get_rank(rank=rank)
853 return self.json_asok(command, 'mds', info['name'])
854
855 def read_cache(self, path, depth=None):
856 cmd = ["dump", "tree", path]
857 if depth is not None:
858 cmd.append(depth.__str__())
859 result = self.mds_asok(cmd)
860 if len(result) == 0:
861 raise RuntimeError("Path not found in cache: {0}".format(path))
862
863 return result
864
865 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
866 """
867 Block until the MDS reaches a particular state, or a failure condition
868 is met.
869
870 When there are multiple MDSs, succeed when exaclty one MDS is in the
871 goal state, or fail when any MDS is in the reject state.
872
873 :param goal_state: Return once the MDS is in this state
874 :param reject: Fail if the MDS enters this state before the goal state
875 :param timeout: Fail if this many seconds pass before reaching goal
876 :return: number of seconds waited, rounded down to integer
877 """
878
879 started_at = time.time()
880 while True:
881 status = self.status()
882 if rank is not None:
883 mds_info = status.get_rank(self.id, rank)
884 current_state = mds_info['state'] if mds_info else None
885 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
886 elif mds_id is not None:
887 # mds_info is None if no daemon with this ID exists in the map
888 mds_info = status.get_mds(mds_id)
889 current_state = mds_info['state'] if mds_info else None
890 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
891 else:
892 # In general, look for a single MDS
893 states = [m['state'] for m in status.get_ranks(self.id)]
894 if [s for s in states if s == goal_state] == [goal_state]:
895 current_state = goal_state
896 elif reject in states:
897 current_state = reject
898 else:
899 current_state = None
900 log.info("mapped states {0} to {1}".format(states, current_state))
901
902 elapsed = time.time() - started_at
903 if current_state == goal_state:
904 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
905 return elapsed
906 elif reject is not None and current_state == reject:
907 raise RuntimeError("MDS in reject state {0}".format(current_state))
908 elif timeout is not None and elapsed > timeout:
909 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
910 raise RuntimeError(
911 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
912 elapsed, goal_state, current_state
913 ))
914 else:
915 time.sleep(1)
916
917 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
918 mds_id = self.mds_ids[0]
919 remote = self.mds_daemons[mds_id].remote
920 if pool is None:
921 pool = self.get_data_pool_name()
922
923 obj_name = "{0:x}.00000000".format(ino_no)
924
925 args = [
926 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
927 ]
928 try:
929 proc = remote.run(
930 args=args,
931 stdout=StringIO())
932 except CommandFailedError as e:
933 log.error(e.__str__())
934 raise ObjectNotFound(obj_name)
935
936 data = proc.stdout.getvalue()
937
938 p = remote.run(
939 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
940 stdout=StringIO(),
941 stdin=data
942 )
943
944 return json.loads(p.stdout.getvalue().strip())
945
946 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
947 """
948 Write to an xattr of the 0th data object of an inode. Will
949 succeed whether the object and/or xattr already exist or not.
950
951 :param ino_no: integer inode number
952 :param xattr_name: string name of the xattr
953 :param data: byte array data to write to the xattr
954 :param pool: name of data pool or None to use primary data pool
955 :return: None
956 """
957 remote = self.mds_daemons[self.mds_ids[0]].remote
958 if pool is None:
959 pool = self.get_data_pool_name()
960
961 obj_name = "{0:x}.00000000".format(ino_no)
962 args = [
963 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
964 obj_name, xattr_name, data
965 ]
966 remote.run(
967 args=args,
968 stdout=StringIO())
969
970 def read_backtrace(self, ino_no, pool=None):
971 """
972 Read the backtrace from the data pool, return a dict in the format
973 given by inode_backtrace_t::dump, which is something like:
974
975 ::
976
977 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
978 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
979
980 { "ino": 1099511627778,
981 "ancestors": [
982 { "dirino": 1,
983 "dname": "blah",
984 "version": 11}],
985 "pool": 1,
986 "old_pools": []}
987
988 :param pool: name of pool to read backtrace from. If omitted, FS must have only
989 one data pool and that will be used.
990 """
991 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
992
993 def read_layout(self, ino_no, pool=None):
994 """
995 Read 'layout' xattr of an inode and parse the result, returning a dict like:
996 ::
997 {
998 "stripe_unit": 4194304,
999 "stripe_count": 1,
1000 "object_size": 4194304,
1001 "pool_id": 1,
1002 "pool_ns": "",
1003 }
1004
1005 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1006 one data pool and that will be used.
1007 """
1008 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1009
1010 def _enumerate_data_objects(self, ino, size):
1011 """
1012 Get the list of expected data objects for a range, and the list of objects
1013 that really exist.
1014
1015 :return a tuple of two lists of strings (expected, actual)
1016 """
1017 stripe_size = 1024 * 1024 * 4
1018
1019 size = max(stripe_size, size)
1020
1021 want_objects = [
1022 "{0:x}.{1:08x}".format(ino, n)
1023 for n in range(0, ((size - 1) / stripe_size) + 1)
1024 ]
1025
1026 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1027
1028 return want_objects, exist_objects
1029
1030 def data_objects_present(self, ino, size):
1031 """
1032 Check that *all* the expected data objects for an inode are present in the data pool
1033 """
1034
1035 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1036 missing = set(want_objects) - set(exist_objects)
1037
1038 if missing:
1039 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1040 ino, size, missing
1041 ))
1042 return False
1043 else:
1044 log.info("All objects for ino {0} size {1} found".format(ino, size))
1045 return True
1046
1047 def data_objects_absent(self, ino, size):
1048 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1049 present = set(want_objects) & set(exist_objects)
1050
1051 if present:
1052 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1053 ino, size, present
1054 ))
1055 return False
1056 else:
1057 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1058 return True
1059
1060 def dirfrag_exists(self, ino, frag):
1061 try:
1062 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1063 except CommandFailedError as e:
1064 return False
1065 else:
1066 return True
1067
1068 def rados(self, args, pool=None, namespace=None, stdin_data=None,
1069 stdin_file=None):
1070 """
1071 Call into the `rados` CLI from an MDS
1072 """
1073
1074 if pool is None:
1075 pool = self.get_metadata_pool_name()
1076
1077 # Doesn't matter which MDS we use to run rados commands, they all
1078 # have access to the pools
1079 mds_id = self.mds_ids[0]
1080 remote = self.mds_daemons[mds_id].remote
1081
1082 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1083 # using the `rados` CLI
1084 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1085 (["--namespace", namespace] if namespace else []) +
1086 args)
1087
1088 if stdin_file is not None:
1089 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
1090
1091 p = remote.run(
1092 args=args,
1093 stdin=stdin_data,
1094 stdout=StringIO())
1095 return p.stdout.getvalue().strip()
1096
1097 def list_dirfrag(self, dir_ino):
1098 """
1099 Read the named object and return the list of omap keys
1100
1101 :return a list of 0 or more strings
1102 """
1103
1104 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1105
1106 try:
1107 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1108 except CommandFailedError as e:
1109 log.error(e.__str__())
1110 raise ObjectNotFound(dirfrag_obj_name)
1111
1112 return key_list_str.split("\n") if key_list_str else []
1113
1114 def erase_metadata_objects(self, prefix):
1115 """
1116 For all objects in the metadata pool matching the prefix,
1117 erase them.
1118
1119 This O(N) with the number of objects in the pool, so only suitable
1120 for use on toy test filesystems.
1121 """
1122 all_objects = self.rados(["ls"]).split("\n")
1123 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1124 for o in matching_objects:
1125 self.rados(["rm", o])
1126
1127 def erase_mds_objects(self, rank):
1128 """
1129 Erase all the per-MDS objects for a particular rank. This includes
1130 inotable, sessiontable, journal
1131 """
1132
1133 def obj_prefix(multiplier):
1134 """
1135 MDS object naming conventions like rank 1's
1136 journal is at 201.***
1137 """
1138 return "%x." % (multiplier * 0x100 + rank)
1139
1140 # MDS_INO_LOG_OFFSET
1141 self.erase_metadata_objects(obj_prefix(2))
1142 # MDS_INO_LOG_BACKUP_OFFSET
1143 self.erase_metadata_objects(obj_prefix(3))
1144 # MDS_INO_LOG_POINTER_OFFSET
1145 self.erase_metadata_objects(obj_prefix(4))
1146 # MDSTables & SessionMap
1147 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1148
1149 @property
1150 def _prefix(self):
1151 """
1152 Override this to set a different
1153 """
1154 return ""
1155
1156 def _run_tool(self, tool, args, rank=None, quiet=False):
1157 # Tests frequently have [client] configuration that jacks up
1158 # the objecter log level (unlikely to be interesting here)
1159 # and does not set the mds log level (very interesting here)
1160 if quiet:
1161 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1162 else:
1163 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1164
1165 if rank is not None:
1166 base_args.extend(["--rank", "%d" % rank])
1167
1168 t1 = datetime.datetime.now()
1169 r = self.tool_remote.run(
1170 args=base_args + args,
1171 stdout=StringIO()).stdout.getvalue().strip()
1172 duration = datetime.datetime.now() - t1
1173 log.info("Ran {0} in time {1}, result:\n{2}".format(
1174 base_args + args, duration, r
1175 ))
1176 return r
1177
1178 @property
1179 def tool_remote(self):
1180 """
1181 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1182 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1183 so that tests can use this remote to go get locally written output files from the tools.
1184 """
1185 mds_id = self.mds_ids[0]
1186 return self.mds_daemons[mds_id].remote
1187
1188 def journal_tool(self, args, rank=None, quiet=False):
1189 """
1190 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1191 """
1192 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1193
1194 def table_tool(self, args, quiet=False):
1195 """
1196 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1197 """
1198 return self._run_tool("cephfs-table-tool", args, None, quiet)
1199
1200 def data_scan(self, args, quiet=False, worker_count=1):
1201 """
1202 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1203
1204 :param worker_count: if greater than 1, multiple workers will be run
1205 in parallel and the return value will be None
1206 """
1207
1208 workers = []
1209
1210 for n in range(0, worker_count):
1211 if worker_count > 1:
1212 # data-scan args first token is a command, followed by args to it.
1213 # insert worker arguments after the command.
1214 cmd = args[0]
1215 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1216 else:
1217 worker_args = args
1218
1219 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1220 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1221
1222 for w in workers:
1223 w.get()
1224
1225 if worker_count == 1:
1226 return workers[0].value
1227 else:
1228 return None
1229
1230 def is_full(self):
1231 return self.is_pool_full(self.get_data_pool_name())