]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
bump version to 15.2.6-pve1
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
11fdf7f2 11import traceback
7c673cae 12
e306af50
TL
13from io import BytesIO
14from six import StringIO
15
7c673cae
FG
16from teuthology.exceptions import CommandFailedError
17from teuthology import misc
18from teuthology.nuke import clear_firewall
19from teuthology.parallel import parallel
20from tasks.ceph_manager import write_conf
21from tasks import ceph_manager
22
23
24log = logging.getLogger(__name__)
25
26
27DAEMON_WAIT_TIMEOUT = 120
28ROOT_INO = 1
29
92f5a8d4
TL
30class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
7c673cae
FG
54
55class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
62class FSStatus(object):
63 """
64 Operations on a snapshot of the FSMap.
65 """
66 def __init__(self, mon_manager):
67 self.mon = mon_manager
68 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
69
70 def __str__(self):
71 return json.dumps(self.map, indent = 2, sort_keys = True)
72
73 # Expose the fsmap for manual inspection.
74 def __getitem__(self, key):
75 """
76 Get a field from the fsmap.
77 """
78 return self.map[key]
79
80 def get_filesystems(self):
81 """
82 Iterator for all filesystems.
83 """
84 for fs in self.map['filesystems']:
85 yield fs
86
87 def get_all(self):
88 """
89 Iterator for all the mds_info components in the FSMap.
90 """
9f95a23c 91 for info in self.map['standbys']:
7c673cae
FG
92 yield info
93 for fs in self.map['filesystems']:
94 for info in fs['mdsmap']['info'].values():
95 yield info
96
97 def get_standbys(self):
98 """
99 Iterator for all standbys.
100 """
101 for info in self.map['standbys']:
102 yield info
103
104 def get_fsmap(self, fscid):
105 """
106 Get the fsmap for the given FSCID.
107 """
108 for fs in self.map['filesystems']:
109 if fscid is None or fs['id'] == fscid:
110 return fs
111 raise RuntimeError("FSCID {0} not in map".format(fscid))
112
113 def get_fsmap_byname(self, name):
114 """
115 Get the fsmap for the given file system name.
116 """
117 for fs in self.map['filesystems']:
118 if name is None or fs['mdsmap']['fs_name'] == name:
119 return fs
120 raise RuntimeError("FS {0} not in map".format(name))
121
122 def get_replays(self, fscid):
123 """
124 Get the standby:replay MDS for the given FSCID.
125 """
126 fs = self.get_fsmap(fscid)
127 for info in fs['mdsmap']['info'].values():
128 if info['state'] == 'up:standby-replay':
129 yield info
130
131 def get_ranks(self, fscid):
132 """
133 Get the ranks for the given FSCID.
134 """
135 fs = self.get_fsmap(fscid)
136 for info in fs['mdsmap']['info'].values():
11fdf7f2 137 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
138 yield info
139
140 def get_rank(self, fscid, rank):
141 """
142 Get the rank for the given FSCID.
143 """
144 for info in self.get_ranks(fscid):
145 if info['rank'] == rank:
146 return info
147 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
148
149 def get_mds(self, name):
150 """
151 Get the info for the given MDS name.
152 """
153 for info in self.get_all():
154 if info['name'] == name:
155 return info
156 return None
157
158 def get_mds_addr(self, name):
159 """
160 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
161 """
162 info = self.get_mds(name)
163 if info:
164 return info['addr']
165 else:
e306af50 166 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
167 raise RuntimeError("MDS id '{0}' not found in map".format(name))
168
9f95a23c
TL
169 def get_mds_gid(self, gid):
170 """
171 Get the info for the given MDS gid.
172 """
173 for info in self.get_all():
174 if info['gid'] == gid:
175 return info
176 return None
177
178 def hadfailover(self, status):
179 """
180 Compares two statuses for mds failovers.
181 Returns True if there is a failover.
182 """
183 for fs in status.map['filesystems']:
184 for info in fs['mdsmap']['info'].values():
185 oldinfo = self.get_mds_gid(info['gid'])
186 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
187 return True
188 #all matching
189 return False
190
7c673cae
FG
191class CephCluster(object):
192 @property
193 def admin_remote(self):
194 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 195 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
196 return result
197
198 def __init__(self, ctx):
199 self._ctx = ctx
200 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
201
202 def get_config(self, key, service_type=None):
203 """
204 Get config from mon by default, or a specific service if caller asks for it
205 """
206 if service_type is None:
207 service_type = 'mon'
208
209 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
210 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
211
212 def set_ceph_conf(self, subsys, key, value):
213 if subsys not in self._ctx.ceph['ceph'].conf:
214 self._ctx.ceph['ceph'].conf[subsys] = {}
215 self._ctx.ceph['ceph'].conf[subsys][key] = value
216 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
217 # used a different config path this won't work.
218
219 def clear_ceph_conf(self, subsys, key):
220 del self._ctx.ceph['ceph'].conf[subsys][key]
221 write_conf(self._ctx)
222
f64942e4
AA
223 def json_asok(self, command, service_type, service_id, timeout=None):
224 if timeout is None:
225 timeout = 15*60
f6b5b4d7 226 command.insert(0, '--format=json')
f64942e4 227 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
f6b5b4d7
TL
228 response_data = proc.stdout.getvalue().strip()
229 if len(response_data) > 0:
230 j = json.loads(response_data)
231 pretty = json.dumps(j, sort_keys=True, indent=2)
232 log.debug(f"_json_asok output\n{pretty}")
233 return j
7c673cae 234 else:
f6b5b4d7 235 log.debug("_json_asok output empty")
7c673cae
FG
236 return None
237
238
239class MDSCluster(CephCluster):
240 """
241 Collective operations on all the MDS daemons in the Ceph cluster. These
242 daemons may be in use by various Filesystems.
243
244 For the benefit of pre-multi-filesystem tests, this class is also
245 a parent of Filesystem. The correct way to use MDSCluster going forward is
246 as a separate instance outside of your (multiple) Filesystem instances.
247 """
248 def __init__(self, ctx):
249 super(MDSCluster, self).__init__(ctx)
250
251 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
252
253 if len(self.mds_ids) == 0:
254 raise RuntimeError("This task requires at least one MDS")
255
256 if hasattr(self._ctx, "daemons"):
257 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
258 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
259
260 def _one_or_all(self, mds_id, cb, in_parallel=True):
261 """
262 Call a callback for a single named MDS, or for all.
263
264 Note that the parallelism here isn't for performance, it's to avoid being overly kind
265 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
266 avoid being overly kind by executing them in a particular order. However, some actions
267 don't cope with being done in parallel, so it's optional (`in_parallel`)
268
269 :param mds_id: MDS daemon name, or None
270 :param cb: Callback taking single argument of MDS daemon name
271 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
272 """
273 if mds_id is None:
274 if in_parallel:
275 with parallel() as p:
276 for mds_id in self.mds_ids:
277 p.spawn(cb, mds_id)
278 else:
279 for mds_id in self.mds_ids:
280 cb(mds_id)
281 else:
282 cb(mds_id)
283
181888fb
FG
284 def get_config(self, key, service_type=None):
285 """
286 get_config specialization of service_type="mds"
287 """
288 if service_type != "mds":
289 return super(MDSCluster, self).get_config(key, service_type)
290
291 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
292 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
293 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
294 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
295
7c673cae
FG
296 def mds_stop(self, mds_id=None):
297 """
298 Stop the MDS daemon process(se). If it held a rank, that rank
299 will eventually go laggy.
300 """
301 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
302
303 def mds_fail(self, mds_id=None):
304 """
305 Inform MDSMonitor of the death of the daemon process(es). If it held
306 a rank, that rank will be relinquished.
307 """
308 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
309
310 def mds_restart(self, mds_id=None):
311 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
312
313 def mds_fail_restart(self, mds_id=None):
314 """
315 Variation on restart that includes marking MDSs as failed, so that doing this
316 operation followed by waiting for healthy daemon states guarantees that they
317 have gone down and come up, rather than potentially seeing the healthy states
318 that existed before the restart.
319 """
320 def _fail_restart(id_):
321 self.mds_daemons[id_].stop()
322 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
323 self.mds_daemons[id_].restart()
324
325 self._one_or_all(mds_id, _fail_restart)
326
1adf2230
AA
327 def mds_signal(self, mds_id, sig, silent=False):
328 """
329 signal a MDS daemon
330 """
331 self.mds_daemons[mds_id].signal(sig, silent);
332
181888fb
FG
333 def newfs(self, name='cephfs', create=True):
334 return Filesystem(self._ctx, name=name, create=create)
7c673cae
FG
335
336 def status(self):
337 return FSStatus(self.mon_manager)
338
339 def delete_all_filesystems(self):
340 """
341 Remove all filesystems that exist, and any pools in use by them.
342 """
343 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
344 pool_id_name = {}
345 for pool in pools:
346 pool_id_name[pool['pool']] = pool['pool_name']
347
348 # mark cluster down for each fs to prevent churn during deletion
349 status = self.status()
350 for fs in status.get_filesystems():
11fdf7f2 351 self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
7c673cae
FG
352
353 # get a new copy as actives may have since changed
354 status = self.status()
355 for fs in status.get_filesystems():
356 mdsmap = fs['mdsmap']
357 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
358
7c673cae
FG
359 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
360 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
361 metadata_pool, metadata_pool,
362 '--yes-i-really-really-mean-it')
363 for data_pool in mdsmap['data_pools']:
364 data_pool = pool_id_name[data_pool]
365 try:
366 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
367 data_pool, data_pool,
368 '--yes-i-really-really-mean-it')
369 except CommandFailedError as e:
370 if e.exitstatus == 16: # EBUSY, this data pool is used
371 pass # by two metadata pools, let the 2nd
372 else: # pass delete it
373 raise
374
375 def get_standby_daemons(self):
376 return set([s['name'] for s in self.status().get_standbys()])
377
378 def get_mds_hostnames(self):
379 result = set()
380 for mds_id in self.mds_ids:
381 mds_remote = self.mon_manager.find_remote('mds', mds_id)
382 result.add(mds_remote.hostname)
383
384 return list(result)
385
386 def set_clients_block(self, blocked, mds_id=None):
387 """
388 Block (using iptables) client communications to this MDS. Be careful: if
389 other services are running on this MDS, or other MDSs try to talk to this
390 MDS, their communications may also be blocked as collatoral damage.
391
392 :param mds_id: Optional ID of MDS to block, default to all
393 :return:
394 """
395 da_flag = "-A" if blocked else "-D"
396
397 def set_block(_mds_id):
398 remote = self.mon_manager.find_remote('mds', _mds_id)
399 status = self.status()
400
401 addr = status.get_mds_addr(_mds_id)
402 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
403
404 remote.run(
405 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
406 "comment", "--comment", "teuthology"])
407 remote.run(
408 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
409 "comment", "--comment", "teuthology"])
410
411 self._one_or_all(mds_id, set_block, in_parallel=False)
412
413 def clear_firewall(self):
414 clear_firewall(self._ctx)
415
416 def get_mds_info(self, mds_id):
417 return FSStatus(self.mon_manager).get_mds(mds_id)
418
7c673cae
FG
419 def is_pool_full(self, pool_name):
420 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
421 for pool in pools:
422 if pool['pool_name'] == pool_name:
423 return 'full' in pool['flags_names'].split(",")
424
425 raise RuntimeError("Pool not found '{0}'".format(pool_name))
426
427class Filesystem(MDSCluster):
428 """
429 This object is for driving a CephFS filesystem. The MDS daemons driven by
430 MDSCluster may be shared with other Filesystems.
431 """
3efd9988
FG
432 def __init__(self, ctx, fscid=None, name=None, create=False,
433 ec_profile=None):
7c673cae
FG
434 super(Filesystem, self).__init__(ctx)
435
181888fb 436 self.name = name
3efd9988 437 self.ec_profile = ec_profile
7c673cae 438 self.id = None
7c673cae 439 self.metadata_pool_name = None
181888fb
FG
440 self.metadata_overlay = False
441 self.data_pool_name = None
7c673cae
FG
442 self.data_pools = None
443
444 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
445 self.client_id = client_list[0]
446 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
447
181888fb 448 if name is not None:
7c673cae
FG
449 if fscid is not None:
450 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 451 if create and not self.legacy_configured():
7c673cae 452 self.create()
181888fb
FG
453 else:
454 if fscid is not None:
455 self.id = fscid
456 self.getinfo(refresh = True)
7c673cae
FG
457
458 # Stash a reference to the first created filesystem on ctx, so
459 # that if someone drops to the interactive shell they can easily
460 # poke our methods.
461 if not hasattr(self._ctx, "filesystem"):
462 self._ctx.filesystem = self
463
9f95a23c
TL
464 def get_task_status(self, status_key):
465 return self.mon_manager.get_service_task_status("mds", status_key)
466
7c673cae
FG
467 def getinfo(self, refresh = False):
468 status = self.status()
469 if self.id is not None:
470 fsmap = status.get_fsmap(self.id)
471 elif self.name is not None:
472 fsmap = status.get_fsmap_byname(self.name)
473 else:
474 fss = [fs for fs in status.get_filesystems()]
475 if len(fss) == 1:
476 fsmap = fss[0]
477 elif len(fss) == 0:
478 raise RuntimeError("no file system available")
479 else:
480 raise RuntimeError("more than one file system available")
481 self.id = fsmap['id']
482 self.name = fsmap['mdsmap']['fs_name']
483 self.get_pool_names(status = status, refresh = refresh)
484 return status
485
181888fb
FG
486 def set_metadata_overlay(self, overlay):
487 if self.id is not None:
488 raise RuntimeError("cannot specify fscid when configuring overlay")
489 self.metadata_overlay = overlay
490
7c673cae
FG
491 def deactivate(self, rank):
492 if rank < 0:
493 raise RuntimeError("invalid rank")
494 elif rank == 0:
495 raise RuntimeError("cannot deactivate rank 0")
496 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
497
11fdf7f2
TL
498 def reach_max_mds(self):
499 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
500 status = self.getinfo()
501 mds_map = self.get_mds_map(status=status)
502 max_mds = mds_map['max_mds']
503
504 count = len(list(self.get_ranks(status=status)))
505 if count > max_mds:
506 try:
507 # deactivate mds in decending order
508 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
509 while count > max_mds:
510 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
511 target = targets[0]
512 log.info("deactivating rank %d" % target['rank'])
513 self.deactivate(target['rank'])
514 status = self.wait_for_daemons(skip_max_mds_check=True)
515 count = len(list(self.get_ranks(status=status)))
516 except:
517 # In Mimic, deactivation is done automatically:
518 log.info("Error:\n{}".format(traceback.format_exc()))
519 status = self.wait_for_daemons()
520 else:
521 status = self.wait_for_daemons()
522
523 mds_map = self.get_mds_map(status=status)
524 assert(mds_map['max_mds'] == max_mds)
525 assert(mds_map['in'] == list(range(0, max_mds)))
526
527 def fail(self):
528 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
529
9f95a23c
TL
530 def set_flag(self, var, *args):
531 a = map(lambda x: str(x).lower(), args)
532 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
533
534 def set_allow_multifs(self, yes=True):
535 self.set_flag("enable_multiple", yes)
536
91327a77 537 def set_var(self, var, *args):
9f95a23c 538 a = map(lambda x: str(x).lower(), args)
91327a77
AA
539 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
540
11fdf7f2
TL
541 def set_down(self, down=True):
542 self.set_var("down", str(down).lower())
543
544 def set_joinable(self, joinable=True):
9f95a23c 545 self.set_var("joinable", joinable)
11fdf7f2 546
7c673cae 547 def set_max_mds(self, max_mds):
f64942e4 548 self.set_var("max_mds", "%d" % max_mds)
7c673cae 549
11fdf7f2 550 def set_allow_standby_replay(self, yes):
9f95a23c 551 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
552
553 def set_allow_new_snaps(self, yes):
9f95a23c 554 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 555
9f95a23c
TL
556 # In Octopus+, the PG count can be omitted to use the default. We keep the
557 # hard-coded value for deployments of Mimic/Nautilus.
558 pgs_per_fs_pool = 8
7c673cae
FG
559
560 def create(self):
561 if self.name is None:
562 self.name = "cephfs"
563 if self.metadata_pool_name is None:
564 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
565 if self.data_pool_name is None:
566 data_pool_name = "{0}_data".format(self.name)
567 else:
568 data_pool_name = self.data_pool_name
7c673cae
FG
569
570 log.info("Creating filesystem '{0}'".format(self.name))
571
7c673cae 572 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
9f95a23c 573 self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
181888fb
FG
574 if self.metadata_overlay:
575 self.mon_manager.raw_cluster_cmd('fs', 'new',
576 self.name, self.metadata_pool_name, data_pool_name,
577 '--allow-dangerous-metadata-overlay')
578 else:
b32b8144 579 if self.ec_profile and 'disabled' not in self.ec_profile:
3efd9988
FG
580 log.info("EC profile is %s", self.ec_profile)
581 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
582 cmd.extend(self.ec_profile)
583 self.mon_manager.raw_cluster_cmd(*cmd)
584 self.mon_manager.raw_cluster_cmd(
585 'osd', 'pool', 'create',
9f95a23c 586 data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
3efd9988
FG
587 data_pool_name)
588 self.mon_manager.raw_cluster_cmd(
589 'osd', 'pool', 'set',
590 data_pool_name, 'allow_ec_overwrites', 'true')
591 else:
592 self.mon_manager.raw_cluster_cmd(
593 'osd', 'pool', 'create',
9f95a23c 594 data_pool_name, self.pgs_per_fs_pool.__str__())
181888fb 595 self.mon_manager.raw_cluster_cmd('fs', 'new',
92f5a8d4
TL
596 self.name,
597 self.metadata_pool_name,
598 data_pool_name,
599 "--force")
35e4c445
FG
600 self.check_pool_application(self.metadata_pool_name)
601 self.check_pool_application(data_pool_name)
7c673cae 602 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
603 try:
604 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
605 except CommandFailedError as e:
606 if e.exitstatus == 22:
607 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
608 pass
609 else:
610 raise
7c673cae
FG
611
612 self.getinfo(refresh = True)
613
35e4c445
FG
614
615 def check_pool_application(self, pool_name):
616 osd_map = self.mon_manager.get_osd_dump_json()
617 for pool in osd_map['pools']:
618 if pool['pool_name'] == pool_name:
619 if "application_metadata" in pool:
620 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
621 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
622 format(pool_name=pool_name))
35e4c445
FG
623
624
7c673cae
FG
625 def __del__(self):
626 if getattr(self._ctx, "filesystem", None) == self:
627 delattr(self._ctx, "filesystem")
628
629 def exists(self):
630 """
631 Whether a filesystem exists in the mon's filesystem list
632 """
633 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
634 return self.name in [fs['name'] for fs in fs_list]
635
636 def legacy_configured(self):
637 """
638 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
639 the case, the caller should avoid using Filesystem.create
640 """
641 try:
642 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
643 pools = json.loads(out_text)
644 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
645 if metadata_pool_exists:
646 self.metadata_pool_name = 'metadata'
647 except CommandFailedError as e:
648 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
649 # structured output (--format) from the CLI.
650 if e.exitstatus == 22:
651 metadata_pool_exists = True
652 else:
653 raise
654
655 return metadata_pool_exists
656
657 def _df(self):
658 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
659
f64942e4
AA
660 def get_mds_map(self, status=None):
661 if status is None:
662 status = self.status()
663 return status.get_fsmap(self.id)['mdsmap']
7c673cae 664
11fdf7f2
TL
665 def get_var(self, var, status=None):
666 return self.get_mds_map(status=status)[var]
91327a77 667
92f5a8d4
TL
668 def set_dir_layout(self, mount, path, layout):
669 for name, value in layout.items():
670 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
671
672 def add_data_pool(self, name, create=True):
673 if create:
9f95a23c 674 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.pgs_per_fs_pool.__str__())
7c673cae
FG
675 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
676 self.get_pool_names(refresh = True)
677 for poolid, fs_name in self.data_pools.items():
678 if name == fs_name:
679 return poolid
680 raise RuntimeError("could not get just created pool '{0}'".format(name))
681
682 def get_pool_names(self, refresh = False, status = None):
683 if refresh or self.metadata_pool_name is None or self.data_pools is None:
684 if status is None:
685 status = self.status()
686 fsmap = status.get_fsmap(self.id)
687
688 osd_map = self.mon_manager.get_osd_dump_json()
689 id_to_name = {}
690 for p in osd_map['pools']:
691 id_to_name[p['pool']] = p['pool_name']
692
693 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
694 self.data_pools = {}
695 for data_pool in fsmap['mdsmap']['data_pools']:
696 self.data_pools[data_pool] = id_to_name[data_pool]
697
698 def get_data_pool_name(self, refresh = False):
699 if refresh or self.data_pools is None:
700 self.get_pool_names(refresh = True)
701 assert(len(self.data_pools) == 1)
e306af50 702 return next(iter(self.data_pools.values()))
7c673cae
FG
703
704 def get_data_pool_id(self, refresh = False):
705 """
706 Don't call this if you have multiple data pools
707 :return: integer
708 """
709 if refresh or self.data_pools is None:
710 self.get_pool_names(refresh = True)
711 assert(len(self.data_pools) == 1)
e306af50 712 return next(iter(self.data_pools.keys()))
7c673cae
FG
713
714 def get_data_pool_names(self, refresh = False):
715 if refresh or self.data_pools is None:
716 self.get_pool_names(refresh = True)
e306af50 717 return list(self.data_pools.values())
7c673cae
FG
718
719 def get_metadata_pool_name(self):
720 return self.metadata_pool_name
721
181888fb
FG
722 def set_data_pool_name(self, name):
723 if self.id is not None:
724 raise RuntimeError("can't set filesystem name if its fscid is set")
725 self.data_pool_name = name
726
7c673cae
FG
727 def get_namespace_id(self):
728 return self.id
729
730 def get_pool_df(self, pool_name):
731 """
732 Return a dict like:
733 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
734 """
735 for pool_df in self._df()['pools']:
736 if pool_df['name'] == pool_name:
737 return pool_df['stats']
738
739 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
740
741 def get_usage(self):
742 return self._df()['stats']['total_used_bytes']
743
11fdf7f2 744 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
745 """
746 Return true if all daemons are in one of active, standby, standby-replay, and
747 at least max_mds daemons are in 'active'.
748
749 Unlike most of Filesystem, this function is tolerant of new-style `fs`
750 commands being missing, because we are part of the ceph installation
751 process during upgrade suites, so must fall back to old style commands
752 when we get an EINVAL on a new style command.
753
754 :return:
755 """
11fdf7f2
TL
756 # First, check to see that processes haven't exited with an error code
757 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
758 mds.check_status()
7c673cae
FG
759
760 active_count = 0
761 try:
11fdf7f2 762 mds_map = self.get_mds_map(status=status)
7c673cae
FG
763 except CommandFailedError as cfe:
764 # Old version, fall back to non-multi-fs commands
765 if cfe.exitstatus == errno.EINVAL:
766 mds_map = json.loads(
767 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
768 else:
769 raise
770
771 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
772
773 for mds_id, mds_status in mds_map['info'].items():
774 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
775 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
776 return False
777 elif mds_status['state'] == 'up:active':
778 active_count += 1
779
780 log.info("are_daemons_healthy: {0}/{1}".format(
781 active_count, mds_map['max_mds']
782 ))
783
11fdf7f2
TL
784 if not skip_max_mds_check:
785 if active_count > mds_map['max_mds']:
786 log.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
787 return False
788 elif active_count == mds_map['max_mds']:
789 # The MDSMap says these guys are active, but let's check they really are
790 for mds_id, mds_status in mds_map['info'].items():
791 if mds_status['state'] == 'up:active':
792 try:
793 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
794 except CommandFailedError as cfe:
795 if cfe.exitstatus == errno.EINVAL:
796 # Old version, can't do this check
797 continue
798 else:
799 # MDS not even running
800 return False
801
802 if daemon_status['state'] != 'up:active':
803 # MDS hasn't taken the latest map yet
7c673cae
FG
804 return False
805
11fdf7f2
TL
806 return True
807 else:
808 return False
7c673cae 809 else:
11fdf7f2
TL
810 log.info("are_daemons_healthy: skipping max_mds check")
811 return True
7c673cae 812
11fdf7f2 813 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
814 """
815 Return MDS daemon names of those daemons in the given state
816 :param state:
817 :return:
818 """
11fdf7f2 819 mdsmap = self.get_mds_map(status)
7c673cae 820 result = []
9f95a23c
TL
821 for mds_status in sorted(mdsmap['info'].values(),
822 key=lambda _: _['rank']):
7c673cae
FG
823 if mds_status['state'] == state or state is None:
824 result.append(mds_status['name'])
825
826 return result
827
9f95a23c 828 def get_active_names(self, status=None):
7c673cae
FG
829 """
830 Return MDS daemon names of those daemons holding ranks
831 in state up:active
832
833 :return: list of strings like ['a', 'b'], sorted by rank
834 """
9f95a23c 835 return self.get_daemon_names("up:active", status=status)
7c673cae 836
11fdf7f2
TL
837 def get_all_mds_rank(self, status=None):
838 mdsmap = self.get_mds_map(status)
7c673cae 839 result = []
9f95a23c
TL
840 for mds_status in sorted(mdsmap['info'].values(),
841 key=lambda _: _['rank']):
7c673cae
FG
842 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
843 result.append(mds_status['rank'])
844
845 return result
846
f6b5b4d7 847 def get_rank(self, rank=None, status=None):
28e407b8
AA
848 if status is None:
849 status = self.getinfo()
f6b5b4d7
TL
850 if rank is None:
851 rank = 0
28e407b8
AA
852 return status.get_rank(self.id, rank)
853
11fdf7f2
TL
854 def rank_restart(self, rank=0, status=None):
855 name = self.get_rank(rank=rank, status=status)['name']
856 self.mds_restart(mds_id=name)
857
858 def rank_signal(self, signal, rank=0, status=None):
859 name = self.get_rank(rank=rank, status=status)['name']
860 self.mds_signal(name, signal)
861
862 def rank_freeze(self, yes, rank=0):
863 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
864
865 def rank_fail(self, rank=0):
866 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
867
28e407b8
AA
868 def get_ranks(self, status=None):
869 if status is None:
870 status = self.getinfo()
871 return status.get_ranks(self.id)
872
11fdf7f2
TL
873 def get_replays(self, status=None):
874 if status is None:
875 status = self.getinfo()
876 return status.get_replays(self.id)
877
878 def get_replay(self, rank=0, status=None):
879 for replay in self.get_replays(status=status):
880 if replay['rank'] == rank:
881 return replay
882 return None
883
28e407b8 884 def get_rank_names(self, status=None):
7c673cae
FG
885 """
886 Return MDS daemon names of those daemons holding a rank,
887 sorted by rank. This includes e.g. up:replay/reconnect
888 as well as active, but does not include standby or
889 standby-replay.
890 """
11fdf7f2 891 mdsmap = self.get_mds_map(status)
7c673cae 892 result = []
9f95a23c
TL
893 for mds_status in sorted(mdsmap['info'].values(),
894 key=lambda _: _['rank']):
7c673cae
FG
895 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
896 result.append(mds_status['name'])
897
898 return result
899
11fdf7f2 900 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
901 """
902 Wait until all daemons are healthy
903 :return:
904 """
905
906 if timeout is None:
907 timeout = DAEMON_WAIT_TIMEOUT
908
11fdf7f2
TL
909 if status is None:
910 status = self.status()
911
7c673cae
FG
912 elapsed = 0
913 while True:
11fdf7f2
TL
914 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
915 return status
7c673cae
FG
916 else:
917 time.sleep(1)
918 elapsed += 1
919
920 if elapsed > timeout:
11fdf7f2 921 log.info("status = {0}".format(status))
7c673cae
FG
922 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
923
11fdf7f2
TL
924 status = self.status()
925
7c673cae
FG
926 def get_lone_mds_id(self):
927 """
928 Get a single MDS ID: the only one if there is only one
929 configured, else the only one currently holding a rank,
930 else raise an error.
931 """
932 if len(self.mds_ids) != 1:
933 alive = self.get_rank_names()
934 if len(alive) == 1:
935 return alive[0]
936 else:
937 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
938 else:
939 return self.mds_ids[0]
940
941 def recreate(self):
942 log.info("Creating new filesystem")
943 self.delete_all_filesystems()
944 self.id = None
945 self.create()
946
947 def put_metadata_object_raw(self, object_id, infile):
948 """
949 Save an object to the metadata pool
950 """
951 temp_bin_path = infile
952 self.client_remote.run(args=[
953 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
954 ])
955
956 def get_metadata_object_raw(self, object_id):
957 """
958 Retrieve an object from the metadata pool and store it in a file.
959 """
960 temp_bin_path = '/tmp/' + object_id + '.bin'
961
962 self.client_remote.run(args=[
963 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
964 ])
965
966 return temp_bin_path
967
968 def get_metadata_object(self, object_type, object_id):
969 """
970 Retrieve an object from the metadata pool, pass it through
971 ceph-dencoder to dump it to JSON, and return the decoded object.
972 """
973 temp_bin_path = '/tmp/out.bin'
974
975 self.client_remote.run(args=[
976 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
977 ])
978
9f95a23c 979 dump_json = self.client_remote.sh([
7c673cae 980 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
9f95a23c 981 ]).strip()
7c673cae
FG
982 try:
983 dump = json.loads(dump_json)
984 except (TypeError, ValueError):
985 log.error("Failed to decode JSON: '{0}'".format(dump_json))
986 raise
987
988 return dump
989
990 def get_journal_version(self):
991 """
992 Read the JournalPointer and Journal::Header objects to learn the version of
993 encoding in use.
994 """
995 journal_pointer_object = '400.00000000'
996 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
997 journal_ino = journal_pointer_dump['journal_pointer']['front']
998
999 journal_header_object = "{0:x}.00000000".format(journal_ino)
1000 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1001
1002 version = journal_header_dump['journal_header']['stream_format']
1003 log.info("Read journal version {0}".format(version))
1004
1005 return version
1006
f64942e4 1007 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae
FG
1008 if mds_id is None:
1009 mds_id = self.get_lone_mds_id()
1010
f64942e4 1011 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1012
f64942e4
AA
1013 def rank_asok(self, command, rank=0, status=None, timeout=None):
1014 info = self.get_rank(rank=rank, status=status)
1015 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1016
11fdf7f2
TL
1017 def rank_tell(self, command, rank=0, status=None):
1018 info = self.get_rank(rank=rank, status=status)
1019 return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
1020
f6b5b4d7
TL
1021 def ranks_tell(self, command, status=None):
1022 if status is None:
1023 status = self.status()
1024 out = []
1025 for r in status.get_ranks(self.id):
1026 result = self.rank_tell(command, rank=r['rank'], status=status)
1027 out.append((r['rank'], result))
1028 return sorted(out)
1029
1030 def ranks_perf(self, f, status=None):
1031 perf = self.ranks_tell(["perf", "dump"], status=status)
1032 out = []
1033 for rank, perf in perf:
1034 out.append((rank, f(perf)))
1035 return out
1036
7c673cae
FG
1037 def read_cache(self, path, depth=None):
1038 cmd = ["dump", "tree", path]
1039 if depth is not None:
1040 cmd.append(depth.__str__())
1041 result = self.mds_asok(cmd)
1042 if len(result) == 0:
1043 raise RuntimeError("Path not found in cache: {0}".format(path))
1044
1045 return result
1046
1047 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1048 """
1049 Block until the MDS reaches a particular state, or a failure condition
1050 is met.
1051
1052 When there are multiple MDSs, succeed when exaclty one MDS is in the
1053 goal state, or fail when any MDS is in the reject state.
1054
1055 :param goal_state: Return once the MDS is in this state
1056 :param reject: Fail if the MDS enters this state before the goal state
1057 :param timeout: Fail if this many seconds pass before reaching goal
1058 :return: number of seconds waited, rounded down to integer
1059 """
1060
1061 started_at = time.time()
1062 while True:
1063 status = self.status()
1064 if rank is not None:
f64942e4
AA
1065 try:
1066 mds_info = status.get_rank(self.id, rank)
1067 current_state = mds_info['state'] if mds_info else None
1068 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1069 except:
1070 mdsmap = self.get_mds_map(status=status)
1071 if rank in mdsmap['failed']:
1072 log.info("Waiting for rank {0} to come back.".format(rank))
1073 current_state = None
1074 else:
1075 raise
7c673cae
FG
1076 elif mds_id is not None:
1077 # mds_info is None if no daemon with this ID exists in the map
1078 mds_info = status.get_mds(mds_id)
1079 current_state = mds_info['state'] if mds_info else None
1080 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1081 else:
1082 # In general, look for a single MDS
1083 states = [m['state'] for m in status.get_ranks(self.id)]
1084 if [s for s in states if s == goal_state] == [goal_state]:
1085 current_state = goal_state
1086 elif reject in states:
1087 current_state = reject
1088 else:
1089 current_state = None
1090 log.info("mapped states {0} to {1}".format(states, current_state))
1091
1092 elapsed = time.time() - started_at
1093 if current_state == goal_state:
1094 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
1095 return elapsed
1096 elif reject is not None and current_state == reject:
1097 raise RuntimeError("MDS in reject state {0}".format(current_state))
1098 elif timeout is not None and elapsed > timeout:
1099 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1100 raise RuntimeError(
1101 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1102 elapsed, goal_state, current_state
1103 ))
1104 else:
1105 time.sleep(1)
1106
1107 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
1108 mds_id = self.mds_ids[0]
1109 remote = self.mds_daemons[mds_id].remote
1110 if pool is None:
1111 pool = self.get_data_pool_name()
1112
1113 obj_name = "{0:x}.00000000".format(ino_no)
1114
1115 args = [
1116 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
1117 ]
1118 try:
e306af50 1119 proc = remote.run(args=args, stdout=BytesIO())
7c673cae
FG
1120 except CommandFailedError as e:
1121 log.error(e.__str__())
1122 raise ObjectNotFound(obj_name)
1123
e306af50 1124 data = proc.stdout.getvalue()
9f95a23c
TL
1125 dump = remote.sh(
1126 [os.path.join(self._prefix, "ceph-dencoder"),
1127 "type", type,
1128 "import", "-",
1129 "decode", "dump_json"],
7c673cae
FG
1130 stdin=data
1131 )
1132
9f95a23c 1133 return json.loads(dump.strip())
7c673cae
FG
1134
1135 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1136 """
1137 Write to an xattr of the 0th data object of an inode. Will
1138 succeed whether the object and/or xattr already exist or not.
1139
1140 :param ino_no: integer inode number
1141 :param xattr_name: string name of the xattr
1142 :param data: byte array data to write to the xattr
1143 :param pool: name of data pool or None to use primary data pool
1144 :return: None
1145 """
1146 remote = self.mds_daemons[self.mds_ids[0]].remote
1147 if pool is None:
1148 pool = self.get_data_pool_name()
1149
1150 obj_name = "{0:x}.00000000".format(ino_no)
1151 args = [
1152 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
1153 obj_name, xattr_name, data
1154 ]
9f95a23c 1155 remote.sh(args)
7c673cae
FG
1156
1157 def read_backtrace(self, ino_no, pool=None):
1158 """
1159 Read the backtrace from the data pool, return a dict in the format
1160 given by inode_backtrace_t::dump, which is something like:
1161
1162 ::
1163
1164 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1165 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1166
1167 { "ino": 1099511627778,
1168 "ancestors": [
1169 { "dirino": 1,
1170 "dname": "blah",
1171 "version": 11}],
1172 "pool": 1,
1173 "old_pools": []}
1174
1175 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1176 one data pool and that will be used.
1177 """
1178 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1179
1180 def read_layout(self, ino_no, pool=None):
1181 """
1182 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1183 ::
1184 {
1185 "stripe_unit": 4194304,
1186 "stripe_count": 1,
1187 "object_size": 4194304,
1188 "pool_id": 1,
1189 "pool_ns": "",
1190 }
1191
1192 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1193 one data pool and that will be used.
1194 """
1195 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1196
1197 def _enumerate_data_objects(self, ino, size):
1198 """
1199 Get the list of expected data objects for a range, and the list of objects
1200 that really exist.
1201
1202 :return a tuple of two lists of strings (expected, actual)
1203 """
1204 stripe_size = 1024 * 1024 * 4
1205
1206 size = max(stripe_size, size)
1207
1208 want_objects = [
1209 "{0:x}.{1:08x}".format(ino, n)
e306af50 1210 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1211 ]
1212
1213 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1214
1215 return want_objects, exist_objects
1216
1217 def data_objects_present(self, ino, size):
1218 """
1219 Check that *all* the expected data objects for an inode are present in the data pool
1220 """
1221
1222 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1223 missing = set(want_objects) - set(exist_objects)
1224
1225 if missing:
1226 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1227 ino, size, missing
1228 ))
1229 return False
1230 else:
1231 log.info("All objects for ino {0} size {1} found".format(ino, size))
1232 return True
1233
1234 def data_objects_absent(self, ino, size):
1235 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1236 present = set(want_objects) & set(exist_objects)
1237
1238 if present:
1239 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1240 ino, size, present
1241 ))
1242 return False
1243 else:
1244 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1245 return True
1246
1247 def dirfrag_exists(self, ino, frag):
1248 try:
1249 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1250 except CommandFailedError:
7c673cae
FG
1251 return False
1252 else:
1253 return True
1254
94b18763 1255 def rados(self, args, pool=None, namespace=None, stdin_data=None,
e306af50
TL
1256 stdin_file=None,
1257 stdout_data=None):
7c673cae
FG
1258 """
1259 Call into the `rados` CLI from an MDS
1260 """
1261
1262 if pool is None:
1263 pool = self.get_metadata_pool_name()
1264
1265 # Doesn't matter which MDS we use to run rados commands, they all
1266 # have access to the pools
1267 mds_id = self.mds_ids[0]
1268 remote = self.mds_daemons[mds_id].remote
1269
1270 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1271 # using the `rados` CLI
1272 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1273 (["--namespace", namespace] if namespace else []) +
1274 args)
94b18763
FG
1275
1276 if stdin_file is not None:
1277 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
e306af50
TL
1278 if stdout_data is None:
1279 stdout_data = StringIO()
94b18763 1280
e306af50
TL
1281 p = remote.run(args=args,
1282 stdin=stdin_data,
1283 stdout=stdout_data)
1284 return p.stdout.getvalue().strip()
7c673cae
FG
1285
1286 def list_dirfrag(self, dir_ino):
1287 """
1288 Read the named object and return the list of omap keys
1289
1290 :return a list of 0 or more strings
1291 """
1292
1293 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1294
1295 try:
1296 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1297 except CommandFailedError as e:
1298 log.error(e.__str__())
1299 raise ObjectNotFound(dirfrag_obj_name)
1300
1301 return key_list_str.split("\n") if key_list_str else []
1302
1303 def erase_metadata_objects(self, prefix):
1304 """
1305 For all objects in the metadata pool matching the prefix,
1306 erase them.
1307
1308 This O(N) with the number of objects in the pool, so only suitable
1309 for use on toy test filesystems.
1310 """
1311 all_objects = self.rados(["ls"]).split("\n")
1312 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1313 for o in matching_objects:
1314 self.rados(["rm", o])
1315
1316 def erase_mds_objects(self, rank):
1317 """
1318 Erase all the per-MDS objects for a particular rank. This includes
1319 inotable, sessiontable, journal
1320 """
1321
1322 def obj_prefix(multiplier):
1323 """
1324 MDS object naming conventions like rank 1's
1325 journal is at 201.***
1326 """
1327 return "%x." % (multiplier * 0x100 + rank)
1328
1329 # MDS_INO_LOG_OFFSET
1330 self.erase_metadata_objects(obj_prefix(2))
1331 # MDS_INO_LOG_BACKUP_OFFSET
1332 self.erase_metadata_objects(obj_prefix(3))
1333 # MDS_INO_LOG_POINTER_OFFSET
1334 self.erase_metadata_objects(obj_prefix(4))
1335 # MDSTables & SessionMap
1336 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1337
1338 @property
1339 def _prefix(self):
1340 """
1341 Override this to set a different
1342 """
1343 return ""
1344
f64942e4
AA
1345 def _make_rank(self, rank):
1346 return "{}:{}".format(self.name, rank)
1347
7c673cae
FG
1348 def _run_tool(self, tool, args, rank=None, quiet=False):
1349 # Tests frequently have [client] configuration that jacks up
1350 # the objecter log level (unlikely to be interesting here)
1351 # and does not set the mds log level (very interesting here)
1352 if quiet:
1353 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1354 else:
1355 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1356
1357 if rank is not None:
f64942e4 1358 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1359
1360 t1 = datetime.datetime.now()
e306af50 1361 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae
FG
1362 duration = datetime.datetime.now() - t1
1363 log.info("Ran {0} in time {1}, result:\n{2}".format(
1364 base_args + args, duration, r
1365 ))
1366 return r
1367
1368 @property
1369 def tool_remote(self):
1370 """
1371 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1372 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1373 so that tests can use this remote to go get locally written output files from the tools.
1374 """
1375 mds_id = self.mds_ids[0]
1376 return self.mds_daemons[mds_id].remote
1377
f64942e4 1378 def journal_tool(self, args, rank, quiet=False):
7c673cae 1379 """
f64942e4 1380 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1381 """
f64942e4
AA
1382 fs_rank = self._make_rank(rank)
1383 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae
FG
1384
1385 def table_tool(self, args, quiet=False):
1386 """
1387 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1388 """
1389 return self._run_tool("cephfs-table-tool", args, None, quiet)
1390
1391 def data_scan(self, args, quiet=False, worker_count=1):
1392 """
1393 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1394
1395 :param worker_count: if greater than 1, multiple workers will be run
1396 in parallel and the return value will be None
1397 """
1398
1399 workers = []
1400
1401 for n in range(0, worker_count):
1402 if worker_count > 1:
1403 # data-scan args first token is a command, followed by args to it.
1404 # insert worker arguments after the command.
1405 cmd = args[0]
1406 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1407 else:
1408 worker_args = args
1409
1410 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1411 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1412
1413 for w in workers:
1414 w.get()
1415
1416 if worker_count == 1:
1417 return workers[0].value
1418 else:
1419 return None
b32b8144
FG
1420
1421 def is_full(self):
1422 return self.is_pool_full(self.get_data_pool_name())