]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
7352f12c801be6806a7636e9747e64b39a596259
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11 import traceback
12
13 from teuthology.exceptions import CommandFailedError
14 from teuthology import misc
15 from teuthology.nuke import clear_firewall
16 from teuthology.parallel import parallel
17 from tasks.ceph_manager import write_conf
18 from tasks import ceph_manager
19
20
21 log = logging.getLogger(__name__)
22
23
24 DAEMON_WAIT_TIMEOUT = 120
25 ROOT_INO = 1
26
27 class FileLayout(object):
28 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
29 self.pool = pool
30 self.pool_namespace = pool_namespace
31 self.stripe_unit = stripe_unit
32 self.stripe_count = stripe_count
33 self.object_size = object_size
34
35 @classmethod
36 def load_from_ceph(layout_str):
37 # TODO
38 pass
39
40 def items(self):
41 if self.pool is not None:
42 yield ("pool", self.pool)
43 if self.pool_namespace:
44 yield ("pool_namespace", self.pool_namespace)
45 if self.stripe_unit is not None:
46 yield ("stripe_unit", self.stripe_unit)
47 if self.stripe_count is not None:
48 yield ("stripe_count", self.stripe_count)
49 if self.object_size is not None:
50 yield ("object_size", self.stripe_size)
51
52 class ObjectNotFound(Exception):
53 def __init__(self, object_name):
54 self._object_name = object_name
55
56 def __str__(self):
57 return "Object not found: '{0}'".format(self._object_name)
58
59 class FSStatus(object):
60 """
61 Operations on a snapshot of the FSMap.
62 """
63 def __init__(self, mon_manager):
64 self.mon = mon_manager
65 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
66
67 def __str__(self):
68 return json.dumps(self.map, indent = 2, sort_keys = True)
69
70 # Expose the fsmap for manual inspection.
71 def __getitem__(self, key):
72 """
73 Get a field from the fsmap.
74 """
75 return self.map[key]
76
77 def get_filesystems(self):
78 """
79 Iterator for all filesystems.
80 """
81 for fs in self.map['filesystems']:
82 yield fs
83
84 def get_all(self):
85 """
86 Iterator for all the mds_info components in the FSMap.
87 """
88 for info in self.map['standbys']:
89 yield info
90 for fs in self.map['filesystems']:
91 for info in fs['mdsmap']['info'].values():
92 yield info
93
94 def get_standbys(self):
95 """
96 Iterator for all standbys.
97 """
98 for info in self.map['standbys']:
99 yield info
100
101 def get_fsmap(self, fscid):
102 """
103 Get the fsmap for the given FSCID.
104 """
105 for fs in self.map['filesystems']:
106 if fscid is None or fs['id'] == fscid:
107 return fs
108 raise RuntimeError("FSCID {0} not in map".format(fscid))
109
110 def get_fsmap_byname(self, name):
111 """
112 Get the fsmap for the given file system name.
113 """
114 for fs in self.map['filesystems']:
115 if name is None or fs['mdsmap']['fs_name'] == name:
116 return fs
117 raise RuntimeError("FS {0} not in map".format(name))
118
119 def get_replays(self, fscid):
120 """
121 Get the standby:replay MDS for the given FSCID.
122 """
123 fs = self.get_fsmap(fscid)
124 for info in fs['mdsmap']['info'].values():
125 if info['state'] == 'up:standby-replay':
126 yield info
127
128 def get_ranks(self, fscid):
129 """
130 Get the ranks for the given FSCID.
131 """
132 fs = self.get_fsmap(fscid)
133 for info in fs['mdsmap']['info'].values():
134 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
135 yield info
136
137 def get_rank(self, fscid, rank):
138 """
139 Get the rank for the given FSCID.
140 """
141 for info in self.get_ranks(fscid):
142 if info['rank'] == rank:
143 return info
144 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
145
146 def get_mds(self, name):
147 """
148 Get the info for the given MDS name.
149 """
150 for info in self.get_all():
151 if info['name'] == name:
152 return info
153 return None
154
155 def get_mds_addr(self, name):
156 """
157 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
158 """
159 info = self.get_mds(name)
160 if info:
161 return info['addr']
162 else:
163 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
164 raise RuntimeError("MDS id '{0}' not found in map".format(name))
165
166 def get_mds_gid(self, gid):
167 """
168 Get the info for the given MDS gid.
169 """
170 for info in self.get_all():
171 if info['gid'] == gid:
172 return info
173 return None
174
175 def hadfailover(self, status):
176 """
177 Compares two statuses for mds failovers.
178 Returns True if there is a failover.
179 """
180 for fs in status.map['filesystems']:
181 for info in fs['mdsmap']['info'].values():
182 oldinfo = self.get_mds_gid(info['gid'])
183 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
184 return True
185 #all matching
186 return False
187
188 class CephCluster(object):
189 @property
190 def admin_remote(self):
191 first_mon = misc.get_first_mon(self._ctx, None)
192 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
193 return result
194
195 def __init__(self, ctx):
196 self._ctx = ctx
197 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
198
199 def get_config(self, key, service_type=None):
200 """
201 Get config from mon by default, or a specific service if caller asks for it
202 """
203 if service_type is None:
204 service_type = 'mon'
205
206 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
207 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
208
209 def set_ceph_conf(self, subsys, key, value):
210 if subsys not in self._ctx.ceph['ceph'].conf:
211 self._ctx.ceph['ceph'].conf[subsys] = {}
212 self._ctx.ceph['ceph'].conf[subsys][key] = value
213 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
214 # used a different config path this won't work.
215
216 def clear_ceph_conf(self, subsys, key):
217 del self._ctx.ceph['ceph'].conf[subsys][key]
218 write_conf(self._ctx)
219
220 def json_asok(self, command, service_type, service_id, timeout=None):
221 if timeout is None:
222 timeout = 15*60
223 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
224 response_data = proc.stdout.getvalue()
225 log.info("_json_asok output: {0}".format(response_data))
226 if response_data.strip():
227 return json.loads(response_data)
228 else:
229 return None
230
231
232 class MDSCluster(CephCluster):
233 """
234 Collective operations on all the MDS daemons in the Ceph cluster. These
235 daemons may be in use by various Filesystems.
236
237 For the benefit of pre-multi-filesystem tests, this class is also
238 a parent of Filesystem. The correct way to use MDSCluster going forward is
239 as a separate instance outside of your (multiple) Filesystem instances.
240 """
241 def __init__(self, ctx):
242 super(MDSCluster, self).__init__(ctx)
243
244 self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds'))
245
246 if len(self.mds_ids) == 0:
247 raise RuntimeError("This task requires at least one MDS")
248
249 if hasattr(self._ctx, "daemons"):
250 # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task
251 self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
252
253 def _one_or_all(self, mds_id, cb, in_parallel=True):
254 """
255 Call a callback for a single named MDS, or for all.
256
257 Note that the parallelism here isn't for performance, it's to avoid being overly kind
258 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
259 avoid being overly kind by executing them in a particular order. However, some actions
260 don't cope with being done in parallel, so it's optional (`in_parallel`)
261
262 :param mds_id: MDS daemon name, or None
263 :param cb: Callback taking single argument of MDS daemon name
264 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
265 """
266 if mds_id is None:
267 if in_parallel:
268 with parallel() as p:
269 for mds_id in self.mds_ids:
270 p.spawn(cb, mds_id)
271 else:
272 for mds_id in self.mds_ids:
273 cb(mds_id)
274 else:
275 cb(mds_id)
276
277 def get_config(self, key, service_type=None):
278 """
279 get_config specialization of service_type="mds"
280 """
281 if service_type != "mds":
282 return super(MDSCluster, self).get_config(key, service_type)
283
284 # Some tests stop MDS daemons, don't send commands to a dead one:
285 service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0]
286 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
287
288 def mds_stop(self, mds_id=None):
289 """
290 Stop the MDS daemon process(se). If it held a rank, that rank
291 will eventually go laggy.
292 """
293 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
294
295 def mds_fail(self, mds_id=None):
296 """
297 Inform MDSMonitor of the death of the daemon process(es). If it held
298 a rank, that rank will be relinquished.
299 """
300 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
301
302 def mds_restart(self, mds_id=None):
303 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
304
305 def mds_fail_restart(self, mds_id=None):
306 """
307 Variation on restart that includes marking MDSs as failed, so that doing this
308 operation followed by waiting for healthy daemon states guarantees that they
309 have gone down and come up, rather than potentially seeing the healthy states
310 that existed before the restart.
311 """
312 def _fail_restart(id_):
313 self.mds_daemons[id_].stop()
314 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
315 self.mds_daemons[id_].restart()
316
317 self._one_or_all(mds_id, _fail_restart)
318
319 def mds_signal(self, mds_id, sig, silent=False):
320 """
321 signal a MDS daemon
322 """
323 self.mds_daemons[mds_id].signal(sig, silent);
324
325 def newfs(self, name='cephfs', create=True):
326 return Filesystem(self._ctx, name=name, create=create)
327
328 def status(self):
329 return FSStatus(self.mon_manager)
330
331 def delete_all_filesystems(self):
332 """
333 Remove all filesystems that exist, and any pools in use by them.
334 """
335 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
336 pool_id_name = {}
337 for pool in pools:
338 pool_id_name[pool['pool']] = pool['pool_name']
339
340 # mark cluster down for each fs to prevent churn during deletion
341 status = self.status()
342 for fs in status.get_filesystems():
343 self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name']))
344
345 # get a new copy as actives may have since changed
346 status = self.status()
347 for fs in status.get_filesystems():
348 mdsmap = fs['mdsmap']
349 metadata_pool = pool_id_name[mdsmap['metadata_pool']]
350
351 self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it')
352 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
353 metadata_pool, metadata_pool,
354 '--yes-i-really-really-mean-it')
355 for data_pool in mdsmap['data_pools']:
356 data_pool = pool_id_name[data_pool]
357 try:
358 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete',
359 data_pool, data_pool,
360 '--yes-i-really-really-mean-it')
361 except CommandFailedError as e:
362 if e.exitstatus == 16: # EBUSY, this data pool is used
363 pass # by two metadata pools, let the 2nd
364 else: # pass delete it
365 raise
366
367 def get_standby_daemons(self):
368 return set([s['name'] for s in self.status().get_standbys()])
369
370 def get_mds_hostnames(self):
371 result = set()
372 for mds_id in self.mds_ids:
373 mds_remote = self.mon_manager.find_remote('mds', mds_id)
374 result.add(mds_remote.hostname)
375
376 return list(result)
377
378 def set_clients_block(self, blocked, mds_id=None):
379 """
380 Block (using iptables) client communications to this MDS. Be careful: if
381 other services are running on this MDS, or other MDSs try to talk to this
382 MDS, their communications may also be blocked as collatoral damage.
383
384 :param mds_id: Optional ID of MDS to block, default to all
385 :return:
386 """
387 da_flag = "-A" if blocked else "-D"
388
389 def set_block(_mds_id):
390 remote = self.mon_manager.find_remote('mds', _mds_id)
391 status = self.status()
392
393 addr = status.get_mds_addr(_mds_id)
394 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
395
396 remote.run(
397 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
398 "comment", "--comment", "teuthology"])
399 remote.run(
400 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
401 "comment", "--comment", "teuthology"])
402
403 self._one_or_all(mds_id, set_block, in_parallel=False)
404
405 def clear_firewall(self):
406 clear_firewall(self._ctx)
407
408 def get_mds_info(self, mds_id):
409 return FSStatus(self.mon_manager).get_mds(mds_id)
410
411 def is_pool_full(self, pool_name):
412 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
413 for pool in pools:
414 if pool['pool_name'] == pool_name:
415 return 'full' in pool['flags_names'].split(",")
416
417 raise RuntimeError("Pool not found '{0}'".format(pool_name))
418
419 class Filesystem(MDSCluster):
420 """
421 This object is for driving a CephFS filesystem. The MDS daemons driven by
422 MDSCluster may be shared with other Filesystems.
423 """
424 def __init__(self, ctx, fscid=None, name=None, create=False,
425 ec_profile=None):
426 super(Filesystem, self).__init__(ctx)
427
428 self.name = name
429 self.ec_profile = ec_profile
430 self.id = None
431 self.metadata_pool_name = None
432 self.metadata_overlay = False
433 self.data_pool_name = None
434 self.data_pools = None
435
436 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
437 self.client_id = client_list[0]
438 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
439
440 if name is not None:
441 if fscid is not None:
442 raise RuntimeError("cannot specify fscid when creating fs")
443 if create and not self.legacy_configured():
444 self.create()
445 else:
446 if fscid is not None:
447 self.id = fscid
448 self.getinfo(refresh = True)
449
450 # Stash a reference to the first created filesystem on ctx, so
451 # that if someone drops to the interactive shell they can easily
452 # poke our methods.
453 if not hasattr(self._ctx, "filesystem"):
454 self._ctx.filesystem = self
455
456 def get_task_status(self, status_key):
457 return self.mon_manager.get_service_task_status("mds", status_key)
458
459 def getinfo(self, refresh = False):
460 status = self.status()
461 if self.id is not None:
462 fsmap = status.get_fsmap(self.id)
463 elif self.name is not None:
464 fsmap = status.get_fsmap_byname(self.name)
465 else:
466 fss = [fs for fs in status.get_filesystems()]
467 if len(fss) == 1:
468 fsmap = fss[0]
469 elif len(fss) == 0:
470 raise RuntimeError("no file system available")
471 else:
472 raise RuntimeError("more than one file system available")
473 self.id = fsmap['id']
474 self.name = fsmap['mdsmap']['fs_name']
475 self.get_pool_names(status = status, refresh = refresh)
476 return status
477
478 def set_metadata_overlay(self, overlay):
479 if self.id is not None:
480 raise RuntimeError("cannot specify fscid when configuring overlay")
481 self.metadata_overlay = overlay
482
483 def deactivate(self, rank):
484 if rank < 0:
485 raise RuntimeError("invalid rank")
486 elif rank == 0:
487 raise RuntimeError("cannot deactivate rank 0")
488 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
489
490 def reach_max_mds(self):
491 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
492 status = self.getinfo()
493 mds_map = self.get_mds_map(status=status)
494 max_mds = mds_map['max_mds']
495
496 count = len(list(self.get_ranks(status=status)))
497 if count > max_mds:
498 try:
499 # deactivate mds in decending order
500 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
501 while count > max_mds:
502 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
503 target = targets[0]
504 log.info("deactivating rank %d" % target['rank'])
505 self.deactivate(target['rank'])
506 status = self.wait_for_daemons(skip_max_mds_check=True)
507 count = len(list(self.get_ranks(status=status)))
508 except:
509 # In Mimic, deactivation is done automatically:
510 log.info("Error:\n{}".format(traceback.format_exc()))
511 status = self.wait_for_daemons()
512 else:
513 status = self.wait_for_daemons()
514
515 mds_map = self.get_mds_map(status=status)
516 assert(mds_map['max_mds'] == max_mds)
517 assert(mds_map['in'] == list(range(0, max_mds)))
518
519 def fail(self):
520 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
521
522 def set_flag(self, var, *args):
523 a = map(lambda x: str(x).lower(), args)
524 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
525
526 def set_allow_multifs(self, yes=True):
527 self.set_flag("enable_multiple", yes)
528
529 def set_var(self, var, *args):
530 a = map(lambda x: str(x).lower(), args)
531 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
532
533 def set_down(self, down=True):
534 self.set_var("down", str(down).lower())
535
536 def set_joinable(self, joinable=True):
537 self.set_var("joinable", joinable)
538
539 def set_max_mds(self, max_mds):
540 self.set_var("max_mds", "%d" % max_mds)
541
542 def set_allow_standby_replay(self, yes):
543 self.set_var("allow_standby_replay", yes)
544
545 def set_allow_new_snaps(self, yes):
546 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
547
548 # In Octopus+, the PG count can be omitted to use the default. We keep the
549 # hard-coded value for deployments of Mimic/Nautilus.
550 pgs_per_fs_pool = 8
551
552 def create(self):
553 if self.name is None:
554 self.name = "cephfs"
555 if self.metadata_pool_name is None:
556 self.metadata_pool_name = "{0}_metadata".format(self.name)
557 if self.data_pool_name is None:
558 data_pool_name = "{0}_data".format(self.name)
559 else:
560 data_pool_name = self.data_pool_name
561
562 log.info("Creating filesystem '{0}'".format(self.name))
563
564 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
565 self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
566 if self.metadata_overlay:
567 self.mon_manager.raw_cluster_cmd('fs', 'new',
568 self.name, self.metadata_pool_name, data_pool_name,
569 '--allow-dangerous-metadata-overlay')
570 else:
571 if self.ec_profile and 'disabled' not in self.ec_profile:
572 log.info("EC profile is %s", self.ec_profile)
573 cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name]
574 cmd.extend(self.ec_profile)
575 self.mon_manager.raw_cluster_cmd(*cmd)
576 self.mon_manager.raw_cluster_cmd(
577 'osd', 'pool', 'create',
578 data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
579 data_pool_name)
580 self.mon_manager.raw_cluster_cmd(
581 'osd', 'pool', 'set',
582 data_pool_name, 'allow_ec_overwrites', 'true')
583 else:
584 self.mon_manager.raw_cluster_cmd(
585 'osd', 'pool', 'create',
586 data_pool_name, self.pgs_per_fs_pool.__str__())
587 self.mon_manager.raw_cluster_cmd('fs', 'new',
588 self.name,
589 self.metadata_pool_name,
590 data_pool_name,
591 "--force")
592 self.check_pool_application(self.metadata_pool_name)
593 self.check_pool_application(data_pool_name)
594 # Turn off spurious standby count warnings from modifying max_mds in tests.
595 try:
596 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
597 except CommandFailedError as e:
598 if e.exitstatus == 22:
599 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
600 pass
601 else:
602 raise
603
604 self.getinfo(refresh = True)
605
606
607 def check_pool_application(self, pool_name):
608 osd_map = self.mon_manager.get_osd_dump_json()
609 for pool in osd_map['pools']:
610 if pool['pool_name'] == pool_name:
611 if "application_metadata" in pool:
612 if not "cephfs" in pool['application_metadata']:
613 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
614 format(pool_name=pool_name))
615
616
617 def __del__(self):
618 if getattr(self._ctx, "filesystem", None) == self:
619 delattr(self._ctx, "filesystem")
620
621 def exists(self):
622 """
623 Whether a filesystem exists in the mon's filesystem list
624 """
625 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
626 return self.name in [fs['name'] for fs in fs_list]
627
628 def legacy_configured(self):
629 """
630 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
631 the case, the caller should avoid using Filesystem.create
632 """
633 try:
634 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
635 pools = json.loads(out_text)
636 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
637 if metadata_pool_exists:
638 self.metadata_pool_name = 'metadata'
639 except CommandFailedError as e:
640 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
641 # structured output (--format) from the CLI.
642 if e.exitstatus == 22:
643 metadata_pool_exists = True
644 else:
645 raise
646
647 return metadata_pool_exists
648
649 def _df(self):
650 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
651
652 def get_mds_map(self, status=None):
653 if status is None:
654 status = self.status()
655 return status.get_fsmap(self.id)['mdsmap']
656
657 def get_var(self, var, status=None):
658 return self.get_mds_map(status=status)[var]
659
660 def set_dir_layout(self, mount, path, layout):
661 for name, value in layout.items():
662 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
663
664 def add_data_pool(self, name, create=True):
665 if create:
666 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.pgs_per_fs_pool.__str__())
667 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
668 self.get_pool_names(refresh = True)
669 for poolid, fs_name in self.data_pools.items():
670 if name == fs_name:
671 return poolid
672 raise RuntimeError("could not get just created pool '{0}'".format(name))
673
674 def get_pool_names(self, refresh = False, status = None):
675 if refresh or self.metadata_pool_name is None or self.data_pools is None:
676 if status is None:
677 status = self.status()
678 fsmap = status.get_fsmap(self.id)
679
680 osd_map = self.mon_manager.get_osd_dump_json()
681 id_to_name = {}
682 for p in osd_map['pools']:
683 id_to_name[p['pool']] = p['pool_name']
684
685 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
686 self.data_pools = {}
687 for data_pool in fsmap['mdsmap']['data_pools']:
688 self.data_pools[data_pool] = id_to_name[data_pool]
689
690 def get_data_pool_name(self, refresh = False):
691 if refresh or self.data_pools is None:
692 self.get_pool_names(refresh = True)
693 assert(len(self.data_pools) == 1)
694 return self.data_pools.values()[0]
695
696 def get_data_pool_id(self, refresh = False):
697 """
698 Don't call this if you have multiple data pools
699 :return: integer
700 """
701 if refresh or self.data_pools is None:
702 self.get_pool_names(refresh = True)
703 assert(len(self.data_pools) == 1)
704 return self.data_pools.keys()[0]
705
706 def get_data_pool_names(self, refresh = False):
707 if refresh or self.data_pools is None:
708 self.get_pool_names(refresh = True)
709 return self.data_pools.values()
710
711 def get_metadata_pool_name(self):
712 return self.metadata_pool_name
713
714 def set_data_pool_name(self, name):
715 if self.id is not None:
716 raise RuntimeError("can't set filesystem name if its fscid is set")
717 self.data_pool_name = name
718
719 def get_namespace_id(self):
720 return self.id
721
722 def get_pool_df(self, pool_name):
723 """
724 Return a dict like:
725 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
726 """
727 for pool_df in self._df()['pools']:
728 if pool_df['name'] == pool_name:
729 return pool_df['stats']
730
731 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
732
733 def get_usage(self):
734 return self._df()['stats']['total_used_bytes']
735
736 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
737 """
738 Return true if all daemons are in one of active, standby, standby-replay, and
739 at least max_mds daemons are in 'active'.
740
741 Unlike most of Filesystem, this function is tolerant of new-style `fs`
742 commands being missing, because we are part of the ceph installation
743 process during upgrade suites, so must fall back to old style commands
744 when we get an EINVAL on a new style command.
745
746 :return:
747 """
748 # First, check to see that processes haven't exited with an error code
749 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
750 mds.check_status()
751
752 active_count = 0
753 try:
754 mds_map = self.get_mds_map(status=status)
755 except CommandFailedError as cfe:
756 # Old version, fall back to non-multi-fs commands
757 if cfe.exitstatus == errno.EINVAL:
758 mds_map = json.loads(
759 self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json'))
760 else:
761 raise
762
763 log.info("are_daemons_healthy: mds map: {0}".format(mds_map))
764
765 for mds_id, mds_status in mds_map['info'].items():
766 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
767 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
768 return False
769 elif mds_status['state'] == 'up:active':
770 active_count += 1
771
772 log.info("are_daemons_healthy: {0}/{1}".format(
773 active_count, mds_map['max_mds']
774 ))
775
776 if not skip_max_mds_check:
777 if active_count > mds_map['max_mds']:
778 log.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
779 return False
780 elif active_count == mds_map['max_mds']:
781 # The MDSMap says these guys are active, but let's check they really are
782 for mds_id, mds_status in mds_map['info'].items():
783 if mds_status['state'] == 'up:active':
784 try:
785 daemon_status = self.mds_asok(["status"], mds_id=mds_status['name'])
786 except CommandFailedError as cfe:
787 if cfe.exitstatus == errno.EINVAL:
788 # Old version, can't do this check
789 continue
790 else:
791 # MDS not even running
792 return False
793
794 if daemon_status['state'] != 'up:active':
795 # MDS hasn't taken the latest map yet
796 return False
797
798 return True
799 else:
800 return False
801 else:
802 log.info("are_daemons_healthy: skipping max_mds check")
803 return True
804
805 def get_daemon_names(self, state=None, status=None):
806 """
807 Return MDS daemon names of those daemons in the given state
808 :param state:
809 :return:
810 """
811 mdsmap = self.get_mds_map(status)
812 result = []
813 for mds_status in sorted(mdsmap['info'].values(),
814 key=lambda _: _['rank']):
815 if mds_status['state'] == state or state is None:
816 result.append(mds_status['name'])
817
818 return result
819
820 def get_active_names(self, status=None):
821 """
822 Return MDS daemon names of those daemons holding ranks
823 in state up:active
824
825 :return: list of strings like ['a', 'b'], sorted by rank
826 """
827 return self.get_daemon_names("up:active", status=status)
828
829 def get_all_mds_rank(self, status=None):
830 mdsmap = self.get_mds_map(status)
831 result = []
832 for mds_status in sorted(mdsmap['info'].values(),
833 key=lambda _: _['rank']):
834 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
835 result.append(mds_status['rank'])
836
837 return result
838
839 def get_rank(self, rank=0, status=None):
840 if status is None:
841 status = self.getinfo()
842 return status.get_rank(self.id, rank)
843
844 def rank_restart(self, rank=0, status=None):
845 name = self.get_rank(rank=rank, status=status)['name']
846 self.mds_restart(mds_id=name)
847
848 def rank_signal(self, signal, rank=0, status=None):
849 name = self.get_rank(rank=rank, status=status)['name']
850 self.mds_signal(name, signal)
851
852 def rank_freeze(self, yes, rank=0):
853 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
854
855 def rank_fail(self, rank=0):
856 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
857
858 def get_ranks(self, status=None):
859 if status is None:
860 status = self.getinfo()
861 return status.get_ranks(self.id)
862
863 def get_replays(self, status=None):
864 if status is None:
865 status = self.getinfo()
866 return status.get_replays(self.id)
867
868 def get_replay(self, rank=0, status=None):
869 for replay in self.get_replays(status=status):
870 if replay['rank'] == rank:
871 return replay
872 return None
873
874 def get_rank_names(self, status=None):
875 """
876 Return MDS daemon names of those daemons holding a rank,
877 sorted by rank. This includes e.g. up:replay/reconnect
878 as well as active, but does not include standby or
879 standby-replay.
880 """
881 mdsmap = self.get_mds_map(status)
882 result = []
883 for mds_status in sorted(mdsmap['info'].values(),
884 key=lambda _: _['rank']):
885 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
886 result.append(mds_status['name'])
887
888 return result
889
890 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
891 """
892 Wait until all daemons are healthy
893 :return:
894 """
895
896 if timeout is None:
897 timeout = DAEMON_WAIT_TIMEOUT
898
899 if status is None:
900 status = self.status()
901
902 elapsed = 0
903 while True:
904 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
905 return status
906 else:
907 time.sleep(1)
908 elapsed += 1
909
910 if elapsed > timeout:
911 log.info("status = {0}".format(status))
912 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
913
914 status = self.status()
915
916 def get_lone_mds_id(self):
917 """
918 Get a single MDS ID: the only one if there is only one
919 configured, else the only one currently holding a rank,
920 else raise an error.
921 """
922 if len(self.mds_ids) != 1:
923 alive = self.get_rank_names()
924 if len(alive) == 1:
925 return alive[0]
926 else:
927 raise ValueError("Explicit MDS argument required when multiple MDSs in use")
928 else:
929 return self.mds_ids[0]
930
931 def recreate(self):
932 log.info("Creating new filesystem")
933 self.delete_all_filesystems()
934 self.id = None
935 self.create()
936
937 def put_metadata_object_raw(self, object_id, infile):
938 """
939 Save an object to the metadata pool
940 """
941 temp_bin_path = infile
942 self.client_remote.run(args=[
943 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path
944 ])
945
946 def get_metadata_object_raw(self, object_id):
947 """
948 Retrieve an object from the metadata pool and store it in a file.
949 """
950 temp_bin_path = '/tmp/' + object_id + '.bin'
951
952 self.client_remote.run(args=[
953 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
954 ])
955
956 return temp_bin_path
957
958 def get_metadata_object(self, object_type, object_id):
959 """
960 Retrieve an object from the metadata pool, pass it through
961 ceph-dencoder to dump it to JSON, and return the decoded object.
962 """
963 temp_bin_path = '/tmp/out.bin'
964
965 self.client_remote.run(args=[
966 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path
967 ])
968
969 dump_json = self.client_remote.sh([
970 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json'
971 ]).strip()
972 try:
973 dump = json.loads(dump_json)
974 except (TypeError, ValueError):
975 log.error("Failed to decode JSON: '{0}'".format(dump_json))
976 raise
977
978 return dump
979
980 def get_journal_version(self):
981 """
982 Read the JournalPointer and Journal::Header objects to learn the version of
983 encoding in use.
984 """
985 journal_pointer_object = '400.00000000'
986 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
987 journal_ino = journal_pointer_dump['journal_pointer']['front']
988
989 journal_header_object = "{0:x}.00000000".format(journal_ino)
990 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
991
992 version = journal_header_dump['journal_header']['stream_format']
993 log.info("Read journal version {0}".format(version))
994
995 return version
996
997 def mds_asok(self, command, mds_id=None, timeout=None):
998 if mds_id is None:
999 mds_id = self.get_lone_mds_id()
1000
1001 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1002
1003 def rank_asok(self, command, rank=0, status=None, timeout=None):
1004 info = self.get_rank(rank=rank, status=status)
1005 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1006
1007 def rank_tell(self, command, rank=0, status=None):
1008 info = self.get_rank(rank=rank, status=status)
1009 return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command))
1010
1011 def read_cache(self, path, depth=None):
1012 cmd = ["dump", "tree", path]
1013 if depth is not None:
1014 cmd.append(depth.__str__())
1015 result = self.mds_asok(cmd)
1016 if len(result) == 0:
1017 raise RuntimeError("Path not found in cache: {0}".format(path))
1018
1019 return result
1020
1021 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1022 """
1023 Block until the MDS reaches a particular state, or a failure condition
1024 is met.
1025
1026 When there are multiple MDSs, succeed when exaclty one MDS is in the
1027 goal state, or fail when any MDS is in the reject state.
1028
1029 :param goal_state: Return once the MDS is in this state
1030 :param reject: Fail if the MDS enters this state before the goal state
1031 :param timeout: Fail if this many seconds pass before reaching goal
1032 :return: number of seconds waited, rounded down to integer
1033 """
1034
1035 started_at = time.time()
1036 while True:
1037 status = self.status()
1038 if rank is not None:
1039 try:
1040 mds_info = status.get_rank(self.id, rank)
1041 current_state = mds_info['state'] if mds_info else None
1042 log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1043 except:
1044 mdsmap = self.get_mds_map(status=status)
1045 if rank in mdsmap['failed']:
1046 log.info("Waiting for rank {0} to come back.".format(rank))
1047 current_state = None
1048 else:
1049 raise
1050 elif mds_id is not None:
1051 # mds_info is None if no daemon with this ID exists in the map
1052 mds_info = status.get_mds(mds_id)
1053 current_state = mds_info['state'] if mds_info else None
1054 log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1055 else:
1056 # In general, look for a single MDS
1057 states = [m['state'] for m in status.get_ranks(self.id)]
1058 if [s for s in states if s == goal_state] == [goal_state]:
1059 current_state = goal_state
1060 elif reject in states:
1061 current_state = reject
1062 else:
1063 current_state = None
1064 log.info("mapped states {0} to {1}".format(states, current_state))
1065
1066 elapsed = time.time() - started_at
1067 if current_state == goal_state:
1068 log.info("reached state '{0}' in {1}s".format(current_state, elapsed))
1069 return elapsed
1070 elif reject is not None and current_state == reject:
1071 raise RuntimeError("MDS in reject state {0}".format(current_state))
1072 elif timeout is not None and elapsed > timeout:
1073 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1074 raise RuntimeError(
1075 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1076 elapsed, goal_state, current_state
1077 ))
1078 else:
1079 time.sleep(1)
1080
1081 def _read_data_xattr(self, ino_no, xattr_name, type, pool):
1082 mds_id = self.mds_ids[0]
1083 remote = self.mds_daemons[mds_id].remote
1084 if pool is None:
1085 pool = self.get_data_pool_name()
1086
1087 obj_name = "{0:x}.00000000".format(ino_no)
1088
1089 args = [
1090 os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name
1091 ]
1092 try:
1093 data = remote.sh(args)
1094 except CommandFailedError as e:
1095 log.error(e.__str__())
1096 raise ObjectNotFound(obj_name)
1097
1098 dump = remote.sh(
1099 [os.path.join(self._prefix, "ceph-dencoder"),
1100 "type", type,
1101 "import", "-",
1102 "decode", "dump_json"],
1103 stdin=data
1104 )
1105
1106 return json.loads(dump.strip())
1107
1108 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1109 """
1110 Write to an xattr of the 0th data object of an inode. Will
1111 succeed whether the object and/or xattr already exist or not.
1112
1113 :param ino_no: integer inode number
1114 :param xattr_name: string name of the xattr
1115 :param data: byte array data to write to the xattr
1116 :param pool: name of data pool or None to use primary data pool
1117 :return: None
1118 """
1119 remote = self.mds_daemons[self.mds_ids[0]].remote
1120 if pool is None:
1121 pool = self.get_data_pool_name()
1122
1123 obj_name = "{0:x}.00000000".format(ino_no)
1124 args = [
1125 os.path.join(self._prefix, "rados"), "-p", pool, "setxattr",
1126 obj_name, xattr_name, data
1127 ]
1128 remote.sh(args)
1129
1130 def read_backtrace(self, ino_no, pool=None):
1131 """
1132 Read the backtrace from the data pool, return a dict in the format
1133 given by inode_backtrace_t::dump, which is something like:
1134
1135 ::
1136
1137 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1138 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1139
1140 { "ino": 1099511627778,
1141 "ancestors": [
1142 { "dirino": 1,
1143 "dname": "blah",
1144 "version": 11}],
1145 "pool": 1,
1146 "old_pools": []}
1147
1148 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1149 one data pool and that will be used.
1150 """
1151 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1152
1153 def read_layout(self, ino_no, pool=None):
1154 """
1155 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1156 ::
1157 {
1158 "stripe_unit": 4194304,
1159 "stripe_count": 1,
1160 "object_size": 4194304,
1161 "pool_id": 1,
1162 "pool_ns": "",
1163 }
1164
1165 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1166 one data pool and that will be used.
1167 """
1168 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1169
1170 def _enumerate_data_objects(self, ino, size):
1171 """
1172 Get the list of expected data objects for a range, and the list of objects
1173 that really exist.
1174
1175 :return a tuple of two lists of strings (expected, actual)
1176 """
1177 stripe_size = 1024 * 1024 * 4
1178
1179 size = max(stripe_size, size)
1180
1181 want_objects = [
1182 "{0:x}.{1:08x}".format(ino, n)
1183 for n in range(0, ((size - 1) / stripe_size) + 1)
1184 ]
1185
1186 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n")
1187
1188 return want_objects, exist_objects
1189
1190 def data_objects_present(self, ino, size):
1191 """
1192 Check that *all* the expected data objects for an inode are present in the data pool
1193 """
1194
1195 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1196 missing = set(want_objects) - set(exist_objects)
1197
1198 if missing:
1199 log.info("Objects missing (ino {0}, size {1}): {2}".format(
1200 ino, size, missing
1201 ))
1202 return False
1203 else:
1204 log.info("All objects for ino {0} size {1} found".format(ino, size))
1205 return True
1206
1207 def data_objects_absent(self, ino, size):
1208 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1209 present = set(want_objects) & set(exist_objects)
1210
1211 if present:
1212 log.info("Objects not absent (ino {0}, size {1}): {2}".format(
1213 ino, size, present
1214 ))
1215 return False
1216 else:
1217 log.info("All objects for ino {0} size {1} are absent".format(ino, size))
1218 return True
1219
1220 def dirfrag_exists(self, ino, frag):
1221 try:
1222 self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1223 except CommandFailedError:
1224 return False
1225 else:
1226 return True
1227
1228 def rados(self, args, pool=None, namespace=None, stdin_data=None,
1229 stdin_file=None):
1230 """
1231 Call into the `rados` CLI from an MDS
1232 """
1233
1234 if pool is None:
1235 pool = self.get_metadata_pool_name()
1236
1237 # Doesn't matter which MDS we use to run rados commands, they all
1238 # have access to the pools
1239 mds_id = self.mds_ids[0]
1240 remote = self.mds_daemons[mds_id].remote
1241
1242 # NB we could alternatively use librados pybindings for this, but it's a one-liner
1243 # using the `rados` CLI
1244 args = ([os.path.join(self._prefix, "rados"), "-p", pool] +
1245 (["--namespace", namespace] if namespace else []) +
1246 args)
1247
1248 if stdin_file is not None:
1249 args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)]
1250
1251 output = remote.sh(args, stdin=stdin_data)
1252 return output.strip()
1253
1254 def list_dirfrag(self, dir_ino):
1255 """
1256 Read the named object and return the list of omap keys
1257
1258 :return a list of 0 or more strings
1259 """
1260
1261 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1262
1263 try:
1264 key_list_str = self.rados(["listomapkeys", dirfrag_obj_name])
1265 except CommandFailedError as e:
1266 log.error(e.__str__())
1267 raise ObjectNotFound(dirfrag_obj_name)
1268
1269 return key_list_str.split("\n") if key_list_str else []
1270
1271 def erase_metadata_objects(self, prefix):
1272 """
1273 For all objects in the metadata pool matching the prefix,
1274 erase them.
1275
1276 This O(N) with the number of objects in the pool, so only suitable
1277 for use on toy test filesystems.
1278 """
1279 all_objects = self.rados(["ls"]).split("\n")
1280 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1281 for o in matching_objects:
1282 self.rados(["rm", o])
1283
1284 def erase_mds_objects(self, rank):
1285 """
1286 Erase all the per-MDS objects for a particular rank. This includes
1287 inotable, sessiontable, journal
1288 """
1289
1290 def obj_prefix(multiplier):
1291 """
1292 MDS object naming conventions like rank 1's
1293 journal is at 201.***
1294 """
1295 return "%x." % (multiplier * 0x100 + rank)
1296
1297 # MDS_INO_LOG_OFFSET
1298 self.erase_metadata_objects(obj_prefix(2))
1299 # MDS_INO_LOG_BACKUP_OFFSET
1300 self.erase_metadata_objects(obj_prefix(3))
1301 # MDS_INO_LOG_POINTER_OFFSET
1302 self.erase_metadata_objects(obj_prefix(4))
1303 # MDSTables & SessionMap
1304 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1305
1306 @property
1307 def _prefix(self):
1308 """
1309 Override this to set a different
1310 """
1311 return ""
1312
1313 def _make_rank(self, rank):
1314 return "{}:{}".format(self.name, rank)
1315
1316 def _run_tool(self, tool, args, rank=None, quiet=False):
1317 # Tests frequently have [client] configuration that jacks up
1318 # the objecter log level (unlikely to be interesting here)
1319 # and does not set the mds log level (very interesting here)
1320 if quiet:
1321 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1322 else:
1323 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1324
1325 if rank is not None:
1326 base_args.extend(["--rank", "%s" % str(rank)])
1327
1328 t1 = datetime.datetime.now()
1329 r = self.tool_remote.sh(base_args + args).strip()
1330 duration = datetime.datetime.now() - t1
1331 log.info("Ran {0} in time {1}, result:\n{2}".format(
1332 base_args + args, duration, r
1333 ))
1334 return r
1335
1336 @property
1337 def tool_remote(self):
1338 """
1339 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1340 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1341 so that tests can use this remote to go get locally written output files from the tools.
1342 """
1343 mds_id = self.mds_ids[0]
1344 return self.mds_daemons[mds_id].remote
1345
1346 def journal_tool(self, args, rank, quiet=False):
1347 """
1348 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1349 """
1350 fs_rank = self._make_rank(rank)
1351 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1352
1353 def table_tool(self, args, quiet=False):
1354 """
1355 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1356 """
1357 return self._run_tool("cephfs-table-tool", args, None, quiet)
1358
1359 def data_scan(self, args, quiet=False, worker_count=1):
1360 """
1361 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1362
1363 :param worker_count: if greater than 1, multiple workers will be run
1364 in parallel and the return value will be None
1365 """
1366
1367 workers = []
1368
1369 for n in range(0, worker_count):
1370 if worker_count > 1:
1371 # data-scan args first token is a command, followed by args to it.
1372 # insert worker arguments after the command.
1373 cmd = args[0]
1374 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1375 else:
1376 worker_args = args
1377
1378 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1379 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1380
1381 for w in workers:
1382 w.get()
1383
1384 if worker_count == 1:
1385 return workers[0].value
1386 else:
1387 return None
1388
1389 def is_full(self):
1390 return self.is_pool_full(self.get_data_pool_name())