]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
update sources to 12.2.10
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11 import random
12
13 from teuthology.exceptions import CommandFailedError
14 from teuthology import misc
15 from teuthology.nuke import clear_firewall
16 from teuthology.parallel import parallel
17 from tasks.ceph_manager import write_conf
18 from tasks import ceph_manager
19
20
21 log = logging.getLogger(__name__)
22
23
24 DAEMON_WAIT_TIMEOUT = 120
25 ROOT_INO = 1
26
27
28 class ObjectNotFound(Exception):
29 def __init__(self, object_name):
30 self._object_name = object_name
31
32 def __str__(self):
33 return "Object not found: '{0}'".format(self._object_name)
34
35 class FSStatus(object):
36 """
37 Operations on a snapshot of the FSMap.
38 """
39 def __init__(self, mon_manager):
40 self.mon = mon_manager
41 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
42
43 def __str__(self):
44 return json.dumps(self.map, indent = 2, sort_keys = True)
45
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self, key):
48 """
49 Get a field from the fsmap.
50 """
51 return self.map[key]
52
53 def get_filesystems(self):
54 """
55 Iterator for all filesystems.
56 """
57 for fs in self.map['filesystems']:
58 yield fs
59
60 def get_all(self):
61 """
62 Iterator for all the mds_info components in the FSMap.
63 """
64 for info in self.get_standbys():
65 yield info
66 for fs in self.map['filesystems']:
67 for info in fs['mdsmap']['info'].values():
68 yield info
69
70 def get_standbys(self):
71 """
72 Iterator for all standbys.
73 """
74 for info in self.map['standbys']:
75 yield info
76
77 def get_fsmap(self, fscid):
78 """
79 Get the fsmap for the given FSCID.
80 """
81 for fs in self.map['filesystems']:
82 if fscid is None or fs['id'] == fscid:
83 return fs
84 raise RuntimeError("FSCID {0} not in map".format(fscid))
85
86 def get_fsmap_byname(self, name):
87 """
88 Get the fsmap for the given file system name.
89 """
90 for fs in self.map['filesystems']:
91 if name is None or fs['mdsmap']['fs_name'] == name:
92 return fs
93 raise RuntimeError("FS {0} not in map".format(name))
94
95 def get_replays(self, fscid):
96 """
97 Get the standby:replay MDS for the given FSCID.
98 """
99 fs = self.get_fsmap(fscid)
100 for info in fs['mdsmap']['info'].values():
101 if info['state'] == 'up:standby-replay':
102 yield info
103
104 def get_ranks(self, fscid):
105 """
106 Get the ranks for the given FSCID.
107 """
108 fs = self.get_fsmap(fscid)
109 for info in fs['mdsmap']['info'].values():
110 if info['rank'] >= 0:
111 yield info
112
113 def get_rank(self, fscid, rank):
114 """
115 Get the rank for the given FSCID.
116 """
117 for info in self.get_ranks(fscid):
118 if info['rank'] == rank:
119 return info
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
121
122 def get_mds(self, name):
123 """
124 Get the info for the given MDS name.
125 """
126 for info in self.get_all():
127 if info['name'] == name:
128 return info
129 return None
130
131 def get_mds_addr(self, name):
132 """
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134 """
135 info = self.get_mds(name)
136 if info:
137 return info['addr']
138 else:
139 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name))
141
142 class CephCluster(object):
143 @property
144 def admin_remote(self):
145 first_mon = misc.get_first_mon(self._ctx, None)
146 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
147 return result
148
149 def __init__(self, ctx):
150 self._ctx = ctx
151 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
152
153 def get_config(self, key, service_type=None):
154 """
155 Get config from mon by default, or a specific service if caller asks for it
156 """
157 if service_type is None:
158 service_type = 'mon'
159
160 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
161 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
162
163 def set_ceph_conf(self, subsys, key, value):
164 if subsys not in self._ctx.ceph['ceph'].conf:
165 self._ctx.ceph['ceph'].conf[subsys] = {}
166 self._ctx.ceph['ceph'].conf[subsys][key] = value
167 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
169
170 def clear_ceph_conf(self, subsys, key):
171 del self._ctx.ceph['ceph'].conf[subsys][key]
172 write_conf(self._ctx)
173
174 def json_asok(self, command, service_type, service_id):
175 proc = self.mon_manager.admin_socket(service_type, service_id, command)
176 response_data = proc.stdout.getvalue()
177 log.info("_json_asok output: {0}".format(response_data))
178 if response_data.strip():
179 return json.loads(response_data)
180 else:
181 return None
182
183
184 class MDSCluster(CephCluster):
185 """
186 Collective operations on all the MDS daemons in the Ceph cluster. These
187 daemons may be in use by various Filesystems.
188
189 For the benefit of pre-multi-filesystem tests, this class is also
190 a parent of Filesystem. The correct way to use MDSCluster going forward is
191 as a separate instance outside of your (multiple) Filesystem instances.
192 """
193 def __init__(self, ctx):
194 super(MDSCluster, self).__init__(ctx)
195
196 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
197
198 if len(self.mds_ids) == 0:
199 raise RuntimeError("This task requires at least one MDS")
200
201 if hasattr(self._ctx, "daemons"):
202 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
204
205 def _one_or_all(self, mds_id, cb, in_parallel=True):
206 """
207 Call a callback for a single named MDS, or for all.
208
209 Note that the parallelism here isn't for performance, it's to avoid being overly kind
210 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211 avoid being overly kind by executing them in a particular order. However, some actions
212 don't cope with being done in parallel, so it's optional (`in_parallel`)
213
214 :param mds_id: MDS daemon name, or None
215 :param cb: Callback taking single argument of MDS daemon name
216 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
217 """
218 if mds_id is None:
219 if in_parallel:
220 with parallel() as p:
221 for mds_id in self.mds_ids:
222 p.spawn(cb, mds_id)
223 else:
224 for mds_id in self.mds_ids:
225 cb(mds_id)
226 else:
227 cb(mds_id)
228
229 def get_config(self, key, service_type=None):
230 """
231 get_config specialization of service_type="mds"
232 """
233 if service_type != "mds":
234 return super(MDSCluster, self).get_config(key, service_type)
235
236 # Some tests stop MDS daemons, don't send commands to a dead one:
237 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def mds_stop(self, mds_id=None):
241 """
242 Stop the MDS daemon process(se). If it held a rank, that rank
243 will eventually go laggy.
244 """
245 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
246
247 def mds_fail(self, mds_id=None):
248 """
249 Inform MDSMonitor of the death of the daemon process(es). If it held
250 a rank, that rank will be relinquished.
251 """
252 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
253
254 def mds_restart(self, mds_id=None):
255 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
256
257 def mds_fail_restart(self, mds_id=None):
258 """
259 Variation on restart that includes marking MDSs as failed, so that doing this
260 operation followed by waiting for healthy daemon states guarantees that they
261 have gone down and come up, rather than potentially seeing the healthy states
262 that existed before the restart.
263 """
264 def _fail_restart(id_):
265 self.mds_daemons[id_].stop()
266 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
267 self.mds_daemons[id_].restart()
268
269 self._one_or_all(mds_id, _fail_restart)
270
271 def mds_signal(self, mds_id, sig, silent=False):
272 """
273 signal a MDS daemon
274 """
275 self.mds_daemons[mds_id].signal(sig, silent);
276
277 def newfs(self, name='cephfs', create=True):
278 return Filesystem(self._ctx, name=name, create=create)
279
280 def status(self):
281 return FSStatus(self.mon_manager)
282
283 def delete_all_filesystems(self):
284 """
285 Remove all filesystems that exist, and any pools in use by them.
286 """
287 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
288 pool_id_name = {}
289 for pool in pools:
290 pool_id_name[pool['pool']] = pool['pool_name']
291
292 # mark cluster down for each fs to prevent churn during deletion
293 status = self.status()
294 for fs in status.get_filesystems():
295 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
296
297 # get a new copy as actives may have since changed
298 status = self.status()
299 for fs in status.get_filesystems():
300 mdsmap = fs['mdsmap']
301 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
302
303 for gid in mdsmap['up'].values():
304 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
305
306 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
307 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
308 metadata_pool, metadata_pool,
309 '--yes-i-really-really-mean-it')
310 for data_pool in mdsmap['data_pools']:
311 data_pool = pool_id_name[data_pool]
312 try:
313 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
314 data_pool, data_pool,
315 '--yes-i-really-really-mean-it')
316 except CommandFailedError as e:
317 if e.exitstatus == 16: # EBUSY, this data pool is used
318 pass # by two metadata pools, let the 2nd
319 else: # pass delete it
320 raise
321
322 def get_standby_daemons(self):
323 return set([s['name'] for s in self.status().get_standbys()])
324
325 def get_mds_hostnames(self):
326 result = set()
327 for mds_id in self.mds_ids:
328 mds_remote = self.mon_manager.find_remote('mds', mds_id)
329 result.add(mds_remote.hostname)
330
331 return list(result)
332
333 def set_clients_block(self, blocked, mds_id=None):
334 """
335 Block (using iptables) client communications to this MDS. Be careful: if
336 other services are running on this MDS, or other MDSs try to talk to this
337 MDS, their communications may also be blocked as collatoral damage.
338
339 :param mds_id: Optional ID of MDS to block, default to all
340 :return:
341 """
342 da_flag = "-A" if blocked else "-D"
343
344 def set_block(_mds_id):
345 remote = self.mon_manager.find_remote('mds', _mds_id)
346 status = self.status()
347
348 addr = status.get_mds_addr(_mds_id)
349 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
350
351 remote.run(
352 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
353 "comment", "--comment", "teuthology"])
354 remote.run(
355 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
356 "comment", "--comment", "teuthology"])
357
358 self._one_or_all(mds_id, set_block, in_parallel=False)
359
360 def clear_firewall(self):
361 clear_firewall(self._ctx)
362
363 def get_mds_info(self, mds_id):
364 return FSStatus(self.mon_manager).get_mds(mds_id)
365
366 def is_pool_full(self, pool_name):
367 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
368 for pool in pools:
369 if pool['pool_name'] == pool_name:
370 return 'full' in pool['flags_names'].split(",")
371
372 raise RuntimeError("Pool not found '{0}'".format(pool_name))
373
374 class Filesystem(MDSCluster):
375 """
376 This object is for driving a CephFS filesystem. The MDS daemons driven by
377 MDSCluster may be shared with other Filesystems.
378 """
379 def __init__(self, ctx, fscid=None, name=None, create=False,
380 ec_profile=None):
381 super(Filesystem, self).__init__(ctx)
382
383 self.name = name
384 self.ec_profile = ec_profile
385 self.id = None
386 self.metadata_pool_name = None
387 self.metadata_overlay = False
388 self.data_pool_name = None
389 self.data_pools = None
390
391 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
392 self.client_id = client_list[0]
393 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
394
395 if name is not None:
396 if fscid is not None:
397 raise RuntimeError("cannot specify fscid when creating fs")
398 if create and not self.legacy_configured():
399 self.create()
400 else:
401 if fscid is not None:
402 self.id = fscid
403 self.getinfo(refresh = True)
404
405 # Stash a reference to the first created filesystem on ctx, so
406 # that if someone drops to the interactive shell they can easily
407 # poke our methods.
408 if not hasattr(self._ctx, "filesystem"):
409 self._ctx.filesystem = self
410
411 def getinfo(self, refresh = False):
412 status = self.status()
413 if self.id is not None:
414 fsmap = status.get_fsmap(self.id)
415 elif self.name is not None:
416 fsmap = status.get_fsmap_byname(self.name)
417 else:
418 fss = [fs for fs in status.get_filesystems()]
419 if len(fss) == 1:
420 fsmap = fss[0]
421 elif len(fss) == 0:
422 raise RuntimeError("no file system available")
423 else:
424 raise RuntimeError("more than one file system available")
425 self.id = fsmap['id']
426 self.name = fsmap['mdsmap']['fs_name']
427 self.get_pool_names(status = status, refresh = refresh)
428 return status
429
430 def set_metadata_overlay(self, overlay):
431 if self.id is not None:
432 raise RuntimeError("cannot specify fscid when configuring overlay")
433 self.metadata_overlay = overlay
434
435 def deactivate(self, rank):
436 if rank < 0:
437 raise RuntimeError("invalid rank")
438 elif rank == 0:
439 raise RuntimeError("cannot deactivate rank 0")
440 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
441
442 def set_var(self, var, *args):
443 a = map(str, args)
444 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
445
446 def set_max_mds(self, max_mds):
447 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
448
449 def set_allow_dirfrags(self, yes):
450 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
451
452 def get_pgs_per_fs_pool(self):
453 """
454 Calculate how many PGs to use when creating a pool, in order to avoid raising any
455 health warnings about mon_pg_warn_min_per_osd
456
457 :return: an integer number of PGs
458 """
459 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
460 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
461 return pg_warn_min_per_osd * osd_count
462
463 def create(self):
464 if self.name is None:
465 self.name = "cephfs"
466 if self.metadata_pool_name is None:
467 self.metadata_pool_name = "{0}_metadata".format(self.name)
468 if self.data_pool_name is None:
469 data_pool_name = "{0}_data".format(self.name)
470 else:
471 data_pool_name = self.data_pool_name
472
473 log.info("Creating filesystem '{0}'".format(self.name))
474
475 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
476
477 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
478 self.metadata_pool_name, pgs_per_fs_pool.__str__())
479 if self.metadata_overlay:
480 self.mon_manager.raw_cluster_cmd('fs', 'new',
481 self.name, self.metadata_pool_name, data_pool_name,
482 '--allow-dangerous-metadata-overlay')
483 else:
484 if self.ec_profile and 'disabled' not in self.ec_profile:
485 log.info("EC profile is %s", self.ec_profile)
486 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
487 cmd.extend(self.ec_profile)
488 self.mon_manager.raw_cluster_cmd(*cmd)
489 self.mon_manager.raw_cluster_cmd(
490 'osd', 'pool', 'create',
491 data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
492 data_pool_name)
493 self.mon_manager.raw_cluster_cmd(
494 'osd', 'pool', 'set',
495 data_pool_name, 'allow_ec_overwrites', 'true')
496 else:
497 self.mon_manager.raw_cluster_cmd(
498 'osd', 'pool', 'create',
499 data_pool_name, pgs_per_fs_pool.__str__())
500 self.mon_manager.raw_cluster_cmd('fs', 'new',
501 self.name, self.metadata_pool_name, data_pool_name)
502 self.check_pool_application(self.metadata_pool_name)
503 self.check_pool_application(data_pool_name)
504 # Turn off spurious standby count warnings from modifying max_mds in tests.
505 try:
506 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
507 except CommandFailedError as e:
508 if e.exitstatus == 22:
509 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
510 pass
511 else:
512 raise
513
514 self.getinfo(refresh = True)
515
516
517 def check_pool_application(self, pool_name):
518 osd_map = self.mon_manager.get_osd_dump_json()
519 for pool in osd_map['pools']:
520 if pool['pool_name'] == pool_name:
521 if "application_metadata" in pool:
522 if not "cephfs" in pool['application_metadata']:
523 raise RuntimeError("Pool %p does not name cephfs as application!".\
524 format(pool_name))
525
526
527 def __del__(self):
528 if getattr(self._ctx, "filesystem", None) == self:
529 delattr(self._ctx, "filesystem")
530
531 def exists(self):
532 """
533 Whether a filesystem exists in the mon's filesystem list
534 """
535 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
536 return self.name in [fs['name'] for fs in fs_list]
537
538 def legacy_configured(self):
539 """
540 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
541 the case, the caller should avoid using Filesystem.create
542 """
543 try:
544 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
545 pools = json.loads(out_text)
546 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
547 if metadata_pool_exists:
548 self.metadata_pool_name = 'metadata'
549 except CommandFailedError as e:
550 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
551 # structured output (--format) from the CLI.
552 if e.exitstatus == 22:
553 metadata_pool_exists = True
554 else:
555 raise
556
557 return metadata_pool_exists
558
559 def _df(self):
560 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
561
562 def get_mds_map(self):
563 return self.status().get_fsmap(self.id)['mdsmap']
564
565 def get_var(self, var):
566 return self.status().get_fsmap(self.id)['mdsmap'][var]
567
568 def add_data_pool(self, name):
569 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
570 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
571 self.get_pool_names(refresh = True)
572 for poolid, fs_name in self.data_pools.items():
573 if name == fs_name:
574 return poolid
575 raise RuntimeError("could not get just created pool '{0}'".format(name))
576
577 def get_pool_names(self, refresh = False, status = None):
578 if refresh or self.metadata_pool_name is None or self.data_pools is None:
579 if status is None:
580 status = self.status()
581 fsmap = status.get_fsmap(self.id)
582
583 osd_map = self.mon_manager.get_osd_dump_json()
584 id_to_name = {}
585 for p in osd_map['pools']:
586 id_to_name[p['pool']] = p['pool_name']
587
588 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
589 self.data_pools = {}
590 for data_pool in fsmap['mdsmap']['data_pools']:
591 self.data_pools[data_pool] = id_to_name[data_pool]
592
593 def get_data_pool_name(self, refresh = False):
594 if refresh or self.data_pools is None:
595 self.get_pool_names(refresh = True)
596 assert(len(self.data_pools) == 1)
597 return self.data_pools.values()[0]
598
599 def get_data_pool_id(self, refresh = False):
600 """
601 Don't call this if you have multiple data pools
602 :return: integer
603 """
604 if refresh or self.data_pools is None:
605 self.get_pool_names(refresh = True)
606 assert(len(self.data_pools) == 1)
607 return self.data_pools.keys()[0]
608
609 def get_data_pool_names(self, refresh = False):
610 if refresh or self.data_pools is None:
611 self.get_pool_names(refresh = True)
612 return self.data_pools.values()
613
614 def get_metadata_pool_name(self):
615 return self.metadata_pool_name
616
617 def set_data_pool_name(self, name):
618 if self.id is not None:
619 raise RuntimeError("can't set filesystem name if its fscid is set")
620 self.data_pool_name = name
621
622 def get_namespace_id(self):
623 return self.id
624
625 def get_pool_df(self, pool_name):
626 """
627 Return a dict like:
628 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
629 """
630 for pool_df in self._df()['pools']:
631 if pool_df['name'] == pool_name:
632 return pool_df['stats']
633
634 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
635
636 def get_usage(self):
637 return self._df()['stats']['total_used_bytes']
638
639 def are_daemons_healthy(self):
640 """
641 Return true if all daemons are in one of active, standby, standby-replay, and
642 at least max_mds daemons are in 'active'.
643
644 Unlike most of Filesystem, this function is tolerant of new-style `fs`
645 commands being missing, because we are part of the ceph installation
646 process during upgrade suites, so must fall back to old style commands
647 when we get an EINVAL on a new style command.
648
649 :return:
650 """
651
652 active_count = 0
653 try:
654 mds_map = self.get_mds_map()
655 except CommandFailedError as cfe:
656 # Old version, fall back to non-multi-fs commands
657 if cfe.exitstatus == errno.EINVAL:
658 mds_map = json.loads(
659 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
660 else:
661 raise
662
663 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
664
665 for mds_id, mds_status in mds_map['info'].items():
666 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
667 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
668 return False
669 elif mds_status['state'] == 'up:active':
670 active_count += 1
671
672 log.info("are_daemons_healthy: {0}/{1}".format(
673 active_count, mds_map['max_mds']
674 ))
675
676 if active_count >= mds_map['max_mds']:
677 # The MDSMap says these guys are active, but let's check they really are
678 for mds_id, mds_status in mds_map['info'].items():
679 if mds_status['state'] == 'up:active':
680 try:
681 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
682 except CommandFailedError as cfe:
683 if cfe.exitstatus == errno.EINVAL:
684 # Old version, can't do this check
685 continue
686 else:
687 # MDS not even running
688 return False
689
690 if daemon_status['state'] != 'up:active':
691 # MDS hasn't taken the latest map yet
692 return False
693
694 return True
695 else:
696 return False
697
698 def get_daemon_names(self, state=None):
699 """
700 Return MDS daemon names of those daemons in the given state
701 :param state:
702 :return:
703 """
704 status = self.get_mds_map()
705 result = []
706 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
707 if mds_status['state'] == state or state is None:
708 result.append(mds_status['name'])
709
710 return result
711
712 def get_active_names(self):
713 """
714 Return MDS daemon names of those daemons holding ranks
715 in state up:active
716
717 :return: list of strings like ['a', 'b'], sorted by rank
718 """
719 return self.get_daemon_names("up:active")
720
721 def get_all_mds_rank(self):
722 status = self.get_mds_map()
723 result = []
724 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
725 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
726 result.append(mds_status['rank'])
727
728 return result
729
730 def get_rank(self, rank=0, status=None):
731 if status is None:
732 status = self.getinfo()
733 return status.get_rank(self.id, rank)
734
735 def get_ranks(self, status=None):
736 if status is None:
737 status = self.getinfo()
738 return status.get_ranks(self.id)
739
740 def get_rank_names(self, status=None):
741 """
742 Return MDS daemon names of those daemons holding a rank,
743 sorted by rank. This includes e.g. up:replay/reconnect
744 as well as active, but does not include standby or
745 standby-replay.
746 """
747 status = self.get_mds_map()
748 result = []
749 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
750 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
751 result.append(mds_status['name'])
752
753 return result
754
755 def wait_for_daemons(self, timeout=None):
756 """
757 Wait until all daemons are healthy
758 :return:
759 """
760
761 if timeout is None:
762 timeout = DAEMON_WAIT_TIMEOUT
763
764 elapsed = 0
765 while True:
766 if self.are_daemons_healthy():
767 return
768 else:
769 time.sleep(1)
770 elapsed += 1
771
772 if elapsed > timeout:
773 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
774
775 def get_lone_mds_id(self):
776 """
777 Get a single MDS ID: the only one if there is only one
778 configured, else the only one currently holding a rank,
779 else raise an error.
780 """
781 if len(self.mds_ids) != 1:
782 alive = self.get_rank_names()
783 if len(alive) == 1:
784 return alive[0]
785 else:
786 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
787 else:
788 return self.mds_ids[0]
789
790 def recreate(self):
791 log.info("Creating new filesystem")
792 self.delete_all_filesystems()
793 self.id = None
794 self.create()
795
796 def put_metadata_object_raw(self, object_id, infile):
797 """
798 Save an object to the metadata pool
799 """
800 temp_bin_path = infile
801 self.client_remote.run(args=[
802 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
803 ])
804
805 def get_metadata_object_raw(self, object_id):
806 """
807 Retrieve an object from the metadata pool and store it in a file.
808 """
809 temp_bin_path = '/tmp/' + object_id + '.bin'
810
811 self.client_remote.run(args=[
812 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
813 ])
814
815 return temp_bin_path
816
817 def get_metadata_object(self, object_type, object_id):
818 """
819 Retrieve an object from the metadata pool, pass it through
820 ceph-dencoder to dump it to JSON, and return the decoded object.
821 """
822 temp_bin_path = '/tmp/out.bin'
823
824 self.client_remote.run(args=[
825 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
826 ])
827
828 stdout = StringIO()
829 self.client_remote.run(args=[
830 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
831 ], stdout=stdout)
832 dump_json = stdout.getvalue().strip()
833 try:
834 dump = json.loads(dump_json)
835 except (TypeError, ValueError):
836 log.error("Failed to decode JSON: '{0}'".format(dump_json))
837 raise
838
839 return dump
840
841 def get_journal_version(self):
842 """
843 Read the JournalPointer and Journal::Header objects to learn the version of
844 encoding in use.
845 """
846 journal_pointer_object = '400.00000000'
847 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
848 journal_ino = journal_pointer_dump['journal_pointer']['front']
849
850 journal_header_object = "{0:x}.00000000".format(journal_ino)
851 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
852
853 version = journal_header_dump['journal_header']['stream_format']
854 log.info("Read journal version {0}".format(version))
855
856 return version
857
858 def mds_asok(self, command, mds_id=None):
859 if mds_id is None:
860 mds_id = self.get_lone_mds_id()
861
862 return self.json_asok(command, 'mds', mds_id)
863
864 def rank_asok(self, command, rank=0):
865 info = self.get_rank(rank=rank)
866 return self.json_asok(command, 'mds', info['name'])
867
868 def read_cache(self, path, depth=None):
869 cmd = ["dump", "tree", path]
870 if depth is not None:
871 cmd.append(depth.__str__())
872 result = self.mds_asok(cmd)
873 if len(result) == 0:
874 raise RuntimeError("Path not found in cache: {0}".format(path))
875
876 return result
877
878 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
879 """
880 Block until the MDS reaches a particular state, or a failure condition
881 is met.
882
883 When there are multiple MDSs, succeed when exaclty one MDS is in the
884 goal state, or fail when any MDS is in the reject state.
885
886 :param goal_state: Return once the MDS is in this state
887 :param reject: Fail if the MDS enters this state before the goal state
888 :param timeout: Fail if this many seconds pass before reaching goal
889 :return: number of seconds waited, rounded down to integer
890 """
891
892 started_at = time.time()
893 while True:
894 status = self.status()
895 if rank is not None:
896 mds_info = status.get_rank(self.id, rank)
897 current_state = mds_info['state'] if mds_info else None
898 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
899 elif mds_id is not None:
900 # mds_info is None if no daemon with this ID exists in the map
901 mds_info = status.get_mds(mds_id)
902 current_state = mds_info['state'] if mds_info else None
903 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
904 else:
905 # In general, look for a single MDS
906 states = [m['state'] for m in status.get_ranks(self.id)]
907 if [s for s in states if s == goal_state] == [goal_state]:
908 current_state = goal_state
909 elif reject in states:
910 current_state = reject
911 else:
912 current_state = None
913 log.info("mapped states {0} to {1}".format(states, current_state))
914
915 elapsed = time.time() - started_at
916 if current_state == goal_state:
917 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
918 return elapsed
919 elif reject is not None and current_state == reject:
920 raise RuntimeError("MDS in reject state {0}".format(current_state))
921 elif timeout is not None and elapsed > timeout:
922 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
923 raise RuntimeError(
924 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
925 elapsed, goal_state, current_state
926 ))
927 else:
928 time.sleep(1)
929
930 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
931 mds_id = self.mds_ids[0]
932 remote = self.mds_daemons[mds_id].remote
933 if pool is None:
934 pool = self.get_data_pool_name()
935
936 obj_name = "{0:x}.00000000".format(ino_no)
937
938 args = [
939 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
940 ]
941 try:
942 proc = remote.run(
943 args=args,
944 stdout=StringIO())
945 except CommandFailedError as e:
946 log.error(e.__str__())
947 raise ObjectNotFound(obj_name)
948
949 data = proc.stdout.getvalue()
950
951 p = remote.run(
952 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
953 stdout=StringIO(),
954 stdin=data
955 )
956
957 return json.loads(p.stdout.getvalue().strip())
958
959 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
960 """
961 Write to an xattr of the 0th data object of an inode. Will
962 succeed whether the object and/or xattr already exist or not.
963
964 :param ino_no: integer inode number
965 :param xattr_name: string name of the xattr
966 :param data: byte array data to write to the xattr
967 :param pool: name of data pool or None to use primary data pool
968 :return: None
969 """
970 remote = self.mds_daemons[self.mds_ids[0]].remote
971 if pool is None:
972 pool = self.get_data_pool_name()
973
974 obj_name = "{0:x}.00000000".format(ino_no)
975 args = [
976 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
977 obj_name, xattr_name, data
978 ]
979 remote.run(
980 args=args,
981 stdout=StringIO())
982
983 def read_backtrace(self, ino_no, pool=None):
984 """
985 Read the backtrace from the data pool, return a dict in the format
986 given by inode_backtrace_t::dump, which is something like:
987
988 ::
989
990 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
991 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
992
993 { "ino": 1099511627778,
994 "ancestors": [
995 { "dirino": 1,
996 "dname": "blah",
997 "version": 11}],
998 "pool": 1,
999 "old_pools": []}
1000
1001 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1002 one data pool and that will be used.
1003 """
1004 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1005
1006 def read_layout(self, ino_no, pool=None):
1007 """
1008 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1009 ::
1010 {
1011 "stripe_unit": 4194304,
1012 "stripe_count": 1,
1013 "object_size": 4194304,
1014 "pool_id": 1,
1015 "pool_ns": "",
1016 }
1017
1018 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1019 one data pool and that will be used.
1020 """
1021 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1022
1023 def _enumerate_data_objects(self, ino, size):
1024 """
1025 Get the list of expected data objects for a range, and the list of objects
1026 that really exist.
1027
1028 :return a tuple of two lists of strings (expected, actual)
1029 """
1030 stripe_size = 1024 * 1024 * 4
1031
1032 size = max(stripe_size, size)
1033
1034 want_objects = [
1035 "{0:x}.{1:08x}".format(ino, n)
1036 for n in range(0, ((size - 1) / stripe_size) + 1)
1037 ]
1038
1039 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1040
1041 return want_objects, exist_objects
1042
1043 def data_objects_present(self, ino, size):
1044 """
1045 Check that *all* the expected data objects for an inode are present in the data pool
1046 """
1047
1048 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1049 missing = set(want_objects) - set(exist_objects)
1050
1051 if missing:
1052 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1053 ino, size, missing
1054 ))
1055 return False
1056 else:
1057 log.info("All objects for ino {0} size {1} found".format(ino, size))
1058 return True
1059
1060 def data_objects_absent(self, ino, size):
1061 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1062 present = set(want_objects) & set(exist_objects)
1063
1064 if present:
1065 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1066 ino, size, present
1067 ))
1068 return False
1069 else:
1070 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1071 return True
1072
1073 def dirfrag_exists(self, ino, frag):
1074 try:
1075 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1076 except CommandFailedError as e:
1077 return False
1078 else:
1079 return True
1080
1081 def rados(self, args, pool=None, namespace=None, stdin_data=None,
1082 stdin_file=None):
1083 """
1084 Call into the `rados` CLI from an MDS
1085 """
1086
1087 if pool is None:
1088 pool = self.get_metadata_pool_name()
1089
1090 # Doesn't matter which MDS we use to run rados commands, they all
1091 # have access to the pools
1092 mds_id = self.mds_ids[0]
1093 remote = self.mds_daemons[mds_id].remote
1094
1095 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1096 # using the `rados` CLI
1097 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1098 (["--namespace", namespace] if namespace else []) +
1099 args)
1100
1101 if stdin_file is not None:
1102 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
1103
1104 p = remote.run(
1105 args=args,
1106 stdin=stdin_data,
1107 stdout=StringIO())
1108 return p.stdout.getvalue().strip()
1109
1110 def list_dirfrag(self, dir_ino):
1111 """
1112 Read the named object and return the list of omap keys
1113
1114 :return a list of 0 or more strings
1115 """
1116
1117 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1118
1119 try:
1120 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1121 except CommandFailedError as e:
1122 log.error(e.__str__())
1123 raise ObjectNotFound(dirfrag_obj_name)
1124
1125 return key_list_str.split("\n") if key_list_str else []
1126
1127 def erase_metadata_objects(self, prefix):
1128 """
1129 For all objects in the metadata pool matching the prefix,
1130 erase them.
1131
1132 This O(N) with the number of objects in the pool, so only suitable
1133 for use on toy test filesystems.
1134 """
1135 all_objects = self.rados(["ls"]).split("\n")
1136 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1137 for o in matching_objects:
1138 self.rados(["rm", o])
1139
1140 def erase_mds_objects(self, rank):
1141 """
1142 Erase all the per-MDS objects for a particular rank. This includes
1143 inotable, sessiontable, journal
1144 """
1145
1146 def obj_prefix(multiplier):
1147 """
1148 MDS object naming conventions like rank 1's
1149 journal is at 201.***
1150 """
1151 return "%x." % (multiplier * 0x100 + rank)
1152
1153 # MDS_INO_LOG_OFFSET
1154 self.erase_metadata_objects(obj_prefix(2))
1155 # MDS_INO_LOG_BACKUP_OFFSET
1156 self.erase_metadata_objects(obj_prefix(3))
1157 # MDS_INO_LOG_POINTER_OFFSET
1158 self.erase_metadata_objects(obj_prefix(4))
1159 # MDSTables & SessionMap
1160 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1161
1162 @property
1163 def _prefix(self):
1164 """
1165 Override this to set a different
1166 """
1167 return ""
1168
1169 def _run_tool(self, tool, args, rank=None, quiet=False):
1170 # Tests frequently have [client] configuration that jacks up
1171 # the objecter log level (unlikely to be interesting here)
1172 # and does not set the mds log level (very interesting here)
1173 if quiet:
1174 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1175 else:
1176 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1177
1178 if rank is not None:
1179 base_args.extend(["--rank", "%d" % rank])
1180
1181 t1 = datetime.datetime.now()
1182 r = self.tool_remote.run(
1183 args=base_args + args,
1184 stdout=StringIO()).stdout.getvalue().strip()
1185 duration = datetime.datetime.now() - t1
1186 log.info("Ran {0} in time {1}, result:\n{2}".format(
1187 base_args + args, duration, r
1188 ))
1189 return r
1190
1191 @property
1192 def tool_remote(self):
1193 """
1194 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1195 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1196 so that tests can use this remote to go get locally written output files from the tools.
1197 """
1198 mds_id = self.mds_ids[0]
1199 return self.mds_daemons[mds_id].remote
1200
1201 def journal_tool(self, args, rank=None, quiet=False):
1202 """
1203 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1204 """
1205 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1206
1207 def table_tool(self, args, quiet=False):
1208 """
1209 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1210 """
1211 return self._run_tool("cephfs-table-tool", args, None, quiet)
1212
1213 def data_scan(self, args, quiet=False, worker_count=1):
1214 """
1215 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1216
1217 :param worker_count: if greater than 1, multiple workers will be run
1218 in parallel and the return value will be None
1219 """
1220
1221 workers = []
1222
1223 for n in range(0, worker_count):
1224 if worker_count > 1:
1225 # data-scan args first token is a command, followed by args to it.
1226 # insert worker arguments after the command.
1227 cmd = args[0]
1228 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1229 else:
1230 worker_args = args
1231
1232 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1233 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1234
1235 for w in workers:
1236 w.get()
1237
1238 if worker_count == 1:
1239 return workers[0].value
1240 else:
1241 return None
1242
1243 def is_full(self):
1244 return self.is_pool_full(self.get_data_pool_name())