]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
update sources to v12.1.4
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11
12 from teuthology.exceptions import CommandFailedError
13 from teuthology import misc
14 from teuthology.nuke import clear_firewall
15 from teuthology.parallel import parallel
16 from tasks.ceph_manager import write_conf
17 from tasks import ceph_manager
18
19
20 log = logging.getLogger(__name__)
21
22
23 DAEMON_WAIT_TIMEOUT = 120
24 ROOT_INO = 1
25
26
27 class ObjectNotFound(Exception):
28 def __init__(self, object_name):
29 self._object_name = object_name
30
31 def __str__(self):
32 return "Object not found: '{0}'".format(self._object_name)
33
34 class FSStatus(object):
35 """
36 Operations on a snapshot of the FSMap.
37 """
38 def __init__(self, mon_manager):
39 self.mon = mon_manager
40 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
41
42 def __str__(self):
43 return json.dumps(self.map, indent = 2, sort_keys = True)
44
45 # Expose the fsmap for manual inspection.
46 def __getitem__(self, key):
47 """
48 Get a field from the fsmap.
49 """
50 return self.map[key]
51
52 def get_filesystems(self):
53 """
54 Iterator for all filesystems.
55 """
56 for fs in self.map['filesystems']:
57 yield fs
58
59 def get_all(self):
60 """
61 Iterator for all the mds_info components in the FSMap.
62 """
63 for info in self.get_standbys():
64 yield info
65 for fs in self.map['filesystems']:
66 for info in fs['mdsmap']['info'].values():
67 yield info
68
69 def get_standbys(self):
70 """
71 Iterator for all standbys.
72 """
73 for info in self.map['standbys']:
74 yield info
75
76 def get_fsmap(self, fscid):
77 """
78 Get the fsmap for the given FSCID.
79 """
80 for fs in self.map['filesystems']:
81 if fscid is None or fs['id'] == fscid:
82 return fs
83 raise RuntimeError("FSCID {0} not in map".format(fscid))
84
85 def get_fsmap_byname(self, name):
86 """
87 Get the fsmap for the given file system name.
88 """
89 for fs in self.map['filesystems']:
90 if name is None or fs['mdsmap']['fs_name'] == name:
91 return fs
92 raise RuntimeError("FS {0} not in map".format(name))
93
94 def get_replays(self, fscid):
95 """
96 Get the standby:replay MDS for the given FSCID.
97 """
98 fs = self.get_fsmap(fscid)
99 for info in fs['mdsmap']['info'].values():
100 if info['state'] == 'up:standby-replay':
101 yield info
102
103 def get_ranks(self, fscid):
104 """
105 Get the ranks for the given FSCID.
106 """
107 fs = self.get_fsmap(fscid)
108 for info in fs['mdsmap']['info'].values():
109 if info['rank'] >= 0:
110 yield info
111
112 def get_rank(self, fscid, rank):
113 """
114 Get the rank for the given FSCID.
115 """
116 for info in self.get_ranks(fscid):
117 if info['rank'] == rank:
118 return info
119 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
120
121 def get_mds(self, name):
122 """
123 Get the info for the given MDS name.
124 """
125 for info in self.get_all():
126 if info['name'] == name:
127 return info
128 return None
129
130 def get_mds_addr(self, name):
131 """
132 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
133 """
134 info = self.get_mds(name)
135 if info:
136 return info['addr']
137 else:
138 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
139 raise RuntimeError("MDS id '{0}' not found in map".format(name))
140
141 class CephCluster(object):
142 @property
143 def admin_remote(self):
144 first_mon = misc.get_first_mon(self._ctx, None)
145 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
146 return result
147
148 def __init__(self, ctx):
149 self._ctx = ctx
150 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
151
152 def get_config(self, key, service_type=None):
153 """
154 Get config from mon by default, or a specific service if caller asks for it
155 """
156 if service_type is None:
157 service_type = 'mon'
158
159 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
160 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
161
162 def set_ceph_conf(self, subsys, key, value):
163 if subsys not in self._ctx.ceph['ceph'].conf:
164 self._ctx.ceph['ceph'].conf[subsys] = {}
165 self._ctx.ceph['ceph'].conf[subsys][key] = value
166 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
167 # used a different config path this won't work.
168
169 def clear_ceph_conf(self, subsys, key):
170 del self._ctx.ceph['ceph'].conf[subsys][key]
171 write_conf(self._ctx)
172
173 def json_asok(self, command, service_type, service_id):
174 proc = self.mon_manager.admin_socket(service_type, service_id, command)
175 response_data = proc.stdout.getvalue()
176 log.info("_json_asok output: {0}".format(response_data))
177 if response_data.strip():
178 return json.loads(response_data)
179 else:
180 return None
181
182
183 class MDSCluster(CephCluster):
184 """
185 Collective operations on all the MDS daemons in the Ceph cluster. These
186 daemons may be in use by various Filesystems.
187
188 For the benefit of pre-multi-filesystem tests, this class is also
189 a parent of Filesystem. The correct way to use MDSCluster going forward is
190 as a separate instance outside of your (multiple) Filesystem instances.
191 """
192 def __init__(self, ctx):
193 super(MDSCluster, self).__init__(ctx)
194
195 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
196
197 if len(self.mds_ids) == 0:
198 raise RuntimeError("This task requires at least one MDS")
199
200 if hasattr(self._ctx, "daemons"):
201 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
202 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
203
204 def _one_or_all(self, mds_id, cb, in_parallel=True):
205 """
206 Call a callback for a single named MDS, or for all.
207
208 Note that the parallelism here isn't for performance, it's to avoid being overly kind
209 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
210 avoid being overly kind by executing them in a particular order. However, some actions
211 don't cope with being done in parallel, so it's optional (`in_parallel`)
212
213 :param mds_id: MDS daemon name, or None
214 :param cb: Callback taking single argument of MDS daemon name
215 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
216 """
217 if mds_id is None:
218 if in_parallel:
219 with parallel() as p:
220 for mds_id in self.mds_ids:
221 p.spawn(cb, mds_id)
222 else:
223 for mds_id in self.mds_ids:
224 cb(mds_id)
225 else:
226 cb(mds_id)
227
228 def mds_stop(self, mds_id=None):
229 """
230 Stop the MDS daemon process(se). If it held a rank, that rank
231 will eventually go laggy.
232 """
233 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
234
235 def mds_fail(self, mds_id=None):
236 """
237 Inform MDSMonitor of the death of the daemon process(es). If it held
238 a rank, that rank will be relinquished.
239 """
240 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
241
242 def mds_restart(self, mds_id=None):
243 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
244
245 def mds_fail_restart(self, mds_id=None):
246 """
247 Variation on restart that includes marking MDSs as failed, so that doing this
248 operation followed by waiting for healthy daemon states guarantees that they
249 have gone down and come up, rather than potentially seeing the healthy states
250 that existed before the restart.
251 """
252 def _fail_restart(id_):
253 self.mds_daemons[id_].stop()
254 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
255 self.mds_daemons[id_].restart()
256
257 self._one_or_all(mds_id, _fail_restart)
258
259 def newfs(self, name):
260 return Filesystem(self._ctx, create=name)
261
262 def status(self):
263 return FSStatus(self.mon_manager)
264
265 def delete_all_filesystems(self):
266 """
267 Remove all filesystems that exist, and any pools in use by them.
268 """
269 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
270 pool_id_name = {}
271 for pool in pools:
272 pool_id_name[pool['pool']] = pool['pool_name']
273
274 # mark cluster down for each fs to prevent churn during deletion
275 status = self.status()
276 for fs in status.get_filesystems():
277 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
278
279 # get a new copy as actives may have since changed
280 status = self.status()
281 for fs in status.get_filesystems():
282 mdsmap = fs['mdsmap']
283 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
284
285 for gid in mdsmap['up'].values():
286 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
287
288 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
289 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
290 metadata_pool, metadata_pool,
291 '--yes-i-really-really-mean-it')
292 for data_pool in mdsmap['data_pools']:
293 data_pool = pool_id_name[data_pool]
294 try:
295 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
296 data_pool, data_pool,
297 '--yes-i-really-really-mean-it')
298 except CommandFailedError as e:
299 if e.exitstatus == 16: # EBUSY, this data pool is used
300 pass # by two metadata pools, let the 2nd
301 else: # pass delete it
302 raise
303
304 def get_standby_daemons(self):
305 return set([s['name'] for s in self.status().get_standbys()])
306
307 def get_mds_hostnames(self):
308 result = set()
309 for mds_id in self.mds_ids:
310 mds_remote = self.mon_manager.find_remote('mds', mds_id)
311 result.add(mds_remote.hostname)
312
313 return list(result)
314
315 def set_clients_block(self, blocked, mds_id=None):
316 """
317 Block (using iptables) client communications to this MDS. Be careful: if
318 other services are running on this MDS, or other MDSs try to talk to this
319 MDS, their communications may also be blocked as collatoral damage.
320
321 :param mds_id: Optional ID of MDS to block, default to all
322 :return:
323 """
324 da_flag = "-A" if blocked else "-D"
325
326 def set_block(_mds_id):
327 remote = self.mon_manager.find_remote('mds', _mds_id)
328 status = self.status()
329
330 addr = status.get_mds_addr(_mds_id)
331 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
332
333 remote.run(
334 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
335 "comment", "--comment", "teuthology"])
336 remote.run(
337 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
338 "comment", "--comment", "teuthology"])
339
340 self._one_or_all(mds_id, set_block, in_parallel=False)
341
342 def clear_firewall(self):
343 clear_firewall(self._ctx)
344
345 def get_mds_info(self, mds_id):
346 return FSStatus(self.mon_manager).get_mds(mds_id)
347
348 def is_full(self):
349 flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
350 return 'full' in flags
351
352 def is_pool_full(self, pool_name):
353 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
354 for pool in pools:
355 if pool['pool_name'] == pool_name:
356 return 'full' in pool['flags_names'].split(",")
357
358 raise RuntimeError("Pool not found '{0}'".format(pool_name))
359
360 class Filesystem(MDSCluster):
361 """
362 This object is for driving a CephFS filesystem. The MDS daemons driven by
363 MDSCluster may be shared with other Filesystems.
364 """
365 def __init__(self, ctx, fscid=None, create=None):
366 super(Filesystem, self).__init__(ctx)
367
368 self.id = None
369 self.name = None
370 self.metadata_pool_name = None
371 self.data_pools = None
372
373 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
374 self.client_id = client_list[0]
375 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
376
377 if create is not None:
378 if fscid is not None:
379 raise RuntimeError("cannot specify fscid when creating fs")
380 if create is True:
381 self.name = 'cephfs'
382 else:
383 self.name = create
384 if not self.legacy_configured():
385 self.create()
386 elif fscid is not None:
387 self.id = fscid
388 self.getinfo(refresh = True)
389
390 # Stash a reference to the first created filesystem on ctx, so
391 # that if someone drops to the interactive shell they can easily
392 # poke our methods.
393 if not hasattr(self._ctx, "filesystem"):
394 self._ctx.filesystem = self
395
396 def getinfo(self, refresh = False):
397 status = self.status()
398 if self.id is not None:
399 fsmap = status.get_fsmap(self.id)
400 elif self.name is not None:
401 fsmap = status.get_fsmap_byname(self.name)
402 else:
403 fss = [fs for fs in status.get_filesystems()]
404 if len(fss) == 1:
405 fsmap = fss[0]
406 elif len(fss) == 0:
407 raise RuntimeError("no file system available")
408 else:
409 raise RuntimeError("more than one file system available")
410 self.id = fsmap['id']
411 self.name = fsmap['mdsmap']['fs_name']
412 self.get_pool_names(status = status, refresh = refresh)
413 return status
414
415 def deactivate(self, rank):
416 if rank < 0:
417 raise RuntimeError("invalid rank")
418 elif rank == 0:
419 raise RuntimeError("cannot deactivate rank 0")
420 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
421
422 def set_max_mds(self, max_mds):
423 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
424
425 def set_allow_dirfrags(self, yes):
426 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
427
428 def get_pgs_per_fs_pool(self):
429 """
430 Calculate how many PGs to use when creating a pool, in order to avoid raising any
431 health warnings about mon_pg_warn_min_per_osd
432
433 :return: an integer number of PGs
434 """
435 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
436 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
437 return pg_warn_min_per_osd * osd_count
438
439 def create(self):
440 if self.name is None:
441 self.name = "cephfs"
442 if self.metadata_pool_name is None:
443 self.metadata_pool_name = "{0}_metadata".format(self.name)
444 data_pool_name = "{0}_data".format(self.name)
445
446 log.info("Creating filesystem '{0}'".format(self.name))
447
448 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
449
450 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
451 self.metadata_pool_name, pgs_per_fs_pool.__str__())
452 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
453 data_pool_name, pgs_per_fs_pool.__str__())
454 self.mon_manager.raw_cluster_cmd('fs', 'new',
455 self.name, self.metadata_pool_name, data_pool_name)
456 self.check_pool_application(self.metadata_pool_name)
457 self.check_pool_application(data_pool_name)
458 # Turn off spurious standby count warnings from modifying max_mds in tests.
459 try:
460 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
461 except CommandFailedError as e:
462 if e.exitstatus == 22:
463 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
464 pass
465 else:
466 raise
467
468 self.getinfo(refresh = True)
469
470
471 def check_pool_application(self, pool_name):
472 osd_map = self.mon_manager.get_osd_dump_json()
473 for pool in osd_map['pools']:
474 if pool['pool_name'] == pool_name:
475 if "application_metadata" in pool:
476 if not "cephfs" in pool['application_metadata']:
477 raise RuntimeError("Pool %p does not name cephfs as application!".\
478 format(pool_name))
479
480
481 def __del__(self):
482 if getattr(self._ctx, "filesystem", None) == self:
483 delattr(self._ctx, "filesystem")
484
485 def exists(self):
486 """
487 Whether a filesystem exists in the mon's filesystem list
488 """
489 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
490 return self.name in [fs['name'] for fs in fs_list]
491
492 def legacy_configured(self):
493 """
494 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
495 the case, the caller should avoid using Filesystem.create
496 """
497 try:
498 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
499 pools = json.loads(out_text)
500 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
501 if metadata_pool_exists:
502 self.metadata_pool_name = 'metadata'
503 except CommandFailedError as e:
504 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
505 # structured output (--format) from the CLI.
506 if e.exitstatus == 22:
507 metadata_pool_exists = True
508 else:
509 raise
510
511 return metadata_pool_exists
512
513 def _df(self):
514 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
515
516 def get_mds_map(self):
517 return self.status().get_fsmap(self.id)['mdsmap']
518
519 def add_data_pool(self, name):
520 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
521 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
522 self.get_pool_names(refresh = True)
523 for poolid, fs_name in self.data_pools.items():
524 if name == fs_name:
525 return poolid
526 raise RuntimeError("could not get just created pool '{0}'".format(name))
527
528 def get_pool_names(self, refresh = False, status = None):
529 if refresh or self.metadata_pool_name is None or self.data_pools is None:
530 if status is None:
531 status = self.status()
532 fsmap = status.get_fsmap(self.id)
533
534 osd_map = self.mon_manager.get_osd_dump_json()
535 id_to_name = {}
536 for p in osd_map['pools']:
537 id_to_name[p['pool']] = p['pool_name']
538
539 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
540 self.data_pools = {}
541 for data_pool in fsmap['mdsmap']['data_pools']:
542 self.data_pools[data_pool] = id_to_name[data_pool]
543
544 def get_data_pool_name(self, refresh = False):
545 if refresh or self.data_pools is None:
546 self.get_pool_names(refresh = True)
547 assert(len(self.data_pools) == 1)
548 return self.data_pools.values()[0]
549
550 def get_data_pool_id(self, refresh = False):
551 """
552 Don't call this if you have multiple data pools
553 :return: integer
554 """
555 if refresh or self.data_pools is None:
556 self.get_pool_names(refresh = True)
557 assert(len(self.data_pools) == 1)
558 return self.data_pools.keys()[0]
559
560 def get_data_pool_names(self, refresh = False):
561 if refresh or self.data_pools is None:
562 self.get_pool_names(refresh = True)
563 return self.data_pools.values()
564
565 def get_metadata_pool_name(self):
566 return self.metadata_pool_name
567
568 def get_namespace_id(self):
569 return self.id
570
571 def get_pool_df(self, pool_name):
572 """
573 Return a dict like:
574 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
575 """
576 for pool_df in self._df()['pools']:
577 if pool_df['name'] == pool_name:
578 return pool_df['stats']
579
580 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
581
582 def get_usage(self):
583 return self._df()['stats']['total_used_bytes']
584
585 def are_daemons_healthy(self):
586 """
587 Return true if all daemons are in one of active, standby, standby-replay, and
588 at least max_mds daemons are in 'active'.
589
590 Unlike most of Filesystem, this function is tolerant of new-style `fs`
591 commands being missing, because we are part of the ceph installation
592 process during upgrade suites, so must fall back to old style commands
593 when we get an EINVAL on a new style command.
594
595 :return:
596 """
597
598 active_count = 0
599 try:
600 mds_map = self.get_mds_map()
601 except CommandFailedError as cfe:
602 # Old version, fall back to non-multi-fs commands
603 if cfe.exitstatus == errno.EINVAL:
604 mds_map = json.loads(
605 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
606 else:
607 raise
608
609 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
610
611 for mds_id, mds_status in mds_map['info'].items():
612 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
613 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
614 return False
615 elif mds_status['state'] == 'up:active':
616 active_count += 1
617
618 log.info("are_daemons_healthy: {0}/{1}".format(
619 active_count, mds_map['max_mds']
620 ))
621
622 if active_count >= mds_map['max_mds']:
623 # The MDSMap says these guys are active, but let's check they really are
624 for mds_id, mds_status in mds_map['info'].items():
625 if mds_status['state'] == 'up:active':
626 try:
627 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
628 except CommandFailedError as cfe:
629 if cfe.exitstatus == errno.EINVAL:
630 # Old version, can't do this check
631 continue
632 else:
633 # MDS not even running
634 return False
635
636 if daemon_status['state'] != 'up:active':
637 # MDS hasn't taken the latest map yet
638 return False
639
640 return True
641 else:
642 return False
643
644 def get_daemon_names(self, state=None):
645 """
646 Return MDS daemon names of those daemons in the given state
647 :param state:
648 :return:
649 """
650 status = self.get_mds_map()
651 result = []
652 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
653 if mds_status['state'] == state or state is None:
654 result.append(mds_status['name'])
655
656 return result
657
658 def get_active_names(self):
659 """
660 Return MDS daemon names of those daemons holding ranks
661 in state up:active
662
663 :return: list of strings like ['a', 'b'], sorted by rank
664 """
665 return self.get_daemon_names("up:active")
666
667 def get_all_mds_rank(self):
668 status = self.get_mds_map()
669 result = []
670 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
671 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
672 result.append(mds_status['rank'])
673
674 return result
675
676 def get_rank_names(self):
677 """
678 Return MDS daemon names of those daemons holding a rank,
679 sorted by rank. This includes e.g. up:replay/reconnect
680 as well as active, but does not include standby or
681 standby-replay.
682 """
683 status = self.get_mds_map()
684 result = []
685 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
686 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
687 result.append(mds_status['name'])
688
689 return result
690
691 def wait_for_daemons(self, timeout=None):
692 """
693 Wait until all daemons are healthy
694 :return:
695 """
696
697 if timeout is None:
698 timeout = DAEMON_WAIT_TIMEOUT
699
700 elapsed = 0
701 while True:
702 if self.are_daemons_healthy():
703 return
704 else:
705 time.sleep(1)
706 elapsed += 1
707
708 if elapsed > timeout:
709 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
710
711 def get_lone_mds_id(self):
712 """
713 Get a single MDS ID: the only one if there is only one
714 configured, else the only one currently holding a rank,
715 else raise an error.
716 """
717 if len(self.mds_ids) != 1:
718 alive = self.get_rank_names()
719 if len(alive) == 1:
720 return alive[0]
721 else:
722 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
723 else:
724 return self.mds_ids[0]
725
726 def recreate(self):
727 log.info("Creating new filesystem")
728 self.delete_all_filesystems()
729 self.id = None
730 self.create()
731
732 def put_metadata_object_raw(self, object_id, infile):
733 """
734 Save an object to the metadata pool
735 """
736 temp_bin_path = infile
737 self.client_remote.run(args=[
738 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
739 ])
740
741 def get_metadata_object_raw(self, object_id):
742 """
743 Retrieve an object from the metadata pool and store it in a file.
744 """
745 temp_bin_path = '/tmp/' + object_id + '.bin'
746
747 self.client_remote.run(args=[
748 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
749 ])
750
751 return temp_bin_path
752
753 def get_metadata_object(self, object_type, object_id):
754 """
755 Retrieve an object from the metadata pool, pass it through
756 ceph-dencoder to dump it to JSON, and return the decoded object.
757 """
758 temp_bin_path = '/tmp/out.bin'
759
760 self.client_remote.run(args=[
761 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
762 ])
763
764 stdout = StringIO()
765 self.client_remote.run(args=[
766 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
767 ], stdout=stdout)
768 dump_json = stdout.getvalue().strip()
769 try:
770 dump = json.loads(dump_json)
771 except (TypeError, ValueError):
772 log.error("Failed to decode JSON: '{0}'".format(dump_json))
773 raise
774
775 return dump
776
777 def get_journal_version(self):
778 """
779 Read the JournalPointer and Journal::Header objects to learn the version of
780 encoding in use.
781 """
782 journal_pointer_object = '400.00000000'
783 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
784 journal_ino = journal_pointer_dump['journal_pointer']['front']
785
786 journal_header_object = "{0:x}.00000000".format(journal_ino)
787 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
788
789 version = journal_header_dump['journal_header']['stream_format']
790 log.info("Read journal version {0}".format(version))
791
792 return version
793
794 def mds_asok(self, command, mds_id=None):
795 if mds_id is None:
796 mds_id = self.get_lone_mds_id()
797
798 return self.json_asok(command, 'mds', mds_id)
799
800 def read_cache(self, path, depth=None):
801 cmd = ["dump", "tree", path]
802 if depth is not None:
803 cmd.append(depth.__str__())
804 result = self.mds_asok(cmd)
805 if len(result) == 0:
806 raise RuntimeError("Path not found in cache: {0}".format(path))
807
808 return result
809
810 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
811 """
812 Block until the MDS reaches a particular state, or a failure condition
813 is met.
814
815 When there are multiple MDSs, succeed when exaclty one MDS is in the
816 goal state, or fail when any MDS is in the reject state.
817
818 :param goal_state: Return once the MDS is in this state
819 :param reject: Fail if the MDS enters this state before the goal state
820 :param timeout: Fail if this many seconds pass before reaching goal
821 :return: number of seconds waited, rounded down to integer
822 """
823
824 started_at = time.time()
825 while True:
826 status = self.status()
827 if rank is not None:
828 mds_info = status.get_rank(self.id, rank)
829 current_state = mds_info['state'] if mds_info else None
830 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
831 elif mds_id is not None:
832 # mds_info is None if no daemon with this ID exists in the map
833 mds_info = status.get_mds(mds_id)
834 current_state = mds_info['state'] if mds_info else None
835 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
836 else:
837 # In general, look for a single MDS
838 states = [m['state'] for m in status.get_ranks(self.id)]
839 if [s for s in states if s == goal_state] == [goal_state]:
840 current_state = goal_state
841 elif reject in states:
842 current_state = reject
843 else:
844 current_state = None
845 log.info("mapped states {0} to {1}".format(states, current_state))
846
847 elapsed = time.time() - started_at
848 if current_state == goal_state:
849 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
850 return elapsed
851 elif reject is not None and current_state == reject:
852 raise RuntimeError("MDS in reject state {0}".format(current_state))
853 elif timeout is not None and elapsed > timeout:
854 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
855 raise RuntimeError(
856 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
857 elapsed, goal_state, current_state
858 ))
859 else:
860 time.sleep(1)
861
862 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
863 mds_id = self.mds_ids[0]
864 remote = self.mds_daemons[mds_id].remote
865 if pool is None:
866 pool = self.get_data_pool_name()
867
868 obj_name = "{0:x}.00000000".format(ino_no)
869
870 args = [
871 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
872 ]
873 try:
874 proc = remote.run(
875 args=args,
876 stdout=StringIO())
877 except CommandFailedError as e:
878 log.error(e.__str__())
879 raise ObjectNotFound(obj_name)
880
881 data = proc.stdout.getvalue()
882
883 p = remote.run(
884 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
885 stdout=StringIO(),
886 stdin=data
887 )
888
889 return json.loads(p.stdout.getvalue().strip())
890
891 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
892 """
893 Write to an xattr of the 0th data object of an inode. Will
894 succeed whether the object and/or xattr already exist or not.
895
896 :param ino_no: integer inode number
897 :param xattr_name: string name of the xattr
898 :param data: byte array data to write to the xattr
899 :param pool: name of data pool or None to use primary data pool
900 :return: None
901 """
902 remote = self.mds_daemons[self.mds_ids[0]].remote
903 if pool is None:
904 pool = self.get_data_pool_name()
905
906 obj_name = "{0:x}.00000000".format(ino_no)
907 args = [
908 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
909 obj_name, xattr_name, data
910 ]
911 remote.run(
912 args=args,
913 stdout=StringIO())
914
915 def read_backtrace(self, ino_no, pool=None):
916 """
917 Read the backtrace from the data pool, return a dict in the format
918 given by inode_backtrace_t::dump, which is something like:
919
920 ::
921
922 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
923 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
924
925 { "ino": 1099511627778,
926 "ancestors": [
927 { "dirino": 1,
928 "dname": "blah",
929 "version": 11}],
930 "pool": 1,
931 "old_pools": []}
932
933 :param pool: name of pool to read backtrace from. If omitted, FS must have only
934 one data pool and that will be used.
935 """
936 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
937
938 def read_layout(self, ino_no, pool=None):
939 """
940 Read 'layout' xattr of an inode and parse the result, returning a dict like:
941 ::
942 {
943 "stripe_unit": 4194304,
944 "stripe_count": 1,
945 "object_size": 4194304,
946 "pool_id": 1,
947 "pool_ns": "",
948 }
949
950 :param pool: name of pool to read backtrace from. If omitted, FS must have only
951 one data pool and that will be used.
952 """
953 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
954
955 def _enumerate_data_objects(self, ino, size):
956 """
957 Get the list of expected data objects for a range, and the list of objects
958 that really exist.
959
960 :return a tuple of two lists of strings (expected, actual)
961 """
962 stripe_size = 1024 * 1024 * 4
963
964 size = max(stripe_size, size)
965
966 want_objects = [
967 "{0:x}.{1:08x}".format(ino, n)
968 for n in range(0, ((size - 1) / stripe_size) + 1)
969 ]
970
971 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
972
973 return want_objects, exist_objects
974
975 def data_objects_present(self, ino, size):
976 """
977 Check that *all* the expected data objects for an inode are present in the data pool
978 """
979
980 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
981 missing = set(want_objects) - set(exist_objects)
982
983 if missing:
984 log.info("Objects missing (ino {0}, size {1}): {2}".format(
985 ino, size, missing
986 ))
987 return False
988 else:
989 log.info("All objects for ino {0} size {1} found".format(ino, size))
990 return True
991
992 def data_objects_absent(self, ino, size):
993 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
994 present = set(want_objects) & set(exist_objects)
995
996 if present:
997 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
998 ino, size, present
999 ))
1000 return False
1001 else:
1002 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1003 return True
1004
1005 def dirfrag_exists(self, ino, frag):
1006 try:
1007 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1008 except CommandFailedError as e:
1009 return False
1010 else:
1011 return True
1012
1013 def rados(self, args, pool=None, namespace=None, stdin_data=None):
1014 """
1015 Call into the `rados` CLI from an MDS
1016 """
1017
1018 if pool is None:
1019 pool = self.get_metadata_pool_name()
1020
1021 # Doesn't matter which MDS we use to run rados commands, they all
1022 # have access to the pools
1023 mds_id = self.mds_ids[0]
1024 remote = self.mds_daemons[mds_id].remote
1025
1026 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1027 # using the `rados` CLI
1028 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1029 (["--namespace", namespace] if namespace else []) +
1030 args)
1031 p = remote.run(
1032 args=args,
1033 stdin=stdin_data,
1034 stdout=StringIO())
1035 return p.stdout.getvalue().strip()
1036
1037 def list_dirfrag(self, dir_ino):
1038 """
1039 Read the named object and return the list of omap keys
1040
1041 :return a list of 0 or more strings
1042 """
1043
1044 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1045
1046 try:
1047 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1048 except CommandFailedError as e:
1049 log.error(e.__str__())
1050 raise ObjectNotFound(dirfrag_obj_name)
1051
1052 return key_list_str.split("\n") if key_list_str else []
1053
1054 def erase_metadata_objects(self, prefix):
1055 """
1056 For all objects in the metadata pool matching the prefix,
1057 erase them.
1058
1059 This O(N) with the number of objects in the pool, so only suitable
1060 for use on toy test filesystems.
1061 """
1062 all_objects = self.rados(["ls"]).split("\n")
1063 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1064 for o in matching_objects:
1065 self.rados(["rm", o])
1066
1067 def erase_mds_objects(self, rank):
1068 """
1069 Erase all the per-MDS objects for a particular rank. This includes
1070 inotable, sessiontable, journal
1071 """
1072
1073 def obj_prefix(multiplier):
1074 """
1075 MDS object naming conventions like rank 1's
1076 journal is at 201.***
1077 """
1078 return "%x." % (multiplier * 0x100 + rank)
1079
1080 # MDS_INO_LOG_OFFSET
1081 self.erase_metadata_objects(obj_prefix(2))
1082 # MDS_INO_LOG_BACKUP_OFFSET
1083 self.erase_metadata_objects(obj_prefix(3))
1084 # MDS_INO_LOG_POINTER_OFFSET
1085 self.erase_metadata_objects(obj_prefix(4))
1086 # MDSTables & SessionMap
1087 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1088
1089 @property
1090 def _prefix(self):
1091 """
1092 Override this to set a different
1093 """
1094 return ""
1095
1096 def _run_tool(self, tool, args, rank=None, quiet=False):
1097 # Tests frequently have [client] configuration that jacks up
1098 # the objecter log level (unlikely to be interesting here)
1099 # and does not set the mds log level (very interesting here)
1100 if quiet:
1101 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1102 else:
1103 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1104
1105 if rank is not None:
1106 base_args.extend(["--rank", "%d" % rank])
1107
1108 t1 = datetime.datetime.now()
1109 r = self.tool_remote.run(
1110 args=base_args + args,
1111 stdout=StringIO()).stdout.getvalue().strip()
1112 duration = datetime.datetime.now() - t1
1113 log.info("Ran {0} in time {1}, result:\n{2}".format(
1114 base_args + args, duration, r
1115 ))
1116 return r
1117
1118 @property
1119 def tool_remote(self):
1120 """
1121 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1122 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1123 so that tests can use this remote to go get locally written output files from the tools.
1124 """
1125 mds_id = self.mds_ids[0]
1126 return self.mds_daemons[mds_id].remote
1127
1128 def journal_tool(self, args, rank=None, quiet=False):
1129 """
1130 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1131 """
1132 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1133
1134 def table_tool(self, args, quiet=False):
1135 """
1136 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1137 """
1138 return self._run_tool("cephfs-table-tool", args, None, quiet)
1139
1140 def data_scan(self, args, quiet=False, worker_count=1):
1141 """
1142 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1143
1144 :param worker_count: if greater than 1, multiple workers will be run
1145 in parallel and the return value will be None
1146 """
1147
1148 workers = []
1149
1150 for n in range(0, worker_count):
1151 if worker_count > 1:
1152 # data-scan args first token is a command, followed by args to it.
1153 # insert worker arguments after the command.
1154 cmd = args[0]
1155 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1156 else:
1157 worker_args = args
1158
1159 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1160 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1161
1162 for w in workers:
1163 w.get()
1164
1165 if worker_count == 1:
1166 return workers[0].value
1167 else:
1168 return None