]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
01e1ca588c1e8ee3076ad5f9b0fd10efa44a63c5
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 from StringIO import StringIO
3 import json
4 import logging
5 from gevent import Greenlet
6 import os
7 import time
8 import datetime
9 import re
10 import errno
11
12 from teuthology.exceptions import CommandFailedError
13 from teuthology import misc
14 from teuthology.nuke import clear_firewall
15 from teuthology.parallel import parallel
16 from tasks.ceph_manager import write_conf
17 from tasks import ceph_manager
18
19
20 log = logging.getLogger(__name__)
21
22
23 DAEMON_WAIT_TIMEOUT = 120
24 ROOT_INO = 1
25
26
27 class ObjectNotFound(Exception):
28 def __init__(self, object_name):
29 self._object_name = object_name
30
31 def __str__(self):
32 return "Object not found: '{0}'".format(self._object_name)
33
34 class FSStatus(object):
35 """
36 Operations on a snapshot of the FSMap.
37 """
38 def __init__(self, mon_manager):
39 self.mon = mon_manager
40 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
41
42 def __str__(self):
43 return json.dumps(self.map, indent = 2, sort_keys = True)
44
45 # Expose the fsmap for manual inspection.
46 def __getitem__(self, key):
47 """
48 Get a field from the fsmap.
49 """
50 return self.map[key]
51
52 def get_filesystems(self):
53 """
54 Iterator for all filesystems.
55 """
56 for fs in self.map['filesystems']:
57 yield fs
58
59 def get_all(self):
60 """
61 Iterator for all the mds_info components in the FSMap.
62 """
63 for info in self.get_standbys():
64 yield info
65 for fs in self.map['filesystems']:
66 for info in fs['mdsmap']['info'].values():
67 yield info
68
69 def get_standbys(self):
70 """
71 Iterator for all standbys.
72 """
73 for info in self.map['standbys']:
74 yield info
75
76 def get_fsmap(self, fscid):
77 """
78 Get the fsmap for the given FSCID.
79 """
80 for fs in self.map['filesystems']:
81 if fscid is None or fs['id'] == fscid:
82 return fs
83 raise RuntimeError("FSCID {0} not in map".format(fscid))
84
85 def get_fsmap_byname(self, name):
86 """
87 Get the fsmap for the given file system name.
88 """
89 for fs in self.map['filesystems']:
90 if name is None or fs['mdsmap']['fs_name'] == name:
91 return fs
92 raise RuntimeError("FS {0} not in map".format(name))
93
94 def get_replays(self, fscid):
95 """
96 Get the standby:replay MDS for the given FSCID.
97 """
98 fs = self.get_fsmap(fscid)
99 for info in fs['mdsmap']['info'].values():
100 if info['state'] == 'up:standby-replay':
101 yield info
102
103 def get_ranks(self, fscid):
104 """
105 Get the ranks for the given FSCID.
106 """
107 fs = self.get_fsmap(fscid)
108 for info in fs['mdsmap']['info'].values():
109 if info['rank'] >= 0:
110 yield info
111
112 def get_rank(self, fscid, rank):
113 """
114 Get the rank for the given FSCID.
115 """
116 for info in self.get_ranks(fscid):
117 if info['rank'] == rank:
118 return info
119 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
120
121 def get_mds(self, name):
122 """
123 Get the info for the given MDS name.
124 """
125 for info in self.get_all():
126 if info['name'] == name:
127 return info
128 return None
129
130 def get_mds_addr(self, name):
131 """
132 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
133 """
134 info = self.get_mds(name)
135 if info:
136 return info['addr']
137 else:
138 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
139 raise RuntimeError("MDS id '{0}' not found in map".format(name))
140
141 class CephCluster(object):
142 @property
143 def admin_remote(self):
144 first_mon = misc.get_first_mon(self._ctx, None)
145 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
146 return result
147
148 def __init__(self, ctx):
149 self._ctx = ctx
150 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
151
152 def get_config(self, key, service_type=None):
153 """
154 Get config from mon by default, or a specific service if caller asks for it
155 """
156 if service_type is None:
157 service_type = 'mon'
158
159 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
160 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
161
162 def set_ceph_conf(self, subsys, key, value):
163 if subsys not in self._ctx.ceph['ceph'].conf:
164 self._ctx.ceph['ceph'].conf[subsys] = {}
165 self._ctx.ceph['ceph'].conf[subsys][key] = value
166 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
167 # used a different config path this won't work.
168
169 def clear_ceph_conf(self, subsys, key):
170 del self._ctx.ceph['ceph'].conf[subsys][key]
171 write_conf(self._ctx)
172
173 def json_asok(self, command, service_type, service_id):
174 proc = self.mon_manager.admin_socket(service_type, service_id, command)
175 response_data = proc.stdout.getvalue()
176 log.info("_json_asok output: {0}".format(response_data))
177 if response_data.strip():
178 return json.loads(response_data)
179 else:
180 return None
181
182
183 class MDSCluster(CephCluster):
184 """
185 Collective operations on all the MDS daemons in the Ceph cluster. These
186 daemons may be in use by various Filesystems.
187
188 For the benefit of pre-multi-filesystem tests, this class is also
189 a parent of Filesystem. The correct way to use MDSCluster going forward is
190 as a separate instance outside of your (multiple) Filesystem instances.
191 """
192 def __init__(self, ctx):
193 super(MDSCluster, self).__init__(ctx)
194
195 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
196
197 if len(self.mds_ids) == 0:
198 raise RuntimeError("This task requires at least one MDS")
199
200 if hasattr(self._ctx, "daemons"):
201 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
202 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
203
204 def _one_or_all(self, mds_id, cb, in_parallel=True):
205 """
206 Call a callback for a single named MDS, or for all.
207
208 Note that the parallelism here isn't for performance, it's to avoid being overly kind
209 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
210 avoid being overly kind by executing them in a particular order. However, some actions
211 don't cope with being done in parallel, so it's optional (`in_parallel`)
212
213 :param mds_id: MDS daemon name, or None
214 :param cb: Callback taking single argument of MDS daemon name
215 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
216 """
217 if mds_id is None:
218 if in_parallel:
219 with parallel() as p:
220 for mds_id in self.mds_ids:
221 p.spawn(cb, mds_id)
222 else:
223 for mds_id in self.mds_ids:
224 cb(mds_id)
225 else:
226 cb(mds_id)
227
228 def mds_stop(self, mds_id=None):
229 """
230 Stop the MDS daemon process(se). If it held a rank, that rank
231 will eventually go laggy.
232 """
233 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
234
235 def mds_fail(self, mds_id=None):
236 """
237 Inform MDSMonitor of the death of the daemon process(es). If it held
238 a rank, that rank will be relinquished.
239 """
240 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
241
242 def mds_restart(self, mds_id=None):
243 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
244
245 def mds_fail_restart(self, mds_id=None):
246 """
247 Variation on restart that includes marking MDSs as failed, so that doing this
248 operation followed by waiting for healthy daemon states guarantees that they
249 have gone down and come up, rather than potentially seeing the healthy states
250 that existed before the restart.
251 """
252 def _fail_restart(id_):
253 self.mds_daemons[id_].stop()
254 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
255 self.mds_daemons[id_].restart()
256
257 self._one_or_all(mds_id, _fail_restart)
258
259 def newfs(self, name):
260 return Filesystem(self._ctx, create=name)
261
262 def status(self):
263 return FSStatus(self.mon_manager)
264
265 def delete_all_filesystems(self):
266 """
267 Remove all filesystems that exist, and any pools in use by them.
268 """
269 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
270 pool_id_name = {}
271 for pool in pools:
272 pool_id_name[pool['pool']] = pool['pool_name']
273
274 # mark cluster down for each fs to prevent churn during deletion
275 status = self.status()
276 for fs in status.get_filesystems():
277 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
278
279 # get a new copy as actives may have since changed
280 status = self.status()
281 for fs in status.get_filesystems():
282 mdsmap = fs['mdsmap']
283 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
284
285 for gid in mdsmap['up'].values():
286 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
287
288 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
289 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
290 metadata_pool, metadata_pool,
291 '--yes-i-really-really-mean-it')
292 for data_pool in mdsmap['data_pools']:
293 data_pool = pool_id_name[data_pool]
294 try:
295 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
296 data_pool, data_pool,
297 '--yes-i-really-really-mean-it')
298 except CommandFailedError as e:
299 if e.exitstatus == 16: # EBUSY, this data pool is used
300 pass # by two metadata pools, let the 2nd
301 else: # pass delete it
302 raise
303
304 def get_standby_daemons(self):
305 return set([s['name'] for s in self.status().get_standbys()])
306
307 def get_mds_hostnames(self):
308 result = set()
309 for mds_id in self.mds_ids:
310 mds_remote = self.mon_manager.find_remote('mds', mds_id)
311 result.add(mds_remote.hostname)
312
313 return list(result)
314
315 def set_clients_block(self, blocked, mds_id=None):
316 """
317 Block (using iptables) client communications to this MDS. Be careful: if
318 other services are running on this MDS, or other MDSs try to talk to this
319 MDS, their communications may also be blocked as collatoral damage.
320
321 :param mds_id: Optional ID of MDS to block, default to all
322 :return:
323 """
324 da_flag = "-A" if blocked else "-D"
325
326 def set_block(_mds_id):
327 remote = self.mon_manager.find_remote('mds', _mds_id)
328 status = self.status()
329
330 addr = status.get_mds_addr(_mds_id)
331 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
332
333 remote.run(
334 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
335 "comment", "--comment", "teuthology"])
336 remote.run(
337 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
338 "comment", "--comment", "teuthology"])
339
340 self._one_or_all(mds_id, set_block, in_parallel=False)
341
342 def clear_firewall(self):
343 clear_firewall(self._ctx)
344
345 def get_mds_info(self, mds_id):
346 return FSStatus(self.mon_manager).get_mds(mds_id)
347
348 def is_full(self):
349 flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
350 return 'full' in flags
351
352 def is_pool_full(self, pool_name):
353 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
354 for pool in pools:
355 if pool['pool_name'] == pool_name:
356 return 'full' in pool['flags_names'].split(",")
357
358 raise RuntimeError("Pool not found '{0}'".format(pool_name))
359
360 class Filesystem(MDSCluster):
361 """
362 This object is for driving a CephFS filesystem. The MDS daemons driven by
363 MDSCluster may be shared with other Filesystems.
364 """
365 def __init__(self, ctx, fscid=None, create=None):
366 super(Filesystem, self).__init__(ctx)
367
368 self.id = None
369 self.name = None
370 self.metadata_pool_name = None
371 self.data_pools = None
372
373 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
374 self.client_id = client_list[0]
375 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
376
377 if create is not None:
378 if fscid is not None:
379 raise RuntimeError("cannot specify fscid when creating fs")
380 if create is True:
381 self.name = 'cephfs'
382 else:
383 self.name = create
384 if not self.legacy_configured():
385 self.create()
386 elif fscid is not None:
387 self.id = fscid
388 self.getinfo(refresh = True)
389
390 # Stash a reference to the first created filesystem on ctx, so
391 # that if someone drops to the interactive shell they can easily
392 # poke our methods.
393 if not hasattr(self._ctx, "filesystem"):
394 self._ctx.filesystem = self
395
396 def getinfo(self, refresh = False):
397 status = self.status()
398 if self.id is not None:
399 fsmap = status.get_fsmap(self.id)
400 elif self.name is not None:
401 fsmap = status.get_fsmap_byname(self.name)
402 else:
403 fss = [fs for fs in status.get_filesystems()]
404 if len(fss) == 1:
405 fsmap = fss[0]
406 elif len(fss) == 0:
407 raise RuntimeError("no file system available")
408 else:
409 raise RuntimeError("more than one file system available")
410 self.id = fsmap['id']
411 self.name = fsmap['mdsmap']['fs_name']
412 self.get_pool_names(status = status, refresh = refresh)
413 return status
414
415 def deactivate(self, rank):
416 if rank < 0:
417 raise RuntimeError("invalid rank")
418 elif rank == 0:
419 raise RuntimeError("cannot deactivate rank 0")
420 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
421
422 def set_max_mds(self, max_mds):
423 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
424
425 def set_allow_dirfrags(self, yes):
426 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
427
428 def set_allow_multimds(self, yes):
429 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_multimds", str(yes).lower(), '--yes-i-really-mean-it')
430
431 def get_pgs_per_fs_pool(self):
432 """
433 Calculate how many PGs to use when creating a pool, in order to avoid raising any
434 health warnings about mon_pg_warn_min_per_osd
435
436 :return: an integer number of PGs
437 """
438 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
439 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
440 return pg_warn_min_per_osd * osd_count
441
442 def create(self):
443 if self.name is None:
444 self.name = "cephfs"
445 if self.metadata_pool_name is None:
446 self.metadata_pool_name = "{0}_metadata".format(self.name)
447 data_pool_name = "{0}_data".format(self.name)
448
449 log.info("Creating filesystem '{0}'".format(self.name))
450
451 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
452
453 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
454 self.metadata_pool_name, pgs_per_fs_pool.__str__())
455 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
456 data_pool_name, pgs_per_fs_pool.__str__())
457 self.mon_manager.raw_cluster_cmd('fs', 'new',
458 self.name, self.metadata_pool_name, data_pool_name)
459 # Turn off spurious standby count warnings from modifying max_mds in tests.
460 try:
461 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
462 except CommandFailedError as e:
463 if e.exitstatus == 22:
464 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
465 pass
466 else:
467 raise
468
469 self.getinfo(refresh = True)
470
471 def __del__(self):
472 if getattr(self._ctx, "filesystem", None) == self:
473 delattr(self._ctx, "filesystem")
474
475 def exists(self):
476 """
477 Whether a filesystem exists in the mon's filesystem list
478 """
479 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
480 return self.name in [fs['name'] for fs in fs_list]
481
482 def legacy_configured(self):
483 """
484 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
485 the case, the caller should avoid using Filesystem.create
486 """
487 try:
488 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
489 pools = json.loads(out_text)
490 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
491 if metadata_pool_exists:
492 self.metadata_pool_name = 'metadata'
493 except CommandFailedError as e:
494 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
495 # structured output (--format) from the CLI.
496 if e.exitstatus == 22:
497 metadata_pool_exists = True
498 else:
499 raise
500
501 return metadata_pool_exists
502
503 def _df(self):
504 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
505
506 def get_mds_map(self):
507 return self.status().get_fsmap(self.id)['mdsmap']
508
509 def add_data_pool(self, name):
510 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
511 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
512 self.get_pool_names(refresh = True)
513 for poolid, fs_name in self.data_pools.items():
514 if name == fs_name:
515 return poolid
516 raise RuntimeError("could not get just created pool '{0}'".format(name))
517
518 def get_pool_names(self, refresh = False, status = None):
519 if refresh or self.metadata_pool_name is None or self.data_pools is None:
520 if status is None:
521 status = self.status()
522 fsmap = status.get_fsmap(self.id)
523
524 osd_map = self.mon_manager.get_osd_dump_json()
525 id_to_name = {}
526 for p in osd_map['pools']:
527 id_to_name[p['pool']] = p['pool_name']
528
529 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
530 self.data_pools = {}
531 for data_pool in fsmap['mdsmap']['data_pools']:
532 self.data_pools[data_pool] = id_to_name[data_pool]
533
534 def get_data_pool_name(self, refresh = False):
535 if refresh or self.data_pools is None:
536 self.get_pool_names(refresh = True)
537 assert(len(self.data_pools) == 1)
538 return self.data_pools.values()[0]
539
540 def get_data_pool_id(self, refresh = False):
541 """
542 Don't call this if you have multiple data pools
543 :return: integer
544 """
545 if refresh or self.data_pools is None:
546 self.get_pool_names(refresh = True)
547 assert(len(self.data_pools) == 1)
548 return self.data_pools.keys()[0]
549
550 def get_data_pool_names(self, refresh = False):
551 if refresh or self.data_pools is None:
552 self.get_pool_names(refresh = True)
553 return self.data_pools.values()
554
555 def get_metadata_pool_name(self):
556 return self.metadata_pool_name
557
558 def get_namespace_id(self):
559 return self.id
560
561 def get_pool_df(self, pool_name):
562 """
563 Return a dict like:
564 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
565 """
566 for pool_df in self._df()['pools']:
567 if pool_df['name'] == pool_name:
568 return pool_df['stats']
569
570 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
571
572 def get_usage(self):
573 return self._df()['stats']['total_used_bytes']
574
575 def are_daemons_healthy(self):
576 """
577 Return true if all daemons are in one of active, standby, standby-replay, and
578 at least max_mds daemons are in 'active'.
579
580 Unlike most of Filesystem, this function is tolerant of new-style `fs`
581 commands being missing, because we are part of the ceph installation
582 process during upgrade suites, so must fall back to old style commands
583 when we get an EINVAL on a new style command.
584
585 :return:
586 """
587
588 active_count = 0
589 try:
590 mds_map = self.get_mds_map()
591 except CommandFailedError as cfe:
592 # Old version, fall back to non-multi-fs commands
593 if cfe.exitstatus == errno.EINVAL:
594 mds_map = json.loads(
595 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
596 else:
597 raise
598
599 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
600
601 for mds_id, mds_status in mds_map['info'].items():
602 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
603 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
604 return False
605 elif mds_status['state'] == 'up:active':
606 active_count += 1
607
608 log.info("are_daemons_healthy: {0}/{1}".format(
609 active_count, mds_map['max_mds']
610 ))
611
612 if active_count >= mds_map['max_mds']:
613 # The MDSMap says these guys are active, but let's check they really are
614 for mds_id, mds_status in mds_map['info'].items():
615 if mds_status['state'] == 'up:active':
616 try:
617 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
618 except CommandFailedError as cfe:
619 if cfe.exitstatus == errno.EINVAL:
620 # Old version, can't do this check
621 continue
622 else:
623 # MDS not even running
624 return False
625
626 if daemon_status['state'] != 'up:active':
627 # MDS hasn't taken the latest map yet
628 return False
629
630 return True
631 else:
632 return False
633
634 def get_daemon_names(self, state=None):
635 """
636 Return MDS daemon names of those daemons in the given state
637 :param state:
638 :return:
639 """
640 status = self.get_mds_map()
641 result = []
642 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
643 if mds_status['state'] == state or state is None:
644 result.append(mds_status['name'])
645
646 return result
647
648 def get_active_names(self):
649 """
650 Return MDS daemon names of those daemons holding ranks
651 in state up:active
652
653 :return: list of strings like ['a', 'b'], sorted by rank
654 """
655 return self.get_daemon_names("up:active")
656
657 def get_all_mds_rank(self):
658 status = self.get_mds_map()
659 result = []
660 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
661 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
662 result.append(mds_status['rank'])
663
664 return result
665
666 def get_rank_names(self):
667 """
668 Return MDS daemon names of those daemons holding a rank,
669 sorted by rank. This includes e.g. up:replay/reconnect
670 as well as active, but does not include standby or
671 standby-replay.
672 """
673 status = self.get_mds_map()
674 result = []
675 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
676 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
677 result.append(mds_status['name'])
678
679 return result
680
681 def wait_for_daemons(self, timeout=None):
682 """
683 Wait until all daemons are healthy
684 :return:
685 """
686
687 if timeout is None:
688 timeout = DAEMON_WAIT_TIMEOUT
689
690 elapsed = 0
691 while True:
692 if self.are_daemons_healthy():
693 return
694 else:
695 time.sleep(1)
696 elapsed += 1
697
698 if elapsed > timeout:
699 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
700
701 def get_lone_mds_id(self):
702 """
703 Get a single MDS ID: the only one if there is only one
704 configured, else the only one currently holding a rank,
705 else raise an error.
706 """
707 if len(self.mds_ids) != 1:
708 alive = self.get_rank_names()
709 if len(alive) == 1:
710 return alive[0]
711 else:
712 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
713 else:
714 return self.mds_ids[0]
715
716 def recreate(self):
717 log.info("Creating new filesystem")
718 self.delete_all_filesystems()
719 self.id = None
720 self.create()
721
722 def put_metadata_object_raw(self, object_id, infile):
723 """
724 Save an object to the metadata pool
725 """
726 temp_bin_path = infile
727 self.client_remote.run(args=[
728 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
729 ])
730
731 def get_metadata_object_raw(self, object_id):
732 """
733 Retrieve an object from the metadata pool and store it in a file.
734 """
735 temp_bin_path = '/tmp/' + object_id + '.bin'
736
737 self.client_remote.run(args=[
738 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
739 ])
740
741 return temp_bin_path
742
743 def get_metadata_object(self, object_type, object_id):
744 """
745 Retrieve an object from the metadata pool, pass it through
746 ceph-dencoder to dump it to JSON, and return the decoded object.
747 """
748 temp_bin_path = '/tmp/out.bin'
749
750 self.client_remote.run(args=[
751 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
752 ])
753
754 stdout = StringIO()
755 self.client_remote.run(args=[
756 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
757 ], stdout=stdout)
758 dump_json = stdout.getvalue().strip()
759 try:
760 dump = json.loads(dump_json)
761 except (TypeError, ValueError):
762 log.error("Failed to decode JSON: '{0}'".format(dump_json))
763 raise
764
765 return dump
766
767 def get_journal_version(self):
768 """
769 Read the JournalPointer and Journal::Header objects to learn the version of
770 encoding in use.
771 """
772 journal_pointer_object = '400.00000000'
773 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
774 journal_ino = journal_pointer_dump['journal_pointer']['front']
775
776 journal_header_object = "{0:x}.00000000".format(journal_ino)
777 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
778
779 version = journal_header_dump['journal_header']['stream_format']
780 log.info("Read journal version {0}".format(version))
781
782 return version
783
784 def mds_asok(self, command, mds_id=None):
785 if mds_id is None:
786 mds_id = self.get_lone_mds_id()
787
788 return self.json_asok(command, 'mds', mds_id)
789
790 def read_cache(self, path, depth=None):
791 cmd = ["dump", "tree", path]
792 if depth is not None:
793 cmd.append(depth.__str__())
794 result = self.mds_asok(cmd)
795 if len(result) == 0:
796 raise RuntimeError("Path not found in cache: {0}".format(path))
797
798 return result
799
800 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
801 """
802 Block until the MDS reaches a particular state, or a failure condition
803 is met.
804
805 When there are multiple MDSs, succeed when exaclty one MDS is in the
806 goal state, or fail when any MDS is in the reject state.
807
808 :param goal_state: Return once the MDS is in this state
809 :param reject: Fail if the MDS enters this state before the goal state
810 :param timeout: Fail if this many seconds pass before reaching goal
811 :return: number of seconds waited, rounded down to integer
812 """
813
814 started_at = time.time()
815 while True:
816 status = self.status()
817 if rank is not None:
818 mds_info = status.get_rank(self.id, rank)
819 current_state = mds_info['state'] if mds_info else None
820 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
821 elif mds_id is not None:
822 # mds_info is None if no daemon with this ID exists in the map
823 mds_info = status.get_mds(mds_id)
824 current_state = mds_info['state'] if mds_info else None
825 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
826 else:
827 # In general, look for a single MDS
828 states = [m['state'] for m in status.get_ranks(self.id)]
829 if [s for s in states if s == goal_state] == [goal_state]:
830 current_state = goal_state
831 elif reject in states:
832 current_state = reject
833 else:
834 current_state = None
835 log.info("mapped states {0} to {1}".format(states, current_state))
836
837 elapsed = time.time() - started_at
838 if current_state == goal_state:
839 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
840 return elapsed
841 elif reject is not None and current_state == reject:
842 raise RuntimeError("MDS in reject state {0}".format(current_state))
843 elif timeout is not None and elapsed > timeout:
844 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
845 raise RuntimeError(
846 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
847 elapsed, goal_state, current_state
848 ))
849 else:
850 time.sleep(1)
851
852 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
853 mds_id = self.mds_ids[0]
854 remote = self.mds_daemons[mds_id].remote
855 if pool is None:
856 pool = self.get_data_pool_name()
857
858 obj_name = "{0:x}.00000000".format(ino_no)
859
860 args = [
861 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
862 ]
863 try:
864 proc = remote.run(
865 args=args,
866 stdout=StringIO())
867 except CommandFailedError as e:
868 log.error(e.__str__())
869 raise ObjectNotFound(obj_name)
870
871 data = proc.stdout.getvalue()
872
873 p = remote.run(
874 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
875 stdout=StringIO(),
876 stdin=data
877 )
878
879 return json.loads(p.stdout.getvalue().strip())
880
881 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
882 """
883 Write to an xattr of the 0th data object of an inode. Will
884 succeed whether the object and/or xattr already exist or not.
885
886 :param ino_no: integer inode number
887 :param xattr_name: string name of the xattr
888 :param data: byte array data to write to the xattr
889 :param pool: name of data pool or None to use primary data pool
890 :return: None
891 """
892 remote = self.mds_daemons[self.mds_ids[0]].remote
893 if pool is None:
894 pool = self.get_data_pool_name()
895
896 obj_name = "{0:x}.00000000".format(ino_no)
897 args = [
898 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
899 obj_name, xattr_name, data
900 ]
901 remote.run(
902 args=args,
903 stdout=StringIO())
904
905 def read_backtrace(self, ino_no, pool=None):
906 """
907 Read the backtrace from the data pool, return a dict in the format
908 given by inode_backtrace_t::dump, which is something like:
909
910 ::
911
912 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
913 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
914
915 { "ino": 1099511627778,
916 "ancestors": [
917 { "dirino": 1,
918 "dname": "blah",
919 "version": 11}],
920 "pool": 1,
921 "old_pools": []}
922
923 :param pool: name of pool to read backtrace from. If omitted, FS must have only
924 one data pool and that will be used.
925 """
926 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
927
928 def read_layout(self, ino_no, pool=None):
929 """
930 Read 'layout' xattr of an inode and parse the result, returning a dict like:
931 ::
932 {
933 "stripe_unit": 4194304,
934 "stripe_count": 1,
935 "object_size": 4194304,
936 "pool_id": 1,
937 "pool_ns": "",
938 }
939
940 :param pool: name of pool to read backtrace from. If omitted, FS must have only
941 one data pool and that will be used.
942 """
943 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
944
945 def _enumerate_data_objects(self, ino, size):
946 """
947 Get the list of expected data objects for a range, and the list of objects
948 that really exist.
949
950 :return a tuple of two lists of strings (expected, actual)
951 """
952 stripe_size = 1024 * 1024 * 4
953
954 size = max(stripe_size, size)
955
956 want_objects = [
957 "{0:x}.{1:08x}".format(ino, n)
958 for n in range(0, ((size - 1) / stripe_size) + 1)
959 ]
960
961 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
962
963 return want_objects, exist_objects
964
965 def data_objects_present(self, ino, size):
966 """
967 Check that *all* the expected data objects for an inode are present in the data pool
968 """
969
970 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
971 missing = set(want_objects) - set(exist_objects)
972
973 if missing:
974 log.info("Objects missing (ino {0}, size {1}): {2}".format(
975 ino, size, missing
976 ))
977 return False
978 else:
979 log.info("All objects for ino {0} size {1} found".format(ino, size))
980 return True
981
982 def data_objects_absent(self, ino, size):
983 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
984 present = set(want_objects) & set(exist_objects)
985
986 if present:
987 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
988 ino, size, present
989 ))
990 return False
991 else:
992 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
993 return True
994
995 def dirfrag_exists(self, ino, frag):
996 try:
997 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
998 except CommandFailedError as e:
999 return False
1000 else:
1001 return True
1002
1003 def rados(self, args, pool=None, namespace=None, stdin_data=None):
1004 """
1005 Call into the `rados` CLI from an MDS
1006 """
1007
1008 if pool is None:
1009 pool = self.get_metadata_pool_name()
1010
1011 # Doesn't matter which MDS we use to run rados commands, they all
1012 # have access to the pools
1013 mds_id = self.mds_ids[0]
1014 remote = self.mds_daemons[mds_id].remote
1015
1016 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1017 # using the `rados` CLI
1018 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1019 (["--namespace", namespace] if namespace else []) +
1020 args)
1021 p = remote.run(
1022 args=args,
1023 stdin=stdin_data,
1024 stdout=StringIO())
1025 return p.stdout.getvalue().strip()
1026
1027 def list_dirfrag(self, dir_ino):
1028 """
1029 Read the named object and return the list of omap keys
1030
1031 :return a list of 0 or more strings
1032 """
1033
1034 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1035
1036 try:
1037 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1038 except CommandFailedError as e:
1039 log.error(e.__str__())
1040 raise ObjectNotFound(dirfrag_obj_name)
1041
1042 return key_list_str.split("\n") if key_list_str else []
1043
1044 def erase_metadata_objects(self, prefix):
1045 """
1046 For all objects in the metadata pool matching the prefix,
1047 erase them.
1048
1049 This O(N) with the number of objects in the pool, so only suitable
1050 for use on toy test filesystems.
1051 """
1052 all_objects = self.rados(["ls"]).split("\n")
1053 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1054 for o in matching_objects:
1055 self.rados(["rm", o])
1056
1057 def erase_mds_objects(self, rank):
1058 """
1059 Erase all the per-MDS objects for a particular rank. This includes
1060 inotable, sessiontable, journal
1061 """
1062
1063 def obj_prefix(multiplier):
1064 """
1065 MDS object naming conventions like rank 1's
1066 journal is at 201.***
1067 """
1068 return "%x." % (multiplier * 0x100 + rank)
1069
1070 # MDS_INO_LOG_OFFSET
1071 self.erase_metadata_objects(obj_prefix(2))
1072 # MDS_INO_LOG_BACKUP_OFFSET
1073 self.erase_metadata_objects(obj_prefix(3))
1074 # MDS_INO_LOG_POINTER_OFFSET
1075 self.erase_metadata_objects(obj_prefix(4))
1076 # MDSTables & SessionMap
1077 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1078
1079 @property
1080 def _prefix(self):
1081 """
1082 Override this to set a different
1083 """
1084 return ""
1085
1086 def _run_tool(self, tool, args, rank=None, quiet=False):
1087 # Tests frequently have [client] configuration that jacks up
1088 # the objecter log level (unlikely to be interesting here)
1089 # and does not set the mds log level (very interesting here)
1090 if quiet:
1091 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1092 else:
1093 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1094
1095 if rank is not None:
1096 base_args.extend(["--rank", "%d" % rank])
1097
1098 t1 = datetime.datetime.now()
1099 r = self.tool_remote.run(
1100 args=base_args + args,
1101 stdout=StringIO()).stdout.getvalue().strip()
1102 duration = datetime.datetime.now() - t1
1103 log.info("Ran {0} in time {1}, result:\n{2}".format(
1104 base_args + args, duration, r
1105 ))
1106 return r
1107
1108 @property
1109 def tool_remote(self):
1110 """
1111 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1112 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1113 so that tests can use this remote to go get locally written output files from the tools.
1114 """
1115 mds_id = self.mds_ids[0]
1116 return self.mds_daemons[mds_id].remote
1117
1118 def journal_tool(self, args, rank=None, quiet=False):
1119 """
1120 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1121 """
1122 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1123
1124 def table_tool(self, args, quiet=False):
1125 """
1126 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1127 """
1128 return self._run_tool("cephfs-table-tool", args, None, quiet)
1129
1130 def data_scan(self, args, quiet=False, worker_count=1):
1131 """
1132 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1133
1134 :param worker_count: if greater than 1, multiple workers will be run
1135 in parallel and the return value will be None
1136 """
1137
1138 workers = []
1139
1140 for n in range(0, worker_count):
1141 if worker_count > 1:
1142 # data-scan args first token is a command, followed by args to it.
1143 # insert worker arguments after the command.
1144 cmd = args[0]
1145 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1146 else:
1147 worker_args = args
1148
1149 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1150 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1151
1152 for w in workers:
1153 w.get()
1154
1155 if worker_count == 1:
1156 return workers[0].value
1157 else:
1158 return None