]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae
FG
1
2from StringIO import StringIO
3import json
4import logging
5from gevent import Greenlet
6import os
7import time
8import datetime
9import re
10import errno
11
12from teuthology.exceptions import CommandFailedError
13from teuthology import misc
14from teuthology.nuke import clear_firewall
15from teuthology.parallel import parallel
16from tasks.ceph_manager import write_conf
17from tasks import ceph_manager
18
19
20log = logging.getLogger(__name__)
21
22
23DAEMON_WAIT_TIMEOUT = 120
24ROOT_INO = 1
25
26
27class ObjectNotFound(Exception):
28 def __init__(self, object_name):
29 self._object_name = object_name
30
31 def __str__(self):
32 return "Object not found: '{0}'".format(self._object_name)
33
34class FSStatus(object):
35 """
36 Operations on a snapshot of the FSMap.
37 """
38 def __init__(self, mon_manager):
39 self.mon = mon_manager
40 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
41
42 def __str__(self):
43 return json.dumps(self.map, indent = 2, sort_keys = True)
44
45 # Expose the fsmap for manual inspection.
46 def __getitem__(self, key):
47 """
48 Get a field from the fsmap.
49 """
50 return self.map[key]
51
52 def get_filesystems(self):
53 """
54 Iterator for all filesystems.
55 """
56 for fs in self.map['filesystems']:
57 yield fs
58
59 def get_all(self):
60 """
61 Iterator for all the mds_info components in the FSMap.
62 """
63 for info in self.get_standbys():
64 yield info
65 for fs in self.map['filesystems']:
66 for info in fs['mdsmap']['info'].values():
67 yield info
68
69 def get_standbys(self):
70 """
71 Iterator for all standbys.
72 """
73 for info in self.map['standbys']:
74 yield info
75
76 def get_fsmap(self, fscid):
77 """
78 Get the fsmap for the given FSCID.
79 """
80 for fs in self.map['filesystems']:
81 if fscid is None or fs['id'] == fscid:
82 return fs
83 raise RuntimeError("FSCID {0} not in map".format(fscid))
84
85 def get_fsmap_byname(self, name):
86 """
87 Get the fsmap for the given file system name.
88 """
89 for fs in self.map['filesystems']:
90 if name is None or fs['mdsmap']['fs_name'] == name:
91 return fs
92 raise RuntimeError("FS {0} not in map".format(name))
93
94 def get_replays(self, fscid):
95 """
96 Get the standby:replay MDS for the given FSCID.
97 """
98 fs = self.get_fsmap(fscid)
99 for info in fs['mdsmap']['info'].values():
100 if info['state'] == 'up:standby-replay':
101 yield info
102
103 def get_ranks(self, fscid):
104 """
105 Get the ranks for the given FSCID.
106 """
107 fs = self.get_fsmap(fscid)
108 for info in fs['mdsmap']['info'].values():
109 if info['rank'] >= 0:
110 yield info
111
112 def get_rank(self, fscid, rank):
113 """
114 Get the rank for the given FSCID.
115 """
116 for info in self.get_ranks(fscid):
117 if info['rank'] == rank:
118 return info
119 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
120
121 def get_mds(self, name):
122 """
123 Get the info for the given MDS name.
124 """
125 for info in self.get_all():
126 if info['name'] == name:
127 return info
128 return None
129
130 def get_mds_addr(self, name):
131 """
132 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
133 """
134 info = self.get_mds(name)
135 if info:
136 return info['addr']
137 else:
138 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
139 raise RuntimeError("MDS id '{0}' not found in map".format(name))
140
141class CephCluster(object):
142 @property
143 def admin_remote(self):
144 first_mon = misc.get_first_mon(self._ctx, None)
145 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
146 return result
147
148 def __init__(self, ctx):
149 self._ctx = ctx
150 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
151
152 def get_config(self, key, service_type=None):
153 """
154 Get config from mon by default, or a specific service if caller asks for it
155 """
156 if service_type is None:
157 service_type = 'mon'
158
159 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
160 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
161
162 def set_ceph_conf(self, subsys, key, value):
163 if subsys not in self._ctx.ceph['ceph'].conf:
164 self._ctx.ceph['ceph'].conf[subsys] = {}
165 self._ctx.ceph['ceph'].conf[subsys][key] = value
166 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
167 # used a different config path this won't work.
168
169 def clear_ceph_conf(self, subsys, key):
170 del self._ctx.ceph['ceph'].conf[subsys][key]
171 write_conf(self._ctx)
172
173 def json_asok(self, command, service_type, service_id):
174 proc = self.mon_manager.admin_socket(service_type, service_id, command)
175 response_data = proc.stdout.getvalue()
176 log.info("_json_asok output: {0}".format(response_data))
177 if response_data.strip():
178 return json.loads(response_data)
179 else:
180 return None
181
182
183class MDSCluster(CephCluster):
184 """
185 Collective operations on all the MDS daemons in the Ceph cluster. These
186 daemons may be in use by various Filesystems.
187
188 For the benefit of pre-multi-filesystem tests, this class is also
189 a parent of Filesystem. The correct way to use MDSCluster going forward is
190 as a separate instance outside of your (multiple) Filesystem instances.
191 """
192 def __init__(self, ctx):
193 super(MDSCluster, self).__init__(ctx)
194
195 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
196
197 if len(self.mds_ids) == 0:
198 raise RuntimeError("This task requires at least one MDS")
199
200 if hasattr(self._ctx, "daemons"):
201 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
202 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
203
204 def _one_or_all(self, mds_id, cb, in_parallel=True):
205 """
206 Call a callback for a single named MDS, or for all.
207
208 Note that the parallelism here isn't for performance, it's to avoid being overly kind
209 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
210 avoid being overly kind by executing them in a particular order. However, some actions
211 don't cope with being done in parallel, so it's optional (`in_parallel`)
212
213 :param mds_id: MDS daemon name, or None
214 :param cb: Callback taking single argument of MDS daemon name
215 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
216 """
217 if mds_id is None:
218 if in_parallel:
219 with parallel() as p:
220 for mds_id in self.mds_ids:
221 p.spawn(cb, mds_id)
222 else:
223 for mds_id in self.mds_ids:
224 cb(mds_id)
225 else:
226 cb(mds_id)
227
228 def mds_stop(self, mds_id=None):
229 """
230 Stop the MDS daemon process(se). If it held a rank, that rank
231 will eventually go laggy.
232 """
233 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
234
235 def mds_fail(self, mds_id=None):
236 """
237 Inform MDSMonitor of the death of the daemon process(es). If it held
238 a rank, that rank will be relinquished.
239 """
240 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
241
242 def mds_restart(self, mds_id=None):
243 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
244
245 def mds_fail_restart(self, mds_id=None):
246 """
247 Variation on restart that includes marking MDSs as failed, so that doing this
248 operation followed by waiting for healthy daemon states guarantees that they
249 have gone down and come up, rather than potentially seeing the healthy states
250 that existed before the restart.
251 """
252 def _fail_restart(id_):
253 self.mds_daemons[id_].stop()
254 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
255 self.mds_daemons[id_].restart()
256
257 self._one_or_all(mds_id, _fail_restart)
258
259 def newfs(self, name):
260 return Filesystem(self._ctx, create=name)
261
262 def status(self):
263 return FSStatus(self.mon_manager)
264
265 def delete_all_filesystems(self):
266 """
267 Remove all filesystems that exist, and any pools in use by them.
268 """
269 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
270 pool_id_name = {}
271 for pool in pools:
272 pool_id_name[pool['pool']] = pool['pool_name']
273
274 # mark cluster down for each fs to prevent churn during deletion
275 status = self.status()
276 for fs in status.get_filesystems():
277 self.mon_manager.raw_cluster_cmd("fs", "set", fs['mdsmap']['fs_name'], "cluster_down", "true")
278
279 # get a new copy as actives may have since changed
280 status = self.status()
281 for fs in status.get_filesystems():
282 mdsmap = fs['mdsmap']
283 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
284
285 for gid in mdsmap['up'].values():
286 self.mon_manager.raw_cluster_cmd('mds', 'fail', gid.__str__())
287
288 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
289 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
290 metadata_pool, metadata_pool,
291 '--yes-i-really-really-mean-it')
292 for data_pool in mdsmap['data_pools']:
293 data_pool = pool_id_name[data_pool]
294 try:
295 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
296 data_pool, data_pool,
297 '--yes-i-really-really-mean-it')
298 except CommandFailedError as e:
299 if e.exitstatus == 16: # EBUSY, this data pool is used
300 pass # by two metadata pools, let the 2nd
301 else: # pass delete it
302 raise
303
304 def get_standby_daemons(self):
305 return set([s['name'] for s in self.status().get_standbys()])
306
307 def get_mds_hostnames(self):
308 result = set()
309 for mds_id in self.mds_ids:
310 mds_remote = self.mon_manager.find_remote('mds', mds_id)
311 result.add(mds_remote.hostname)
312
313 return list(result)
314
315 def set_clients_block(self, blocked, mds_id=None):
316 """
317 Block (using iptables) client communications to this MDS. Be careful: if
318 other services are running on this MDS, or other MDSs try to talk to this
319 MDS, their communications may also be blocked as collatoral damage.
320
321 :param mds_id: Optional ID of MDS to block, default to all
322 :return:
323 """
324 da_flag = "-A" if blocked else "-D"
325
326 def set_block(_mds_id):
327 remote = self.mon_manager.find_remote('mds', _mds_id)
328 status = self.status()
329
330 addr = status.get_mds_addr(_mds_id)
331 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
332
333 remote.run(
334 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
335 "comment", "--comment", "teuthology"])
336 remote.run(
337 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
338 "comment", "--comment", "teuthology"])
339
340 self._one_or_all(mds_id, set_block, in_parallel=False)
341
342 def clear_firewall(self):
343 clear_firewall(self._ctx)
344
345 def get_mds_info(self, mds_id):
346 return FSStatus(self.mon_manager).get_mds(mds_id)
347
348 def is_full(self):
349 flags = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['flags']
350 return 'full' in flags
351
352 def is_pool_full(self, pool_name):
353 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
354 for pool in pools:
355 if pool['pool_name'] == pool_name:
356 return 'full' in pool['flags_names'].split(",")
357
358 raise RuntimeError("Pool not found '{0}'".format(pool_name))
359
360class Filesystem(MDSCluster):
361 """
362 This object is for driving a CephFS filesystem. The MDS daemons driven by
363 MDSCluster may be shared with other Filesystems.
364 """
365 def __init__(self, ctx, fscid=None, create=None):
366 super(Filesystem, self).__init__(ctx)
367
368 self.id = None
369 self.name = None
370 self.metadata_pool_name = None
371 self.data_pools = None
372
373 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
374 self.client_id = client_list[0]
375 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
376
377 if create is not None:
378 if fscid is not None:
379 raise RuntimeError("cannot specify fscid when creating fs")
380 if create is True:
381 self.name = 'cephfs'
382 else:
383 self.name = create
384 if not self.legacy_configured():
385 self.create()
386 elif fscid is not None:
387 self.id = fscid
388 self.getinfo(refresh = True)
389
390 # Stash a reference to the first created filesystem on ctx, so
391 # that if someone drops to the interactive shell they can easily
392 # poke our methods.
393 if not hasattr(self._ctx, "filesystem"):
394 self._ctx.filesystem = self
395
396 def getinfo(self, refresh = False):
397 status = self.status()
398 if self.id is not None:
399 fsmap = status.get_fsmap(self.id)
400 elif self.name is not None:
401 fsmap = status.get_fsmap_byname(self.name)
402 else:
403 fss = [fs for fs in status.get_filesystems()]
404 if len(fss) == 1:
405 fsmap = fss[0]
406 elif len(fss) == 0:
407 raise RuntimeError("no file system available")
408 else:
409 raise RuntimeError("more than one file system available")
410 self.id = fsmap['id']
411 self.name = fsmap['mdsmap']['fs_name']
412 self.get_pool_names(status = status, refresh = refresh)
413 return status
414
415 def deactivate(self, rank):
416 if rank < 0:
417 raise RuntimeError("invalid rank")
418 elif rank == 0:
419 raise RuntimeError("cannot deactivate rank 0")
420 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
421
422 def set_max_mds(self, max_mds):
423 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "max_mds", "%d" % max_mds)
424
425 def set_allow_dirfrags(self, yes):
426 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_dirfrags", str(yes).lower(), '--yes-i-really-mean-it')
427
428 def set_allow_multimds(self, yes):
429 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, "allow_multimds", str(yes).lower(), '--yes-i-really-mean-it')
430
431 def get_pgs_per_fs_pool(self):
432 """
433 Calculate how many PGs to use when creating a pool, in order to avoid raising any
434 health warnings about mon_pg_warn_min_per_osd
435
436 :return: an integer number of PGs
437 """
438 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
439 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
440 return pg_warn_min_per_osd * osd_count
441
442 def create(self):
443 if self.name is None:
444 self.name = "cephfs"
445 if self.metadata_pool_name is None:
446 self.metadata_pool_name = "{0}_metadata".format(self.name)
447 data_pool_name = "{0}_data".format(self.name)
448
449 log.info("Creating filesystem '{0}'".format(self.name))
450
451 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
452
453 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
454 self.metadata_pool_name, pgs_per_fs_pool.__str__())
455 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
456 data_pool_name, pgs_per_fs_pool.__str__())
457 self.mon_manager.raw_cluster_cmd('fs', 'new',
458 self.name, self.metadata_pool_name, data_pool_name)
459 # Turn off spurious standby count warnings from modifying max_mds in tests.
460 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
461
462 self.getinfo(refresh = True)
463
464 def __del__(self):
465 if getattr(self._ctx, "filesystem", None) == self:
466 delattr(self._ctx, "filesystem")
467
468 def exists(self):
469 """
470 Whether a filesystem exists in the mon's filesystem list
471 """
472 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
473 return self.name in [fs['name'] for fs in fs_list]
474
475 def legacy_configured(self):
476 """
477 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
478 the case, the caller should avoid using Filesystem.create
479 """
480 try:
481 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
482 pools = json.loads(out_text)
483 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
484 if metadata_pool_exists:
485 self.metadata_pool_name = 'metadata'
486 except CommandFailedError as e:
487 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
488 # structured output (--format) from the CLI.
489 if e.exitstatus == 22:
490 metadata_pool_exists = True
491 else:
492 raise
493
494 return metadata_pool_exists
495
496 def _df(self):
497 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
498
499 def get_mds_map(self):
500 return self.status().get_fsmap(self.id)['mdsmap']
501
502 def add_data_pool(self, name):
503 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
504 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
505 self.get_pool_names(refresh = True)
506 for poolid, fs_name in self.data_pools.items():
507 if name == fs_name:
508 return poolid
509 raise RuntimeError("could not get just created pool '{0}'".format(name))
510
511 def get_pool_names(self, refresh = False, status = None):
512 if refresh or self.metadata_pool_name is None or self.data_pools is None:
513 if status is None:
514 status = self.status()
515 fsmap = status.get_fsmap(self.id)
516
517 osd_map = self.mon_manager.get_osd_dump_json()
518 id_to_name = {}
519 for p in osd_map['pools']:
520 id_to_name[p['pool']] = p['pool_name']
521
522 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
523 self.data_pools = {}
524 for data_pool in fsmap['mdsmap']['data_pools']:
525 self.data_pools[data_pool] = id_to_name[data_pool]
526
527 def get_data_pool_name(self, refresh = False):
528 if refresh or self.data_pools is None:
529 self.get_pool_names(refresh = True)
530 assert(len(self.data_pools) == 1)
531 return self.data_pools.values()[0]
532
533 def get_data_pool_id(self, refresh = False):
534 """
535 Don't call this if you have multiple data pools
536 :return: integer
537 """
538 if refresh or self.data_pools is None:
539 self.get_pool_names(refresh = True)
540 assert(len(self.data_pools) == 1)
541 return self.data_pools.keys()[0]
542
543 def get_data_pool_names(self, refresh = False):
544 if refresh or self.data_pools is None:
545 self.get_pool_names(refresh = True)
546 return self.data_pools.values()
547
548 def get_metadata_pool_name(self):
549 return self.metadata_pool_name
550
551 def get_namespace_id(self):
552 return self.id
553
554 def get_pool_df(self, pool_name):
555 """
556 Return a dict like:
557 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
558 """
559 for pool_df in self._df()['pools']:
560 if pool_df['name'] == pool_name:
561 return pool_df['stats']
562
563 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
564
565 def get_usage(self):
566 return self._df()['stats']['total_used_bytes']
567
568 def are_daemons_healthy(self):
569 """
570 Return true if all daemons are in one of active, standby, standby-replay, and
571 at least max_mds daemons are in 'active'.
572
573 Unlike most of Filesystem, this function is tolerant of new-style `fs`
574 commands being missing, because we are part of the ceph installation
575 process during upgrade suites, so must fall back to old style commands
576 when we get an EINVAL on a new style command.
577
578 :return:
579 """
580
581 active_count = 0
582 try:
583 mds_map = self.get_mds_map()
584 except CommandFailedError as cfe:
585 # Old version, fall back to non-multi-fs commands
586 if cfe.exitstatus == errno.EINVAL:
587 mds_map = json.loads(
588 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
589 else:
590 raise
591
592 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
593
594 for mds_id, mds_status in mds_map['info'].items():
595 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
596 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
597 return False
598 elif mds_status['state'] == 'up:active':
599 active_count += 1
600
601 log.info("are_daemons_healthy: {0}/{1}".format(
602 active_count, mds_map['max_mds']
603 ))
604
605 if active_count >= mds_map['max_mds']:
606 # The MDSMap says these guys are active, but let's check they really are
607 for mds_id, mds_status in mds_map['info'].items():
608 if mds_status['state'] == 'up:active':
609 try:
610 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
611 except CommandFailedError as cfe:
612 if cfe.exitstatus == errno.EINVAL:
613 # Old version, can't do this check
614 continue
615 else:
616 # MDS not even running
617 return False
618
619 if daemon_status['state'] != 'up:active':
620 # MDS hasn't taken the latest map yet
621 return False
622
623 return True
624 else:
625 return False
626
627 def get_daemon_names(self, state=None):
628 """
629 Return MDS daemon names of those daemons in the given state
630 :param state:
631 :return:
632 """
633 status = self.get_mds_map()
634 result = []
635 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
636 if mds_status['state'] == state or state is None:
637 result.append(mds_status['name'])
638
639 return result
640
641 def get_active_names(self):
642 """
643 Return MDS daemon names of those daemons holding ranks
644 in state up:active
645
646 :return: list of strings like ['a', 'b'], sorted by rank
647 """
648 return self.get_daemon_names("up:active")
649
650 def get_all_mds_rank(self):
651 status = self.get_mds_map()
652 result = []
653 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
654 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
655 result.append(mds_status['rank'])
656
657 return result
658
659 def get_rank_names(self):
660 """
661 Return MDS daemon names of those daemons holding a rank,
662 sorted by rank. This includes e.g. up:replay/reconnect
663 as well as active, but does not include standby or
664 standby-replay.
665 """
666 status = self.get_mds_map()
667 result = []
668 for mds_status in sorted(status['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
669 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
670 result.append(mds_status['name'])
671
672 return result
673
674 def wait_for_daemons(self, timeout=None):
675 """
676 Wait until all daemons are healthy
677 :return:
678 """
679
680 if timeout is None:
681 timeout = DAEMON_WAIT_TIMEOUT
682
683 elapsed = 0
684 while True:
685 if self.are_daemons_healthy():
686 return
687 else:
688 time.sleep(1)
689 elapsed += 1
690
691 if elapsed > timeout:
692 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
693
694 def get_lone_mds_id(self):
695 """
696 Get a single MDS ID: the only one if there is only one
697 configured, else the only one currently holding a rank,
698 else raise an error.
699 """
700 if len(self.mds_ids) != 1:
701 alive = self.get_rank_names()
702 if len(alive) == 1:
703 return alive[0]
704 else:
705 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
706 else:
707 return self.mds_ids[0]
708
709 def recreate(self):
710 log.info("Creating new filesystem")
711 self.delete_all_filesystems()
712 self.id = None
713 self.create()
714
715 def put_metadata_object_raw(self, object_id, infile):
716 """
717 Save an object to the metadata pool
718 """
719 temp_bin_path = infile
720 self.client_remote.run(args=[
721 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
722 ])
723
724 def get_metadata_object_raw(self, object_id):
725 """
726 Retrieve an object from the metadata pool and store it in a file.
727 """
728 temp_bin_path = '/tmp/' + object_id + '.bin'
729
730 self.client_remote.run(args=[
731 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
732 ])
733
734 return temp_bin_path
735
736 def get_metadata_object(self, object_type, object_id):
737 """
738 Retrieve an object from the metadata pool, pass it through
739 ceph-dencoder to dump it to JSON, and return the decoded object.
740 """
741 temp_bin_path = '/tmp/out.bin'
742
743 self.client_remote.run(args=[
744 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
745 ])
746
747 stdout = StringIO()
748 self.client_remote.run(args=[
749 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
750 ], stdout=stdout)
751 dump_json = stdout.getvalue().strip()
752 try:
753 dump = json.loads(dump_json)
754 except (TypeError, ValueError):
755 log.error("Failed to decode JSON: '{0}'".format(dump_json))
756 raise
757
758 return dump
759
760 def get_journal_version(self):
761 """
762 Read the JournalPointer and Journal::Header objects to learn the version of
763 encoding in use.
764 """
765 journal_pointer_object = '400.00000000'
766 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
767 journal_ino = journal_pointer_dump['journal_pointer']['front']
768
769 journal_header_object = "{0:x}.00000000".format(journal_ino)
770 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
771
772 version = journal_header_dump['journal_header']['stream_format']
773 log.info("Read journal version {0}".format(version))
774
775 return version
776
777 def mds_asok(self, command, mds_id=None):
778 if mds_id is None:
779 mds_id = self.get_lone_mds_id()
780
781 return self.json_asok(command, 'mds', mds_id)
782
783 def read_cache(self, path, depth=None):
784 cmd = ["dump", "tree", path]
785 if depth is not None:
786 cmd.append(depth.__str__())
787 result = self.mds_asok(cmd)
788 if len(result) == 0:
789 raise RuntimeError("Path not found in cache: {0}".format(path))
790
791 return result
792
793 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
794 """
795 Block until the MDS reaches a particular state, or a failure condition
796 is met.
797
798 When there are multiple MDSs, succeed when exaclty one MDS is in the
799 goal state, or fail when any MDS is in the reject state.
800
801 :param goal_state: Return once the MDS is in this state
802 :param reject: Fail if the MDS enters this state before the goal state
803 :param timeout: Fail if this many seconds pass before reaching goal
804 :return: number of seconds waited, rounded down to integer
805 """
806
807 started_at = time.time()
808 while True:
809 status = self.status()
810 if rank is not None:
811 mds_info = status.get_rank(self.id, rank)
812 current_state = mds_info['state'] if mds_info else None
813 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
814 elif mds_id is not None:
815 # mds_info is None if no daemon with this ID exists in the map
816 mds_info = status.get_mds(mds_id)
817 current_state = mds_info['state'] if mds_info else None
818 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
819 else:
820 # In general, look for a single MDS
821 states = [m['state'] for m in status.get_ranks(self.id)]
822 if [s for s in states if s == goal_state] == [goal_state]:
823 current_state = goal_state
824 elif reject in states:
825 current_state = reject
826 else:
827 current_state = None
828 log.info("mapped states {0} to {1}".format(states, current_state))
829
830 elapsed = time.time() - started_at
831 if current_state == goal_state:
832 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
833 return elapsed
834 elif reject is not None and current_state == reject:
835 raise RuntimeError("MDS in reject state {0}".format(current_state))
836 elif timeout is not None and elapsed > timeout:
837 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
838 raise RuntimeError(
839 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
840 elapsed, goal_state, current_state
841 ))
842 else:
843 time.sleep(1)
844
845 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
846 mds_id = self.mds_ids[0]
847 remote = self.mds_daemons[mds_id].remote
848 if pool is None:
849 pool = self.get_data_pool_name()
850
851 obj_name = "{0:x}.00000000".format(ino_no)
852
853 args = [
854 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
855 ]
856 try:
857 proc = remote.run(
858 args=args,
859 stdout=StringIO())
860 except CommandFailedError as e:
861 log.error(e.__str__())
862 raise ObjectNotFound(obj_name)
863
864 data = proc.stdout.getvalue()
865
866 p = remote.run(
867 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
868 stdout=StringIO(),
869 stdin=data
870 )
871
872 return json.loads(p.stdout.getvalue().strip())
873
874 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
875 """
876 Write to an xattr of the 0th data object of an inode. Will
877 succeed whether the object and/or xattr already exist or not.
878
879 :param ino_no: integer inode number
880 :param xattr_name: string name of the xattr
881 :param data: byte array data to write to the xattr
882 :param pool: name of data pool or None to use primary data pool
883 :return: None
884 """
885 remote = self.mds_daemons[self.mds_ids[0]].remote
886 if pool is None:
887 pool = self.get_data_pool_name()
888
889 obj_name = "{0:x}.00000000".format(ino_no)
890 args = [
891 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
892 obj_name, xattr_name, data
893 ]
894 remote.run(
895 args=args,
896 stdout=StringIO())
897
898 def read_backtrace(self, ino_no, pool=None):
899 """
900 Read the backtrace from the data pool, return a dict in the format
901 given by inode_backtrace_t::dump, which is something like:
902
903 ::
904
905 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
906 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
907
908 { "ino": 1099511627778,
909 "ancestors": [
910 { "dirino": 1,
911 "dname": "blah",
912 "version": 11}],
913 "pool": 1,
914 "old_pools": []}
915
916 :param pool: name of pool to read backtrace from. If omitted, FS must have only
917 one data pool and that will be used.
918 """
919 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
920
921 def read_layout(self, ino_no, pool=None):
922 """
923 Read 'layout' xattr of an inode and parse the result, returning a dict like:
924 ::
925 {
926 "stripe_unit": 4194304,
927 "stripe_count": 1,
928 "object_size": 4194304,
929 "pool_id": 1,
930 "pool_ns": "",
931 }
932
933 :param pool: name of pool to read backtrace from. If omitted, FS must have only
934 one data pool and that will be used.
935 """
936 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
937
938 def _enumerate_data_objects(self, ino, size):
939 """
940 Get the list of expected data objects for a range, and the list of objects
941 that really exist.
942
943 :return a tuple of two lists of strings (expected, actual)
944 """
945 stripe_size = 1024 * 1024 * 4
946
947 size = max(stripe_size, size)
948
949 want_objects = [
950 "{0:x}.{1:08x}".format(ino, n)
951 for n in range(0, ((size - 1) / stripe_size) + 1)
952 ]
953
954 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
955
956 return want_objects, exist_objects
957
958 def data_objects_present(self, ino, size):
959 """
960 Check that *all* the expected data objects for an inode are present in the data pool
961 """
962
963 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
964 missing = set(want_objects) - set(exist_objects)
965
966 if missing:
967 log.info("Objects missing (ino {0}, size {1}): {2}".format(
968 ino, size, missing
969 ))
970 return False
971 else:
972 log.info("All objects for ino {0} size {1} found".format(ino, size))
973 return True
974
975 def data_objects_absent(self, ino, size):
976 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
977 present = set(want_objects) & set(exist_objects)
978
979 if present:
980 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
981 ino, size, present
982 ))
983 return False
984 else:
985 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
986 return True
987
988 def dirfrag_exists(self, ino, frag):
989 try:
990 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
991 except CommandFailedError as e:
992 return False
993 else:
994 return True
995
996 def rados(self, args, pool=None, namespace=None, stdin_data=None):
997 """
998 Call into the `rados` CLI from an MDS
999 """
1000
1001 if pool is None:
1002 pool = self.get_metadata_pool_name()
1003
1004 # Doesn't matter which MDS we use to run rados commands, they all
1005 # have access to the pools
1006 mds_id = self.mds_ids[0]
1007 remote = self.mds_daemons[mds_id].remote
1008
1009 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1010 # using the `rados` CLI
1011 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1012 (["--namespace", namespace] if namespace else []) +
1013 args)
1014 p = remote.run(
1015 args=args,
1016 stdin=stdin_data,
1017 stdout=StringIO())
1018 return p.stdout.getvalue().strip()
1019
1020 def list_dirfrag(self, dir_ino):
1021 """
1022 Read the named object and return the list of omap keys
1023
1024 :return a list of 0 or more strings
1025 """
1026
1027 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1028
1029 try:
1030 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1031 except CommandFailedError as e:
1032 log.error(e.__str__())
1033 raise ObjectNotFound(dirfrag_obj_name)
1034
1035 return key_list_str.split("\n") if key_list_str else []
1036
1037 def erase_metadata_objects(self, prefix):
1038 """
1039 For all objects in the metadata pool matching the prefix,
1040 erase them.
1041
1042 This O(N) with the number of objects in the pool, so only suitable
1043 for use on toy test filesystems.
1044 """
1045 all_objects = self.rados(["ls"]).split("\n")
1046 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1047 for o in matching_objects:
1048 self.rados(["rm", o])
1049
1050 def erase_mds_objects(self, rank):
1051 """
1052 Erase all the per-MDS objects for a particular rank. This includes
1053 inotable, sessiontable, journal
1054 """
1055
1056 def obj_prefix(multiplier):
1057 """
1058 MDS object naming conventions like rank 1's
1059 journal is at 201.***
1060 """
1061 return "%x." % (multiplier * 0x100 + rank)
1062
1063 # MDS_INO_LOG_OFFSET
1064 self.erase_metadata_objects(obj_prefix(2))
1065 # MDS_INO_LOG_BACKUP_OFFSET
1066 self.erase_metadata_objects(obj_prefix(3))
1067 # MDS_INO_LOG_POINTER_OFFSET
1068 self.erase_metadata_objects(obj_prefix(4))
1069 # MDSTables & SessionMap
1070 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1071
1072 @property
1073 def _prefix(self):
1074 """
1075 Override this to set a different
1076 """
1077 return ""
1078
1079 def _run_tool(self, tool, args, rank=None, quiet=False):
1080 # Tests frequently have [client] configuration that jacks up
1081 # the objecter log level (unlikely to be interesting here)
1082 # and does not set the mds log level (very interesting here)
1083 if quiet:
1084 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1085 else:
1086 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1087
1088 if rank is not None:
1089 base_args.extend(["--rank", "%d" % rank])
1090
1091 t1 = datetime.datetime.now()
1092 r = self.tool_remote.run(
1093 args=base_args + args,
1094 stdout=StringIO()).stdout.getvalue().strip()
1095 duration = datetime.datetime.now() - t1
1096 log.info("Ran {0} in time {1}, result:\n{2}".format(
1097 base_args + args, duration, r
1098 ))
1099 return r
1100
1101 @property
1102 def tool_remote(self):
1103 """
1104 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1105 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1106 so that tests can use this remote to go get locally written output files from the tools.
1107 """
1108 mds_id = self.mds_ids[0]
1109 return self.mds_daemons[mds_id].remote
1110
1111 def journal_tool(self, args, rank=None, quiet=False):
1112 """
1113 Invoke cephfs-journal-tool with the passed arguments, and return its stdout
1114 """
1115 return self._run_tool("cephfs-journal-tool", args, rank, quiet)
1116
1117 def table_tool(self, args, quiet=False):
1118 """
1119 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1120 """
1121 return self._run_tool("cephfs-table-tool", args, None, quiet)
1122
1123 def data_scan(self, args, quiet=False, worker_count=1):
1124 """
1125 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1126
1127 :param worker_count: if greater than 1, multiple workers will be run
1128 in parallel and the return value will be None
1129 """
1130
1131 workers = []
1132
1133 for n in range(0, worker_count):
1134 if worker_count > 1:
1135 # data-scan args first token is a command, followed by args to it.
1136 # insert worker arguments after the command.
1137 cmd = args[0]
1138 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1139 else:
1140 worker_args = args
1141
1142 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1143 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1144
1145 for w in workers:
1146 w.get()
1147
1148 if worker_count == 1:
1149 return workers[0].value
1150 else:
1151 return None