]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
bf337f84fc9cbef90d3618cb67692b699f14e028
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11 import traceback
12
13 from io import BytesIO
14 from six import StringIO
15
16 from teuthology.exceptions import CommandFailedError
17 from teuthology import misc
18 from teuthology.nuke import clear_firewall
19 from teuthology.parallel import parallel
20 from tasks.ceph_manager import write_conf
21 from tasks import ceph_manager
22
23
24 log = logging.getLogger(__name__)
25
26
27 DAEMON_WAIT_TIMEOUT = 120
28 ROOT_INO = 1
29
30 class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
54
55 class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
62 class FSStatus(object):
63 """
64 Operations on a snapshot of the FSMap.
65 """
66 def __init__(self, mon_manager):
67 self.mon = mon_manager
68 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
69
70 def __str__(self):
71 return json.dumps(self.map, indent = 2, sort_keys = True)
72
73 # Expose the fsmap for manual inspection.
74 def __getitem__(self, key):
75 """
76 Get a field from the fsmap.
77 """
78 return self.map[key]
79
80 def get_filesystems(self):
81 """
82 Iterator for all filesystems.
83 """
84 for fs in self.map['filesystems']:
85 yield fs
86
87 def get_all(self):
88 """
89 Iterator for all the mds_info components in the FSMap.
90 """
91 for info in self.map['standbys']:
92 yield info
93 for fs in self.map['filesystems']:
94 for info in fs['mdsmap']['info'].values():
95 yield info
96
97 def get_standbys(self):
98 """
99 Iterator for all standbys.
100 """
101 for info in self.map['standbys']:
102 yield info
103
104 def get_fsmap(self, fscid):
105 """
106 Get the fsmap for the given FSCID.
107 """
108 for fs in self.map['filesystems']:
109 if fscid is None or fs['id'] == fscid:
110 return fs
111 raise RuntimeError("FSCID {0} not in map".format(fscid))
112
113 def get_fsmap_byname(self, name):
114 """
115 Get the fsmap for the given file system name.
116 """
117 for fs in self.map['filesystems']:
118 if name is None or fs['mdsmap']['fs_name'] == name:
119 return fs
120 raise RuntimeError("FS {0} not in map".format(name))
121
122 def get_replays(self, fscid):
123 """
124 Get the standby:replay MDS for the given FSCID.
125 """
126 fs = self.get_fsmap(fscid)
127 for info in fs['mdsmap']['info'].values():
128 if info['state'] == 'up:standby-replay':
129 yield info
130
131 def get_ranks(self, fscid):
132 """
133 Get the ranks for the given FSCID.
134 """
135 fs = self.get_fsmap(fscid)
136 for info in fs['mdsmap']['info'].values():
137 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
138 yield info
139
140 def get_rank(self, fscid, rank):
141 """
142 Get the rank for the given FSCID.
143 """
144 for info in self.get_ranks(fscid):
145 if info['rank'] == rank:
146 return info
147 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
148
149 def get_mds(self, name):
150 """
151 Get the info for the given MDS name.
152 """
153 for info in self.get_all():
154 if info['name'] == name:
155 return info
156 return None
157
158 def get_mds_addr(self, name):
159 """
160 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
161 """
162 info = self.get_mds(name)
163 if info:
164 return info['addr']
165 else:
166 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
167 raise RuntimeError("MDS id '{0}' not found in map".format(name))
168
169 def get_mds_gid(self, gid):
170 """
171 Get the info for the given MDS gid.
172 """
173 for info in self.get_all():
174 if info['gid'] == gid:
175 return info
176 return None
177
178 def hadfailover(self, status):
179 """
180 Compares two statuses for mds failovers.
181 Returns True if there is a failover.
182 """
183 for fs in status.map['filesystems']:
184 for info in fs['mdsmap']['info'].values():
185 oldinfo = self.get_mds_gid(info['gid'])
186 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
187 return True
188 #all matching
189 return False
190
191 class CephCluster(object):
192 @property
193 def admin_remote(self):
194 first_mon = misc.get_first_mon(self._ctx, None)
195 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
196 return result
197
198 def __init__(self, ctx):
199 self._ctx = ctx
200 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
201
202 def get_config(self, key, service_type=None):
203 """
204 Get config from mon by default, or a specific service if caller asks for it
205 """
206 if service_type is None:
207 service_type = 'mon'
208
209 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
210 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
211
212 def set_ceph_conf(self, subsys, key, value):
213 if subsys not in self._ctx.ceph['ceph'].conf:
214 self._ctx.ceph['ceph'].conf[subsys] = {}
215 self._ctx.ceph['ceph'].conf[subsys][key] = value
216 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
217 # used a different config path this won't work.
218
219 def clear_ceph_conf(self, subsys, key):
220 del self._ctx.ceph['ceph'].conf[subsys][key]
221 write_conf(self._ctx)
222
223 def json_asok(self, command, service_type, service_id, timeout=None):
224 if timeout is None:
225 timeout = 15*60
226 command.insert(0, '--format=json')
227 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
228 response_data = proc.stdout.getvalue().strip()
229 if len(response_data) > 0:
230 j = json.loads(response_data)
231 pretty = json.dumps(j, sort_keys=True, indent=2)
232 log.debug(f"_json_asok output\n{pretty}")
233 return j
234 else:
235 log.debug("_json_asok output empty")
236 return None
237
238
239 class MDSCluster(CephCluster):
240 """
241 Collective operations on all the MDS daemons in the Ceph cluster. These
242 daemons may be in use by various Filesystems.
243
244 For the benefit of pre-multi-filesystem tests, this class is also
245 a parent of Filesystem. The correct way to use MDSCluster going forward is
246 as a separate instance outside of your (multiple) Filesystem instances.
247 """
248 def __init__(self, ctx):
249 super(MDSCluster, self).__init__(ctx)
250
251 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
252
253 if len(self.mds_ids) == 0:
254 raise RuntimeError("This task requires at least one MDS")
255
256 if hasattr(self._ctx, "daemons"):
257 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
258 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
259
260 def _one_or_all(self, mds_id, cb, in_parallel=True):
261 """
262 Call a callback for a single named MDS, or for all.
263
264 Note that the parallelism here isn't for performance, it's to avoid being overly kind
265 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
266 avoid being overly kind by executing them in a particular order. However, some actions
267 don't cope with being done in parallel, so it's optional (`in_parallel`)
268
269 :param mds_id: MDS daemon name, or None
270 :param cb: Callback taking single argument of MDS daemon name
271 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
272 """
273 if mds_id is None:
274 if in_parallel:
275 with parallel() as p:
276 for mds_id in self.mds_ids:
277 p.spawn(cb, mds_id)
278 else:
279 for mds_id in self.mds_ids:
280 cb(mds_id)
281 else:
282 cb(mds_id)
283
284 def get_config(self, key, service_type=None):
285 """
286 get_config specialization of service_type="mds"
287 """
288 if service_type != "mds":
289 return super(MDSCluster, self).get_config(key, service_type)
290
291 # Some tests stop MDS daemons, don't send commands to a dead one:
292 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
293 service_id = random.sample(running_daemons, 1)[0]
294 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
295
296 def mds_stop(self, mds_id=None):
297 """
298 Stop the MDS daemon process(se). If it held a rank, that rank
299 will eventually go laggy.
300 """
301 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
302
303 def mds_fail(self, mds_id=None):
304 """
305 Inform MDSMonitor of the death of the daemon process(es). If it held
306 a rank, that rank will be relinquished.
307 """
308 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
309
310 def mds_restart(self, mds_id=None):
311 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
312
313 def mds_fail_restart(self, mds_id=None):
314 """
315 Variation on restart that includes marking MDSs as failed, so that doing this
316 operation followed by waiting for healthy daemon states guarantees that they
317 have gone down and come up, rather than potentially seeing the healthy states
318 that existed before the restart.
319 """
320 def _fail_restart(id_):
321 self.mds_daemons[id_].stop()
322 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
323 self.mds_daemons[id_].restart()
324
325 self._one_or_all(mds_id, _fail_restart)
326
327 def mds_signal(self, mds_id, sig, silent=False):
328 """
329 signal a MDS daemon
330 """
331 self.mds_daemons[mds_id].signal(sig, silent);
332
333 def newfs(self, name='cephfs', create=True):
334 return Filesystem(self._ctx, name=name, create=create)
335
336 def status(self):
337 return FSStatus(self.mon_manager)
338
339 def delete_all_filesystems(self):
340 """
341 Remove all filesystems that exist, and any pools in use by them.
342 """
343 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
344 pool_id_name = {}
345 for pool in pools:
346 pool_id_name[pool['pool']] = pool['pool_name']
347
348 # mark cluster down for each fs to prevent churn during deletion
349 status = self.status()
350 for fs in status.get_filesystems():
351 self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
352
353 # get a new copy as actives may have since changed
354 status = self.status()
355 for fs in status.get_filesystems():
356 mdsmap = fs['mdsmap']
357 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
358
359 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
360 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
361 metadata_pool, metadata_pool,
362 '--yes-i-really-really-mean-it')
363 for data_pool in mdsmap['data_pools']:
364 data_pool = pool_id_name[data_pool]
365 try:
366 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
367 data_pool, data_pool,
368 '--yes-i-really-really-mean-it')
369 except CommandFailedError as e:
370 if e.exitstatus == 16: # EBUSY, this data pool is used
371 pass # by two metadata pools, let the 2nd
372 else: # pass delete it
373 raise
374
375 def get_standby_daemons(self):
376 return set([s['name'] for s in self.status().get_standbys()])
377
378 def get_mds_hostnames(self):
379 result = set()
380 for mds_id in self.mds_ids:
381 mds_remote = self.mon_manager.find_remote('mds', mds_id)
382 result.add(mds_remote.hostname)
383
384 return list(result)
385
386 def set_clients_block(self, blocked, mds_id=None):
387 """
388 Block (using iptables) client communications to this MDS. Be careful: if
389 other services are running on this MDS, or other MDSs try to talk to this
390 MDS, their communications may also be blocked as collatoral damage.
391
392 :param mds_id: Optional ID of MDS to block, default to all
393 :return:
394 """
395 da_flag = "-A" if blocked else "-D"
396
397 def set_block(_mds_id):
398 remote = self.mon_manager.find_remote('mds', _mds_id)
399 status = self.status()
400
401 addr = status.get_mds_addr(_mds_id)
402 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
403
404 remote.run(
405 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
406 "comment", "--comment", "teuthology"])
407 remote.run(
408 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
409 "comment", "--comment", "teuthology"])
410
411 self._one_or_all(mds_id, set_block, in_parallel=False)
412
413 def clear_firewall(self):
414 clear_firewall(self._ctx)
415
416 def get_mds_info(self, mds_id):
417 return FSStatus(self.mon_manager).get_mds(mds_id)
418
419 def is_pool_full(self, pool_name):
420 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
421 for pool in pools:
422 if pool['pool_name'] == pool_name:
423 return 'full' in pool['flags_names'].split(",")
424
425 raise RuntimeError("Pool not found '{0}'".format(pool_name))
426
427 class Filesystem(MDSCluster):
428 """
429 This object is for driving a CephFS filesystem. The MDS daemons driven by
430 MDSCluster may be shared with other Filesystems.
431 """
432 def __init__(self, ctx, fs_config=None, fscid=None, name=None, create=False,
433 ec_profile=None):
434 super(Filesystem, self).__init__(ctx)
435
436 self.name = name
437 self.ec_profile = ec_profile
438 self.id = None
439 self.metadata_pool_name = None
440 self.metadata_overlay = False
441 self.data_pool_name = None
442 self.data_pools = None
443 self.fs_config = fs_config
444
445 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
446 self.client_id = client_list[0]
447 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
448
449 if name is not None:
450 if fscid is not None:
451 raise RuntimeError("cannot specify fscid when creating fs")
452 if create and not self.legacy_configured():
453 self.create()
454 else:
455 if fscid is not None:
456 self.id = fscid
457 self.getinfo(refresh = True)
458
459 # Stash a reference to the first created filesystem on ctx, so
460 # that if someone drops to the interactive shell they can easily
461 # poke our methods.
462 if not hasattr(self._ctx, "filesystem"):
463 self._ctx.filesystem = self
464
465 def get_task_status(self, status_key):
466 return self.mon_manager.get_service_task_status("mds", status_key)
467
468 def getinfo(self, refresh = False):
469 status = self.status()
470 if self.id is not None:
471 fsmap = status.get_fsmap(self.id)
472 elif self.name is not None:
473 fsmap = status.get_fsmap_byname(self.name)
474 else:
475 fss = [fs for fs in status.get_filesystems()]
476 if len(fss) == 1:
477 fsmap = fss[0]
478 elif len(fss) == 0:
479 raise RuntimeError("no file system available")
480 else:
481 raise RuntimeError("more than one file system available")
482 self.id = fsmap['id']
483 self.name = fsmap['mdsmap']['fs_name']
484 self.get_pool_names(status = status, refresh = refresh)
485 return status
486
487 def set_metadata_overlay(self, overlay):
488 if self.id is not None:
489 raise RuntimeError("cannot specify fscid when configuring overlay")
490 self.metadata_overlay = overlay
491
492 def deactivate(self, rank):
493 if rank < 0:
494 raise RuntimeError("invalid rank")
495 elif rank == 0:
496 raise RuntimeError("cannot deactivate rank 0")
497 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
498
499 def reach_max_mds(self):
500 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
501 status = self.getinfo()
502 mds_map = self.get_mds_map(status=status)
503 max_mds = mds_map['max_mds']
504
505 count = len(list(self.get_ranks(status=status)))
506 if count > max_mds:
507 try:
508 # deactivate mds in decending order
509 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
510 while count > max_mds:
511 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
512 target = targets[0]
513 log.info("deactivating rank %d" % target['rank'])
514 self.deactivate(target['rank'])
515 status = self.wait_for_daemons(skip_max_mds_check=True)
516 count = len(list(self.get_ranks(status=status)))
517 except:
518 # In Mimic, deactivation is done automatically:
519 log.info("Error:\n{}".format(traceback.format_exc()))
520 status = self.wait_for_daemons()
521 else:
522 status = self.wait_for_daemons()
523
524 mds_map = self.get_mds_map(status=status)
525 assert(mds_map['max_mds'] == max_mds)
526 assert(mds_map['in'] == list(range(0, max_mds)))
527
528 def fail(self):
529 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
530
531 def set_flag(self, var, *args):
532 a = map(lambda x: str(x).lower(), args)
533 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
534
535 def set_allow_multifs(self, yes=True):
536 self.set_flag("enable_multiple", yes)
537
538 def set_var(self, var, *args):
539 a = map(lambda x: str(x).lower(), args)
540 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
541
542 def set_down(self, down=True):
543 self.set_var("down", str(down).lower())
544
545 def set_joinable(self, joinable=True):
546 self.set_var("joinable", joinable)
547
548 def set_max_mds(self, max_mds):
549 self.set_var("max_mds", "%d" % max_mds)
550
551 def set_session_timeout(self, timeout):
552 self.set_var("session_timeout", "%d" % timeout)
553
554 def set_allow_standby_replay(self, yes):
555 self.set_var("allow_standby_replay", yes)
556
557 def set_allow_new_snaps(self, yes):
558 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
559
560 # In Octopus+, the PG count can be omitted to use the default. We keep the
561 # hard-coded value for deployments of Mimic/Nautilus.
562 pgs_per_fs_pool = 8
563
564 def create(self):
565 if self.name is None:
566 self.name = "cephfs"
567 if self.metadata_pool_name is None:
568 self.metadata_pool_name = "{0}_metadata".format(self.name)
569 if self.data_pool_name is None:
570 data_pool_name = "{0}_data".format(self.name)
571 else:
572 data_pool_name = self.data_pool_name
573
574 log.info("Creating filesystem '{0}'".format(self.name))
575
576 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
577 self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
578 if self.metadata_overlay:
579 self.mon_manager.raw_cluster_cmd('fs', 'new',
580 self.name, self.metadata_pool_name, data_pool_name,
581 '--allow-dangerous-metadata-overlay')
582 else:
583 if self.ec_profile and 'disabled' not in self.ec_profile:
584 log.info("EC profile is %s", self.ec_profile)
585 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
586 cmd.extend(self.ec_profile)
587 self.mon_manager.raw_cluster_cmd(*cmd)
588 self.mon_manager.raw_cluster_cmd(
589 'osd', 'pool', 'create',
590 data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
591 data_pool_name)
592 self.mon_manager.raw_cluster_cmd(
593 'osd', 'pool', 'set',
594 data_pool_name, 'allow_ec_overwrites', 'true')
595 else:
596 self.mon_manager.raw_cluster_cmd(
597 'osd', 'pool', 'create',
598 data_pool_name, self.pgs_per_fs_pool.__str__())
599 self.mon_manager.raw_cluster_cmd('fs', 'new',
600 self.name,
601 self.metadata_pool_name,
602 data_pool_name,
603 "--force")
604 self.check_pool_application(self.metadata_pool_name)
605 self.check_pool_application(data_pool_name)
606 # Turn off spurious standby count warnings from modifying max_mds in tests.
607 try:
608 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
609 except CommandFailedError as e:
610 if e.exitstatus == 22:
611 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
612 pass
613 else:
614 raise
615
616 if self.fs_config is not None:
617 max_mds = self.fs_config.get('max_mds', 1)
618 if max_mds > 1:
619 self.set_max_mds(max_mds)
620
621 # If absent will use the default value (60 seconds)
622 session_timeout = self.fs_config.get('session_timeout', 60)
623 if session_timeout != 60:
624 self.set_session_timeout(session_timeout)
625
626 self.getinfo(refresh = True)
627
628
629 def check_pool_application(self, pool_name):
630 osd_map = self.mon_manager.get_osd_dump_json()
631 for pool in osd_map['pools']:
632 if pool['pool_name'] == pool_name:
633 if "application_metadata" in pool:
634 if not "cephfs" in pool['application_metadata']:
635 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
636 format(pool_name=pool_name))
637
638
639 def __del__(self):
640 if getattr(self._ctx, "filesystem", None) == self:
641 delattr(self._ctx, "filesystem")
642
643 def exists(self):
644 """
645 Whether a filesystem exists in the mon's filesystem list
646 """
647 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
648 return self.name in [fs['name'] for fs in fs_list]
649
650 def legacy_configured(self):
651 """
652 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
653 the case, the caller should avoid using Filesystem.create
654 """
655 try:
656 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
657 pools = json.loads(out_text)
658 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
659 if metadata_pool_exists:
660 self.metadata_pool_name = 'metadata'
661 except CommandFailedError as e:
662 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
663 # structured output (--format) from the CLI.
664 if e.exitstatus == 22:
665 metadata_pool_exists = True
666 else:
667 raise
668
669 return metadata_pool_exists
670
671 def _df(self):
672 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
673
674 def get_mds_map(self, status=None):
675 if status is None:
676 status = self.status()
677 return status.get_fsmap(self.id)['mdsmap']
678
679 def get_var(self, var, status=None):
680 return self.get_mds_map(status=status)[var]
681
682 def set_dir_layout(self, mount, path, layout):
683 for name, value in layout.items():
684 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
685
686 def add_data_pool(self, name, create=True):
687 if create:
688 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.pgs_per_fs_pool.__str__())
689 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
690 self.get_pool_names(refresh = True)
691 for poolid, fs_name in self.data_pools.items():
692 if name == fs_name:
693 return poolid
694 raise RuntimeError("could not get just created pool '{0}'".format(name))
695
696 def get_pool_names(self, refresh = False, status = None):
697 if refresh or self.metadata_pool_name is None or self.data_pools is None:
698 if status is None:
699 status = self.status()
700 fsmap = status.get_fsmap(self.id)
701
702 osd_map = self.mon_manager.get_osd_dump_json()
703 id_to_name = {}
704 for p in osd_map['pools']:
705 id_to_name[p['pool']] = p['pool_name']
706
707 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
708 self.data_pools = {}
709 for data_pool in fsmap['mdsmap']['data_pools']:
710 self.data_pools[data_pool] = id_to_name[data_pool]
711
712 def get_data_pool_name(self, refresh = False):
713 if refresh or self.data_pools is None:
714 self.get_pool_names(refresh = True)
715 assert(len(self.data_pools) == 1)
716 return next(iter(self.data_pools.values()))
717
718 def get_data_pool_id(self, refresh = False):
719 """
720 Don't call this if you have multiple data pools
721 :return: integer
722 """
723 if refresh or self.data_pools is None:
724 self.get_pool_names(refresh = True)
725 assert(len(self.data_pools) == 1)
726 return next(iter(self.data_pools.keys()))
727
728 def get_data_pool_names(self, refresh = False):
729 if refresh or self.data_pools is None:
730 self.get_pool_names(refresh = True)
731 return list(self.data_pools.values())
732
733 def get_metadata_pool_name(self):
734 return self.metadata_pool_name
735
736 def set_data_pool_name(self, name):
737 if self.id is not None:
738 raise RuntimeError("can't set filesystem name if its fscid is set")
739 self.data_pool_name = name
740
741 def get_namespace_id(self):
742 return self.id
743
744 def get_pool_df(self, pool_name):
745 """
746 Return a dict like:
747 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
748 """
749 for pool_df in self._df()['pools']:
750 if pool_df['name'] == pool_name:
751 return pool_df['stats']
752
753 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
754
755 def get_usage(self):
756 return self._df()['stats']['total_used_bytes']
757
758 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
759 """
760 Return true if all daemons are in one of active, standby, standby-replay, and
761 at least max_mds daemons are in 'active'.
762
763 Unlike most of Filesystem, this function is tolerant of new-style `fs`
764 commands being missing, because we are part of the ceph installation
765 process during upgrade suites, so must fall back to old style commands
766 when we get an EINVAL on a new style command.
767
768 :return:
769 """
770 # First, check to see that processes haven't exited with an error code
771 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
772 mds.check_status()
773
774 active_count = 0
775 try:
776 mds_map = self.get_mds_map(status=status)
777 except CommandFailedError as cfe:
778 # Old version, fall back to non-multi-fs commands
779 if cfe.exitstatus == errno.EINVAL:
780 mds_map = json.loads(
781 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
782 else:
783 raise
784
785 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
786
787 for mds_id, mds_status in mds_map['info'].items():
788 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
789 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
790 return False
791 elif mds_status['state'] == 'up:active':
792 active_count += 1
793
794 log.info("are_daemons_healthy: {0}/{1}".format(
795 active_count, mds_map['max_mds']
796 ))
797
798 if not skip_max_mds_check:
799 if active_count > mds_map['max_mds']:
800 log.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
801 return False
802 elif active_count == mds_map['max_mds']:
803 # The MDSMap says these guys are active, but let's check they really are
804 for mds_id, mds_status in mds_map['info'].items():
805 if mds_status['state'] == 'up:active':
806 try:
807 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
808 except CommandFailedError as cfe:
809 if cfe.exitstatus == errno.EINVAL:
810 # Old version, can't do this check
811 continue
812 else:
813 # MDS not even running
814 return False
815
816 if daemon_status['state'] != 'up:active':
817 # MDS hasn't taken the latest map yet
818 return False
819
820 return True
821 else:
822 return False
823 else:
824 log.info("are_daemons_healthy: skipping max_mds check")
825 return True
826
827 def get_daemon_names(self, state=None, status=None):
828 """
829 Return MDS daemon names of those daemons in the given state
830 :param state:
831 :return:
832 """
833 mdsmap = self.get_mds_map(status)
834 result = []
835 for mds_status in sorted(mdsmap['info'].values(),
836 key=lambda _: _['rank']):
837 if mds_status['state'] == state or state is None:
838 result.append(mds_status['name'])
839
840 return result
841
842 def get_active_names(self, status=None):
843 """
844 Return MDS daemon names of those daemons holding ranks
845 in state up:active
846
847 :return: list of strings like ['a', 'b'], sorted by rank
848 """
849 return self.get_daemon_names("up:active", status=status)
850
851 def get_all_mds_rank(self, status=None):
852 mdsmap = self.get_mds_map(status)
853 result = []
854 for mds_status in sorted(mdsmap['info'].values(),
855 key=lambda _: _['rank']):
856 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
857 result.append(mds_status['rank'])
858
859 return result
860
861 def get_rank(self, rank=None, status=None):
862 if status is None:
863 status = self.getinfo()
864 if rank is None:
865 rank = 0
866 return status.get_rank(self.id, rank)
867
868 def rank_restart(self, rank=0, status=None):
869 name = self.get_rank(rank=rank, status=status)['name']
870 self.mds_restart(mds_id=name)
871
872 def rank_signal(self, signal, rank=0, status=None):
873 name = self.get_rank(rank=rank, status=status)['name']
874 self.mds_signal(name, signal)
875
876 def rank_freeze(self, yes, rank=0):
877 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
878
879 def rank_fail(self, rank=0):
880 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
881
882 def get_ranks(self, status=None):
883 if status is None:
884 status = self.getinfo()
885 return status.get_ranks(self.id)
886
887 def get_replays(self, status=None):
888 if status is None:
889 status = self.getinfo()
890 return status.get_replays(self.id)
891
892 def get_replay(self, rank=0, status=None):
893 for replay in self.get_replays(status=status):
894 if replay['rank'] == rank:
895 return replay
896 return None
897
898 def get_rank_names(self, status=None):
899 """
900 Return MDS daemon names of those daemons holding a rank,
901 sorted by rank. This includes e.g. up:replay/reconnect
902 as well as active, but does not include standby or
903 standby-replay.
904 """
905 mdsmap = self.get_mds_map(status)
906 result = []
907 for mds_status in sorted(mdsmap['info'].values(),
908 key=lambda _: _['rank']):
909 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
910 result.append(mds_status['name'])
911
912 return result
913
914 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
915 """
916 Wait until all daemons are healthy
917 :return:
918 """
919
920 if timeout is None:
921 timeout = DAEMON_WAIT_TIMEOUT
922
923 if status is None:
924 status = self.status()
925
926 elapsed = 0
927 while True:
928 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
929 return status
930 else:
931 time.sleep(1)
932 elapsed += 1
933
934 if elapsed > timeout:
935 log.info("status = {0}".format(status))
936 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
937
938 status = self.status()
939
940 def get_lone_mds_id(self):
941 """
942 Get a single MDS ID: the only one if there is only one
943 configured, else the only one currently holding a rank,
944 else raise an error.
945 """
946 if len(self.mds_ids) != 1:
947 alive = self.get_rank_names()
948 if len(alive) == 1:
949 return alive[0]
950 else:
951 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
952 else:
953 return self.mds_ids[0]
954
955 def recreate(self):
956 log.info("Creating new filesystem")
957 self.delete_all_filesystems()
958 self.id = None
959 self.create()
960
961 def put_metadata_object_raw(self, object_id, infile):
962 """
963 Save an object to the metadata pool
964 """
965 temp_bin_path = infile
966 self.client_remote.run(args=[
967 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
968 ])
969
970 def get_metadata_object_raw(self, object_id):
971 """
972 Retrieve an object from the metadata pool and store it in a file.
973 """
974 temp_bin_path = '/tmp/' + object_id + '.bin'
975
976 self.client_remote.run(args=[
977 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
978 ])
979
980 return temp_bin_path
981
982 def get_metadata_object(self, object_type, object_id):
983 """
984 Retrieve an object from the metadata pool, pass it through
985 ceph-dencoder to dump it to JSON, and return the decoded object.
986 """
987 temp_bin_path = '/tmp/out.bin'
988
989 self.client_remote.run(args=[
990 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
991 ])
992
993 dump_json = self.client_remote.sh([
994 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
995 ]).strip()
996 try:
997 dump = json.loads(dump_json)
998 except (TypeError, ValueError):
999 log.error("Failed to decode JSON: '{0}'".format(dump_json))
1000 raise
1001
1002 return dump
1003
1004 def get_journal_version(self):
1005 """
1006 Read the JournalPointer and Journal::Header objects to learn the version of
1007 encoding in use.
1008 """
1009 journal_pointer_object = '400.00000000'
1010 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1011 journal_ino = journal_pointer_dump['journal_pointer']['front']
1012
1013 journal_header_object = "{0:x}.00000000".format(journal_ino)
1014 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1015
1016 version = journal_header_dump['journal_header']['stream_format']
1017 log.info("Read journal version {0}".format(version))
1018
1019 return version
1020
1021 def mds_asok(self, command, mds_id=None, timeout=None):
1022 if mds_id is None:
1023 mds_id = self.get_lone_mds_id()
1024
1025 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1026
1027 def rank_asok(self, command, rank=0, status=None, timeout=None):
1028 info = self.get_rank(rank=rank, status=status)
1029 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1030
1031 def rank_tell(self, command, rank=0, status=None):
1032 info = self.get_rank(rank=rank, status=status)
1033 return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
1034
1035 def ranks_tell(self, command, status=None):
1036 if status is None:
1037 status = self.status()
1038 out = []
1039 for r in status.get_ranks(self.id):
1040 result = self.rank_tell(command, rank=r['rank'], status=status)
1041 out.append((r['rank'], result))
1042 return sorted(out)
1043
1044 def ranks_perf(self, f, status=None):
1045 perf = self.ranks_tell(["perf", "dump"], status=status)
1046 out = []
1047 for rank, perf in perf:
1048 out.append((rank, f(perf)))
1049 return out
1050
1051 def read_cache(self, path, depth=None):
1052 cmd = ["dump", "tree", path]
1053 if depth is not None:
1054 cmd.append(depth.__str__())
1055 result = self.mds_asok(cmd)
1056 if len(result) == 0:
1057 raise RuntimeError("Path not found in cache: {0}".format(path))
1058
1059 return result
1060
1061 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1062 """
1063 Block until the MDS reaches a particular state, or a failure condition
1064 is met.
1065
1066 When there are multiple MDSs, succeed when exaclty one MDS is in the
1067 goal state, or fail when any MDS is in the reject state.
1068
1069 :param goal_state: Return once the MDS is in this state
1070 :param reject: Fail if the MDS enters this state before the goal state
1071 :param timeout: Fail if this many seconds pass before reaching goal
1072 :return: number of seconds waited, rounded down to integer
1073 """
1074
1075 started_at = time.time()
1076 while True:
1077 status = self.status()
1078 if rank is not None:
1079 try:
1080 mds_info = status.get_rank(self.id, rank)
1081 current_state = mds_info['state'] if mds_info else None
1082 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1083 except:
1084 mdsmap = self.get_mds_map(status=status)
1085 if rank in mdsmap['failed']:
1086 log.info("Waiting for rank {0} to come back.".format(rank))
1087 current_state = None
1088 else:
1089 raise
1090 elif mds_id is not None:
1091 # mds_info is None if no daemon with this ID exists in the map
1092 mds_info = status.get_mds(mds_id)
1093 current_state = mds_info['state'] if mds_info else None
1094 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1095 else:
1096 # In general, look for a single MDS
1097 states = [m['state'] for m in status.get_ranks(self.id)]
1098 if [s for s in states if s == goal_state] == [goal_state]:
1099 current_state = goal_state
1100 elif reject in states:
1101 current_state = reject
1102 else:
1103 current_state = None
1104 log.info("mapped states {0} to {1}".format(states, current_state))
1105
1106 elapsed = time.time() - started_at
1107 if current_state == goal_state:
1108 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
1109 return elapsed
1110 elif reject is not None and current_state == reject:
1111 raise RuntimeError("MDS in reject state {0}".format(current_state))
1112 elif timeout is not None and elapsed > timeout:
1113 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1114 raise RuntimeError(
1115 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1116 elapsed, goal_state, current_state
1117 ))
1118 else:
1119 time.sleep(1)
1120
1121 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
1122 mds_id = self.mds_ids[0]
1123 remote = self.mds_daemons[mds_id].remote
1124 if pool is None:
1125 pool = self.get_data_pool_name()
1126
1127 obj_name = "{0:x}.00000000".format(ino_no)
1128
1129 args = [
1130 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
1131 ]
1132 try:
1133 proc = remote.run(args=args, stdout=BytesIO())
1134 except CommandFailedError as e:
1135 log.error(e.__str__())
1136 raise ObjectNotFound(obj_name)
1137
1138 data = proc.stdout.getvalue()
1139 dump = remote.sh(
1140 [os.path.join(self._prefix, "ceph-dencoder"),
1141 "type", type,
1142 "import", "-",
1143 "decode", "dump_json"],
1144 stdin=data
1145 )
1146
1147 return json.loads(dump.strip())
1148
1149 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1150 """
1151 Write to an xattr of the 0th data object of an inode. Will
1152 succeed whether the object and/or xattr already exist or not.
1153
1154 :param ino_no: integer inode number
1155 :param xattr_name: string name of the xattr
1156 :param data: byte array data to write to the xattr
1157 :param pool: name of data pool or None to use primary data pool
1158 :return: None
1159 """
1160 remote = self.mds_daemons[self.mds_ids[0]].remote
1161 if pool is None:
1162 pool = self.get_data_pool_name()
1163
1164 obj_name = "{0:x}.00000000".format(ino_no)
1165 args = [
1166 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
1167 obj_name, xattr_name, data
1168 ]
1169 remote.sh(args)
1170
1171 def read_backtrace(self, ino_no, pool=None):
1172 """
1173 Read the backtrace from the data pool, return a dict in the format
1174 given by inode_backtrace_t::dump, which is something like:
1175
1176 ::
1177
1178 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1179 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1180
1181 { "ino": 1099511627778,
1182 "ancestors": [
1183 { "dirino": 1,
1184 "dname": "blah",
1185 "version": 11}],
1186 "pool": 1,
1187 "old_pools": []}
1188
1189 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1190 one data pool and that will be used.
1191 """
1192 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1193
1194 def read_layout(self, ino_no, pool=None):
1195 """
1196 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1197 ::
1198 {
1199 "stripe_unit": 4194304,
1200 "stripe_count": 1,
1201 "object_size": 4194304,
1202 "pool_id": 1,
1203 "pool_ns": "",
1204 }
1205
1206 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1207 one data pool and that will be used.
1208 """
1209 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1210
1211 def _enumerate_data_objects(self, ino, size):
1212 """
1213 Get the list of expected data objects for a range, and the list of objects
1214 that really exist.
1215
1216 :return a tuple of two lists of strings (expected, actual)
1217 """
1218 stripe_size = 1024 * 1024 * 4
1219
1220 size = max(stripe_size, size)
1221
1222 want_objects = [
1223 "{0:x}.{1:08x}".format(ino, n)
1224 for n in range(0, ((size - 1) // stripe_size) + 1)
1225 ]
1226
1227 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1228
1229 return want_objects, exist_objects
1230
1231 def data_objects_present(self, ino, size):
1232 """
1233 Check that *all* the expected data objects for an inode are present in the data pool
1234 """
1235
1236 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1237 missing = set(want_objects) - set(exist_objects)
1238
1239 if missing:
1240 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1241 ino, size, missing
1242 ))
1243 return False
1244 else:
1245 log.info("All objects for ino {0} size {1} found".format(ino, size))
1246 return True
1247
1248 def data_objects_absent(self, ino, size):
1249 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1250 present = set(want_objects) & set(exist_objects)
1251
1252 if present:
1253 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1254 ino, size, present
1255 ))
1256 return False
1257 else:
1258 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1259 return True
1260
1261 def dirfrag_exists(self, ino, frag):
1262 try:
1263 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1264 except CommandFailedError:
1265 return False
1266 else:
1267 return True
1268
1269 def rados(self, args, pool=None, namespace=None, stdin_data=None,
1270 stdin_file=None,
1271 stdout_data=None):
1272 """
1273 Call into the `rados` CLI from an MDS
1274 """
1275
1276 if pool is None:
1277 pool = self.get_metadata_pool_name()
1278
1279 # Doesn't matter which MDS we use to run rados commands, they all
1280 # have access to the pools
1281 mds_id = self.mds_ids[0]
1282 remote = self.mds_daemons[mds_id].remote
1283
1284 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1285 # using the `rados` CLI
1286 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1287 (["--namespace", namespace] if namespace else []) +
1288 args)
1289
1290 if stdin_file is not None:
1291 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
1292 if stdout_data is None:
1293 stdout_data = StringIO()
1294
1295 p = remote.run(args=args,
1296 stdin=stdin_data,
1297 stdout=stdout_data)
1298 return p.stdout.getvalue().strip()
1299
1300 def list_dirfrag(self, dir_ino):
1301 """
1302 Read the named object and return the list of omap keys
1303
1304 :return a list of 0 or more strings
1305 """
1306
1307 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1308
1309 try:
1310 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1311 except CommandFailedError as e:
1312 log.error(e.__str__())
1313 raise ObjectNotFound(dirfrag_obj_name)
1314
1315 return key_list_str.split("\n") if key_list_str else []
1316
1317 def erase_metadata_objects(self, prefix):
1318 """
1319 For all objects in the metadata pool matching the prefix,
1320 erase them.
1321
1322 This O(N) with the number of objects in the pool, so only suitable
1323 for use on toy test filesystems.
1324 """
1325 all_objects = self.rados(["ls"]).split("\n")
1326 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1327 for o in matching_objects:
1328 self.rados(["rm", o])
1329
1330 def erase_mds_objects(self, rank):
1331 """
1332 Erase all the per-MDS objects for a particular rank. This includes
1333 inotable, sessiontable, journal
1334 """
1335
1336 def obj_prefix(multiplier):
1337 """
1338 MDS object naming conventions like rank 1's
1339 journal is at 201.***
1340 """
1341 return "%x." % (multiplier * 0x100 + rank)
1342
1343 # MDS_INO_LOG_OFFSET
1344 self.erase_metadata_objects(obj_prefix(2))
1345 # MDS_INO_LOG_BACKUP_OFFSET
1346 self.erase_metadata_objects(obj_prefix(3))
1347 # MDS_INO_LOG_POINTER_OFFSET
1348 self.erase_metadata_objects(obj_prefix(4))
1349 # MDSTables & SessionMap
1350 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1351
1352 @property
1353 def _prefix(self):
1354 """
1355 Override this to set a different
1356 """
1357 return ""
1358
1359 def _make_rank(self, rank):
1360 return "{}:{}".format(self.name, rank)
1361
1362 def _run_tool(self, tool, args, rank=None, quiet=False):
1363 # Tests frequently have [client] configuration that jacks up
1364 # the objecter log level (unlikely to be interesting here)
1365 # and does not set the mds log level (very interesting here)
1366 if quiet:
1367 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1368 else:
1369 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1370
1371 if rank is not None:
1372 base_args.extend(["--rank", "%s" % str(rank)])
1373
1374 t1 = datetime.datetime.now()
1375 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
1376 duration = datetime.datetime.now() - t1
1377 log.info("Ran {0} in time {1}, result:\n{2}".format(
1378 base_args + args, duration, r
1379 ))
1380 return r
1381
1382 @property
1383 def tool_remote(self):
1384 """
1385 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1386 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1387 so that tests can use this remote to go get locally written output files from the tools.
1388 """
1389 mds_id = self.mds_ids[0]
1390 return self.mds_daemons[mds_id].remote
1391
1392 def journal_tool(self, args, rank, quiet=False):
1393 """
1394 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1395 """
1396 fs_rank = self._make_rank(rank)
1397 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1398
1399 def table_tool(self, args, quiet=False):
1400 """
1401 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1402 """
1403 return self._run_tool("cephfs-table-tool", args, None, quiet)
1404
1405 def data_scan(self, args, quiet=False, worker_count=1):
1406 """
1407 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1408
1409 :param worker_count: if greater than 1, multiple workers will be run
1410 in parallel and the return value will be None
1411 """
1412
1413 workers = []
1414
1415 for n in range(0, worker_count):
1416 if worker_count > 1:
1417 # data-scan args first token is a command, followed by args to it.
1418 # insert worker arguments after the command.
1419 cmd = args[0]
1420 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1421 else:
1422 worker_args = args
1423
1424 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1425 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1426
1427 for w in workers:
1428 w.get()
1429
1430 if worker_count == 1:
1431 return workers[0].value
1432 else:
1433 return None
1434
1435 def is_full(self):
1436 return self.is_pool_full(self.get_data_pool_name())