]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
update sources to v12.2.1
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae
FG
1
2from StringIO import StringIO
3import json
4import logging
5from gevent import Greenlet
6import os
7import time
8import datetime
9import re
10import errno
181888fb 11import random
7c673cae
FG
12
13from teuthology.exceptions import CommandFailedError
14from teuthology import misc
15from teuthology.nuke import clear_firewall
16from teuthology.parallel import parallel
17from tasks.ceph_manager import write_conf
18from tasks import ceph_manager
19
20
21log = logging.getLogger(__name__)
22
23
24DAEMON_WAIT_TIMEOUT = 120
25ROOT_INO = 1
26
27
28class ObjectNotFound(Exception):
29 def __init__(self, object_name):
30 self._object_name = object_name
31
32 def __str__(self):
33 return "Object not found: '{0}'".format(self._object_name)
34
35class FSStatus(object):
36 """
37 Operations on a snapshot of the FSMap.
38 """
39 def __init__(self, mon_manager):
40 self.mon = mon_manager
41 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
42
43 def __str__(self):
44 return json.dumps(self.map, indent = 2, sort_keys = True)
45
46 # Expose the fsmap for manual inspection.
47 def __getitem__(self, key):
48 """
49 Get a field from the fsmap.
50 """
51 return self.map[key]
52
53 def get_filesystems(self):
54 """
55 Iterator for all filesystems.
56 """
57 for fs in self.map['filesystems']:
58 yield fs
59
60 def get_all(self):
61 """
62 Iterator for all the mds_info components in the FSMap.
63 """
64 for info in self.get_standbys():
65 yield info
66 for fs in self.map['filesystems']:
67 for info in fs['mdsmap']['info'].values():
68 yield info
69
70 def get_standbys(self):
71 """
72 Iterator for all standbys.
73 """
74 for info in self.map['standbys']:
75 yield info
76
77 def get_fsmap(self, fscid):
78 """
79 Get the fsmap for the given FSCID.
80 """
81 for fs in self.map['filesystems']:
82 if fscid is None or fs['id'] == fscid:
83 return fs
84 raise RuntimeError("FSCID {0} not in map".format(fscid))
85
86 def get_fsmap_byname(self, name):
87 """
88 Get the fsmap for the given file system name.
89 """
90 for fs in self.map['filesystems']:
91 if name is None or fs['mdsmap']['fs_name'] == name:
92 return fs
93 raise RuntimeError("FS {0} not in map".format(name))
94
95 def get_replays(self, fscid):
96 """
97 Get the standby:replay MDS for the given FSCID.
98 """
99 fs = self.get_fsmap(fscid)
100 for info in fs['mdsmap']['info'].values():
101 if info['state'] == 'up:standby-replay':
102 yield info
103
104 def get_ranks(self, fscid):
105 """
106 Get the ranks for the given FSCID.
107 """
108 fs = self.get_fsmap(fscid)
109 for info in fs['mdsmap']['info'].values():
110 if info['rank'] >= 0:
111 yield info
112
113 def get_rank(self, fscid, rank):
114 """
115 Get the rank for the given FSCID.
116 """
117 for info in self.get_ranks(fscid):
118 if info['rank'] == rank:
119 return info
120 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
121
122 def get_mds(self, name):
123 """
124 Get the info for the given MDS name.
125 """
126 for info in self.get_all():
127 if info['name'] == name:
128 return info
129 return None
130
131 def get_mds_addr(self, name):
132 """
133 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
134 """
135 info = self.get_mds(name)
136 if info:
137 return info['addr']
138 else:
139 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
140 raise RuntimeError("MDS id '{0}' not found in map".format(name))
141
142class CephCluster(object):
143 @property
144 def admin_remote(self):
145 first_mon = misc.get_first_mon(self._ctx, None)
146 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
147 return result
148
149 def __init__(self, ctx):
150 self._ctx = ctx
151 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
152
153 def get_config(self, key, service_type=None):
154 """
155 Get config from mon by default, or a specific service if caller asks for it
156 """
157 if service_type is None:
158 service_type = 'mon'
159
160 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
161 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
162
163 def set_ceph_conf(self, subsys, key, value):
164 if subsys not in self._ctx.ceph['ceph'].conf:
165 self._ctx.ceph['ceph'].conf[subsys] = {}
166 self._ctx.ceph['ceph'].conf[subsys][key] = value
167 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
168 # used a different config path this won't work.
169
170 def clear_ceph_conf(self, subsys, key):
171 del self._ctx.ceph['ceph'].conf[subsys][key]
172 write_conf(self._ctx)
173
174 def json_asok(self, command, service_type, service_id):
175 proc = self.mon_manager.admin_socket(service_type, service_id, command)
176 response_data = proc.stdout.getvalue()
177 log.info("_json_asok output: {0}".format(response_data))
178 if response_data.strip():
179 return json.loads(response_data)
180 else:
181 return None
182
183
184class MDSCluster(CephCluster):
185 """
186 Collective operations on all the MDS daemons in the Ceph cluster. These
187 daemons may be in use by various Filesystems.
188
189 For the benefit of pre-multi-filesystem tests, this class is also
190 a parent of Filesystem. The correct way to use MDSCluster going forward is
191 as a separate instance outside of your (multiple) Filesystem instances.
192 """
193 def __init__(self, ctx):
194 super(MDSCluster, self).__init__(ctx)
195
196 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
197
198 if len(self.mds_ids) == 0:
199 raise RuntimeError("This task requires at least one MDS")
200
201 if hasattr(self._ctx, "daemons"):
202 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
203 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
204
205 def _one_or_all(self, mds_id, cb, in_parallel=True):
206 """
207 Call a callback for a single named MDS, or for all.
208
209 Note that the parallelism here isn't for performance, it's to avoid being overly kind
210 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
211 avoid being overly kind by executing them in a particular order. However, some actions
212 don't cope with being done in parallel, so it's optional (`in_parallel`)
213
214 :param mds_id: MDS daemon name, or None
215 :param cb: Callback taking single argument of MDS daemon name
216 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
217 """
218 if mds_id is None:
219 if in_parallel:
220 with parallel() as p:
221 for mds_id in self.mds_ids:
222 p.spawn(cb, mds_id)
223 else:
224 for mds_id in self.mds_ids:
225 cb(mds_id)
226 else:
227 cb(mds_id)
228
181888fb
FG
229 def get_config(self, key, service_type=None):
230 """
231 get_config specialization of service_type="mds"
232 """
233 if service_type != "mds":
234 return super(MDSCluster, self).get_config(key, service_type)
235
236 # Some tests stop MDS daemons, don't send commands to a dead one:
237 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
7c673cae
FG
240 def mds_stop(self, mds_id=None):
241 """
242 Stop the MDS daemon process(se). If it held a rank, that rank
243 will eventually go laggy.
244 """
245 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
246
247 def mds_fail(self, mds_id=None):
248 """
249 Inform MDSMonitor of the death of the daemon process(es). If it held
250 a rank, that rank will be relinquished.
251 """
252 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
253
254 def mds_restart(self, mds_id=None):
255 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
256
257 def mds_fail_restart(self, mds_id=None):
258 """
259 Variation on restart that includes marking MDSs as failed, so that doing this
260 operation followed by waiting for healthy daemon states guarantees that they
261 have gone down and come up, rather than potentially seeing the healthy states
262 that existed before the restart.
263 """
264 def _fail_restart(id_):
265 self.mds_daemons[id_].stop()
266 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
267 self.mds_daemons[id_].restart()
268
269 self._one_or_all(mds_id, _fail_restart)
270
181888fb
FG
271 def newfs(self, name='cephfs', create=True):
272 return Filesystem(self._ctx, name=name, create=create)
7c673cae
FG
273
274 def status(self):
275 return FSStatus(self.mon_manager)
276
277 def delete_all_filesystems(self):
278 """
279 Remove all filesystems that exist, and any pools in use by them.
280 """
281 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
282 pool_id_name = {}
283 for pool in pools:
284 pool_id_name[pool['pool']] = pool['pool_name']
285
286 # mark cluster down for each fs to prevent churn during deletion
287 status = self.status()
288 for fs in status.get_filesystems():
289 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
290
291 # get a new copy as actives may have since changed
292 status = self.status()
293 for fs in status.get_filesystems():
294 mdsmap = fs['mdsmap']
295 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
296
297 for gid in mdsmap['up'].values():
298 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
299
300 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
301 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
302 metadata_pool, metadata_pool,
303 '--yes-i-really-really-mean-it')
304 for data_pool in mdsmap['data_pools']:
305 data_pool = pool_id_name[data_pool]
306 try:
307 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
308 data_pool, data_pool,
309 '--yes-i-really-really-mean-it')
310 except CommandFailedError as e:
311 if e.exitstatus == 16: # EBUSY, this data pool is used
312 pass # by two metadata pools, let the 2nd
313 else: # pass delete it
314 raise
315
316 def get_standby_daemons(self):
317 return set([s['name'] for s in self.status().get_standbys()])
318
319 def get_mds_hostnames(self):
320 result = set()
321 for mds_id in self.mds_ids:
322 mds_remote = self.mon_manager.find_remote('mds', mds_id)
323 result.add(mds_remote.hostname)
324
325 return list(result)
326
327 def set_clients_block(self, blocked, mds_id=None):
328 """
329 Block (using iptables) client communications to this MDS. Be careful: if
330 other services are running on this MDS, or other MDSs try to talk to this
331 MDS, their communications may also be blocked as collatoral damage.
332
333 :param mds_id: Optional ID of MDS to block, default to all
334 :return:
335 """
336 da_flag = "-A" if blocked else "-D"
337
338 def set_block(_mds_id):
339 remote = self.mon_manager.find_remote('mds', _mds_id)
340 status = self.status()
341
342 addr = status.get_mds_addr(_mds_id)
343 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
344
345 remote.run(
346 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
347 "comment", "--comment", "teuthology"])
348 remote.run(
349 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
350 "comment", "--comment", "teuthology"])
351
352 self._one_or_all(mds_id, set_block, in_parallel=False)
353
354 def clear_firewall(self):
355 clear_firewall(self._ctx)
356
357 def get_mds_info(self, mds_id):
358 return FSStatus(self.mon_manager).get_mds(mds_id)
359
360 def is_full(self):
361 flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
362 return 'full' in flags
363
364 def is_pool_full(self, pool_name):
365 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
366 for pool in pools:
367 if pool['pool_name'] == pool_name:
368 return 'full' in pool['flags_names'].split(",")
369
370 raise RuntimeError("Pool not found '{0}'".format(pool_name))
371
372class Filesystem(MDSCluster):
373 """
374 This object is for driving a CephFS filesystem. The MDS daemons driven by
375 MDSCluster may be shared with other Filesystems.
376 """
181888fb 377 def __init__(self, ctx, fscid=None, name=None, create=False):
7c673cae
FG
378 super(Filesystem, self).__init__(ctx)
379
181888fb 380 self.name = name
7c673cae 381 self.id = None
7c673cae 382 self.metadata_pool_name = None
181888fb
FG
383 self.metadata_overlay = False
384 self.data_pool_name = None
7c673cae
FG
385 self.data_pools = None
386
387 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
388 self.client_id = client_list[0]
389 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
390
181888fb 391 if name is not None:
7c673cae
FG
392 if fscid is not None:
393 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 394 if create and not self.legacy_configured():
7c673cae 395 self.create()
181888fb
FG
396 else:
397 if fscid is not None:
398 self.id = fscid
399 self.getinfo(refresh = True)
7c673cae
FG
400
401 # Stash a reference to the first created filesystem on ctx, so
402 # that if someone drops to the interactive shell they can easily
403 # poke our methods.
404 if not hasattr(self._ctx, "filesystem"):
405 self._ctx.filesystem = self
406
407 def getinfo(self, refresh = False):
408 status = self.status()
409 if self.id is not None:
410 fsmap = status.get_fsmap(self.id)
411 elif self.name is not None:
412 fsmap = status.get_fsmap_byname(self.name)
413 else:
414 fss = [fs for fs in status.get_filesystems()]
415 if len(fss) == 1:
416 fsmap = fss[0]
417 elif len(fss) == 0:
418 raise RuntimeError("no file system available")
419 else:
420 raise RuntimeError("more than one file system available")
421 self.id = fsmap['id']
422 self.name = fsmap['mdsmap']['fs_name']
423 self.get_pool_names(status = status, refresh = refresh)
424 return status
425
181888fb
FG
426 def set_metadata_overlay(self, overlay):
427 if self.id is not None:
428 raise RuntimeError("cannot specify fscid when configuring overlay")
429 self.metadata_overlay = overlay
430
7c673cae
FG
431 def deactivate(self, rank):
432 if rank < 0:
433 raise RuntimeError("invalid rank")
434 elif rank == 0:
435 raise RuntimeError("cannot deactivate rank 0")
436 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
437
438 def set_max_mds(self, max_mds):
439 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
440
441 def set_allow_dirfrags(self, yes):
442 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
443
7c673cae
FG
444 def get_pgs_per_fs_pool(self):
445 """
446 Calculate how many PGs to use when creating a pool, in order to avoid raising any
447 health warnings about mon_pg_warn_min_per_osd
448
449 :return: an integer number of PGs
450 """
451 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
452 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
453 return pg_warn_min_per_osd * osd_count
454
455 def create(self):
456 if self.name is None:
457 self.name = "cephfs"
458 if self.metadata_pool_name is None:
459 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
460 if self.data_pool_name is None:
461 data_pool_name = "{0}_data".format(self.name)
462 else:
463 data_pool_name = self.data_pool_name
7c673cae
FG
464
465 log.info("Creating filesystem '{0}'".format(self.name))
466
467 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
468
469 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
470 self.metadata_pool_name, pgs_per_fs_pool.__str__())
181888fb
FG
471 if self.metadata_overlay:
472 self.mon_manager.raw_cluster_cmd('fs', 'new',
473 self.name, self.metadata_pool_name, data_pool_name,
474 '--allow-dangerous-metadata-overlay')
475 else:
476 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
477 data_pool_name, pgs_per_fs_pool.__str__())
478 self.mon_manager.raw_cluster_cmd('fs', 'new',
479 self.name, self.metadata_pool_name, data_pool_name)
35e4c445
FG
480 self.check_pool_application(self.metadata_pool_name)
481 self.check_pool_application(data_pool_name)
7c673cae 482 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
483 try:
484 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
485 except CommandFailedError as e:
486 if e.exitstatus == 22:
487 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
488 pass
489 else:
490 raise
7c673cae
FG
491
492 self.getinfo(refresh = True)
493
35e4c445
FG
494
495 def check_pool_application(self, pool_name):
496 osd_map = self.mon_manager.get_osd_dump_json()
497 for pool in osd_map['pools']:
498 if pool['pool_name'] == pool_name:
499 if "application_metadata" in pool:
500 if not "cephfs" in pool['application_metadata']:
501 raise RuntimeError("Pool %p does not name cephfs as application!".\
502 format(pool_name))
503
504
7c673cae
FG
505 def __del__(self):
506 if getattr(self._ctx, "filesystem", None) == self:
507 delattr(self._ctx, "filesystem")
508
509 def exists(self):
510 """
511 Whether a filesystem exists in the mon's filesystem list
512 """
513 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
514 return self.name in [fs['name'] for fs in fs_list]
515
516 def legacy_configured(self):
517 """
518 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
519 the case, the caller should avoid using Filesystem.create
520 """
521 try:
522 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
523 pools = json.loads(out_text)
524 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
525 if metadata_pool_exists:
526 self.metadata_pool_name = 'metadata'
527 except CommandFailedError as e:
528 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
529 # structured output (--format) from the CLI.
530 if e.exitstatus == 22:
531 metadata_pool_exists = True
532 else:
533 raise
534
535 return metadata_pool_exists
536
537 def _df(self):
538 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
539
540 def get_mds_map(self):
541 return self.status().get_fsmap(self.id)['mdsmap']
542
543 def add_data_pool(self, name):
544 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
545 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
546 self.get_pool_names(refresh = True)
547 for poolid, fs_name in self.data_pools.items():
548 if name == fs_name:
549 return poolid
550 raise RuntimeError("could not get just created pool '{0}'".format(name))
551
552 def get_pool_names(self, refresh = False, status = None):
553 if refresh or self.metadata_pool_name is None or self.data_pools is None:
554 if status is None:
555 status = self.status()
556 fsmap = status.get_fsmap(self.id)
557
558 osd_map = self.mon_manager.get_osd_dump_json()
559 id_to_name = {}
560 for p in osd_map['pools']:
561 id_to_name[p['pool']] = p['pool_name']
562
563 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
564 self.data_pools = {}
565 for data_pool in fsmap['mdsmap']['data_pools']:
566 self.data_pools[data_pool] = id_to_name[data_pool]
567
568 def get_data_pool_name(self, refresh = False):
569 if refresh or self.data_pools is None:
570 self.get_pool_names(refresh = True)
571 assert(len(self.data_pools) == 1)
572 return self.data_pools.values()[0]
573
574 def get_data_pool_id(self, refresh = False):
575 """
576 Don't call this if you have multiple data pools
577 :return: integer
578 """
579 if refresh or self.data_pools is None:
580 self.get_pool_names(refresh = True)
581 assert(len(self.data_pools) == 1)
582 return self.data_pools.keys()[0]
583
584 def get_data_pool_names(self, refresh = False):
585 if refresh or self.data_pools is None:
586 self.get_pool_names(refresh = True)
587 return self.data_pools.values()
588
589 def get_metadata_pool_name(self):
590 return self.metadata_pool_name
591
181888fb
FG
592 def set_data_pool_name(self, name):
593 if self.id is not None:
594 raise RuntimeError("can't set filesystem name if its fscid is set")
595 self.data_pool_name = name
596
7c673cae
FG
597 def get_namespace_id(self):
598 return self.id
599
600 def get_pool_df(self, pool_name):
601 """
602 Return a dict like:
603 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
604 """
605 for pool_df in self._df()['pools']:
606 if pool_df['name'] == pool_name:
607 return pool_df['stats']
608
609 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
610
611 def get_usage(self):
612 return self._df()['stats']['total_used_bytes']
613
614 def are_daemons_healthy(self):
615 """
616 Return true if all daemons are in one of active, standby, standby-replay, and
617 at least max_mds daemons are in 'active'.
618
619 Unlike most of Filesystem, this function is tolerant of new-style `fs`
620 commands being missing, because we are part of the ceph installation
621 process during upgrade suites, so must fall back to old style commands
622 when we get an EINVAL on a new style command.
623
624 :return:
625 """
626
627 active_count = 0
628 try:
629 mds_map = self.get_mds_map()
630 except CommandFailedError as cfe:
631 # Old version, fall back to non-multi-fs commands
632 if cfe.exitstatus == errno.EINVAL:
633 mds_map = json.loads(
634 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
635 else:
636 raise
637
638 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
639
640 for mds_id, mds_status in mds_map['info'].items():
641 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
642 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
643 return False
644 elif mds_status['state'] == 'up:active':
645 active_count += 1
646
647 log.info("are_daemons_healthy: {0}/{1}".format(
648 active_count, mds_map['max_mds']
649 ))
650
651 if active_count >= mds_map['max_mds']:
652 # The MDSMap says these guys are active, but let's check they really are
653 for mds_id, mds_status in mds_map['info'].items():
654 if mds_status['state'] == 'up:active':
655 try:
656 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
657 except CommandFailedError as cfe:
658 if cfe.exitstatus == errno.EINVAL:
659 # Old version, can't do this check
660 continue
661 else:
662 # MDS not even running
663 return False
664
665 if daemon_status['state'] != 'up:active':
666 # MDS hasn't taken the latest map yet
667 return False
668
669 return True
670 else:
671 return False
672
673 def get_daemon_names(self, state=None):
674 """
675 Return MDS daemon names of those daemons in the given state
676 :param state:
677 :return:
678 """
679 status = self.get_mds_map()
680 result = []
681 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
682 if mds_status['state'] == state or state is None:
683 result.append(mds_status['name'])
684
685 return result
686
687 def get_active_names(self):
688 """
689 Return MDS daemon names of those daemons holding ranks
690 in state up:active
691
692 :return: list of strings like ['a', 'b'], sorted by rank
693 """
694 return self.get_daemon_names("up:active")
695
696 def get_all_mds_rank(self):
697 status = self.get_mds_map()
698 result = []
699 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
700 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
701 result.append(mds_status['rank'])
702
703 return result
704
705 def get_rank_names(self):
706 """
707 Return MDS daemon names of those daemons holding a rank,
708 sorted by rank. This includes e.g. up:replay/reconnect
709 as well as active, but does not include standby or
710 standby-replay.
711 """
712 status = self.get_mds_map()
713 result = []
714 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
715 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
716 result.append(mds_status['name'])
717
718 return result
719
720 def wait_for_daemons(self, timeout=None):
721 """
722 Wait until all daemons are healthy
723 :return:
724 """
725
726 if timeout is None:
727 timeout = DAEMON_WAIT_TIMEOUT
728
729 elapsed = 0
730 while True:
731 if self.are_daemons_healthy():
732 return
733 else:
734 time.sleep(1)
735 elapsed += 1
736
737 if elapsed > timeout:
738 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
739
740 def get_lone_mds_id(self):
741 """
742 Get a single MDS ID: the only one if there is only one
743 configured, else the only one currently holding a rank,
744 else raise an error.
745 """
746 if len(self.mds_ids) != 1:
747 alive = self.get_rank_names()
748 if len(alive) == 1:
749 return alive[0]
750 else:
751 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
752 else:
753 return self.mds_ids[0]
754
755 def recreate(self):
756 log.info("Creating new filesystem")
757 self.delete_all_filesystems()
758 self.id = None
759 self.create()
760
761 def put_metadata_object_raw(self, object_id, infile):
762 """
763 Save an object to the metadata pool
764 """
765 temp_bin_path = infile
766 self.client_remote.run(args=[
767 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
768 ])
769
770 def get_metadata_object_raw(self, object_id):
771 """
772 Retrieve an object from the metadata pool and store it in a file.
773 """
774 temp_bin_path = '/tmp/' + object_id + '.bin'
775
776 self.client_remote.run(args=[
777 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
778 ])
779
780 return temp_bin_path
781
782 def get_metadata_object(self, object_type, object_id):
783 """
784 Retrieve an object from the metadata pool, pass it through
785 ceph-dencoder to dump it to JSON, and return the decoded object.
786 """
787 temp_bin_path = '/tmp/out.bin'
788
789 self.client_remote.run(args=[
790 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
791 ])
792
793 stdout = StringIO()
794 self.client_remote.run(args=[
795 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
796 ], stdout=stdout)
797 dump_json = stdout.getvalue().strip()
798 try:
799 dump = json.loads(dump_json)
800 except (TypeError, ValueError):
801 log.error("Failed to decode JSON: '{0}'".format(dump_json))
802 raise
803
804 return dump
805
806 def get_journal_version(self):
807 """
808 Read the JournalPointer and Journal::Header objects to learn the version of
809 encoding in use.
810 """
811 journal_pointer_object = '400.00000000'
812 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
813 journal_ino = journal_pointer_dump['journal_pointer']['front']
814
815 journal_header_object = "{0:x}.00000000".format(journal_ino)
816 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
817
818 version = journal_header_dump['journal_header']['stream_format']
819 log.info("Read journal version {0}".format(version))
820
821 return version
822
823 def mds_asok(self, command, mds_id=None):
824 if mds_id is None:
825 mds_id = self.get_lone_mds_id()
826
827 return self.json_asok(command, 'mds', mds_id)
828
829 def read_cache(self, path, depth=None):
830 cmd = ["dump", "tree", path]
831 if depth is not None:
832 cmd.append(depth.__str__())
833 result = self.mds_asok(cmd)
834 if len(result) == 0:
835 raise RuntimeError("Path not found in cache: {0}".format(path))
836
837 return result
838
839 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
840 """
841 Block until the MDS reaches a particular state, or a failure condition
842 is met.
843
844 When there are multiple MDSs, succeed when exaclty one MDS is in the
845 goal state, or fail when any MDS is in the reject state.
846
847 :param goal_state: Return once the MDS is in this state
848 :param reject: Fail if the MDS enters this state before the goal state
849 :param timeout: Fail if this many seconds pass before reaching goal
850 :return: number of seconds waited, rounded down to integer
851 """
852
853 started_at = time.time()
854 while True:
855 status = self.status()
856 if rank is not None:
857 mds_info = status.get_rank(self.id, rank)
858 current_state = mds_info['state'] if mds_info else None
859 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
860 elif mds_id is not None:
861 # mds_info is None if no daemon with this ID exists in the map
862 mds_info = status.get_mds(mds_id)
863 current_state = mds_info['state'] if mds_info else None
864 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
865 else:
866 # In general, look for a single MDS
867 states = [m['state'] for m in status.get_ranks(self.id)]
868 if [s for s in states if s == goal_state] == [goal_state]:
869 current_state = goal_state
870 elif reject in states:
871 current_state = reject
872 else:
873 current_state = None
874 log.info("mapped states {0} to {1}".format(states, current_state))
875
876 elapsed = time.time() - started_at
877 if current_state == goal_state:
878 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
879 return elapsed
880 elif reject is not None and current_state == reject:
881 raise RuntimeError("MDS in reject state {0}".format(current_state))
882 elif timeout is not None and elapsed > timeout:
883 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
884 raise RuntimeError(
885 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
886 elapsed, goal_state, current_state
887 ))
888 else:
889 time.sleep(1)
890
891 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
892 mds_id = self.mds_ids[0]
893 remote = self.mds_daemons[mds_id].remote
894 if pool is None:
895 pool = self.get_data_pool_name()
896
897 obj_name = "{0:x}.00000000".format(ino_no)
898
899 args = [
900 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
901 ]
902 try:
903 proc = remote.run(
904 args=args,
905 stdout=StringIO())
906 except CommandFailedError as e:
907 log.error(e.__str__())
908 raise ObjectNotFound(obj_name)
909
910 data = proc.stdout.getvalue()
911
912 p = remote.run(
913 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
914 stdout=StringIO(),
915 stdin=data
916 )
917
918 return json.loads(p.stdout.getvalue().strip())
919
920 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
921 """
922 Write to an xattr of the 0th data object of an inode. Will
923 succeed whether the object and/or xattr already exist or not.
924
925 :param ino_no: integer inode number
926 :param xattr_name: string name of the xattr
927 :param data: byte array data to write to the xattr
928 :param pool: name of data pool or None to use primary data pool
929 :return: None
930 """
931 remote = self.mds_daemons[self.mds_ids[0]].remote
932 if pool is None:
933 pool = self.get_data_pool_name()
934
935 obj_name = "{0:x}.00000000".format(ino_no)
936 args = [
937 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
938 obj_name, xattr_name, data
939 ]
940 remote.run(
941 args=args,
942 stdout=StringIO())
943
944 def read_backtrace(self, ino_no, pool=None):
945 """
946 Read the backtrace from the data pool, return a dict in the format
947 given by inode_backtrace_t::dump, which is something like:
948
949 ::
950
951 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
952 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
953
954 { "ino": 1099511627778,
955 "ancestors": [
956 { "dirino": 1,
957 "dname": "blah",
958 "version": 11}],
959 "pool": 1,
960 "old_pools": []}
961
962 :param pool: name of pool to read backtrace from. If omitted, FS must have only
963 one data pool and that will be used.
964 """
965 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
966
967 def read_layout(self, ino_no, pool=None):
968 """
969 Read 'layout' xattr of an inode and parse the result, returning a dict like:
970 ::
971 {
972 "stripe_unit": 4194304,
973 "stripe_count": 1,
974 "object_size": 4194304,
975 "pool_id": 1,
976 "pool_ns": "",
977 }
978
979 :param pool: name of pool to read backtrace from. If omitted, FS must have only
980 one data pool and that will be used.
981 """
982 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
983
984 def _enumerate_data_objects(self, ino, size):
985 """
986 Get the list of expected data objects for a range, and the list of objects
987 that really exist.
988
989 :return a tuple of two lists of strings (expected, actual)
990 """
991 stripe_size = 1024 * 1024 * 4
992
993 size = max(stripe_size, size)
994
995 want_objects = [
996 "{0:x}.{1:08x}".format(ino, n)
997 for n in range(0, ((size - 1) / stripe_size) + 1)
998 ]
999
1000 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1001
1002 return want_objects, exist_objects
1003
1004 def data_objects_present(self, ino, size):
1005 """
1006 Check that *all* the expected data objects for an inode are present in the data pool
1007 """
1008
1009 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1010 missing = set(want_objects) - set(exist_objects)
1011
1012 if missing:
1013 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1014 ino, size, missing
1015 ))
1016 return False
1017 else:
1018 log.info("All objects for ino {0} size {1} found".format(ino, size))
1019 return True
1020
1021 def data_objects_absent(self, ino, size):
1022 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1023 present = set(want_objects) & set(exist_objects)
1024
1025 if present:
1026 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1027 ino, size, present
1028 ))
1029 return False
1030 else:
1031 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1032 return True
1033
1034 def dirfrag_exists(self, ino, frag):
1035 try:
1036 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1037 except CommandFailedError as e:
1038 return False
1039 else:
1040 return True
1041
1042 def rados(self, args, pool=None, namespace=None, stdin_data=None):
1043 """
1044 Call into the `rados` CLI from an MDS
1045 """
1046
1047 if pool is None:
1048 pool = self.get_metadata_pool_name()
1049
1050 # Doesn't matter which MDS we use to run rados commands, they all
1051 # have access to the pools
1052 mds_id = self.mds_ids[0]
1053 remote = self.mds_daemons[mds_id].remote
1054
1055 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1056 # using the `rados` CLI
1057 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1058 (["--namespace", namespace] if namespace else []) +
1059 args)
1060 p = remote.run(
1061 args=args,
1062 stdin=stdin_data,
1063 stdout=StringIO())
1064 return p.stdout.getvalue().strip()
1065
1066 def list_dirfrag(self, dir_ino):
1067 """
1068 Read the named object and return the list of omap keys
1069
1070 :return a list of 0 or more strings
1071 """
1072
1073 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1074
1075 try:
1076 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1077 except CommandFailedError as e:
1078 log.error(e.__str__())
1079 raise ObjectNotFound(dirfrag_obj_name)
1080
1081 return key_list_str.split("\n") if key_list_str else []
1082
1083 def erase_metadata_objects(self, prefix):
1084 """
1085 For all objects in the metadata pool matching the prefix,
1086 erase them.
1087
1088 This O(N) with the number of objects in the pool, so only suitable
1089 for use on toy test filesystems.
1090 """
1091 all_objects = self.rados(["ls"]).split("\n")
1092 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1093 for o in matching_objects:
1094 self.rados(["rm", o])
1095
1096 def erase_mds_objects(self, rank):
1097 """
1098 Erase all the per-MDS objects for a particular rank. This includes
1099 inotable, sessiontable, journal
1100 """
1101
1102 def obj_prefix(multiplier):
1103 """
1104 MDS object naming conventions like rank 1's
1105 journal is at 201.***
1106 """
1107 return "%x." % (multiplier * 0x100 + rank)
1108
1109 # MDS_INO_LOG_OFFSET
1110 self.erase_metadata_objects(obj_prefix(2))
1111 # MDS_INO_LOG_BACKUP_OFFSET
1112 self.erase_metadata_objects(obj_prefix(3))
1113 # MDS_INO_LOG_POINTER_OFFSET
1114 self.erase_metadata_objects(obj_prefix(4))
1115 # MDSTables & SessionMap
1116 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1117
1118 @property
1119 def _prefix(self):
1120 """
1121 Override this to set a different
1122 """
1123 return ""
1124
1125 def _run_tool(self, tool, args, rank=None, quiet=False):
1126 # Tests frequently have [client] configuration that jacks up
1127 # the objecter log level (unlikely to be interesting here)
1128 # and does not set the mds log level (very interesting here)
1129 if quiet:
1130 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1131 else:
1132 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1133
1134 if rank is not None:
1135 base_args.extend(["--rank", "%d" % rank])
1136
1137 t1 = datetime.datetime.now()
1138 r = self.tool_remote.run(
1139 args=base_args + args,
1140 stdout=StringIO()).stdout.getvalue().strip()
1141 duration = datetime.datetime.now() - t1
1142 log.info("Ran {0} in time {1}, result:\n{2}".format(
1143 base_args + args, duration, r
1144 ))
1145 return r
1146
1147 @property
1148 def tool_remote(self):
1149 """
1150 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1151 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1152 so that tests can use this remote to go get locally written output files from the tools.
1153 """
1154 mds_id = self.mds_ids[0]
1155 return self.mds_daemons[mds_id].remote
1156
1157 def journal_tool(self, args, rank=None, quiet=False):
1158 """
1159 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1160 """
1161 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1162
1163 def table_tool(self, args, quiet=False):
1164 """
1165 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1166 """
1167 return self._run_tool("cephfs-table-tool", args, None, quiet)
1168
1169 def data_scan(self, args, quiet=False, worker_count=1):
1170 """
1171 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1172
1173 :param worker_count: if greater than 1, multiple workers will be run
1174 in parallel and the return value will be None
1175 """
1176
1177 workers = []
1178
1179 for n in range(0, worker_count):
1180 if worker_count > 1:
1181 # data-scan args first token is a command, followed by args to it.
1182 # insert worker arguments after the command.
1183 cmd = args[0]
1184 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1185 else:
1186 worker_args = args
1187
1188 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1189 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1190
1191 for w in workers:
1192 w.get()
1193
1194 if worker_count == 1:
1195 return workers[0].value
1196 else:
1197 return None