]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
import 15.2.4
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
11fdf7f2 11import traceback
7c673cae 12
e306af50
TL
13from io import BytesIO
14from six import StringIO
15
7c673cae
FG
16from teuthology.exceptions import CommandFailedError
17from teuthology import misc
18from teuthology.nuke import clear_firewall
19from teuthology.parallel import parallel
20from tasks.ceph_manager import write_conf
21from tasks import ceph_manager
22
23
24log = logging.getLogger(__name__)
25
26
27DAEMON_WAIT_TIMEOUT = 120
28ROOT_INO = 1
29
92f5a8d4
TL
30class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
7c673cae
FG
54
55class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
62class FSStatus(object):
63 """
64 Operations on a snapshot of the FSMap.
65 """
66 def __init__(self, mon_manager):
67 self.mon = mon_manager
68 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
69
70 def __str__(self):
71 return json.dumps(self.map, indent = 2, sort_keys = True)
72
73 # Expose the fsmap for manual inspection.
74 def __getitem__(self, key):
75 """
76 Get a field from the fsmap.
77 """
78 return self.map[key]
79
80 def get_filesystems(self):
81 """
82 Iterator for all filesystems.
83 """
84 for fs in self.map['filesystems']:
85 yield fs
86
87 def get_all(self):
88 """
89 Iterator for all the mds_info components in the FSMap.
90 """
9f95a23c 91 for info in self.map['standbys']:
7c673cae
FG
92 yield info
93 for fs in self.map['filesystems']:
94 for info in fs['mdsmap']['info'].values():
95 yield info
96
97 def get_standbys(self):
98 """
99 Iterator for all standbys.
100 """
101 for info in self.map['standbys']:
102 yield info
103
104 def get_fsmap(self, fscid):
105 """
106 Get the fsmap for the given FSCID.
107 """
108 for fs in self.map['filesystems']:
109 if fscid is None or fs['id'] == fscid:
110 return fs
111 raise RuntimeError("FSCID {0} not in map".format(fscid))
112
113 def get_fsmap_byname(self, name):
114 """
115 Get the fsmap for the given file system name.
116 """
117 for fs in self.map['filesystems']:
118 if name is None or fs['mdsmap']['fs_name'] == name:
119 return fs
120 raise RuntimeError("FS {0} not in map".format(name))
121
122 def get_replays(self, fscid):
123 """
124 Get the standby:replay MDS for the given FSCID.
125 """
126 fs = self.get_fsmap(fscid)
127 for info in fs['mdsmap']['info'].values():
128 if info['state'] == 'up:standby-replay':
129 yield info
130
131 def get_ranks(self, fscid):
132 """
133 Get the ranks for the given FSCID.
134 """
135 fs = self.get_fsmap(fscid)
136 for info in fs['mdsmap']['info'].values():
11fdf7f2 137 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
138 yield info
139
140 def get_rank(self, fscid, rank):
141 """
142 Get the rank for the given FSCID.
143 """
144 for info in self.get_ranks(fscid):
145 if info['rank'] == rank:
146 return info
147 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
148
149 def get_mds(self, name):
150 """
151 Get the info for the given MDS name.
152 """
153 for info in self.get_all():
154 if info['name'] == name:
155 return info
156 return None
157
158 def get_mds_addr(self, name):
159 """
160 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
161 """
162 info = self.get_mds(name)
163 if info:
164 return info['addr']
165 else:
e306af50 166 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
167 raise RuntimeError("MDS id '{0}' not found in map".format(name))
168
9f95a23c
TL
169 def get_mds_gid(self, gid):
170 """
171 Get the info for the given MDS gid.
172 """
173 for info in self.get_all():
174 if info['gid'] == gid:
175 return info
176 return None
177
178 def hadfailover(self, status):
179 """
180 Compares two statuses for mds failovers.
181 Returns True if there is a failover.
182 """
183 for fs in status.map['filesystems']:
184 for info in fs['mdsmap']['info'].values():
185 oldinfo = self.get_mds_gid(info['gid'])
186 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
187 return True
188 #all matching
189 return False
190
7c673cae
FG
191class CephCluster(object):
192 @property
193 def admin_remote(self):
194 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 195 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
196 return result
197
198 def __init__(self, ctx):
199 self._ctx = ctx
200 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
201
202 def get_config(self, key, service_type=None):
203 """
204 Get config from mon by default, or a specific service if caller asks for it
205 """
206 if service_type is None:
207 service_type = 'mon'
208
209 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
210 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
211
212 def set_ceph_conf(self, subsys, key, value):
213 if subsys not in self._ctx.ceph['ceph'].conf:
214 self._ctx.ceph['ceph'].conf[subsys] = {}
215 self._ctx.ceph['ceph'].conf[subsys][key] = value
216 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
217 # used a different config path this won't work.
218
219 def clear_ceph_conf(self, subsys, key):
220 del self._ctx.ceph['ceph'].conf[subsys][key]
221 write_conf(self._ctx)
222
f64942e4
AA
223 def json_asok(self, command, service_type, service_id, timeout=None):
224 if timeout is None:
225 timeout = 15*60
226 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
7c673cae
FG
227 response_data = proc.stdout.getvalue()
228 log.info("_json_asok output: {0}".format(response_data))
229 if response_data.strip():
230 return json.loads(response_data)
231 else:
232 return None
233
234
235class MDSCluster(CephCluster):
236 """
237 Collective operations on all the MDS daemons in the Ceph cluster. These
238 daemons may be in use by various Filesystems.
239
240 For the benefit of pre-multi-filesystem tests, this class is also
241 a parent of Filesystem. The correct way to use MDSCluster going forward is
242 as a separate instance outside of your (multiple) Filesystem instances.
243 """
244 def __init__(self, ctx):
245 super(MDSCluster, self).__init__(ctx)
246
247 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
248
249 if len(self.mds_ids) == 0:
250 raise RuntimeError("This task requires at least one MDS")
251
252 if hasattr(self._ctx, "daemons"):
253 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
254 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
255
256 def _one_or_all(self, mds_id, cb, in_parallel=True):
257 """
258 Call a callback for a single named MDS, or for all.
259
260 Note that the parallelism here isn't for performance, it's to avoid being overly kind
261 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
262 avoid being overly kind by executing them in a particular order. However, some actions
263 don't cope with being done in parallel, so it's optional (`in_parallel`)
264
265 :param mds_id: MDS daemon name, or None
266 :param cb: Callback taking single argument of MDS daemon name
267 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
268 """
269 if mds_id is None:
270 if in_parallel:
271 with parallel() as p:
272 for mds_id in self.mds_ids:
273 p.spawn(cb, mds_id)
274 else:
275 for mds_id in self.mds_ids:
276 cb(mds_id)
277 else:
278 cb(mds_id)
279
181888fb
FG
280 def get_config(self, key, service_type=None):
281 """
282 get_config specialization of service_type="mds"
283 """
284 if service_type != "mds":
285 return super(MDSCluster, self).get_config(key, service_type)
286
287 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
288 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
289 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
290 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
291
7c673cae
FG
292 def mds_stop(self, mds_id=None):
293 """
294 Stop the MDS daemon process(se). If it held a rank, that rank
295 will eventually go laggy.
296 """
297 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
298
299 def mds_fail(self, mds_id=None):
300 """
301 Inform MDSMonitor of the death of the daemon process(es). If it held
302 a rank, that rank will be relinquished.
303 """
304 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
305
306 def mds_restart(self, mds_id=None):
307 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
308
309 def mds_fail_restart(self, mds_id=None):
310 """
311 Variation on restart that includes marking MDSs as failed, so that doing this
312 operation followed by waiting for healthy daemon states guarantees that they
313 have gone down and come up, rather than potentially seeing the healthy states
314 that existed before the restart.
315 """
316 def _fail_restart(id_):
317 self.mds_daemons[id_].stop()
318 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
319 self.mds_daemons[id_].restart()
320
321 self._one_or_all(mds_id, _fail_restart)
322
1adf2230
AA
323 def mds_signal(self, mds_id, sig, silent=False):
324 """
325 signal a MDS daemon
326 """
327 self.mds_daemons[mds_id].signal(sig, silent);
328
181888fb
FG
329 def newfs(self, name='cephfs', create=True):
330 return Filesystem(self._ctx, name=name, create=create)
7c673cae
FG
331
332 def status(self):
333 return FSStatus(self.mon_manager)
334
335 def delete_all_filesystems(self):
336 """
337 Remove all filesystems that exist, and any pools in use by them.
338 """
339 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
340 pool_id_name = {}
341 for pool in pools:
342 pool_id_name[pool['pool']] = pool['pool_name']
343
344 # mark cluster down for each fs to prevent churn during deletion
345 status = self.status()
346 for fs in status.get_filesystems():
11fdf7f2 347 self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
7c673cae
FG
348
349 # get a new copy as actives may have since changed
350 status = self.status()
351 for fs in status.get_filesystems():
352 mdsmap = fs['mdsmap']
353 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
354
7c673cae
FG
355 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
356 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
357 metadata_pool, metadata_pool,
358 '--yes-i-really-really-mean-it')
359 for data_pool in mdsmap['data_pools']:
360 data_pool = pool_id_name[data_pool]
361 try:
362 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
363 data_pool, data_pool,
364 '--yes-i-really-really-mean-it')
365 except CommandFailedError as e:
366 if e.exitstatus == 16: # EBUSY, this data pool is used
367 pass # by two metadata pools, let the 2nd
368 else: # pass delete it
369 raise
370
371 def get_standby_daemons(self):
372 return set([s['name'] for s in self.status().get_standbys()])
373
374 def get_mds_hostnames(self):
375 result = set()
376 for mds_id in self.mds_ids:
377 mds_remote = self.mon_manager.find_remote('mds', mds_id)
378 result.add(mds_remote.hostname)
379
380 return list(result)
381
382 def set_clients_block(self, blocked, mds_id=None):
383 """
384 Block (using iptables) client communications to this MDS. Be careful: if
385 other services are running on this MDS, or other MDSs try to talk to this
386 MDS, their communications may also be blocked as collatoral damage.
387
388 :param mds_id: Optional ID of MDS to block, default to all
389 :return:
390 """
391 da_flag = "-A" if blocked else "-D"
392
393 def set_block(_mds_id):
394 remote = self.mon_manager.find_remote('mds', _mds_id)
395 status = self.status()
396
397 addr = status.get_mds_addr(_mds_id)
398 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
399
400 remote.run(
401 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
402 "comment", "--comment", "teuthology"])
403 remote.run(
404 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
405 "comment", "--comment", "teuthology"])
406
407 self._one_or_all(mds_id, set_block, in_parallel=False)
408
409 def clear_firewall(self):
410 clear_firewall(self._ctx)
411
412 def get_mds_info(self, mds_id):
413 return FSStatus(self.mon_manager).get_mds(mds_id)
414
7c673cae
FG
415 def is_pool_full(self, pool_name):
416 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
417 for pool in pools:
418 if pool['pool_name'] == pool_name:
419 return 'full' in pool['flags_names'].split(",")
420
421 raise RuntimeError("Pool not found '{0}'".format(pool_name))
422
423class Filesystem(MDSCluster):
424 """
425 This object is for driving a CephFS filesystem. The MDS daemons driven by
426 MDSCluster may be shared with other Filesystems.
427 """
3efd9988
FG
428 def __init__(self, ctx, fscid=None, name=None, create=False,
429 ec_profile=None):
7c673cae
FG
430 super(Filesystem, self).__init__(ctx)
431
181888fb 432 self.name = name
3efd9988 433 self.ec_profile = ec_profile
7c673cae 434 self.id = None
7c673cae 435 self.metadata_pool_name = None
181888fb
FG
436 self.metadata_overlay = False
437 self.data_pool_name = None
7c673cae
FG
438 self.data_pools = None
439
440 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
441 self.client_id = client_list[0]
442 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
443
181888fb 444 if name is not None:
7c673cae
FG
445 if fscid is not None:
446 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 447 if create and not self.legacy_configured():
7c673cae 448 self.create()
181888fb
FG
449 else:
450 if fscid is not None:
451 self.id = fscid
452 self.getinfo(refresh = True)
7c673cae
FG
453
454 # Stash a reference to the first created filesystem on ctx, so
455 # that if someone drops to the interactive shell they can easily
456 # poke our methods.
457 if not hasattr(self._ctx, "filesystem"):
458 self._ctx.filesystem = self
459
9f95a23c
TL
460 def get_task_status(self, status_key):
461 return self.mon_manager.get_service_task_status("mds", status_key)
462
7c673cae
FG
463 def getinfo(self, refresh = False):
464 status = self.status()
465 if self.id is not None:
466 fsmap = status.get_fsmap(self.id)
467 elif self.name is not None:
468 fsmap = status.get_fsmap_byname(self.name)
469 else:
470 fss = [fs for fs in status.get_filesystems()]
471 if len(fss) == 1:
472 fsmap = fss[0]
473 elif len(fss) == 0:
474 raise RuntimeError("no file system available")
475 else:
476 raise RuntimeError("more than one file system available")
477 self.id = fsmap['id']
478 self.name = fsmap['mdsmap']['fs_name']
479 self.get_pool_names(status = status, refresh = refresh)
480 return status
481
181888fb
FG
482 def set_metadata_overlay(self, overlay):
483 if self.id is not None:
484 raise RuntimeError("cannot specify fscid when configuring overlay")
485 self.metadata_overlay = overlay
486
7c673cae
FG
487 def deactivate(self, rank):
488 if rank < 0:
489 raise RuntimeError("invalid rank")
490 elif rank == 0:
491 raise RuntimeError("cannot deactivate rank 0")
492 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
493
11fdf7f2
TL
494 def reach_max_mds(self):
495 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
496 status = self.getinfo()
497 mds_map = self.get_mds_map(status=status)
498 max_mds = mds_map['max_mds']
499
500 count = len(list(self.get_ranks(status=status)))
501 if count > max_mds:
502 try:
503 # deactivate mds in decending order
504 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
505 while count > max_mds:
506 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
507 target = targets[0]
508 log.info("deactivating rank %d" % target['rank'])
509 self.deactivate(target['rank'])
510 status = self.wait_for_daemons(skip_max_mds_check=True)
511 count = len(list(self.get_ranks(status=status)))
512 except:
513 # In Mimic, deactivation is done automatically:
514 log.info("Error:\n{}".format(traceback.format_exc()))
515 status = self.wait_for_daemons()
516 else:
517 status = self.wait_for_daemons()
518
519 mds_map = self.get_mds_map(status=status)
520 assert(mds_map['max_mds'] == max_mds)
521 assert(mds_map['in'] == list(range(0, max_mds)))
522
523 def fail(self):
524 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
525
9f95a23c
TL
526 def set_flag(self, var, *args):
527 a = map(lambda x: str(x).lower(), args)
528 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
529
530 def set_allow_multifs(self, yes=True):
531 self.set_flag("enable_multiple", yes)
532
91327a77 533 def set_var(self, var, *args):
9f95a23c 534 a = map(lambda x: str(x).lower(), args)
91327a77
AA
535 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
536
11fdf7f2
TL
537 def set_down(self, down=True):
538 self.set_var("down", str(down).lower())
539
540 def set_joinable(self, joinable=True):
9f95a23c 541 self.set_var("joinable", joinable)
11fdf7f2 542
7c673cae 543 def set_max_mds(self, max_mds):
f64942e4 544 self.set_var("max_mds", "%d" % max_mds)
7c673cae 545
11fdf7f2 546 def set_allow_standby_replay(self, yes):
9f95a23c 547 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
548
549 def set_allow_new_snaps(self, yes):
9f95a23c 550 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 551
9f95a23c
TL
552 # In Octopus+, the PG count can be omitted to use the default. We keep the
553 # hard-coded value for deployments of Mimic/Nautilus.
554 pgs_per_fs_pool = 8
7c673cae
FG
555
556 def create(self):
557 if self.name is None:
558 self.name = "cephfs"
559 if self.metadata_pool_name is None:
560 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
561 if self.data_pool_name is None:
562 data_pool_name = "{0}_data".format(self.name)
563 else:
564 data_pool_name = self.data_pool_name
7c673cae
FG
565
566 log.info("Creating filesystem '{0}'".format(self.name))
567
7c673cae 568 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
9f95a23c 569 self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
181888fb
FG
570 if self.metadata_overlay:
571 self.mon_manager.raw_cluster_cmd('fs', 'new',
572 self.name, self.metadata_pool_name, data_pool_name,
573 '--allow-dangerous-metadata-overlay')
574 else:
b32b8144 575 if self.ec_profile and 'disabled' not in self.ec_profile:
3efd9988
FG
576 log.info("EC profile is %s", self.ec_profile)
577 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
578 cmd.extend(self.ec_profile)
579 self.mon_manager.raw_cluster_cmd(*cmd)
580 self.mon_manager.raw_cluster_cmd(
581 'osd', 'pool', 'create',
9f95a23c 582 data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
3efd9988
FG
583 data_pool_name)
584 self.mon_manager.raw_cluster_cmd(
585 'osd', 'pool', 'set',
586 data_pool_name, 'allow_ec_overwrites', 'true')
587 else:
588 self.mon_manager.raw_cluster_cmd(
589 'osd', 'pool', 'create',
9f95a23c 590 data_pool_name, self.pgs_per_fs_pool.__str__())
181888fb 591 self.mon_manager.raw_cluster_cmd('fs', 'new',
92f5a8d4
TL
592 self.name,
593 self.metadata_pool_name,
594 data_pool_name,
595 "--force")
35e4c445
FG
596 self.check_pool_application(self.metadata_pool_name)
597 self.check_pool_application(data_pool_name)
7c673cae 598 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
599 try:
600 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
601 except CommandFailedError as e:
602 if e.exitstatus == 22:
603 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
604 pass
605 else:
606 raise
7c673cae
FG
607
608 self.getinfo(refresh = True)
609
35e4c445
FG
610
611 def check_pool_application(self, pool_name):
612 osd_map = self.mon_manager.get_osd_dump_json()
613 for pool in osd_map['pools']:
614 if pool['pool_name'] == pool_name:
615 if "application_metadata" in pool:
616 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
617 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
618 format(pool_name=pool_name))
35e4c445
FG
619
620
7c673cae
FG
621 def __del__(self):
622 if getattr(self._ctx, "filesystem", None) == self:
623 delattr(self._ctx, "filesystem")
624
625 def exists(self):
626 """
627 Whether a filesystem exists in the mon's filesystem list
628 """
629 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
630 return self.name in [fs['name'] for fs in fs_list]
631
632 def legacy_configured(self):
633 """
634 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
635 the case, the caller should avoid using Filesystem.create
636 """
637 try:
638 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
639 pools = json.loads(out_text)
640 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
641 if metadata_pool_exists:
642 self.metadata_pool_name = 'metadata'
643 except CommandFailedError as e:
644 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
645 # structured output (--format) from the CLI.
646 if e.exitstatus == 22:
647 metadata_pool_exists = True
648 else:
649 raise
650
651 return metadata_pool_exists
652
653 def _df(self):
654 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
655
f64942e4
AA
656 def get_mds_map(self, status=None):
657 if status is None:
658 status = self.status()
659 return status.get_fsmap(self.id)['mdsmap']
7c673cae 660
11fdf7f2
TL
661 def get_var(self, var, status=None):
662 return self.get_mds_map(status=status)[var]
91327a77 663
92f5a8d4
TL
664 def set_dir_layout(self, mount, path, layout):
665 for name, value in layout.items():
666 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
667
668 def add_data_pool(self, name, create=True):
669 if create:
9f95a23c 670 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.pgs_per_fs_pool.__str__())
7c673cae
FG
671 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
672 self.get_pool_names(refresh = True)
673 for poolid, fs_name in self.data_pools.items():
674 if name == fs_name:
675 return poolid
676 raise RuntimeError("could not get just created pool '{0}'".format(name))
677
678 def get_pool_names(self, refresh = False, status = None):
679 if refresh or self.metadata_pool_name is None or self.data_pools is None:
680 if status is None:
681 status = self.status()
682 fsmap = status.get_fsmap(self.id)
683
684 osd_map = self.mon_manager.get_osd_dump_json()
685 id_to_name = {}
686 for p in osd_map['pools']:
687 id_to_name[p['pool']] = p['pool_name']
688
689 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
690 self.data_pools = {}
691 for data_pool in fsmap['mdsmap']['data_pools']:
692 self.data_pools[data_pool] = id_to_name[data_pool]
693
694 def get_data_pool_name(self, refresh = False):
695 if refresh or self.data_pools is None:
696 self.get_pool_names(refresh = True)
697 assert(len(self.data_pools) == 1)
e306af50 698 return next(iter(self.data_pools.values()))
7c673cae
FG
699
700 def get_data_pool_id(self, refresh = False):
701 """
702 Don't call this if you have multiple data pools
703 :return: integer
704 """
705 if refresh or self.data_pools is None:
706 self.get_pool_names(refresh = True)
707 assert(len(self.data_pools) == 1)
e306af50 708 return next(iter(self.data_pools.keys()))
7c673cae
FG
709
710 def get_data_pool_names(self, refresh = False):
711 if refresh or self.data_pools is None:
712 self.get_pool_names(refresh = True)
e306af50 713 return list(self.data_pools.values())
7c673cae
FG
714
715 def get_metadata_pool_name(self):
716 return self.metadata_pool_name
717
181888fb
FG
718 def set_data_pool_name(self, name):
719 if self.id is not None:
720 raise RuntimeError("can't set filesystem name if its fscid is set")
721 self.data_pool_name = name
722
7c673cae
FG
723 def get_namespace_id(self):
724 return self.id
725
726 def get_pool_df(self, pool_name):
727 """
728 Return a dict like:
729 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
730 """
731 for pool_df in self._df()['pools']:
732 if pool_df['name'] == pool_name:
733 return pool_df['stats']
734
735 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
736
737 def get_usage(self):
738 return self._df()['stats']['total_used_bytes']
739
11fdf7f2 740 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
741 """
742 Return true if all daemons are in one of active, standby, standby-replay, and
743 at least max_mds daemons are in 'active'.
744
745 Unlike most of Filesystem, this function is tolerant of new-style `fs`
746 commands being missing, because we are part of the ceph installation
747 process during upgrade suites, so must fall back to old style commands
748 when we get an EINVAL on a new style command.
749
750 :return:
751 """
11fdf7f2
TL
752 # First, check to see that processes haven't exited with an error code
753 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
754 mds.check_status()
7c673cae
FG
755
756 active_count = 0
757 try:
11fdf7f2 758 mds_map = self.get_mds_map(status=status)
7c673cae
FG
759 except CommandFailedError as cfe:
760 # Old version, fall back to non-multi-fs commands
761 if cfe.exitstatus == errno.EINVAL:
762 mds_map = json.loads(
763 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
764 else:
765 raise
766
767 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
768
769 for mds_id, mds_status in mds_map['info'].items():
770 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
771 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
772 return False
773 elif mds_status['state'] == 'up:active':
774 active_count += 1
775
776 log.info("are_daemons_healthy: {0}/{1}".format(
777 active_count, mds_map['max_mds']
778 ))
779
11fdf7f2
TL
780 if not skip_max_mds_check:
781 if active_count > mds_map['max_mds']:
782 log.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
783 return False
784 elif active_count == mds_map['max_mds']:
785 # The MDSMap says these guys are active, but let's check they really are
786 for mds_id, mds_status in mds_map['info'].items():
787 if mds_status['state'] == 'up:active':
788 try:
789 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
790 except CommandFailedError as cfe:
791 if cfe.exitstatus == errno.EINVAL:
792 # Old version, can't do this check
793 continue
794 else:
795 # MDS not even running
796 return False
797
798 if daemon_status['state'] != 'up:active':
799 # MDS hasn't taken the latest map yet
7c673cae
FG
800 return False
801
11fdf7f2
TL
802 return True
803 else:
804 return False
7c673cae 805 else:
11fdf7f2
TL
806 log.info("are_daemons_healthy: skipping max_mds check")
807 return True
7c673cae 808
11fdf7f2 809 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
810 """
811 Return MDS daemon names of those daemons in the given state
812 :param state:
813 :return:
814 """
11fdf7f2 815 mdsmap = self.get_mds_map(status)
7c673cae 816 result = []
9f95a23c
TL
817 for mds_status in sorted(mdsmap['info'].values(),
818 key=lambda _: _['rank']):
7c673cae
FG
819 if mds_status['state'] == state or state is None:
820 result.append(mds_status['name'])
821
822 return result
823
9f95a23c 824 def get_active_names(self, status=None):
7c673cae
FG
825 """
826 Return MDS daemon names of those daemons holding ranks
827 in state up:active
828
829 :return: list of strings like ['a', 'b'], sorted by rank
830 """
9f95a23c 831 return self.get_daemon_names("up:active", status=status)
7c673cae 832
11fdf7f2
TL
833 def get_all_mds_rank(self, status=None):
834 mdsmap = self.get_mds_map(status)
7c673cae 835 result = []
9f95a23c
TL
836 for mds_status in sorted(mdsmap['info'].values(),
837 key=lambda _: _['rank']):
7c673cae
FG
838 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
839 result.append(mds_status['rank'])
840
841 return result
842
28e407b8
AA
843 def get_rank(self, rank=0, status=None):
844 if status is None:
845 status = self.getinfo()
846 return status.get_rank(self.id, rank)
847
11fdf7f2
TL
848 def rank_restart(self, rank=0, status=None):
849 name = self.get_rank(rank=rank, status=status)['name']
850 self.mds_restart(mds_id=name)
851
852 def rank_signal(self, signal, rank=0, status=None):
853 name = self.get_rank(rank=rank, status=status)['name']
854 self.mds_signal(name, signal)
855
856 def rank_freeze(self, yes, rank=0):
857 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
858
859 def rank_fail(self, rank=0):
860 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
861
28e407b8
AA
862 def get_ranks(self, status=None):
863 if status is None:
864 status = self.getinfo()
865 return status.get_ranks(self.id)
866
11fdf7f2
TL
867 def get_replays(self, status=None):
868 if status is None:
869 status = self.getinfo()
870 return status.get_replays(self.id)
871
872 def get_replay(self, rank=0, status=None):
873 for replay in self.get_replays(status=status):
874 if replay['rank'] == rank:
875 return replay
876 return None
877
28e407b8 878 def get_rank_names(self, status=None):
7c673cae
FG
879 """
880 Return MDS daemon names of those daemons holding a rank,
881 sorted by rank. This includes e.g. up:replay/reconnect
882 as well as active, but does not include standby or
883 standby-replay.
884 """
11fdf7f2 885 mdsmap = self.get_mds_map(status)
7c673cae 886 result = []
9f95a23c
TL
887 for mds_status in sorted(mdsmap['info'].values(),
888 key=lambda _: _['rank']):
7c673cae
FG
889 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
890 result.append(mds_status['name'])
891
892 return result
893
11fdf7f2 894 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
895 """
896 Wait until all daemons are healthy
897 :return:
898 """
899
900 if timeout is None:
901 timeout = DAEMON_WAIT_TIMEOUT
902
11fdf7f2
TL
903 if status is None:
904 status = self.status()
905
7c673cae
FG
906 elapsed = 0
907 while True:
11fdf7f2
TL
908 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
909 return status
7c673cae
FG
910 else:
911 time.sleep(1)
912 elapsed += 1
913
914 if elapsed > timeout:
11fdf7f2 915 log.info("status = {0}".format(status))
7c673cae
FG
916 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
917
11fdf7f2
TL
918 status = self.status()
919
7c673cae
FG
920 def get_lone_mds_id(self):
921 """
922 Get a single MDS ID: the only one if there is only one
923 configured, else the only one currently holding a rank,
924 else raise an error.
925 """
926 if len(self.mds_ids) != 1:
927 alive = self.get_rank_names()
928 if len(alive) == 1:
929 return alive[0]
930 else:
931 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
932 else:
933 return self.mds_ids[0]
934
935 def recreate(self):
936 log.info("Creating new filesystem")
937 self.delete_all_filesystems()
938 self.id = None
939 self.create()
940
941 def put_metadata_object_raw(self, object_id, infile):
942 """
943 Save an object to the metadata pool
944 """
945 temp_bin_path = infile
946 self.client_remote.run(args=[
947 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
948 ])
949
950 def get_metadata_object_raw(self, object_id):
951 """
952 Retrieve an object from the metadata pool and store it in a file.
953 """
954 temp_bin_path = '/tmp/' + object_id + '.bin'
955
956 self.client_remote.run(args=[
957 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
958 ])
959
960 return temp_bin_path
961
962 def get_metadata_object(self, object_type, object_id):
963 """
964 Retrieve an object from the metadata pool, pass it through
965 ceph-dencoder to dump it to JSON, and return the decoded object.
966 """
967 temp_bin_path = '/tmp/out.bin'
968
969 self.client_remote.run(args=[
970 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
971 ])
972
9f95a23c 973 dump_json = self.client_remote.sh([
7c673cae 974 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
9f95a23c 975 ]).strip()
7c673cae
FG
976 try:
977 dump = json.loads(dump_json)
978 except (TypeError, ValueError):
979 log.error("Failed to decode JSON: '{0}'".format(dump_json))
980 raise
981
982 return dump
983
984 def get_journal_version(self):
985 """
986 Read the JournalPointer and Journal::Header objects to learn the version of
987 encoding in use.
988 """
989 journal_pointer_object = '400.00000000'
990 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
991 journal_ino = journal_pointer_dump['journal_pointer']['front']
992
993 journal_header_object = "{0:x}.00000000".format(journal_ino)
994 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
995
996 version = journal_header_dump['journal_header']['stream_format']
997 log.info("Read journal version {0}".format(version))
998
999 return version
1000
f64942e4 1001 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae
FG
1002 if mds_id is None:
1003 mds_id = self.get_lone_mds_id()
1004
f64942e4 1005 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1006
f64942e4
AA
1007 def rank_asok(self, command, rank=0, status=None, timeout=None):
1008 info = self.get_rank(rank=rank, status=status)
1009 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1010
11fdf7f2
TL
1011 def rank_tell(self, command, rank=0, status=None):
1012 info = self.get_rank(rank=rank, status=status)
1013 return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
1014
7c673cae
FG
1015 def read_cache(self, path, depth=None):
1016 cmd = ["dump", "tree", path]
1017 if depth is not None:
1018 cmd.append(depth.__str__())
1019 result = self.mds_asok(cmd)
1020 if len(result) == 0:
1021 raise RuntimeError("Path not found in cache: {0}".format(path))
1022
1023 return result
1024
1025 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1026 """
1027 Block until the MDS reaches a particular state, or a failure condition
1028 is met.
1029
1030 When there are multiple MDSs, succeed when exaclty one MDS is in the
1031 goal state, or fail when any MDS is in the reject state.
1032
1033 :param goal_state: Return once the MDS is in this state
1034 :param reject: Fail if the MDS enters this state before the goal state
1035 :param timeout: Fail if this many seconds pass before reaching goal
1036 :return: number of seconds waited, rounded down to integer
1037 """
1038
1039 started_at = time.time()
1040 while True:
1041 status = self.status()
1042 if rank is not None:
f64942e4
AA
1043 try:
1044 mds_info = status.get_rank(self.id, rank)
1045 current_state = mds_info['state'] if mds_info else None
1046 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1047 except:
1048 mdsmap = self.get_mds_map(status=status)
1049 if rank in mdsmap['failed']:
1050 log.info("Waiting for rank {0} to come back.".format(rank))
1051 current_state = None
1052 else:
1053 raise
7c673cae
FG
1054 elif mds_id is not None:
1055 # mds_info is None if no daemon with this ID exists in the map
1056 mds_info = status.get_mds(mds_id)
1057 current_state = mds_info['state'] if mds_info else None
1058 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1059 else:
1060 # In general, look for a single MDS
1061 states = [m['state'] for m in status.get_ranks(self.id)]
1062 if [s for s in states if s == goal_state] == [goal_state]:
1063 current_state = goal_state
1064 elif reject in states:
1065 current_state = reject
1066 else:
1067 current_state = None
1068 log.info("mapped states {0} to {1}".format(states, current_state))
1069
1070 elapsed = time.time() - started_at
1071 if current_state == goal_state:
1072 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
1073 return elapsed
1074 elif reject is not None and current_state == reject:
1075 raise RuntimeError("MDS in reject state {0}".format(current_state))
1076 elif timeout is not None and elapsed > timeout:
1077 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1078 raise RuntimeError(
1079 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1080 elapsed, goal_state, current_state
1081 ))
1082 else:
1083 time.sleep(1)
1084
1085 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
1086 mds_id = self.mds_ids[0]
1087 remote = self.mds_daemons[mds_id].remote
1088 if pool is None:
1089 pool = self.get_data_pool_name()
1090
1091 obj_name = "{0:x}.00000000".format(ino_no)
1092
1093 args = [
1094 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
1095 ]
1096 try:
e306af50 1097 proc = remote.run(args=args, stdout=BytesIO())
7c673cae
FG
1098 except CommandFailedError as e:
1099 log.error(e.__str__())
1100 raise ObjectNotFound(obj_name)
1101
e306af50 1102 data = proc.stdout.getvalue()
9f95a23c
TL
1103 dump = remote.sh(
1104 [os.path.join(self._prefix, "ceph-dencoder"),
1105 "type", type,
1106 "import", "-",
1107 "decode", "dump_json"],
7c673cae
FG
1108 stdin=data
1109 )
1110
9f95a23c 1111 return json.loads(dump.strip())
7c673cae
FG
1112
1113 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1114 """
1115 Write to an xattr of the 0th data object of an inode. Will
1116 succeed whether the object and/or xattr already exist or not.
1117
1118 :param ino_no: integer inode number
1119 :param xattr_name: string name of the xattr
1120 :param data: byte array data to write to the xattr
1121 :param pool: name of data pool or None to use primary data pool
1122 :return: None
1123 """
1124 remote = self.mds_daemons[self.mds_ids[0]].remote
1125 if pool is None:
1126 pool = self.get_data_pool_name()
1127
1128 obj_name = "{0:x}.00000000".format(ino_no)
1129 args = [
1130 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
1131 obj_name, xattr_name, data
1132 ]
9f95a23c 1133 remote.sh(args)
7c673cae
FG
1134
1135 def read_backtrace(self, ino_no, pool=None):
1136 """
1137 Read the backtrace from the data pool, return a dict in the format
1138 given by inode_backtrace_t::dump, which is something like:
1139
1140 ::
1141
1142 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1143 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1144
1145 { "ino": 1099511627778,
1146 "ancestors": [
1147 { "dirino": 1,
1148 "dname": "blah",
1149 "version": 11}],
1150 "pool": 1,
1151 "old_pools": []}
1152
1153 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1154 one data pool and that will be used.
1155 """
1156 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1157
1158 def read_layout(self, ino_no, pool=None):
1159 """
1160 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1161 ::
1162 {
1163 "stripe_unit": 4194304,
1164 "stripe_count": 1,
1165 "object_size": 4194304,
1166 "pool_id": 1,
1167 "pool_ns": "",
1168 }
1169
1170 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1171 one data pool and that will be used.
1172 """
1173 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1174
1175 def _enumerate_data_objects(self, ino, size):
1176 """
1177 Get the list of expected data objects for a range, and the list of objects
1178 that really exist.
1179
1180 :return a tuple of two lists of strings (expected, actual)
1181 """
1182 stripe_size = 1024 * 1024 * 4
1183
1184 size = max(stripe_size, size)
1185
1186 want_objects = [
1187 "{0:x}.{1:08x}".format(ino, n)
e306af50 1188 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1189 ]
1190
1191 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1192
1193 return want_objects, exist_objects
1194
1195 def data_objects_present(self, ino, size):
1196 """
1197 Check that *all* the expected data objects for an inode are present in the data pool
1198 """
1199
1200 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1201 missing = set(want_objects) - set(exist_objects)
1202
1203 if missing:
1204 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1205 ino, size, missing
1206 ))
1207 return False
1208 else:
1209 log.info("All objects for ino {0} size {1} found".format(ino, size))
1210 return True
1211
1212 def data_objects_absent(self, ino, size):
1213 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1214 present = set(want_objects) & set(exist_objects)
1215
1216 if present:
1217 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1218 ino, size, present
1219 ))
1220 return False
1221 else:
1222 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1223 return True
1224
1225 def dirfrag_exists(self, ino, frag):
1226 try:
1227 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1228 except CommandFailedError:
7c673cae
FG
1229 return False
1230 else:
1231 return True
1232
94b18763 1233 def rados(self, args, pool=None, namespace=None, stdin_data=None,
e306af50
TL
1234 stdin_file=None,
1235 stdout_data=None):
7c673cae
FG
1236 """
1237 Call into the `rados` CLI from an MDS
1238 """
1239
1240 if pool is None:
1241 pool = self.get_metadata_pool_name()
1242
1243 # Doesn't matter which MDS we use to run rados commands, they all
1244 # have access to the pools
1245 mds_id = self.mds_ids[0]
1246 remote = self.mds_daemons[mds_id].remote
1247
1248 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1249 # using the `rados` CLI
1250 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1251 (["--namespace", namespace] if namespace else []) +
1252 args)
94b18763
FG
1253
1254 if stdin_file is not None:
1255 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
e306af50
TL
1256 if stdout_data is None:
1257 stdout_data = StringIO()
94b18763 1258
e306af50
TL
1259 p = remote.run(args=args,
1260 stdin=stdin_data,
1261 stdout=stdout_data)
1262 return p.stdout.getvalue().strip()
7c673cae
FG
1263
1264 def list_dirfrag(self, dir_ino):
1265 """
1266 Read the named object and return the list of omap keys
1267
1268 :return a list of 0 or more strings
1269 """
1270
1271 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1272
1273 try:
1274 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1275 except CommandFailedError as e:
1276 log.error(e.__str__())
1277 raise ObjectNotFound(dirfrag_obj_name)
1278
1279 return key_list_str.split("\n") if key_list_str else []
1280
1281 def erase_metadata_objects(self, prefix):
1282 """
1283 For all objects in the metadata pool matching the prefix,
1284 erase them.
1285
1286 This O(N) with the number of objects in the pool, so only suitable
1287 for use on toy test filesystems.
1288 """
1289 all_objects = self.rados(["ls"]).split("\n")
1290 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1291 for o in matching_objects:
1292 self.rados(["rm", o])
1293
1294 def erase_mds_objects(self, rank):
1295 """
1296 Erase all the per-MDS objects for a particular rank. This includes
1297 inotable, sessiontable, journal
1298 """
1299
1300 def obj_prefix(multiplier):
1301 """
1302 MDS object naming conventions like rank 1's
1303 journal is at 201.***
1304 """
1305 return "%x." % (multiplier * 0x100 + rank)
1306
1307 # MDS_INO_LOG_OFFSET
1308 self.erase_metadata_objects(obj_prefix(2))
1309 # MDS_INO_LOG_BACKUP_OFFSET
1310 self.erase_metadata_objects(obj_prefix(3))
1311 # MDS_INO_LOG_POINTER_OFFSET
1312 self.erase_metadata_objects(obj_prefix(4))
1313 # MDSTables & SessionMap
1314 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1315
1316 @property
1317 def _prefix(self):
1318 """
1319 Override this to set a different
1320 """
1321 return ""
1322
f64942e4
AA
1323 def _make_rank(self, rank):
1324 return "{}:{}".format(self.name, rank)
1325
7c673cae
FG
1326 def _run_tool(self, tool, args, rank=None, quiet=False):
1327 # Tests frequently have [client] configuration that jacks up
1328 # the objecter log level (unlikely to be interesting here)
1329 # and does not set the mds log level (very interesting here)
1330 if quiet:
1331 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1332 else:
1333 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1334
1335 if rank is not None:
f64942e4 1336 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1337
1338 t1 = datetime.datetime.now()
e306af50 1339 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae
FG
1340 duration = datetime.datetime.now() - t1
1341 log.info("Ran {0} in time {1}, result:\n{2}".format(
1342 base_args + args, duration, r
1343 ))
1344 return r
1345
1346 @property
1347 def tool_remote(self):
1348 """
1349 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1350 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1351 so that tests can use this remote to go get locally written output files from the tools.
1352 """
1353 mds_id = self.mds_ids[0]
1354 return self.mds_daemons[mds_id].remote
1355
f64942e4 1356 def journal_tool(self, args, rank, quiet=False):
7c673cae 1357 """
f64942e4 1358 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1359 """
f64942e4
AA
1360 fs_rank = self._make_rank(rank)
1361 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae
FG
1362
1363 def table_tool(self, args, quiet=False):
1364 """
1365 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1366 """
1367 return self._run_tool("cephfs-table-tool", args, None, quiet)
1368
1369 def data_scan(self, args, quiet=False, worker_count=1):
1370 """
1371 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1372
1373 :param worker_count: if greater than 1, multiple workers will be run
1374 in parallel and the return value will be None
1375 """
1376
1377 workers = []
1378
1379 for n in range(0, worker_count):
1380 if worker_count > 1:
1381 # data-scan args first token is a command, followed by args to it.
1382 # insert worker arguments after the command.
1383 cmd = args[0]
1384 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1385 else:
1386 worker_args = args
1387
1388 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1389 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1390
1391 for w in workers:
1392 w.get()
1393
1394 if worker_count == 1:
1395 return workers[0].value
1396 else:
1397 return None
b32b8144
FG
1398
1399 def is_full(self):
1400 return self.is_pool_full(self.get_data_pool_name())