]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae
FG
1
2from StringIO import StringIO
3import json
4import logging
5from gevent import Greenlet
6import os
7import time
8import datetime
9import re
10import errno
181888fb 11import random
11fdf7f2 12import traceback
7c673cae
FG
13
14from teuthology.exceptions import CommandFailedError
15from teuthology import misc
16from teuthology.nuke import clear_firewall
17from teuthology.parallel import parallel
18from tasks.ceph_manager import write_conf
19from tasks import ceph_manager
20
21
22log = logging.getLogger(__name__)
23
24
25DAEMON_WAIT_TIMEOUT = 120
26ROOT_INO = 1
27
92f5a8d4
TL
28class FileLayout(object):
29 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
30 self.pool = pool
31 self.pool_namespace = pool_namespace
32 self.stripe_unit = stripe_unit
33 self.stripe_count = stripe_count
34 self.object_size = object_size
35
36 @classmethod
37 def load_from_ceph(layout_str):
38 # TODO
39 pass
40
41 def items(self):
42 if self.pool is not None:
43 yield ("pool", self.pool)
44 if self.pool_namespace:
45 yield ("pool_namespace", self.pool_namespace)
46 if self.stripe_unit is not None:
47 yield ("stripe_unit", self.stripe_unit)
48 if self.stripe_count is not None:
49 yield ("stripe_count", self.stripe_count)
50 if self.object_size is not None:
51 yield ("object_size", self.stripe_size)
7c673cae
FG
52
53class ObjectNotFound(Exception):
54 def __init__(self, object_name):
55 self._object_name = object_name
56
57 def __str__(self):
58 return "Object not found: '{0}'".format(self._object_name)
59
60class FSStatus(object):
61 """
62 Operations on a snapshot of the FSMap.
63 """
64 def __init__(self, mon_manager):
65 self.mon = mon_manager
66 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
67
68 def __str__(self):
69 return json.dumps(self.map, indent = 2, sort_keys = True)
70
71 # Expose the fsmap for manual inspection.
72 def __getitem__(self, key):
73 """
74 Get a field from the fsmap.
75 """
76 return self.map[key]
77
78 def get_filesystems(self):
79 """
80 Iterator for all filesystems.
81 """
82 for fs in self.map['filesystems']:
83 yield fs
84
85 def get_all(self):
86 """
87 Iterator for all the mds_info components in the FSMap.
88 """
89 for info in self.get_standbys():
90 yield info
91 for fs in self.map['filesystems']:
92 for info in fs['mdsmap']['info'].values():
93 yield info
94
95 def get_standbys(self):
96 """
97 Iterator for all standbys.
98 """
99 for info in self.map['standbys']:
100 yield info
101
102 def get_fsmap(self, fscid):
103 """
104 Get the fsmap for the given FSCID.
105 """
106 for fs in self.map['filesystems']:
107 if fscid is None or fs['id'] == fscid:
108 return fs
109 raise RuntimeError("FSCID {0} not in map".format(fscid))
110
111 def get_fsmap_byname(self, name):
112 """
113 Get the fsmap for the given file system name.
114 """
115 for fs in self.map['filesystems']:
116 if name is None or fs['mdsmap']['fs_name'] == name:
117 return fs
118 raise RuntimeError("FS {0} not in map".format(name))
119
120 def get_replays(self, fscid):
121 """
122 Get the standby:replay MDS for the given FSCID.
123 """
124 fs = self.get_fsmap(fscid)
125 for info in fs['mdsmap']['info'].values():
126 if info['state'] == 'up:standby-replay':
127 yield info
128
129 def get_ranks(self, fscid):
130 """
131 Get the ranks for the given FSCID.
132 """
133 fs = self.get_fsmap(fscid)
134 for info in fs['mdsmap']['info'].values():
11fdf7f2 135 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
136 yield info
137
138 def get_rank(self, fscid, rank):
139 """
140 Get the rank for the given FSCID.
141 """
142 for info in self.get_ranks(fscid):
143 if info['rank'] == rank:
144 return info
145 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
146
147 def get_mds(self, name):
148 """
149 Get the info for the given MDS name.
150 """
151 for info in self.get_all():
152 if info['name'] == name:
153 return info
154 return None
155
156 def get_mds_addr(self, name):
157 """
158 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
159 """
160 info = self.get_mds(name)
161 if info:
162 return info['addr']
163 else:
164 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
165 raise RuntimeError("MDS id '{0}' not found in map".format(name))
166
167class CephCluster(object):
168 @property
169 def admin_remote(self):
170 first_mon = misc.get_first_mon(self._ctx, None)
171 (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys()
172 return result
173
174 def __init__(self, ctx):
175 self._ctx = ctx
176 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
177
178 def get_config(self, key, service_type=None):
179 """
180 Get config from mon by default, or a specific service if caller asks for it
181 """
182 if service_type is None:
183 service_type = 'mon'
184
185 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
186 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
187
188 def set_ceph_conf(self, subsys, key, value):
189 if subsys not in self._ctx.ceph['ceph'].conf:
190 self._ctx.ceph['ceph'].conf[subsys] = {}
191 self._ctx.ceph['ceph'].conf[subsys][key] = value
192 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
193 # used a different config path this won't work.
194
195 def clear_ceph_conf(self, subsys, key):
196 del self._ctx.ceph['ceph'].conf[subsys][key]
197 write_conf(self._ctx)
198
f64942e4
AA
199 def json_asok(self, command, service_type, service_id, timeout=None):
200 if timeout is None:
201 timeout = 15*60
202 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
7c673cae
FG
203 response_data = proc.stdout.getvalue()
204 log.info("_json_asok output: {0}".format(response_data))
205 if response_data.strip():
206 return json.loads(response_data)
207 else:
208 return None
209
210
211class MDSCluster(CephCluster):
212 """
213 Collective operations on all the MDS daemons in the Ceph cluster. These
214 daemons may be in use by various Filesystems.
215
216 For the benefit of pre-multi-filesystem tests, this class is also
217 a parent of Filesystem. The correct way to use MDSCluster going forward is
218 as a separate instance outside of your (multiple) Filesystem instances.
219 """
220 def __init__(self, ctx):
221 super(MDSCluster, self).__init__(ctx)
222
223 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
224
225 if len(self.mds_ids) == 0:
226 raise RuntimeError("This task requires at least one MDS")
227
228 if hasattr(self._ctx, "daemons"):
229 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
230 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
231
232 def _one_or_all(self, mds_id, cb, in_parallel=True):
233 """
234 Call a callback for a single named MDS, or for all.
235
236 Note that the parallelism here isn't for performance, it's to avoid being overly kind
237 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
238 avoid being overly kind by executing them in a particular order. However, some actions
239 don't cope with being done in parallel, so it's optional (`in_parallel`)
240
241 :param mds_id: MDS daemon name, or None
242 :param cb: Callback taking single argument of MDS daemon name
243 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
244 """
245 if mds_id is None:
246 if in_parallel:
247 with parallel() as p:
248 for mds_id in self.mds_ids:
249 p.spawn(cb, mds_id)
250 else:
251 for mds_id in self.mds_ids:
252 cb(mds_id)
253 else:
254 cb(mds_id)
255
181888fb
FG
256 def get_config(self, key, service_type=None):
257 """
258 get_config specialization of service_type="mds"
259 """
260 if service_type != "mds":
261 return super(MDSCluster, self).get_config(key, service_type)
262
263 # Some tests stop MDS daemons, don't send commands to a dead one:
264 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
265 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
266
7c673cae
FG
267 def mds_stop(self, mds_id=None):
268 """
269 Stop the MDS daemon process(se). If it held a rank, that rank
270 will eventually go laggy.
271 """
272 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
273
274 def mds_fail(self, mds_id=None):
275 """
276 Inform MDSMonitor of the death of the daemon process(es). If it held
277 a rank, that rank will be relinquished.
278 """
279 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
280
281 def mds_restart(self, mds_id=None):
282 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
283
284 def mds_fail_restart(self, mds_id=None):
285 """
286 Variation on restart that includes marking MDSs as failed, so that doing this
287 operation followed by waiting for healthy daemon states guarantees that they
288 have gone down and come up, rather than potentially seeing the healthy states
289 that existed before the restart.
290 """
291 def _fail_restart(id_):
292 self.mds_daemons[id_].stop()
293 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
294 self.mds_daemons[id_].restart()
295
296 self._one_or_all(mds_id, _fail_restart)
297
1adf2230
AA
298 def mds_signal(self, mds_id, sig, silent=False):
299 """
300 signal a MDS daemon
301 """
302 self.mds_daemons[mds_id].signal(sig, silent);
303
181888fb
FG
304 def newfs(self, name='cephfs', create=True):
305 return Filesystem(self._ctx, name=name, create=create)
7c673cae
FG
306
307 def status(self):
308 return FSStatus(self.mon_manager)
309
310 def delete_all_filesystems(self):
311 """
312 Remove all filesystems that exist, and any pools in use by them.
313 """
314 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
315 pool_id_name = {}
316 for pool in pools:
317 pool_id_name[pool['pool']] = pool['pool_name']
318
319 # mark cluster down for each fs to prevent churn during deletion
320 status = self.status()
321 for fs in status.get_filesystems():
11fdf7f2 322 self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
7c673cae
FG
323
324 # get a new copy as actives may have since changed
325 status = self.status()
326 for fs in status.get_filesystems():
327 mdsmap = fs['mdsmap']
328 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
329
7c673cae
FG
330 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
331 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
332 metadata_pool, metadata_pool,
333 '--yes-i-really-really-mean-it')
334 for data_pool in mdsmap['data_pools']:
335 data_pool = pool_id_name[data_pool]
336 try:
337 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
338 data_pool, data_pool,
339 '--yes-i-really-really-mean-it')
340 except CommandFailedError as e:
341 if e.exitstatus == 16: # EBUSY, this data pool is used
342 pass # by two metadata pools, let the 2nd
343 else: # pass delete it
344 raise
345
346 def get_standby_daemons(self):
347 return set([s['name'] for s in self.status().get_standbys()])
348
349 def get_mds_hostnames(self):
350 result = set()
351 for mds_id in self.mds_ids:
352 mds_remote = self.mon_manager.find_remote('mds', mds_id)
353 result.add(mds_remote.hostname)
354
355 return list(result)
356
357 def set_clients_block(self, blocked, mds_id=None):
358 """
359 Block (using iptables) client communications to this MDS. Be careful: if
360 other services are running on this MDS, or other MDSs try to talk to this
361 MDS, their communications may also be blocked as collatoral damage.
362
363 :param mds_id: Optional ID of MDS to block, default to all
364 :return:
365 """
366 da_flag = "-A" if blocked else "-D"
367
368 def set_block(_mds_id):
369 remote = self.mon_manager.find_remote('mds', _mds_id)
370 status = self.status()
371
372 addr = status.get_mds_addr(_mds_id)
373 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
374
375 remote.run(
376 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
377 "comment", "--comment", "teuthology"])
378 remote.run(
379 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
380 "comment", "--comment", "teuthology"])
381
382 self._one_or_all(mds_id, set_block, in_parallel=False)
383
384 def clear_firewall(self):
385 clear_firewall(self._ctx)
386
387 def get_mds_info(self, mds_id):
388 return FSStatus(self.mon_manager).get_mds(mds_id)
389
7c673cae
FG
390 def is_pool_full(self, pool_name):
391 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
392 for pool in pools:
393 if pool['pool_name'] == pool_name:
394 return 'full' in pool['flags_names'].split(",")
395
396 raise RuntimeError("Pool not found '{0}'".format(pool_name))
397
398class Filesystem(MDSCluster):
399 """
400 This object is for driving a CephFS filesystem. The MDS daemons driven by
401 MDSCluster may be shared with other Filesystems.
402 """
3efd9988
FG
403 def __init__(self, ctx, fscid=None, name=None, create=False,
404 ec_profile=None):
7c673cae
FG
405 super(Filesystem, self).__init__(ctx)
406
181888fb 407 self.name = name
3efd9988 408 self.ec_profile = ec_profile
7c673cae 409 self.id = None
7c673cae 410 self.metadata_pool_name = None
181888fb
FG
411 self.metadata_overlay = False
412 self.data_pool_name = None
7c673cae
FG
413 self.data_pools = None
414
415 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
416 self.client_id = client_list[0]
417 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
418
181888fb 419 if name is not None:
7c673cae
FG
420 if fscid is not None:
421 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 422 if create and not self.legacy_configured():
7c673cae 423 self.create()
181888fb
FG
424 else:
425 if fscid is not None:
426 self.id = fscid
427 self.getinfo(refresh = True)
7c673cae
FG
428
429 # Stash a reference to the first created filesystem on ctx, so
430 # that if someone drops to the interactive shell they can easily
431 # poke our methods.
432 if not hasattr(self._ctx, "filesystem"):
433 self._ctx.filesystem = self
434
435 def getinfo(self, refresh = False):
436 status = self.status()
437 if self.id is not None:
438 fsmap = status.get_fsmap(self.id)
439 elif self.name is not None:
440 fsmap = status.get_fsmap_byname(self.name)
441 else:
442 fss = [fs for fs in status.get_filesystems()]
443 if len(fss) == 1:
444 fsmap = fss[0]
445 elif len(fss) == 0:
446 raise RuntimeError("no file system available")
447 else:
448 raise RuntimeError("more than one file system available")
449 self.id = fsmap['id']
450 self.name = fsmap['mdsmap']['fs_name']
451 self.get_pool_names(status = status, refresh = refresh)
452 return status
453
181888fb
FG
454 def set_metadata_overlay(self, overlay):
455 if self.id is not None:
456 raise RuntimeError("cannot specify fscid when configuring overlay")
457 self.metadata_overlay = overlay
458
7c673cae
FG
459 def deactivate(self, rank):
460 if rank < 0:
461 raise RuntimeError("invalid rank")
462 elif rank == 0:
463 raise RuntimeError("cannot deactivate rank 0")
464 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
465
11fdf7f2
TL
466 def reach_max_mds(self):
467 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
468 status = self.getinfo()
469 mds_map = self.get_mds_map(status=status)
470 max_mds = mds_map['max_mds']
471
472 count = len(list(self.get_ranks(status=status)))
473 if count > max_mds:
474 try:
475 # deactivate mds in decending order
476 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
477 while count > max_mds:
478 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
479 target = targets[0]
480 log.info("deactivating rank %d" % target['rank'])
481 self.deactivate(target['rank'])
482 status = self.wait_for_daemons(skip_max_mds_check=True)
483 count = len(list(self.get_ranks(status=status)))
484 except:
485 # In Mimic, deactivation is done automatically:
486 log.info("Error:\n{}".format(traceback.format_exc()))
487 status = self.wait_for_daemons()
488 else:
489 status = self.wait_for_daemons()
490
491 mds_map = self.get_mds_map(status=status)
492 assert(mds_map['max_mds'] == max_mds)
493 assert(mds_map['in'] == list(range(0, max_mds)))
494
495 def fail(self):
496 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
497
91327a77
AA
498 def set_var(self, var, *args):
499 a = map(str, args)
500 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
501
11fdf7f2
TL
502 def set_down(self, down=True):
503 self.set_var("down", str(down).lower())
504
505 def set_joinable(self, joinable=True):
506 self.set_var("joinable", str(joinable).lower())
507
7c673cae 508 def set_max_mds(self, max_mds):
f64942e4 509 self.set_var("max_mds", "%d" % max_mds)
7c673cae 510
11fdf7f2
TL
511 def set_allow_standby_replay(self, yes):
512 self.set_var("allow_standby_replay", str(yes).lower())
513
514 def set_allow_new_snaps(self, yes):
515 self.set_var("allow_new_snaps", str(yes).lower(), '--yes-i-really-mean-it')
7c673cae 516
7c673cae
FG
517 def get_pgs_per_fs_pool(self):
518 """
519 Calculate how many PGs to use when creating a pool, in order to avoid raising any
520 health warnings about mon_pg_warn_min_per_osd
521
522 :return: an integer number of PGs
523 """
524 pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd'))
525 osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd')))
526 return pg_warn_min_per_osd * osd_count
527
528 def create(self):
529 if self.name is None:
530 self.name = "cephfs"
531 if self.metadata_pool_name is None:
532 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
533 if self.data_pool_name is None:
534 data_pool_name = "{0}_data".format(self.name)
535 else:
536 data_pool_name = self.data_pool_name
7c673cae
FG
537
538 log.info("Creating filesystem '{0}'".format(self.name))
539
540 pgs_per_fs_pool = self.get_pgs_per_fs_pool()
541
542 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
543 self.metadata_pool_name, pgs_per_fs_pool.__str__())
181888fb
FG
544 if self.metadata_overlay:
545 self.mon_manager.raw_cluster_cmd('fs', 'new',
546 self.name, self.metadata_pool_name, data_pool_name,
547 '--allow-dangerous-metadata-overlay')
548 else:
b32b8144 549 if self.ec_profile and 'disabled' not in self.ec_profile:
3efd9988
FG
550 log.info("EC profile is %s", self.ec_profile)
551 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
552 cmd.extend(self.ec_profile)
553 self.mon_manager.raw_cluster_cmd(*cmd)
554 self.mon_manager.raw_cluster_cmd(
555 'osd', 'pool', 'create',
556 data_pool_name, pgs_per_fs_pool.__str__(), 'erasure',
557 data_pool_name)
558 self.mon_manager.raw_cluster_cmd(
559 'osd', 'pool', 'set',
560 data_pool_name, 'allow_ec_overwrites', 'true')
561 else:
562 self.mon_manager.raw_cluster_cmd(
563 'osd', 'pool', 'create',
564 data_pool_name, pgs_per_fs_pool.__str__())
181888fb 565 self.mon_manager.raw_cluster_cmd('fs', 'new',
92f5a8d4
TL
566 self.name,
567 self.metadata_pool_name,
568 data_pool_name,
569 "--force")
35e4c445
FG
570 self.check_pool_application(self.metadata_pool_name)
571 self.check_pool_application(data_pool_name)
7c673cae 572 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
573 try:
574 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
575 except CommandFailedError as e:
576 if e.exitstatus == 22:
577 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
578 pass
579 else:
580 raise
7c673cae
FG
581
582 self.getinfo(refresh = True)
583
35e4c445
FG
584
585 def check_pool_application(self, pool_name):
586 osd_map = self.mon_manager.get_osd_dump_json()
587 for pool in osd_map['pools']:
588 if pool['pool_name'] == pool_name:
589 if "application_metadata" in pool:
590 if not "cephfs" in pool['application_metadata']:
591 raise RuntimeError("Pool %p does not name cephfs as application!".\
592 format(pool_name))
593
594
7c673cae
FG
595 def __del__(self):
596 if getattr(self._ctx, "filesystem", None) == self:
597 delattr(self._ctx, "filesystem")
598
599 def exists(self):
600 """
601 Whether a filesystem exists in the mon's filesystem list
602 """
603 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
604 return self.name in [fs['name'] for fs in fs_list]
605
606 def legacy_configured(self):
607 """
608 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
609 the case, the caller should avoid using Filesystem.create
610 """
611 try:
612 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
613 pools = json.loads(out_text)
614 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
615 if metadata_pool_exists:
616 self.metadata_pool_name = 'metadata'
617 except CommandFailedError as e:
618 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
619 # structured output (--format) from the CLI.
620 if e.exitstatus == 22:
621 metadata_pool_exists = True
622 else:
623 raise
624
625 return metadata_pool_exists
626
627 def _df(self):
628 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
629
f64942e4
AA
630 def get_mds_map(self, status=None):
631 if status is None:
632 status = self.status()
633 return status.get_fsmap(self.id)['mdsmap']
7c673cae 634
11fdf7f2
TL
635 def get_var(self, var, status=None):
636 return self.get_mds_map(status=status)[var]
91327a77 637
92f5a8d4
TL
638 def set_dir_layout(self, mount, path, layout):
639 for name, value in layout.items():
640 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
641
642 def add_data_pool(self, name, create=True):
643 if create:
644 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__())
7c673cae
FG
645 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
646 self.get_pool_names(refresh = True)
647 for poolid, fs_name in self.data_pools.items():
648 if name == fs_name:
649 return poolid
650 raise RuntimeError("could not get just created pool '{0}'".format(name))
651
652 def get_pool_names(self, refresh = False, status = None):
653 if refresh or self.metadata_pool_name is None or self.data_pools is None:
654 if status is None:
655 status = self.status()
656 fsmap = status.get_fsmap(self.id)
657
658 osd_map = self.mon_manager.get_osd_dump_json()
659 id_to_name = {}
660 for p in osd_map['pools']:
661 id_to_name[p['pool']] = p['pool_name']
662
663 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
664 self.data_pools = {}
665 for data_pool in fsmap['mdsmap']['data_pools']:
666 self.data_pools[data_pool] = id_to_name[data_pool]
667
668 def get_data_pool_name(self, refresh = False):
669 if refresh or self.data_pools is None:
670 self.get_pool_names(refresh = True)
671 assert(len(self.data_pools) == 1)
672 return self.data_pools.values()[0]
673
674 def get_data_pool_id(self, refresh = False):
675 """
676 Don't call this if you have multiple data pools
677 :return: integer
678 """
679 if refresh or self.data_pools is None:
680 self.get_pool_names(refresh = True)
681 assert(len(self.data_pools) == 1)
682 return self.data_pools.keys()[0]
683
684 def get_data_pool_names(self, refresh = False):
685 if refresh or self.data_pools is None:
686 self.get_pool_names(refresh = True)
687 return self.data_pools.values()
688
689 def get_metadata_pool_name(self):
690 return self.metadata_pool_name
691
181888fb
FG
692 def set_data_pool_name(self, name):
693 if self.id is not None:
694 raise RuntimeError("can't set filesystem name if its fscid is set")
695 self.data_pool_name = name
696
7c673cae
FG
697 def get_namespace_id(self):
698 return self.id
699
700 def get_pool_df(self, pool_name):
701 """
702 Return a dict like:
703 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
704 """
705 for pool_df in self._df()['pools']:
706 if pool_df['name'] == pool_name:
707 return pool_df['stats']
708
709 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
710
711 def get_usage(self):
712 return self._df()['stats']['total_used_bytes']
713
11fdf7f2 714 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
715 """
716 Return true if all daemons are in one of active, standby, standby-replay, and
717 at least max_mds daemons are in 'active'.
718
719 Unlike most of Filesystem, this function is tolerant of new-style `fs`
720 commands being missing, because we are part of the ceph installation
721 process during upgrade suites, so must fall back to old style commands
722 when we get an EINVAL on a new style command.
723
724 :return:
725 """
11fdf7f2
TL
726 # First, check to see that processes haven't exited with an error code
727 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
728 mds.check_status()
7c673cae
FG
729
730 active_count = 0
731 try:
11fdf7f2 732 mds_map = self.get_mds_map(status=status)
7c673cae
FG
733 except CommandFailedError as cfe:
734 # Old version, fall back to non-multi-fs commands
735 if cfe.exitstatus == errno.EINVAL:
736 mds_map = json.loads(
737 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
738 else:
739 raise
740
741 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
742
743 for mds_id, mds_status in mds_map['info'].items():
744 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
745 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
746 return False
747 elif mds_status['state'] == 'up:active':
748 active_count += 1
749
750 log.info("are_daemons_healthy: {0}/{1}".format(
751 active_count, mds_map['max_mds']
752 ))
753
11fdf7f2
TL
754 if not skip_max_mds_check:
755 if active_count > mds_map['max_mds']:
756 log.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
757 return False
758 elif active_count == mds_map['max_mds']:
759 # The MDSMap says these guys are active, but let's check they really are
760 for mds_id, mds_status in mds_map['info'].items():
761 if mds_status['state'] == 'up:active':
762 try:
763 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
764 except CommandFailedError as cfe:
765 if cfe.exitstatus == errno.EINVAL:
766 # Old version, can't do this check
767 continue
768 else:
769 # MDS not even running
770 return False
771
772 if daemon_status['state'] != 'up:active':
773 # MDS hasn't taken the latest map yet
7c673cae
FG
774 return False
775
11fdf7f2
TL
776 return True
777 else:
778 return False
7c673cae 779 else:
11fdf7f2
TL
780 log.info("are_daemons_healthy: skipping max_mds check")
781 return True
7c673cae 782
11fdf7f2 783 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
784 """
785 Return MDS daemon names of those daemons in the given state
786 :param state:
787 :return:
788 """
11fdf7f2 789 mdsmap = self.get_mds_map(status)
7c673cae 790 result = []
11fdf7f2 791 for mds_status in sorted(mdsmap['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
7c673cae
FG
792 if mds_status['state'] == state or state is None:
793 result.append(mds_status['name'])
794
795 return result
796
797 def get_active_names(self):
798 """
799 Return MDS daemon names of those daemons holding ranks
800 in state up:active
801
802 :return: list of strings like ['a', 'b'], sorted by rank
803 """
804 return self.get_daemon_names("up:active")
805
11fdf7f2
TL
806 def get_all_mds_rank(self, status=None):
807 mdsmap = self.get_mds_map(status)
7c673cae 808 result = []
11fdf7f2 809 for mds_status in sorted(mdsmap['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
7c673cae
FG
810 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
811 result.append(mds_status['rank'])
812
813 return result
814
28e407b8
AA
815 def get_rank(self, rank=0, status=None):
816 if status is None:
817 status = self.getinfo()
818 return status.get_rank(self.id, rank)
819
11fdf7f2
TL
820 def rank_restart(self, rank=0, status=None):
821 name = self.get_rank(rank=rank, status=status)['name']
822 self.mds_restart(mds_id=name)
823
824 def rank_signal(self, signal, rank=0, status=None):
825 name = self.get_rank(rank=rank, status=status)['name']
826 self.mds_signal(name, signal)
827
828 def rank_freeze(self, yes, rank=0):
829 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
830
831 def rank_fail(self, rank=0):
832 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
833
28e407b8
AA
834 def get_ranks(self, status=None):
835 if status is None:
836 status = self.getinfo()
837 return status.get_ranks(self.id)
838
11fdf7f2
TL
839 def get_replays(self, status=None):
840 if status is None:
841 status = self.getinfo()
842 return status.get_replays(self.id)
843
844 def get_replay(self, rank=0, status=None):
845 for replay in self.get_replays(status=status):
846 if replay['rank'] == rank:
847 return replay
848 return None
849
28e407b8 850 def get_rank_names(self, status=None):
7c673cae
FG
851 """
852 Return MDS daemon names of those daemons holding a rank,
853 sorted by rank. This includes e.g. up:replay/reconnect
854 as well as active, but does not include standby or
855 standby-replay.
856 """
11fdf7f2 857 mdsmap = self.get_mds_map(status)
7c673cae 858 result = []
11fdf7f2 859 for mds_status in sorted(mdsmap['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])):
7c673cae
FG
860 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
861 result.append(mds_status['name'])
862
863 return result
864
11fdf7f2 865 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
866 """
867 Wait until all daemons are healthy
868 :return:
869 """
870
871 if timeout is None:
872 timeout = DAEMON_WAIT_TIMEOUT
873
11fdf7f2
TL
874 if status is None:
875 status = self.status()
876
7c673cae
FG
877 elapsed = 0
878 while True:
11fdf7f2
TL
879 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
880 return status
7c673cae
FG
881 else:
882 time.sleep(1)
883 elapsed += 1
884
885 if elapsed > timeout:
11fdf7f2 886 log.info("status = {0}".format(status))
7c673cae
FG
887 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
888
11fdf7f2
TL
889 status = self.status()
890
7c673cae
FG
891 def get_lone_mds_id(self):
892 """
893 Get a single MDS ID: the only one if there is only one
894 configured, else the only one currently holding a rank,
895 else raise an error.
896 """
897 if len(self.mds_ids) != 1:
898 alive = self.get_rank_names()
899 if len(alive) == 1:
900 return alive[0]
901 else:
902 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
903 else:
904 return self.mds_ids[0]
905
906 def recreate(self):
907 log.info("Creating new filesystem")
908 self.delete_all_filesystems()
909 self.id = None
910 self.create()
911
912 def put_metadata_object_raw(self, object_id, infile):
913 """
914 Save an object to the metadata pool
915 """
916 temp_bin_path = infile
917 self.client_remote.run(args=[
918 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
919 ])
920
921 def get_metadata_object_raw(self, object_id):
922 """
923 Retrieve an object from the metadata pool and store it in a file.
924 """
925 temp_bin_path = '/tmp/' + object_id + '.bin'
926
927 self.client_remote.run(args=[
928 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
929 ])
930
931 return temp_bin_path
932
933 def get_metadata_object(self, object_type, object_id):
934 """
935 Retrieve an object from the metadata pool, pass it through
936 ceph-dencoder to dump it to JSON, and return the decoded object.
937 """
938 temp_bin_path = '/tmp/out.bin'
939
940 self.client_remote.run(args=[
941 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
942 ])
943
944 stdout = StringIO()
945 self.client_remote.run(args=[
946 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
947 ], stdout=stdout)
948 dump_json = stdout.getvalue().strip()
949 try:
950 dump = json.loads(dump_json)
951 except (TypeError, ValueError):
952 log.error("Failed to decode JSON: '{0}'".format(dump_json))
953 raise
954
955 return dump
956
957 def get_journal_version(self):
958 """
959 Read the JournalPointer and Journal::Header objects to learn the version of
960 encoding in use.
961 """
962 journal_pointer_object = '400.00000000'
963 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
964 journal_ino = journal_pointer_dump['journal_pointer']['front']
965
966 journal_header_object = "{0:x}.00000000".format(journal_ino)
967 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
968
969 version = journal_header_dump['journal_header']['stream_format']
970 log.info("Read journal version {0}".format(version))
971
972 return version
973
f64942e4 974 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae
FG
975 if mds_id is None:
976 mds_id = self.get_lone_mds_id()
977
f64942e4 978 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 979
f64942e4
AA
980 def rank_asok(self, command, rank=0, status=None, timeout=None):
981 info = self.get_rank(rank=rank, status=status)
982 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 983
11fdf7f2
TL
984 def rank_tell(self, command, rank=0, status=None):
985 info = self.get_rank(rank=rank, status=status)
986 return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
987
7c673cae
FG
988 def read_cache(self, path, depth=None):
989 cmd = ["dump", "tree", path]
990 if depth is not None:
991 cmd.append(depth.__str__())
992 result = self.mds_asok(cmd)
993 if len(result) == 0:
994 raise RuntimeError("Path not found in cache: {0}".format(path))
995
996 return result
997
998 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
999 """
1000 Block until the MDS reaches a particular state, or a failure condition
1001 is met.
1002
1003 When there are multiple MDSs, succeed when exaclty one MDS is in the
1004 goal state, or fail when any MDS is in the reject state.
1005
1006 :param goal_state: Return once the MDS is in this state
1007 :param reject: Fail if the MDS enters this state before the goal state
1008 :param timeout: Fail if this many seconds pass before reaching goal
1009 :return: number of seconds waited, rounded down to integer
1010 """
1011
1012 started_at = time.time()
1013 while True:
1014 status = self.status()
1015 if rank is not None:
f64942e4
AA
1016 try:
1017 mds_info = status.get_rank(self.id, rank)
1018 current_state = mds_info['state'] if mds_info else None
1019 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1020 except:
1021 mdsmap = self.get_mds_map(status=status)
1022 if rank in mdsmap['failed']:
1023 log.info("Waiting for rank {0} to come back.".format(rank))
1024 current_state = None
1025 else:
1026 raise
7c673cae
FG
1027 elif mds_id is not None:
1028 # mds_info is None if no daemon with this ID exists in the map
1029 mds_info = status.get_mds(mds_id)
1030 current_state = mds_info['state'] if mds_info else None
1031 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1032 else:
1033 # In general, look for a single MDS
1034 states = [m['state'] for m in status.get_ranks(self.id)]
1035 if [s for s in states if s == goal_state] == [goal_state]:
1036 current_state = goal_state
1037 elif reject in states:
1038 current_state = reject
1039 else:
1040 current_state = None
1041 log.info("mapped states {0} to {1}".format(states, current_state))
1042
1043 elapsed = time.time() - started_at
1044 if current_state == goal_state:
1045 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
1046 return elapsed
1047 elif reject is not None and current_state == reject:
1048 raise RuntimeError("MDS in reject state {0}".format(current_state))
1049 elif timeout is not None and elapsed > timeout:
1050 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1051 raise RuntimeError(
1052 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1053 elapsed, goal_state, current_state
1054 ))
1055 else:
1056 time.sleep(1)
1057
1058 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
1059 mds_id = self.mds_ids[0]
1060 remote = self.mds_daemons[mds_id].remote
1061 if pool is None:
1062 pool = self.get_data_pool_name()
1063
1064 obj_name = "{0:x}.00000000".format(ino_no)
1065
1066 args = [
1067 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
1068 ]
1069 try:
1070 proc = remote.run(
1071 args=args,
1072 stdout=StringIO())
1073 except CommandFailedError as e:
1074 log.error(e.__str__())
1075 raise ObjectNotFound(obj_name)
1076
1077 data = proc.stdout.getvalue()
1078
1079 p = remote.run(
1080 args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"],
1081 stdout=StringIO(),
1082 stdin=data
1083 )
1084
1085 return json.loads(p.stdout.getvalue().strip())
1086
1087 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1088 """
1089 Write to an xattr of the 0th data object of an inode. Will
1090 succeed whether the object and/or xattr already exist or not.
1091
1092 :param ino_no: integer inode number
1093 :param xattr_name: string name of the xattr
1094 :param data: byte array data to write to the xattr
1095 :param pool: name of data pool or None to use primary data pool
1096 :return: None
1097 """
1098 remote = self.mds_daemons[self.mds_ids[0]].remote
1099 if pool is None:
1100 pool = self.get_data_pool_name()
1101
1102 obj_name = "{0:x}.00000000".format(ino_no)
1103 args = [
1104 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
1105 obj_name, xattr_name, data
1106 ]
1107 remote.run(
1108 args=args,
1109 stdout=StringIO())
1110
1111 def read_backtrace(self, ino_no, pool=None):
1112 """
1113 Read the backtrace from the data pool, return a dict in the format
1114 given by inode_backtrace_t::dump, which is something like:
1115
1116 ::
1117
1118 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1119 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1120
1121 { "ino": 1099511627778,
1122 "ancestors": [
1123 { "dirino": 1,
1124 "dname": "blah",
1125 "version": 11}],
1126 "pool": 1,
1127 "old_pools": []}
1128
1129 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1130 one data pool and that will be used.
1131 """
1132 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1133
1134 def read_layout(self, ino_no, pool=None):
1135 """
1136 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1137 ::
1138 {
1139 "stripe_unit": 4194304,
1140 "stripe_count": 1,
1141 "object_size": 4194304,
1142 "pool_id": 1,
1143 "pool_ns": "",
1144 }
1145
1146 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1147 one data pool and that will be used.
1148 """
1149 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1150
1151 def _enumerate_data_objects(self, ino, size):
1152 """
1153 Get the list of expected data objects for a range, and the list of objects
1154 that really exist.
1155
1156 :return a tuple of two lists of strings (expected, actual)
1157 """
1158 stripe_size = 1024 * 1024 * 4
1159
1160 size = max(stripe_size, size)
1161
1162 want_objects = [
1163 "{0:x}.{1:08x}".format(ino, n)
1164 for n in range(0, ((size - 1) / stripe_size) + 1)
1165 ]
1166
1167 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1168
1169 return want_objects, exist_objects
1170
1171 def data_objects_present(self, ino, size):
1172 """
1173 Check that *all* the expected data objects for an inode are present in the data pool
1174 """
1175
1176 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1177 missing = set(want_objects) - set(exist_objects)
1178
1179 if missing:
1180 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1181 ino, size, missing
1182 ))
1183 return False
1184 else:
1185 log.info("All objects for ino {0} size {1} found".format(ino, size))
1186 return True
1187
1188 def data_objects_absent(self, ino, size):
1189 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1190 present = set(want_objects) & set(exist_objects)
1191
1192 if present:
1193 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1194 ino, size, present
1195 ))
1196 return False
1197 else:
1198 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1199 return True
1200
1201 def dirfrag_exists(self, ino, frag):
1202 try:
1203 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1204 except CommandFailedError as e:
1205 return False
1206 else:
1207 return True
1208
94b18763
FG
1209 def rados(self, args, pool=None, namespace=None, stdin_data=None,
1210 stdin_file=None):
7c673cae
FG
1211 """
1212 Call into the `rados` CLI from an MDS
1213 """
1214
1215 if pool is None:
1216 pool = self.get_metadata_pool_name()
1217
1218 # Doesn't matter which MDS we use to run rados commands, they all
1219 # have access to the pools
1220 mds_id = self.mds_ids[0]
1221 remote = self.mds_daemons[mds_id].remote
1222
1223 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1224 # using the `rados` CLI
1225 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1226 (["--namespace", namespace] if namespace else []) +
1227 args)
94b18763
FG
1228
1229 if stdin_file is not None:
1230 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
1231
7c673cae
FG
1232 p = remote.run(
1233 args=args,
1234 stdin=stdin_data,
1235 stdout=StringIO())
1236 return p.stdout.getvalue().strip()
1237
1238 def list_dirfrag(self, dir_ino):
1239 """
1240 Read the named object and return the list of omap keys
1241
1242 :return a list of 0 or more strings
1243 """
1244
1245 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1246
1247 try:
1248 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1249 except CommandFailedError as e:
1250 log.error(e.__str__())
1251 raise ObjectNotFound(dirfrag_obj_name)
1252
1253 return key_list_str.split("\n") if key_list_str else []
1254
1255 def erase_metadata_objects(self, prefix):
1256 """
1257 For all objects in the metadata pool matching the prefix,
1258 erase them.
1259
1260 This O(N) with the number of objects in the pool, so only suitable
1261 for use on toy test filesystems.
1262 """
1263 all_objects = self.rados(["ls"]).split("\n")
1264 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1265 for o in matching_objects:
1266 self.rados(["rm", o])
1267
1268 def erase_mds_objects(self, rank):
1269 """
1270 Erase all the per-MDS objects for a particular rank. This includes
1271 inotable, sessiontable, journal
1272 """
1273
1274 def obj_prefix(multiplier):
1275 """
1276 MDS object naming conventions like rank 1's
1277 journal is at 201.***
1278 """
1279 return "%x." % (multiplier * 0x100 + rank)
1280
1281 # MDS_INO_LOG_OFFSET
1282 self.erase_metadata_objects(obj_prefix(2))
1283 # MDS_INO_LOG_BACKUP_OFFSET
1284 self.erase_metadata_objects(obj_prefix(3))
1285 # MDS_INO_LOG_POINTER_OFFSET
1286 self.erase_metadata_objects(obj_prefix(4))
1287 # MDSTables & SessionMap
1288 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1289
1290 @property
1291 def _prefix(self):
1292 """
1293 Override this to set a different
1294 """
1295 return ""
1296
f64942e4
AA
1297 def _make_rank(self, rank):
1298 return "{}:{}".format(self.name, rank)
1299
7c673cae
FG
1300 def _run_tool(self, tool, args, rank=None, quiet=False):
1301 # Tests frequently have [client] configuration that jacks up
1302 # the objecter log level (unlikely to be interesting here)
1303 # and does not set the mds log level (very interesting here)
1304 if quiet:
1305 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1306 else:
1307 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1308
1309 if rank is not None:
f64942e4 1310 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1311
1312 t1 = datetime.datetime.now()
1313 r = self.tool_remote.run(
1314 args=base_args + args,
1315 stdout=StringIO()).stdout.getvalue().strip()
1316 duration = datetime.datetime.now() - t1
1317 log.info("Ran {0} in time {1}, result:\n{2}".format(
1318 base_args + args, duration, r
1319 ))
1320 return r
1321
1322 @property
1323 def tool_remote(self):
1324 """
1325 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1326 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1327 so that tests can use this remote to go get locally written output files from the tools.
1328 """
1329 mds_id = self.mds_ids[0]
1330 return self.mds_daemons[mds_id].remote
1331
f64942e4 1332 def journal_tool(self, args, rank, quiet=False):
7c673cae 1333 """
f64942e4 1334 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1335 """
f64942e4
AA
1336 fs_rank = self._make_rank(rank)
1337 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae
FG
1338
1339 def table_tool(self, args, quiet=False):
1340 """
1341 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1342 """
1343 return self._run_tool("cephfs-table-tool", args, None, quiet)
1344
1345 def data_scan(self, args, quiet=False, worker_count=1):
1346 """
1347 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1348
1349 :param worker_count: if greater than 1, multiple workers will be run
1350 in parallel and the return value will be None
1351 """
1352
1353 workers = []
1354
1355 for n in range(0, worker_count):
1356 if worker_count > 1:
1357 # data-scan args first token is a command, followed by args to it.
1358 # insert worker arguments after the command.
1359 cmd = args[0]
1360 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1361 else:
1362 worker_args = args
1363
1364 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1365 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1366
1367 for w in workers:
1368 w.get()
1369
1370 if worker_count == 1:
1371 return workers[0].value
1372 else:
1373 return None
b32b8144
FG
1374
1375 def is_full(self):
1376 return self.is_pool_full(self.get_data_pool_name())