]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11 import traceback
12
13 from io import BytesIO, StringIO
14 from errno import EBUSY
15
16 from teuthology.exceptions import CommandFailedError
17 from teuthology import misc
18 from teuthology.nuke import clear_firewall
19 from teuthology.parallel import parallel
20 from teuthology import contextutil
21 from tasks.ceph_manager import write_conf
22 from tasks import ceph_manager
23
24
25 log = logging.getLogger(__name__)
26
27
28 DAEMON_WAIT_TIMEOUT = 120
29 ROOT_INO = 1
30
31 class FileLayout(object):
32 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
33 self.pool = pool
34 self.pool_namespace = pool_namespace
35 self.stripe_unit = stripe_unit
36 self.stripe_count = stripe_count
37 self.object_size = object_size
38
39 @classmethod
40 def load_from_ceph(layout_str):
41 # TODO
42 pass
43
44 def items(self):
45 if self.pool is not None:
46 yield ("pool", self.pool)
47 if self.pool_namespace:
48 yield ("pool_namespace", self.pool_namespace)
49 if self.stripe_unit is not None:
50 yield ("stripe_unit", self.stripe_unit)
51 if self.stripe_count is not None:
52 yield ("stripe_count", self.stripe_count)
53 if self.object_size is not None:
54 yield ("object_size", self.stripe_size)
55
56 class ObjectNotFound(Exception):
57 def __init__(self, object_name):
58 self._object_name = object_name
59
60 def __str__(self):
61 return "Object not found: '{0}'".format(self._object_name)
62
63 class FSMissing(Exception):
64 def __init__(self, ident):
65 self.ident = ident
66
67 def __str__(self):
68 return f"File system {self.ident} does not exist in the map"
69
70 class FSStatus(object):
71 """
72 Operations on a snapshot of the FSMap.
73 """
74 def __init__(self, mon_manager):
75 self.mon = mon_manager
76 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
77
78 def __str__(self):
79 return json.dumps(self.map, indent = 2, sort_keys = True)
80
81 # Expose the fsmap for manual inspection.
82 def __getitem__(self, key):
83 """
84 Get a field from the fsmap.
85 """
86 return self.map[key]
87
88 def get_filesystems(self):
89 """
90 Iterator for all filesystems.
91 """
92 for fs in self.map['filesystems']:
93 yield fs
94
95 def get_all(self):
96 """
97 Iterator for all the mds_info components in the FSMap.
98 """
99 for info in self.map['standbys']:
100 yield info
101 for fs in self.map['filesystems']:
102 for info in fs['mdsmap']['info'].values():
103 yield info
104
105 def get_standbys(self):
106 """
107 Iterator for all standbys.
108 """
109 for info in self.map['standbys']:
110 yield info
111
112 def get_fsmap(self, fscid):
113 """
114 Get the fsmap for the given FSCID.
115 """
116 for fs in self.map['filesystems']:
117 if fscid is None or fs['id'] == fscid:
118 return fs
119 raise FSMissing(fscid)
120
121 def get_fsmap_byname(self, name):
122 """
123 Get the fsmap for the given file system name.
124 """
125 for fs in self.map['filesystems']:
126 if name is None or fs['mdsmap']['fs_name'] == name:
127 return fs
128 raise FSMissing(name)
129
130 def get_replays(self, fscid):
131 """
132 Get the standby:replay MDS for the given FSCID.
133 """
134 fs = self.get_fsmap(fscid)
135 for info in fs['mdsmap']['info'].values():
136 if info['state'] == 'up:standby-replay':
137 yield info
138
139 def get_ranks(self, fscid):
140 """
141 Get the ranks for the given FSCID.
142 """
143 fs = self.get_fsmap(fscid)
144 for info in fs['mdsmap']['info'].values():
145 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
146 yield info
147
148 def get_rank(self, fscid, rank):
149 """
150 Get the rank for the given FSCID.
151 """
152 for info in self.get_ranks(fscid):
153 if info['rank'] == rank:
154 return info
155 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
156
157 def get_mds(self, name):
158 """
159 Get the info for the given MDS name.
160 """
161 for info in self.get_all():
162 if info['name'] == name:
163 return info
164 return None
165
166 def get_mds_addr(self, name):
167 """
168 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
169 """
170 info = self.get_mds(name)
171 if info:
172 return info['addr']
173 else:
174 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
175 raise RuntimeError("MDS id '{0}' not found in map".format(name))
176
177 def get_mds_addrs(self, name):
178 """
179 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
180 """
181 info = self.get_mds(name)
182 if info:
183 return [e['addr'] for e in info['addrs']['addrvec']]
184 else:
185 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
186 raise RuntimeError("MDS id '{0}' not found in map".format(name))
187
188 def get_mds_gid(self, gid):
189 """
190 Get the info for the given MDS gid.
191 """
192 for info in self.get_all():
193 if info['gid'] == gid:
194 return info
195 return None
196
197 def hadfailover(self, status):
198 """
199 Compares two statuses for mds failovers.
200 Returns True if there is a failover.
201 """
202 for fs in status.map['filesystems']:
203 for info in fs['mdsmap']['info'].values():
204 oldinfo = self.get_mds_gid(info['gid'])
205 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
206 return True
207 #all matching
208 return False
209
210 class CephCluster(object):
211 @property
212 def admin_remote(self):
213 first_mon = misc.get_first_mon(self._ctx, None)
214 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
215 return result
216
217 def __init__(self, ctx) -> None:
218 self._ctx = ctx
219 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
220
221 def get_config(self, key, service_type=None):
222 """
223 Get config from mon by default, or a specific service if caller asks for it
224 """
225 if service_type is None:
226 service_type = 'mon'
227
228 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
229 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
230
231 def set_ceph_conf(self, subsys, key, value):
232 if subsys not in self._ctx.ceph['ceph'].conf:
233 self._ctx.ceph['ceph'].conf[subsys] = {}
234 self._ctx.ceph['ceph'].conf[subsys][key] = value
235 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
236 # used a different config path this won't work.
237
238 def clear_ceph_conf(self, subsys, key):
239 del self._ctx.ceph['ceph'].conf[subsys][key]
240 write_conf(self._ctx)
241
242 def json_asok(self, command, service_type, service_id, timeout=None):
243 if timeout is None:
244 timeout = 15*60
245 command.insert(0, '--format=json')
246 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
247 response_data = proc.stdout.getvalue().strip()
248 if len(response_data) > 0:
249 j = json.loads(response_data)
250 pretty = json.dumps(j, sort_keys=True, indent=2)
251 log.debug(f"_json_asok output\n{pretty}")
252 return j
253 else:
254 log.debug("_json_asok output empty")
255 return None
256
257 def is_addr_blocklisted(self, addr=None):
258 if addr is None:
259 log.warn("Couldn't get the client address, so the blocklisted "
260 "status undetermined")
261 return False
262
263 blocklist = json.loads(self.mon_manager.run_cluster_cmd(
264 args=["osd", "blocklist", "ls", "--format=json"],
265 stdout=StringIO()).stdout.getvalue())
266 for b in blocklist:
267 if addr == b["addr"]:
268 return True
269 return False
270
271
272 class MDSCluster(CephCluster):
273 """
274 Collective operations on all the MDS daemons in the Ceph cluster. These
275 daemons may be in use by various Filesystems.
276
277 For the benefit of pre-multi-filesystem tests, this class is also
278 a parent of Filesystem. The correct way to use MDSCluster going forward is
279 as a separate instance outside of your (multiple) Filesystem instances.
280 """
281
282 def __init__(self, ctx):
283 super(MDSCluster, self).__init__(ctx)
284
285 @property
286 def mds_ids(self):
287 # do this dynamically because the list of ids may change periodically with cephadm
288 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
289
290 @property
291 def mds_daemons(self):
292 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
293
294 def _one_or_all(self, mds_id, cb, in_parallel=True):
295 """
296 Call a callback for a single named MDS, or for all.
297
298 Note that the parallelism here isn't for performance, it's to avoid being overly kind
299 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
300 avoid being overly kind by executing them in a particular order. However, some actions
301 don't cope with being done in parallel, so it's optional (`in_parallel`)
302
303 :param mds_id: MDS daemon name, or None
304 :param cb: Callback taking single argument of MDS daemon name
305 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
306 """
307
308 if mds_id is None:
309 if in_parallel:
310 with parallel() as p:
311 for mds_id in self.mds_ids:
312 p.spawn(cb, mds_id)
313 else:
314 for mds_id in self.mds_ids:
315 cb(mds_id)
316 else:
317 cb(mds_id)
318
319 def get_config(self, key, service_type=None):
320 """
321 get_config specialization of service_type="mds"
322 """
323 if service_type != "mds":
324 return super(MDSCluster, self).get_config(key, service_type)
325
326 # Some tests stop MDS daemons, don't send commands to a dead one:
327 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
328 service_id = random.sample(running_daemons, 1)[0]
329 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
330
331 def mds_stop(self, mds_id=None):
332 """
333 Stop the MDS daemon process(se). If it held a rank, that rank
334 will eventually go laggy.
335 """
336 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
337
338 def mds_fail(self, mds_id=None):
339 """
340 Inform MDSMonitor of the death of the daemon process(es). If it held
341 a rank, that rank will be relinquished.
342 """
343 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
344
345 def mds_restart(self, mds_id=None):
346 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
347
348 def mds_fail_restart(self, mds_id=None):
349 """
350 Variation on restart that includes marking MDSs as failed, so that doing this
351 operation followed by waiting for healthy daemon states guarantees that they
352 have gone down and come up, rather than potentially seeing the healthy states
353 that existed before the restart.
354 """
355 def _fail_restart(id_):
356 self.mds_daemons[id_].stop()
357 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
358 self.mds_daemons[id_].restart()
359
360 self._one_or_all(mds_id, _fail_restart)
361
362 def mds_signal(self, mds_id, sig, silent=False):
363 """
364 signal a MDS daemon
365 """
366 self.mds_daemons[mds_id].signal(sig, silent);
367
368 def newfs(self, name='cephfs', create=True):
369 return Filesystem(self._ctx, name=name, create=create)
370
371 def status(self):
372 return FSStatus(self.mon_manager)
373
374 def get_standby_daemons(self):
375 return set([s['name'] for s in self.status().get_standbys()])
376
377 def get_mds_hostnames(self):
378 result = set()
379 for mds_id in self.mds_ids:
380 mds_remote = self.mon_manager.find_remote('mds', mds_id)
381 result.add(mds_remote.hostname)
382
383 return list(result)
384
385 def set_clients_block(self, blocked, mds_id=None):
386 """
387 Block (using iptables) client communications to this MDS. Be careful: if
388 other services are running on this MDS, or other MDSs try to talk to this
389 MDS, their communications may also be blocked as collatoral damage.
390
391 :param mds_id: Optional ID of MDS to block, default to all
392 :return:
393 """
394 da_flag = "-A" if blocked else "-D"
395
396 def set_block(_mds_id):
397 remote = self.mon_manager.find_remote('mds', _mds_id)
398 status = self.status()
399
400 addr = status.get_mds_addr(_mds_id)
401 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
402
403 remote.run(
404 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
405 "comment", "--comment", "teuthology"])
406 remote.run(
407 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
409
410 self._one_or_all(mds_id, set_block, in_parallel=False)
411
412 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
413 """
414 Block (using iptables) communications from a provided MDS to other MDSs.
415 Block all ports that an MDS uses for communication.
416
417 :param blocked: True to block the MDS, False otherwise
418 :param mds_rank_1: MDS rank
419 :param mds_rank_2: MDS rank
420 :return:
421 """
422 da_flag = "-A" if blocked else "-D"
423
424 def set_block(mds_ids):
425 status = self.status()
426
427 mds = mds_ids[0]
428 remote = self.mon_manager.find_remote('mds', mds)
429 addrs = status.get_mds_addrs(mds)
430 for addr in addrs:
431 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
432 remote.run(
433 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
434 "comment", "--comment", "teuthology"])
435
436
437 mds = mds_ids[1]
438 remote = self.mon_manager.find_remote('mds', mds)
439 addrs = status.get_mds_addrs(mds)
440 for addr in addrs:
441 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
442 remote.run(
443 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
444 "comment", "--comment", "teuthology"])
445 remote.run(
446 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
448
449 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
450
451 def clear_firewall(self):
452 clear_firewall(self._ctx)
453
454 def get_mds_info(self, mds_id):
455 return FSStatus(self.mon_manager).get_mds(mds_id)
456
457 def is_pool_full(self, pool_name):
458 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
459 for pool in pools:
460 if pool['pool_name'] == pool_name:
461 return 'full' in pool['flags_names'].split(",")
462
463 raise RuntimeError("Pool not found '{0}'".format(pool_name))
464
465 def delete_all_filesystems(self):
466 """
467 Remove all filesystems that exist, and any pools in use by them.
468 """
469 for fs in self.status().get_filesystems():
470 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
471
472
473 class Filesystem(MDSCluster):
474 """
475 This object is for driving a CephFS filesystem. The MDS daemons driven by
476 MDSCluster may be shared with other Filesystems.
477 """
478 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
479 super(Filesystem, self).__init__(ctx)
480
481 self.name = name
482 self.id = None
483 self.metadata_pool_name = None
484 self.metadata_overlay = False
485 self.data_pool_name = None
486 self.data_pools = None
487 self.fs_config = fs_config
488 self.ec_profile = fs_config.get('ec_profile')
489
490 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
491 self.client_id = client_list[0]
492 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
493
494 if name is not None:
495 if fscid is not None:
496 raise RuntimeError("cannot specify fscid when creating fs")
497 if create and not self.legacy_configured():
498 self.create()
499 else:
500 if fscid is not None:
501 self.id = fscid
502 self.getinfo(refresh = True)
503
504 # Stash a reference to the first created filesystem on ctx, so
505 # that if someone drops to the interactive shell they can easily
506 # poke our methods.
507 if not hasattr(self._ctx, "filesystem"):
508 self._ctx.filesystem = self
509
510 def dead(self):
511 try:
512 return not bool(self.get_mds_map())
513 except FSMissing:
514 return True
515
516 def get_task_status(self, status_key):
517 return self.mon_manager.get_service_task_status("mds", status_key)
518
519 def getinfo(self, refresh = False):
520 status = self.status()
521 if self.id is not None:
522 fsmap = status.get_fsmap(self.id)
523 elif self.name is not None:
524 fsmap = status.get_fsmap_byname(self.name)
525 else:
526 fss = [fs for fs in status.get_filesystems()]
527 if len(fss) == 1:
528 fsmap = fss[0]
529 elif len(fss) == 0:
530 raise RuntimeError("no file system available")
531 else:
532 raise RuntimeError("more than one file system available")
533 self.id = fsmap['id']
534 self.name = fsmap['mdsmap']['fs_name']
535 self.get_pool_names(status = status, refresh = refresh)
536 return status
537
538 def set_metadata_overlay(self, overlay):
539 if self.id is not None:
540 raise RuntimeError("cannot specify fscid when configuring overlay")
541 self.metadata_overlay = overlay
542
543 def deactivate(self, rank):
544 if rank < 0:
545 raise RuntimeError("invalid rank")
546 elif rank == 0:
547 raise RuntimeError("cannot deactivate rank 0")
548 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
549
550 def reach_max_mds(self):
551 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
552 status = self.getinfo()
553 mds_map = self.get_mds_map(status=status)
554 max_mds = mds_map['max_mds']
555
556 count = len(list(self.get_ranks(status=status)))
557 if count > max_mds:
558 try:
559 # deactivate mds in decending order
560 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
561 while count > max_mds:
562 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
563 target = targets[0]
564 log.debug("deactivating rank %d" % target['rank'])
565 self.deactivate(target['rank'])
566 status = self.wait_for_daemons(skip_max_mds_check=True)
567 count = len(list(self.get_ranks(status=status)))
568 except:
569 # In Mimic, deactivation is done automatically:
570 log.info("Error:\n{}".format(traceback.format_exc()))
571 status = self.wait_for_daemons()
572 else:
573 status = self.wait_for_daemons()
574
575 mds_map = self.get_mds_map(status=status)
576 assert(mds_map['max_mds'] == max_mds)
577 assert(mds_map['in'] == list(range(0, max_mds)))
578
579 def reset(self):
580 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
581
582 def fail(self):
583 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
584
585 def set_flag(self, var, *args):
586 a = map(lambda x: str(x).lower(), args)
587 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
588
589 def set_allow_multifs(self, yes=True):
590 self.set_flag("enable_multiple", yes)
591
592 def set_var(self, var, *args):
593 a = map(lambda x: str(x).lower(), args)
594 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
595
596 def set_down(self, down=True):
597 self.set_var("down", str(down).lower())
598
599 def set_joinable(self, joinable=True):
600 self.set_var("joinable", joinable)
601
602 def set_max_mds(self, max_mds):
603 self.set_var("max_mds", "%d" % max_mds)
604
605 def set_session_timeout(self, timeout):
606 self.set_var("session_timeout", "%d" % timeout)
607
608 def set_allow_standby_replay(self, yes):
609 self.set_var("allow_standby_replay", yes)
610
611 def set_allow_new_snaps(self, yes):
612 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
613
614 def required_client_features(self, *args, **kwargs):
615 c = ["fs", "required_client_features", self.name, *args]
616 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
617
618 # In Octopus+, the PG count can be omitted to use the default. We keep the
619 # hard-coded value for deployments of Mimic/Nautilus.
620 pgs_per_fs_pool = 8
621
622 def create(self):
623 if self.name is None:
624 self.name = "cephfs"
625 if self.metadata_pool_name is None:
626 self.metadata_pool_name = "{0}_metadata".format(self.name)
627 if self.data_pool_name is None:
628 data_pool_name = "{0}_data".format(self.name)
629 else:
630 data_pool_name = self.data_pool_name
631
632 log.debug("Creating filesystem '{0}'".format(self.name))
633
634 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
635 self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
636
637 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
638 data_pool_name, self.pgs_per_fs_pool.__str__())
639
640 if self.metadata_overlay:
641 self.mon_manager.raw_cluster_cmd('fs', 'new',
642 self.name, self.metadata_pool_name, data_pool_name,
643 '--allow-dangerous-metadata-overlay')
644 else:
645 self.mon_manager.raw_cluster_cmd('fs', 'new',
646 self.name,
647 self.metadata_pool_name,
648 data_pool_name)
649
650 if self.ec_profile and 'disabled' not in self.ec_profile:
651 ec_data_pool_name = data_pool_name + "_ec"
652 log.debug("EC profile is %s", self.ec_profile)
653 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
654 cmd.extend(self.ec_profile)
655 self.mon_manager.raw_cluster_cmd(*cmd)
656 self.mon_manager.raw_cluster_cmd(
657 'osd', 'pool', 'create',
658 ec_data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
659 ec_data_pool_name)
660 self.mon_manager.raw_cluster_cmd(
661 'osd', 'pool', 'set',
662 ec_data_pool_name, 'allow_ec_overwrites', 'true')
663 self.add_data_pool(ec_data_pool_name, create=False)
664 self.check_pool_application(ec_data_pool_name)
665
666 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
667
668 self.check_pool_application(self.metadata_pool_name)
669 self.check_pool_application(data_pool_name)
670
671 # Turn off spurious standby count warnings from modifying max_mds in tests.
672 try:
673 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
674 except CommandFailedError as e:
675 if e.exitstatus == 22:
676 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
677 pass
678 else:
679 raise
680
681 if self.fs_config is not None:
682 max_mds = self.fs_config.get('max_mds', 1)
683 if max_mds > 1:
684 self.set_max_mds(max_mds)
685
686 standby_replay = self.fs_config.get('standby_replay', False)
687 self.set_allow_standby_replay(standby_replay)
688
689 # If absent will use the default value (60 seconds)
690 session_timeout = self.fs_config.get('session_timeout', 60)
691 if session_timeout != 60:
692 self.set_session_timeout(session_timeout)
693
694 self.getinfo(refresh = True)
695
696 def run_client_payload(self, cmd):
697 # avoid circular dep by importing here:
698 from tasks.cephfs.fuse_mount import FuseMount
699 d = misc.get_testdir(self._ctx)
700 m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
701 m.mount()
702 m.run_shell_payload(cmd)
703 m.umount_wait(require_clean=True)
704
705 def _remove_pool(self, name, **kwargs):
706 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
707 return self.mon_manager.ceph(c, **kwargs)
708
709 def rm(self, **kwargs):
710 c = f'fs rm {self.name} --yes-i-really-mean-it'
711 return self.mon_manager.ceph(c, **kwargs)
712
713 def remove_pools(self, data_pools):
714 self._remove_pool(self.get_metadata_pool_name())
715 for poolname in data_pools:
716 try:
717 self._remove_pool(poolname)
718 except CommandFailedError as e:
719 # EBUSY, this data pool is used by two metadata pools, let the
720 # 2nd pass delete it
721 if e.exitstatus == EBUSY:
722 pass
723 else:
724 raise
725
726 def destroy(self, reset_obj_attrs=True):
727 log.info(f'Destroying file system {self.name} and related pools')
728
729 if self.dead():
730 log.debug('already dead...')
731 return
732
733 data_pools = self.get_data_pool_names(refresh=True)
734
735 # make sure no MDSs are attached to given FS.
736 self.fail()
737 self.rm()
738
739 self.remove_pools(data_pools)
740
741 if reset_obj_attrs:
742 self.id = None
743 self.name = None
744 self.metadata_pool_name = None
745 self.data_pool_name = None
746 self.data_pools = None
747
748 def recreate(self):
749 self.destroy()
750
751 self.create()
752 self.getinfo(refresh=True)
753
754 def check_pool_application(self, pool_name):
755 osd_map = self.mon_manager.get_osd_dump_json()
756 for pool in osd_map['pools']:
757 if pool['pool_name'] == pool_name:
758 if "application_metadata" in pool:
759 if not "cephfs" in pool['application_metadata']:
760 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
761 format(pool_name=pool_name))
762
763 def __del__(self):
764 if getattr(self._ctx, "filesystem", None) == self:
765 delattr(self._ctx, "filesystem")
766
767 def exists(self):
768 """
769 Whether a filesystem exists in the mon's filesystem list
770 """
771 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
772 return self.name in [fs['name'] for fs in fs_list]
773
774 def legacy_configured(self):
775 """
776 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
777 the case, the caller should avoid using Filesystem.create
778 """
779 try:
780 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
781 pools = json.loads(out_text)
782 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
783 if metadata_pool_exists:
784 self.metadata_pool_name = 'metadata'
785 except CommandFailedError as e:
786 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
787 # structured output (--format) from the CLI.
788 if e.exitstatus == 22:
789 metadata_pool_exists = True
790 else:
791 raise
792
793 return metadata_pool_exists
794
795 def _df(self):
796 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
797
798 # may raise FSMissing
799 def get_mds_map(self, status=None):
800 if status is None:
801 status = self.status()
802 return status.get_fsmap(self.id)['mdsmap']
803
804 def get_var(self, var, status=None):
805 return self.get_mds_map(status=status)[var]
806
807 def set_dir_layout(self, mount, path, layout):
808 for name, value in layout.items():
809 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
810
811 def add_data_pool(self, name, create=True):
812 if create:
813 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.pgs_per_fs_pool.__str__())
814 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
815 self.get_pool_names(refresh = True)
816 for poolid, fs_name in self.data_pools.items():
817 if name == fs_name:
818 return poolid
819 raise RuntimeError("could not get just created pool '{0}'".format(name))
820
821 def get_pool_names(self, refresh = False, status = None):
822 if refresh or self.metadata_pool_name is None or self.data_pools is None:
823 if status is None:
824 status = self.status()
825 fsmap = status.get_fsmap(self.id)
826
827 osd_map = self.mon_manager.get_osd_dump_json()
828 id_to_name = {}
829 for p in osd_map['pools']:
830 id_to_name[p['pool']] = p['pool_name']
831
832 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
833 self.data_pools = {}
834 for data_pool in fsmap['mdsmap']['data_pools']:
835 self.data_pools[data_pool] = id_to_name[data_pool]
836
837 def get_data_pool_name(self, refresh = False):
838 if refresh or self.data_pools is None:
839 self.get_pool_names(refresh = True)
840 assert(len(self.data_pools) == 1)
841 return next(iter(self.data_pools.values()))
842
843 def get_data_pool_id(self, refresh = False):
844 """
845 Don't call this if you have multiple data pools
846 :return: integer
847 """
848 if refresh or self.data_pools is None:
849 self.get_pool_names(refresh = True)
850 assert(len(self.data_pools) == 1)
851 return next(iter(self.data_pools.keys()))
852
853 def get_data_pool_names(self, refresh = False):
854 if refresh or self.data_pools is None:
855 self.get_pool_names(refresh = True)
856 return list(self.data_pools.values())
857
858 def get_metadata_pool_name(self):
859 return self.metadata_pool_name
860
861 def set_data_pool_name(self, name):
862 if self.id is not None:
863 raise RuntimeError("can't set filesystem name if its fscid is set")
864 self.data_pool_name = name
865
866 def get_namespace_id(self):
867 return self.id
868
869 def get_pool_df(self, pool_name):
870 """
871 Return a dict like:
872 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
873 """
874 for pool_df in self._df()['pools']:
875 if pool_df['name'] == pool_name:
876 return pool_df['stats']
877
878 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
879
880 def get_usage(self):
881 return self._df()['stats']['total_used_bytes']
882
883 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
884 """
885 Return true if all daemons are in one of active, standby, standby-replay, and
886 at least max_mds daemons are in 'active'.
887
888 Unlike most of Filesystem, this function is tolerant of new-style `fs`
889 commands being missing, because we are part of the ceph installation
890 process during upgrade suites, so must fall back to old style commands
891 when we get an EINVAL on a new style command.
892
893 :return:
894 """
895 # First, check to see that processes haven't exited with an error code
896 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
897 mds.check_status()
898
899 active_count = 0
900 mds_map = self.get_mds_map(status=status)
901
902 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
903
904 for mds_id, mds_status in mds_map['info'].items():
905 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
906 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
907 return False
908 elif mds_status['state'] == 'up:active':
909 active_count += 1
910
911 log.debug("are_daemons_healthy: {0}/{1}".format(
912 active_count, mds_map['max_mds']
913 ))
914
915 if not skip_max_mds_check:
916 if active_count > mds_map['max_mds']:
917 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
918 return False
919 elif active_count == mds_map['max_mds']:
920 # The MDSMap says these guys are active, but let's check they really are
921 for mds_id, mds_status in mds_map['info'].items():
922 if mds_status['state'] == 'up:active':
923 try:
924 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
925 except CommandFailedError as cfe:
926 if cfe.exitstatus == errno.EINVAL:
927 # Old version, can't do this check
928 continue
929 else:
930 # MDS not even running
931 return False
932
933 if daemon_status['state'] != 'up:active':
934 # MDS hasn't taken the latest map yet
935 return False
936
937 return True
938 else:
939 return False
940 else:
941 log.debug("are_daemons_healthy: skipping max_mds check")
942 return True
943
944 def get_daemon_names(self, state=None, status=None):
945 """
946 Return MDS daemon names of those daemons in the given state
947 :param state:
948 :return:
949 """
950 mdsmap = self.get_mds_map(status)
951 result = []
952 for mds_status in sorted(mdsmap['info'].values(),
953 key=lambda _: _['rank']):
954 if mds_status['state'] == state or state is None:
955 result.append(mds_status['name'])
956
957 return result
958
959 def get_active_names(self, status=None):
960 """
961 Return MDS daemon names of those daemons holding ranks
962 in state up:active
963
964 :return: list of strings like ['a', 'b'], sorted by rank
965 """
966 return self.get_daemon_names("up:active", status=status)
967
968 def get_all_mds_rank(self, status=None):
969 mdsmap = self.get_mds_map(status)
970 result = []
971 for mds_status in sorted(mdsmap['info'].values(),
972 key=lambda _: _['rank']):
973 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
974 result.append(mds_status['rank'])
975
976 return result
977
978 def get_rank(self, rank=None, status=None):
979 if status is None:
980 status = self.getinfo()
981 if rank is None:
982 rank = 0
983 return status.get_rank(self.id, rank)
984
985 def rank_restart(self, rank=0, status=None):
986 name = self.get_rank(rank=rank, status=status)['name']
987 self.mds_restart(mds_id=name)
988
989 def rank_signal(self, signal, rank=0, status=None):
990 name = self.get_rank(rank=rank, status=status)['name']
991 self.mds_signal(name, signal)
992
993 def rank_freeze(self, yes, rank=0):
994 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
995
996 def rank_fail(self, rank=0):
997 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
998
999 def get_ranks(self, status=None):
1000 if status is None:
1001 status = self.getinfo()
1002 return status.get_ranks(self.id)
1003
1004 def get_replays(self, status=None):
1005 if status is None:
1006 status = self.getinfo()
1007 return status.get_replays(self.id)
1008
1009 def get_replay(self, rank=0, status=None):
1010 for replay in self.get_replays(status=status):
1011 if replay['rank'] == rank:
1012 return replay
1013 return None
1014
1015 def get_rank_names(self, status=None):
1016 """
1017 Return MDS daemon names of those daemons holding a rank,
1018 sorted by rank. This includes e.g. up:replay/reconnect
1019 as well as active, but does not include standby or
1020 standby-replay.
1021 """
1022 mdsmap = self.get_mds_map(status)
1023 result = []
1024 for mds_status in sorted(mdsmap['info'].values(),
1025 key=lambda _: _['rank']):
1026 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1027 result.append(mds_status['name'])
1028
1029 return result
1030
1031 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
1032 """
1033 Wait until all daemons are healthy
1034 :return:
1035 """
1036
1037 if timeout is None:
1038 timeout = DAEMON_WAIT_TIMEOUT
1039
1040 if status is None:
1041 status = self.status()
1042
1043 elapsed = 0
1044 while True:
1045 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1046 return status
1047 else:
1048 time.sleep(1)
1049 elapsed += 1
1050
1051 if elapsed > timeout:
1052 log.debug("status = {0}".format(status))
1053 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1054
1055 status = self.status()
1056
1057 def dencoder(self, obj_type, obj_blob):
1058 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1059 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1060 return p.stdout.getvalue()
1061
1062 def rados(self, *args, **kwargs):
1063 """
1064 Callout to rados CLI.
1065 """
1066
1067 return self.mon_manager.do_rados(*args, **kwargs)
1068
1069 def radosm(self, *args, **kwargs):
1070 """
1071 Interact with the metadata pool via rados CLI.
1072 """
1073
1074 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1075
1076 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
1077 """
1078 Interact with the metadata pool via rados CLI. Get the stdout.
1079 """
1080
1081 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
1082
1083 def get_metadata_object(self, object_type, object_id):
1084 """
1085 Retrieve an object from the metadata pool, pass it through
1086 ceph-dencoder to dump it to JSON, and return the decoded object.
1087 """
1088
1089 o = self.radosmo(['get', object_id, '-'])
1090 j = self.dencoder(object_type, o)
1091 try:
1092 return json.loads(j)
1093 except (TypeError, ValueError):
1094 log.error("Failed to decode JSON: '{0}'".format(j))
1095 raise
1096
1097 def get_journal_version(self):
1098 """
1099 Read the JournalPointer and Journal::Header objects to learn the version of
1100 encoding in use.
1101 """
1102 journal_pointer_object = '400.00000000'
1103 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1104 journal_ino = journal_pointer_dump['journal_pointer']['front']
1105
1106 journal_header_object = "{0:x}.00000000".format(journal_ino)
1107 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1108
1109 version = journal_header_dump['journal_header']['stream_format']
1110 log.debug("Read journal version {0}".format(version))
1111
1112 return version
1113
1114 def mds_asok(self, command, mds_id=None, timeout=None):
1115 if mds_id is None:
1116 return self.rank_asok(command, timeout=timeout)
1117
1118 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1119
1120 def mds_tell(self, command, mds_id=None):
1121 if mds_id is None:
1122 return self.rank_tell(command)
1123
1124 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1125
1126 def rank_asok(self, command, rank=0, status=None, timeout=None):
1127 info = self.get_rank(rank=rank, status=status)
1128 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1129
1130 def rank_tell(self, command, rank=0, status=None):
1131 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
1132
1133 def ranks_tell(self, command, status=None):
1134 if status is None:
1135 status = self.status()
1136 out = []
1137 for r in status.get_ranks(self.id):
1138 result = self.rank_tell(command, rank=r['rank'], status=status)
1139 out.append((r['rank'], result))
1140 return sorted(out)
1141
1142 def ranks_perf(self, f, status=None):
1143 perf = self.ranks_tell(["perf", "dump"], status=status)
1144 out = []
1145 for rank, perf in perf:
1146 out.append((rank, f(perf)))
1147 return out
1148
1149 def read_cache(self, path, depth=None):
1150 cmd = ["dump", "tree", path]
1151 if depth is not None:
1152 cmd.append(depth.__str__())
1153 result = self.mds_asok(cmd)
1154 if len(result) == 0:
1155 raise RuntimeError("Path not found in cache: {0}".format(path))
1156
1157 return result
1158
1159 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1160 """
1161 Block until the MDS reaches a particular state, or a failure condition
1162 is met.
1163
1164 When there are multiple MDSs, succeed when exaclty one MDS is in the
1165 goal state, or fail when any MDS is in the reject state.
1166
1167 :param goal_state: Return once the MDS is in this state
1168 :param reject: Fail if the MDS enters this state before the goal state
1169 :param timeout: Fail if this many seconds pass before reaching goal
1170 :return: number of seconds waited, rounded down to integer
1171 """
1172
1173 started_at = time.time()
1174 while True:
1175 status = self.status()
1176 if rank is not None:
1177 try:
1178 mds_info = status.get_rank(self.id, rank)
1179 current_state = mds_info['state'] if mds_info else None
1180 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1181 except:
1182 mdsmap = self.get_mds_map(status=status)
1183 if rank in mdsmap['failed']:
1184 log.debug("Waiting for rank {0} to come back.".format(rank))
1185 current_state = None
1186 else:
1187 raise
1188 elif mds_id is not None:
1189 # mds_info is None if no daemon with this ID exists in the map
1190 mds_info = status.get_mds(mds_id)
1191 current_state = mds_info['state'] if mds_info else None
1192 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1193 else:
1194 # In general, look for a single MDS
1195 states = [m['state'] for m in status.get_ranks(self.id)]
1196 if [s for s in states if s == goal_state] == [goal_state]:
1197 current_state = goal_state
1198 elif reject in states:
1199 current_state = reject
1200 else:
1201 current_state = None
1202 log.debug("mapped states {0} to {1}".format(states, current_state))
1203
1204 elapsed = time.time() - started_at
1205 if current_state == goal_state:
1206 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
1207 return elapsed
1208 elif reject is not None and current_state == reject:
1209 raise RuntimeError("MDS in reject state {0}".format(current_state))
1210 elif timeout is not None and elapsed > timeout:
1211 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1212 raise RuntimeError(
1213 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1214 elapsed, goal_state, current_state
1215 ))
1216 else:
1217 time.sleep(1)
1218
1219 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
1220 if pool is None:
1221 pool = self.get_data_pool_name()
1222
1223 obj_name = "{0:x}.00000000".format(ino_no)
1224
1225 args = ["getxattr", obj_name, xattr_name]
1226 try:
1227 proc = self.rados(args, pool=pool, stdout=BytesIO())
1228 except CommandFailedError as e:
1229 log.error(e.__str__())
1230 raise ObjectNotFound(obj_name)
1231
1232 obj_blob = proc.stdout.getvalue()
1233 return json.loads(self.dencoder(obj_type, obj_blob).strip())
1234
1235 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1236 """
1237 Write to an xattr of the 0th data object of an inode. Will
1238 succeed whether the object and/or xattr already exist or not.
1239
1240 :param ino_no: integer inode number
1241 :param xattr_name: string name of the xattr
1242 :param data: byte array data to write to the xattr
1243 :param pool: name of data pool or None to use primary data pool
1244 :return: None
1245 """
1246 if pool is None:
1247 pool = self.get_data_pool_name()
1248
1249 obj_name = "{0:x}.00000000".format(ino_no)
1250 args = ["setxattr", obj_name, xattr_name, data]
1251 self.rados(args, pool=pool)
1252
1253 def read_backtrace(self, ino_no, pool=None):
1254 """
1255 Read the backtrace from the data pool, return a dict in the format
1256 given by inode_backtrace_t::dump, which is something like:
1257
1258 ::
1259
1260 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1261 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1262
1263 { "ino": 1099511627778,
1264 "ancestors": [
1265 { "dirino": 1,
1266 "dname": "blah",
1267 "version": 11}],
1268 "pool": 1,
1269 "old_pools": []}
1270
1271 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1272 one data pool and that will be used.
1273 """
1274 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1275
1276 def read_layout(self, ino_no, pool=None):
1277 """
1278 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1279 ::
1280 {
1281 "stripe_unit": 4194304,
1282 "stripe_count": 1,
1283 "object_size": 4194304,
1284 "pool_id": 1,
1285 "pool_ns": "",
1286 }
1287
1288 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1289 one data pool and that will be used.
1290 """
1291 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1292
1293 def _enumerate_data_objects(self, ino, size):
1294 """
1295 Get the list of expected data objects for a range, and the list of objects
1296 that really exist.
1297
1298 :return a tuple of two lists of strings (expected, actual)
1299 """
1300 stripe_size = 1024 * 1024 * 4
1301
1302 size = max(stripe_size, size)
1303
1304 want_objects = [
1305 "{0:x}.{1:08x}".format(ino, n)
1306 for n in range(0, ((size - 1) // stripe_size) + 1)
1307 ]
1308
1309 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
1310
1311 return want_objects, exist_objects
1312
1313 def data_objects_present(self, ino, size):
1314 """
1315 Check that *all* the expected data objects for an inode are present in the data pool
1316 """
1317
1318 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1319 missing = set(want_objects) - set(exist_objects)
1320
1321 if missing:
1322 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
1323 ino, size, missing
1324 ))
1325 return False
1326 else:
1327 log.debug("All objects for ino {0} size {1} found".format(ino, size))
1328 return True
1329
1330 def data_objects_absent(self, ino, size):
1331 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1332 present = set(want_objects) & set(exist_objects)
1333
1334 if present:
1335 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1336 ino, size, present
1337 ))
1338 return False
1339 else:
1340 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
1341 return True
1342
1343 def dirfrag_exists(self, ino, frag):
1344 try:
1345 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1346 except CommandFailedError:
1347 return False
1348 else:
1349 return True
1350
1351 def list_dirfrag(self, dir_ino):
1352 """
1353 Read the named object and return the list of omap keys
1354
1355 :return a list of 0 or more strings
1356 """
1357
1358 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1359
1360 try:
1361 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
1362 except CommandFailedError as e:
1363 log.error(e.__str__())
1364 raise ObjectNotFound(dirfrag_obj_name)
1365
1366 return key_list_str.strip().split("\n") if key_list_str else []
1367
1368 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1369 """
1370 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1371 warning : The splitting of directory is not considered here.
1372 """
1373
1374 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1375 try:
1376 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1377 except CommandFailedError as e:
1378 log.error(e.__str__())
1379 raise ObjectNotFound(dir_ino)
1380
1381 def erase_metadata_objects(self, prefix):
1382 """
1383 For all objects in the metadata pool matching the prefix,
1384 erase them.
1385
1386 This O(N) with the number of objects in the pool, so only suitable
1387 for use on toy test filesystems.
1388 """
1389 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
1390 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1391 for o in matching_objects:
1392 self.radosm(["rm", o])
1393
1394 def erase_mds_objects(self, rank):
1395 """
1396 Erase all the per-MDS objects for a particular rank. This includes
1397 inotable, sessiontable, journal
1398 """
1399
1400 def obj_prefix(multiplier):
1401 """
1402 MDS object naming conventions like rank 1's
1403 journal is at 201.***
1404 """
1405 return "%x." % (multiplier * 0x100 + rank)
1406
1407 # MDS_INO_LOG_OFFSET
1408 self.erase_metadata_objects(obj_prefix(2))
1409 # MDS_INO_LOG_BACKUP_OFFSET
1410 self.erase_metadata_objects(obj_prefix(3))
1411 # MDS_INO_LOG_POINTER_OFFSET
1412 self.erase_metadata_objects(obj_prefix(4))
1413 # MDSTables & SessionMap
1414 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1415
1416 @property
1417 def _prefix(self):
1418 """
1419 Override this to set a different
1420 """
1421 return ""
1422
1423 def _make_rank(self, rank):
1424 return "{}:{}".format(self.name, rank)
1425
1426 def _run_tool(self, tool, args, rank=None, quiet=False):
1427 # Tests frequently have [client] configuration that jacks up
1428 # the objecter log level (unlikely to be interesting here)
1429 # and does not set the mds log level (very interesting here)
1430 if quiet:
1431 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1432 else:
1433 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1434
1435 if rank is not None:
1436 base_args.extend(["--rank", "%s" % str(rank)])
1437
1438 t1 = datetime.datetime.now()
1439 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
1440 duration = datetime.datetime.now() - t1
1441 log.debug("Ran {0} in time {1}, result:\n{2}".format(
1442 base_args + args, duration, r
1443 ))
1444 return r
1445
1446 @property
1447 def tool_remote(self):
1448 """
1449 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1450 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1451 so that tests can use this remote to go get locally written output files from the tools.
1452 """
1453 return self.mon_manager.controller
1454
1455 def journal_tool(self, args, rank, quiet=False):
1456 """
1457 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1458 """
1459 fs_rank = self._make_rank(rank)
1460 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1461
1462 def meta_tool(self, args, rank, quiet=False):
1463 """
1464 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1465 """
1466 fs_rank = self._make_rank(rank)
1467 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1468
1469 def table_tool(self, args, quiet=False):
1470 """
1471 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1472 """
1473 return self._run_tool("cephfs-table-tool", args, None, quiet)
1474
1475 def data_scan(self, args, quiet=False, worker_count=1):
1476 """
1477 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1478
1479 :param worker_count: if greater than 1, multiple workers will be run
1480 in parallel and the return value will be None
1481 """
1482
1483 workers = []
1484
1485 for n in range(0, worker_count):
1486 if worker_count > 1:
1487 # data-scan args first token is a command, followed by args to it.
1488 # insert worker arguments after the command.
1489 cmd = args[0]
1490 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1491 else:
1492 worker_args = args
1493
1494 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1495 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1496
1497 for w in workers:
1498 w.get()
1499
1500 if worker_count == 1:
1501 return workers[0].value
1502 else:
1503 return None
1504
1505 def is_full(self):
1506 return self.is_pool_full(self.get_data_pool_name())
1507
1508 def authorize(self, client_id, caps=('/', 'rw')):
1509 """
1510 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1511 keyring.
1512
1513 client_id: client id that will be authorized
1514 caps: tuple containing the path and permission (can be r or rw)
1515 respectively.
1516 """
1517 client_name = 'client.' + client_id
1518 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1519 client_name, *caps)
1520
1521 def grow(self, new_max_mds, status=None):
1522 oldmax = self.get_var('max_mds', status=status)
1523 assert(new_max_mds > oldmax)
1524 self.set_max_mds(new_max_mds)
1525 return self.wait_for_daemons()
1526
1527 def shrink(self, new_max_mds, status=None):
1528 oldmax = self.get_var('max_mds', status=status)
1529 assert(new_max_mds < oldmax)
1530 self.set_max_mds(new_max_mds)
1531 return self.wait_for_daemons()
1532
1533 def run_scrub(self, cmd, rank=0):
1534 return self.rank_tell(["scrub"] + cmd, rank)
1535
1536 def get_scrub_status(self, rank=0):
1537 return self.run_scrub(["status"], rank)
1538
1539 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1540 timeout=300, reverse=False):
1541 # time out after "timeout" seconds and assume as done
1542 if result is None:
1543 result = "no active scrubs running"
1544 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1545 while proceed():
1546 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1547 assert out_json is not None
1548 if not reverse:
1549 if result in out_json['status']:
1550 log.info("all active scrubs completed")
1551 return True
1552 else:
1553 if result not in out_json['status']:
1554 log.info("all active scrubs completed")
1555 return True
1556
1557 if tag is not None:
1558 status = out_json['scrubs'][tag]
1559 if status is not None:
1560 log.info(f"scrub status for tag:{tag} - {status}")
1561 else:
1562 log.info(f"scrub has completed for tag:{tag}")
1563 return True
1564
1565 # timed out waiting for scrub to complete
1566 return False