]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
8788070e7170c31df6ca33deb0bc3d6d925c070f
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11
12 from io import BytesIO, StringIO
13 from errno import EBUSY
14
15 from teuthology.exceptions import CommandFailedError
16 from teuthology import misc
17 from teuthology.nuke import clear_firewall
18 from teuthology.parallel import parallel
19 from teuthology import contextutil
20 from tasks.ceph_manager import write_conf
21 from tasks import ceph_manager
22
23
24 log = logging.getLogger(__name__)
25
26
27 DAEMON_WAIT_TIMEOUT = 120
28 ROOT_INO = 1
29
30 class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
54
55 class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
62 class FSMissing(Exception):
63 def __init__(self, ident):
64 self.ident = ident
65
66 def __str__(self):
67 return f"File system {self.ident} does not exist in the map"
68
69 class FSStatus(object):
70 """
71 Operations on a snapshot of the FSMap.
72 """
73 def __init__(self, mon_manager, epoch=None):
74 self.mon = mon_manager
75 cmd = ["fs", "dump", "--format=json"]
76 if epoch is not None:
77 cmd.append(str(epoch))
78 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
79
80 def __str__(self):
81 return json.dumps(self.map, indent = 2, sort_keys = True)
82
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self, key):
85 """
86 Get a field from the fsmap.
87 """
88 return self.map[key]
89
90 def get_filesystems(self):
91 """
92 Iterator for all filesystems.
93 """
94 for fs in self.map['filesystems']:
95 yield fs
96
97 def get_all(self):
98 """
99 Iterator for all the mds_info components in the FSMap.
100 """
101 for info in self.map['standbys']:
102 yield info
103 for fs in self.map['filesystems']:
104 for info in fs['mdsmap']['info'].values():
105 yield info
106
107 def get_standbys(self):
108 """
109 Iterator for all standbys.
110 """
111 for info in self.map['standbys']:
112 yield info
113
114 def get_fsmap(self, fscid):
115 """
116 Get the fsmap for the given FSCID.
117 """
118 for fs in self.map['filesystems']:
119 if fscid is None or fs['id'] == fscid:
120 return fs
121 raise FSMissing(fscid)
122
123 def get_fsmap_byname(self, name):
124 """
125 Get the fsmap for the given file system name.
126 """
127 for fs in self.map['filesystems']:
128 if name is None or fs['mdsmap']['fs_name'] == name:
129 return fs
130 raise FSMissing(name)
131
132 def get_replays(self, fscid):
133 """
134 Get the standby:replay MDS for the given FSCID.
135 """
136 fs = self.get_fsmap(fscid)
137 for info in fs['mdsmap']['info'].values():
138 if info['state'] == 'up:standby-replay':
139 yield info
140
141 def get_ranks(self, fscid):
142 """
143 Get the ranks for the given FSCID.
144 """
145 fs = self.get_fsmap(fscid)
146 for info in fs['mdsmap']['info'].values():
147 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
148 yield info
149
150 def get_damaged(self, fscid):
151 """
152 Get the damaged ranks for the given FSCID.
153 """
154 fs = self.get_fsmap(fscid)
155 return fs['mdsmap']['damaged']
156
157 def get_rank(self, fscid, rank):
158 """
159 Get the rank for the given FSCID.
160 """
161 for info in self.get_ranks(fscid):
162 if info['rank'] == rank:
163 return info
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
165
166 def get_mds(self, name):
167 """
168 Get the info for the given MDS name.
169 """
170 for info in self.get_all():
171 if info['name'] == name:
172 return info
173 return None
174
175 def get_mds_addr(self, name):
176 """
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
178 """
179 info = self.get_mds(name)
180 if info:
181 return info['addr']
182 else:
183 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
184 raise RuntimeError("MDS id '{0}' not found in map".format(name))
185
186 def get_mds_addrs(self, name):
187 """
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
189 """
190 info = self.get_mds(name)
191 if info:
192 return [e['addr'] for e in info['addrs']['addrvec']]
193 else:
194 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name))
196
197 def get_mds_gid(self, gid):
198 """
199 Get the info for the given MDS gid.
200 """
201 for info in self.get_all():
202 if info['gid'] == gid:
203 return info
204 return None
205
206 def hadfailover(self, status):
207 """
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
210 """
211 for fs in status.map['filesystems']:
212 for info in fs['mdsmap']['info'].values():
213 oldinfo = self.get_mds_gid(info['gid'])
214 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
215 return True
216 #all matching
217 return False
218
219 class CephCluster(object):
220 @property
221 def admin_remote(self):
222 first_mon = misc.get_first_mon(self._ctx, None)
223 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
224 return result
225
226 def __init__(self, ctx) -> None:
227 self._ctx = ctx
228 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
229
230 def get_config(self, key, service_type=None):
231 """
232 Get config from mon by default, or a specific service if caller asks for it
233 """
234 if service_type is None:
235 service_type = 'mon'
236
237 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def set_ceph_conf(self, subsys, key, value):
241 if subsys not in self._ctx.ceph['ceph'].conf:
242 self._ctx.ceph['ceph'].conf[subsys] = {}
243 self._ctx.ceph['ceph'].conf[subsys][key] = value
244 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
246
247 def clear_ceph_conf(self, subsys, key):
248 del self._ctx.ceph['ceph'].conf[subsys][key]
249 write_conf(self._ctx)
250
251 def json_asok(self, command, service_type, service_id, timeout=None):
252 if timeout is None:
253 timeout = 15*60
254 command.insert(0, '--format=json')
255 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
256 response_data = proc.stdout.getvalue().strip()
257 if len(response_data) > 0:
258 j = json.loads(response_data)
259 pretty = json.dumps(j, sort_keys=True, indent=2)
260 log.debug(f"_json_asok output\n{pretty}")
261 return j
262 else:
263 log.debug("_json_asok output empty")
264 return None
265
266 def is_addr_blocklisted(self, addr):
267 blocklist = json.loads(self.mon_manager.raw_cluster_cmd(
268 "osd", "dump", "--format=json"))['blocklist']
269 if addr in blocklist:
270 return True
271 log.warn(f'The address {addr} is not blocklisted')
272 return False
273
274
275 class MDSCluster(CephCluster):
276 """
277 Collective operations on all the MDS daemons in the Ceph cluster. These
278 daemons may be in use by various Filesystems.
279
280 For the benefit of pre-multi-filesystem tests, this class is also
281 a parent of Filesystem. The correct way to use MDSCluster going forward is
282 as a separate instance outside of your (multiple) Filesystem instances.
283 """
284
285 def __init__(self, ctx):
286 super(MDSCluster, self).__init__(ctx)
287
288 @property
289 def mds_ids(self):
290 # do this dynamically because the list of ids may change periodically with cephadm
291 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
292
293 @property
294 def mds_daemons(self):
295 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
296
297 def _one_or_all(self, mds_id, cb, in_parallel=True):
298 """
299 Call a callback for a single named MDS, or for all.
300
301 Note that the parallelism here isn't for performance, it's to avoid being overly kind
302 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
303 avoid being overly kind by executing them in a particular order. However, some actions
304 don't cope with being done in parallel, so it's optional (`in_parallel`)
305
306 :param mds_id: MDS daemon name, or None
307 :param cb: Callback taking single argument of MDS daemon name
308 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
309 """
310
311 if mds_id is None:
312 if in_parallel:
313 with parallel() as p:
314 for mds_id in self.mds_ids:
315 p.spawn(cb, mds_id)
316 else:
317 for mds_id in self.mds_ids:
318 cb(mds_id)
319 else:
320 cb(mds_id)
321
322 def get_config(self, key, service_type=None):
323 """
324 get_config specialization of service_type="mds"
325 """
326 if service_type != "mds":
327 return super(MDSCluster, self).get_config(key, service_type)
328
329 # Some tests stop MDS daemons, don't send commands to a dead one:
330 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
331 service_id = random.sample(running_daemons, 1)[0]
332 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
333
334 def mds_stop(self, mds_id=None):
335 """
336 Stop the MDS daemon process(se). If it held a rank, that rank
337 will eventually go laggy.
338 """
339 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
340
341 def mds_fail(self, mds_id=None):
342 """
343 Inform MDSMonitor of the death of the daemon process(es). If it held
344 a rank, that rank will be relinquished.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
347
348 def mds_restart(self, mds_id=None):
349 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
350
351 def mds_fail_restart(self, mds_id=None):
352 """
353 Variation on restart that includes marking MDSs as failed, so that doing this
354 operation followed by waiting for healthy daemon states guarantees that they
355 have gone down and come up, rather than potentially seeing the healthy states
356 that existed before the restart.
357 """
358 def _fail_restart(id_):
359 self.mds_daemons[id_].stop()
360 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
361 self.mds_daemons[id_].restart()
362
363 self._one_or_all(mds_id, _fail_restart)
364
365 def mds_signal(self, mds_id, sig, silent=False):
366 """
367 signal a MDS daemon
368 """
369 self.mds_daemons[mds_id].signal(sig, silent);
370
371 def newfs(self, name='cephfs', create=True):
372 return Filesystem(self._ctx, name=name, create=create)
373
374 def status(self, epoch=None):
375 return FSStatus(self.mon_manager, epoch)
376
377 def get_standby_daemons(self):
378 return set([s['name'] for s in self.status().get_standbys()])
379
380 def get_mds_hostnames(self):
381 result = set()
382 for mds_id in self.mds_ids:
383 mds_remote = self.mon_manager.find_remote('mds', mds_id)
384 result.add(mds_remote.hostname)
385
386 return list(result)
387
388 def set_clients_block(self, blocked, mds_id=None):
389 """
390 Block (using iptables) client communications to this MDS. Be careful: if
391 other services are running on this MDS, or other MDSs try to talk to this
392 MDS, their communications may also be blocked as collatoral damage.
393
394 :param mds_id: Optional ID of MDS to block, default to all
395 :return:
396 """
397 da_flag = "-A" if blocked else "-D"
398
399 def set_block(_mds_id):
400 remote = self.mon_manager.find_remote('mds', _mds_id)
401 status = self.status()
402
403 addr = status.get_mds_addr(_mds_id)
404 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
405
406 remote.run(
407 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
409 remote.run(
410 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
411 "comment", "--comment", "teuthology"])
412
413 self._one_or_all(mds_id, set_block, in_parallel=False)
414
415 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
416 """
417 Block (using iptables) communications from a provided MDS to other MDSs.
418 Block all ports that an MDS uses for communication.
419
420 :param blocked: True to block the MDS, False otherwise
421 :param mds_rank_1: MDS rank
422 :param mds_rank_2: MDS rank
423 :return:
424 """
425 da_flag = "-A" if blocked else "-D"
426
427 def set_block(mds_ids):
428 status = self.status()
429
430 mds = mds_ids[0]
431 remote = self.mon_manager.find_remote('mds', mds)
432 addrs = status.get_mds_addrs(mds)
433 for addr in addrs:
434 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
435 remote.run(
436 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
437 "comment", "--comment", "teuthology"])
438
439
440 mds = mds_ids[1]
441 remote = self.mon_manager.find_remote('mds', mds)
442 addrs = status.get_mds_addrs(mds)
443 for addr in addrs:
444 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
445 remote.run(
446 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
448 remote.run(
449 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
450 "comment", "--comment", "teuthology"])
451
452 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
453
454 def clear_firewall(self):
455 clear_firewall(self._ctx)
456
457 def get_mds_info(self, mds_id):
458 return FSStatus(self.mon_manager).get_mds(mds_id)
459
460 def is_pool_full(self, pool_name):
461 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
462 for pool in pools:
463 if pool['pool_name'] == pool_name:
464 return 'full' in pool['flags_names'].split(",")
465
466 raise RuntimeError("Pool not found '{0}'".format(pool_name))
467
468 def delete_all_filesystems(self):
469 """
470 Remove all filesystems that exist, and any pools in use by them.
471 """
472 for fs in self.status().get_filesystems():
473 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
474
475 @property
476 def beacon_timeout(self):
477 """
478 Generate an acceptable timeout for the mons to drive some MDSMap change
479 because of missed beacons from some MDS. This involves looking up the
480 grace period in use by the mons and adding an acceptable buffer.
481 """
482
483 grace = float(self.get_config("mds_beacon_grace", service_type="mon"))
484 return grace*2+15
485
486
487 class Filesystem(MDSCluster):
488 """
489 This object is for driving a CephFS filesystem. The MDS daemons driven by
490 MDSCluster may be shared with other Filesystems.
491 """
492 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
493 super(Filesystem, self).__init__(ctx)
494
495 self.name = name
496 self.id = None
497 self.metadata_pool_name = None
498 self.metadata_overlay = False
499 self.data_pool_name = None
500 self.data_pools = None
501 self.fs_config = fs_config
502 self.ec_profile = fs_config.get('ec_profile')
503
504 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
505 self.client_id = client_list[0]
506 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
507
508 if name is not None:
509 if fscid is not None:
510 raise RuntimeError("cannot specify fscid when creating fs")
511 if create and not self.legacy_configured():
512 self.create()
513 else:
514 if fscid is not None:
515 self.id = fscid
516 self.getinfo(refresh = True)
517
518 # Stash a reference to the first created filesystem on ctx, so
519 # that if someone drops to the interactive shell they can easily
520 # poke our methods.
521 if not hasattr(self._ctx, "filesystem"):
522 self._ctx.filesystem = self
523
524 def dead(self):
525 try:
526 return not bool(self.get_mds_map())
527 except FSMissing:
528 return True
529
530 def get_task_status(self, status_key):
531 return self.mon_manager.get_service_task_status("mds", status_key)
532
533 def getinfo(self, refresh = False):
534 status = self.status()
535 if self.id is not None:
536 fsmap = status.get_fsmap(self.id)
537 elif self.name is not None:
538 fsmap = status.get_fsmap_byname(self.name)
539 else:
540 fss = [fs for fs in status.get_filesystems()]
541 if len(fss) == 1:
542 fsmap = fss[0]
543 elif len(fss) == 0:
544 raise RuntimeError("no file system available")
545 else:
546 raise RuntimeError("more than one file system available")
547 self.id = fsmap['id']
548 self.name = fsmap['mdsmap']['fs_name']
549 self.get_pool_names(status = status, refresh = refresh)
550 return status
551
552 def set_metadata_overlay(self, overlay):
553 if self.id is not None:
554 raise RuntimeError("cannot specify fscid when configuring overlay")
555 self.metadata_overlay = overlay
556
557 def reach_max_mds(self):
558 status = self.wait_for_daemons()
559 mds_map = self.get_mds_map(status=status)
560 assert(mds_map['in'] == list(range(0, mds_map['max_mds'])))
561
562 def reset(self):
563 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
564
565 def fail(self):
566 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
567
568 def set_flag(self, var, *args):
569 a = map(lambda x: str(x).lower(), args)
570 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
571
572 def set_allow_multifs(self, yes=True):
573 self.set_flag("enable_multiple", yes)
574
575 def set_var(self, var, *args):
576 a = map(lambda x: str(x).lower(), args)
577 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
578
579 def set_down(self, down=True):
580 self.set_var("down", str(down).lower())
581
582 def set_joinable(self, joinable=True):
583 self.set_var("joinable", joinable)
584
585 def set_max_mds(self, max_mds):
586 self.set_var("max_mds", "%d" % max_mds)
587
588 def set_session_timeout(self, timeout):
589 self.set_var("session_timeout", "%d" % timeout)
590
591 def set_allow_standby_replay(self, yes):
592 self.set_var("allow_standby_replay", yes)
593
594 def set_allow_new_snaps(self, yes):
595 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
596
597 def compat(self, *args):
598 a = map(lambda x: str(x).lower(), args)
599 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
600
601 def add_compat(self, *args):
602 self.compat("add_compat", *args)
603
604 def add_incompat(self, *args):
605 self.compat("add_incompat", *args)
606
607 def rm_compat(self, *args):
608 self.compat("rm_compat", *args)
609
610 def rm_incompat(self, *args):
611 self.compat("rm_incompat", *args)
612
613 def required_client_features(self, *args, **kwargs):
614 c = ["fs", "required_client_features", self.name, *args]
615 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
616
617 # Since v15.1.0 the pg autoscale mode has been enabled as default,
618 # will let the pg autoscale mode to calculate the pg_num as needed.
619 # We set the pg_num_min to 64 to make sure that pg autoscale mode
620 # won't set the pg_num to low to fix Tracker#45434.
621 pg_num = 64
622 pg_num_min = 64
623 target_size_ratio = 0.9
624 target_size_ratio_ec = 0.9
625
626 def create(self):
627 if self.name is None:
628 self.name = "cephfs"
629 if self.metadata_pool_name is None:
630 self.metadata_pool_name = "{0}_metadata".format(self.name)
631 if self.data_pool_name is None:
632 data_pool_name = "{0}_data".format(self.name)
633 else:
634 data_pool_name = self.data_pool_name
635
636 # will use the ec pool to store the data and a small amount of
637 # metadata still goes to the primary data pool for all files.
638 if not self.metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
639 self.target_size_ratio = 0.05
640
641 log.debug("Creating filesystem '{0}'".format(self.name))
642
643 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
644 self.metadata_pool_name,
645 '--pg_num_min', str(self.pg_num_min))
646
647 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
648 data_pool_name, str(self.pg_num),
649 '--pg_num_min', str(self.pg_num_min),
650 '--target_size_ratio',
651 str(self.target_size_ratio))
652
653 if self.metadata_overlay:
654 self.mon_manager.raw_cluster_cmd('fs', 'new',
655 self.name, self.metadata_pool_name, data_pool_name,
656 '--allow-dangerous-metadata-overlay')
657 else:
658 self.mon_manager.raw_cluster_cmd('fs', 'new',
659 self.name,
660 self.metadata_pool_name,
661 data_pool_name)
662
663 if self.ec_profile and 'disabled' not in self.ec_profile:
664 ec_data_pool_name = data_pool_name + "_ec"
665 log.debug("EC profile is %s", self.ec_profile)
666 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
667 cmd.extend(self.ec_profile)
668 self.mon_manager.raw_cluster_cmd(*cmd)
669 self.mon_manager.raw_cluster_cmd(
670 'osd', 'pool', 'create', ec_data_pool_name,
671 'erasure', ec_data_pool_name,
672 '--pg_num_min', str(self.pg_num_min),
673 '--target_size_ratio', str(self.target_size_ratio_ec))
674 self.mon_manager.raw_cluster_cmd(
675 'osd', 'pool', 'set',
676 ec_data_pool_name, 'allow_ec_overwrites', 'true')
677 self.add_data_pool(ec_data_pool_name, create=False)
678 self.check_pool_application(ec_data_pool_name)
679
680 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
681
682 self.check_pool_application(self.metadata_pool_name)
683 self.check_pool_application(data_pool_name)
684
685 # Turn off spurious standby count warnings from modifying max_mds in tests.
686 try:
687 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
688 except CommandFailedError as e:
689 if e.exitstatus == 22:
690 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
691 pass
692 else:
693 raise
694
695 if self.fs_config is not None:
696 max_mds = self.fs_config.get('max_mds', 1)
697 if max_mds > 1:
698 self.set_max_mds(max_mds)
699
700 standby_replay = self.fs_config.get('standby_replay', False)
701 self.set_allow_standby_replay(standby_replay)
702
703 # If absent will use the default value (60 seconds)
704 session_timeout = self.fs_config.get('session_timeout', 60)
705 if session_timeout != 60:
706 self.set_session_timeout(session_timeout)
707
708 self.getinfo(refresh = True)
709
710 # wait pgs to be clean
711 self.mon_manager.wait_for_clean()
712
713 def run_client_payload(self, cmd):
714 # avoid circular dep by importing here:
715 from tasks.cephfs.fuse_mount import FuseMount
716
717 # Wait for at MDS daemons to be ready before mounting the
718 # ceph-fuse client in run_client_payload()
719 self.wait_for_daemons()
720
721 d = misc.get_testdir(self._ctx)
722 m = FuseMount(self._ctx, d, "admin", self.client_remote, cephfs_name=self.name)
723 m.mount_wait()
724 m.run_shell_payload(cmd)
725 m.umount_wait(require_clean=True)
726
727 def _remove_pool(self, name, **kwargs):
728 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
729 return self.mon_manager.ceph(c, **kwargs)
730
731 def rm(self, **kwargs):
732 c = f'fs rm {self.name} --yes-i-really-mean-it'
733 return self.mon_manager.ceph(c, **kwargs)
734
735 def remove_pools(self, data_pools):
736 self._remove_pool(self.get_metadata_pool_name())
737 for poolname in data_pools:
738 try:
739 self._remove_pool(poolname)
740 except CommandFailedError as e:
741 # EBUSY, this data pool is used by two metadata pools, let the
742 # 2nd pass delete it
743 if e.exitstatus == EBUSY:
744 pass
745 else:
746 raise
747
748 def destroy(self, reset_obj_attrs=True):
749 log.info(f'Destroying file system {self.name} and related pools')
750
751 if self.dead():
752 log.debug('already dead...')
753 return
754
755 data_pools = self.get_data_pool_names(refresh=True)
756
757 # make sure no MDSs are attached to given FS.
758 self.fail()
759 self.rm()
760
761 self.remove_pools(data_pools)
762
763 if reset_obj_attrs:
764 self.id = None
765 self.name = None
766 self.metadata_pool_name = None
767 self.data_pool_name = None
768 self.data_pools = None
769
770 def recreate(self):
771 self.destroy()
772
773 self.create()
774 self.getinfo(refresh=True)
775
776 def check_pool_application(self, pool_name):
777 osd_map = self.mon_manager.get_osd_dump_json()
778 for pool in osd_map['pools']:
779 if pool['pool_name'] == pool_name:
780 if "application_metadata" in pool:
781 if not "cephfs" in pool['application_metadata']:
782 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
783 format(pool_name=pool_name))
784
785 def __del__(self):
786 if getattr(self._ctx, "filesystem", None) == self:
787 delattr(self._ctx, "filesystem")
788
789 def exists(self):
790 """
791 Whether a filesystem exists in the mon's filesystem list
792 """
793 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
794 return self.name in [fs['name'] for fs in fs_list]
795
796 def legacy_configured(self):
797 """
798 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
799 the case, the caller should avoid using Filesystem.create
800 """
801 try:
802 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
803 pools = json.loads(out_text)
804 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
805 if metadata_pool_exists:
806 self.metadata_pool_name = 'metadata'
807 except CommandFailedError as e:
808 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
809 # structured output (--format) from the CLI.
810 if e.exitstatus == 22:
811 metadata_pool_exists = True
812 else:
813 raise
814
815 return metadata_pool_exists
816
817 def _df(self):
818 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
819
820 # may raise FSMissing
821 def get_mds_map(self, status=None):
822 if status is None:
823 status = self.status()
824 return status.get_fsmap(self.id)['mdsmap']
825
826 def get_var(self, var, status=None):
827 return self.get_mds_map(status=status)[var]
828
829 def set_dir_layout(self, mount, path, layout):
830 for name, value in layout.items():
831 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
832
833 def add_data_pool(self, name, create=True):
834 if create:
835 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
836 '--pg_num_min', str(self.pg_num_min))
837 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
838 self.get_pool_names(refresh = True)
839 for poolid, fs_name in self.data_pools.items():
840 if name == fs_name:
841 return poolid
842 raise RuntimeError("could not get just created pool '{0}'".format(name))
843
844 def get_pool_names(self, refresh = False, status = None):
845 if refresh or self.metadata_pool_name is None or self.data_pools is None:
846 if status is None:
847 status = self.status()
848 fsmap = status.get_fsmap(self.id)
849
850 osd_map = self.mon_manager.get_osd_dump_json()
851 id_to_name = {}
852 for p in osd_map['pools']:
853 id_to_name[p['pool']] = p['pool_name']
854
855 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
856 self.data_pools = {}
857 for data_pool in fsmap['mdsmap']['data_pools']:
858 self.data_pools[data_pool] = id_to_name[data_pool]
859
860 def get_data_pool_name(self, refresh = False):
861 if refresh or self.data_pools is None:
862 self.get_pool_names(refresh = True)
863 assert(len(self.data_pools) == 1)
864 return next(iter(self.data_pools.values()))
865
866 def get_data_pool_id(self, refresh = False):
867 """
868 Don't call this if you have multiple data pools
869 :return: integer
870 """
871 if refresh or self.data_pools is None:
872 self.get_pool_names(refresh = True)
873 assert(len(self.data_pools) == 1)
874 return next(iter(self.data_pools.keys()))
875
876 def get_data_pool_names(self, refresh = False):
877 if refresh or self.data_pools is None:
878 self.get_pool_names(refresh = True)
879 return list(self.data_pools.values())
880
881 def get_metadata_pool_name(self):
882 return self.metadata_pool_name
883
884 def set_data_pool_name(self, name):
885 if self.id is not None:
886 raise RuntimeError("can't set filesystem name if its fscid is set")
887 self.data_pool_name = name
888
889 def get_pool_pg_num(self, pool_name):
890 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
891 pool_name, 'pg_num',
892 '--format=json-pretty'))
893 return int(pgs['pg_num'])
894
895 def get_namespace_id(self):
896 return self.id
897
898 def get_pool_df(self, pool_name):
899 """
900 Return a dict like:
901 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
902 """
903 for pool_df in self._df()['pools']:
904 if pool_df['name'] == pool_name:
905 return pool_df['stats']
906
907 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
908
909 def get_usage(self):
910 return self._df()['stats']['total_used_bytes']
911
912 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
913 """
914 Return true if all daemons are in one of active, standby, standby-replay, and
915 at least max_mds daemons are in 'active'.
916
917 Unlike most of Filesystem, this function is tolerant of new-style `fs`
918 commands being missing, because we are part of the ceph installation
919 process during upgrade suites, so must fall back to old style commands
920 when we get an EINVAL on a new style command.
921
922 :return:
923 """
924 # First, check to see that processes haven't exited with an error code
925 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
926 mds.check_status()
927
928 active_count = 0
929 mds_map = self.get_mds_map(status=status)
930
931 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
932
933 for mds_id, mds_status in mds_map['info'].items():
934 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
935 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
936 return False
937 elif mds_status['state'] == 'up:active':
938 active_count += 1
939
940 log.debug("are_daemons_healthy: {0}/{1}".format(
941 active_count, mds_map['max_mds']
942 ))
943
944 if not skip_max_mds_check:
945 if active_count > mds_map['max_mds']:
946 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
947 return False
948 elif active_count == mds_map['max_mds']:
949 # The MDSMap says these guys are active, but let's check they really are
950 for mds_id, mds_status in mds_map['info'].items():
951 if mds_status['state'] == 'up:active':
952 try:
953 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
954 except CommandFailedError as cfe:
955 if cfe.exitstatus == errno.EINVAL:
956 # Old version, can't do this check
957 continue
958 else:
959 # MDS not even running
960 return False
961
962 if daemon_status['state'] != 'up:active':
963 # MDS hasn't taken the latest map yet
964 return False
965
966 return True
967 else:
968 return False
969 else:
970 log.debug("are_daemons_healthy: skipping max_mds check")
971 return True
972
973 def get_daemon_names(self, state=None, status=None):
974 """
975 Return MDS daemon names of those daemons in the given state
976 :param state:
977 :return:
978 """
979 mdsmap = self.get_mds_map(status)
980 result = []
981 for mds_status in sorted(mdsmap['info'].values(),
982 key=lambda _: _['rank']):
983 if mds_status['state'] == state or state is None:
984 result.append(mds_status['name'])
985
986 return result
987
988 def get_active_names(self, status=None):
989 """
990 Return MDS daemon names of those daemons holding ranks
991 in state up:active
992
993 :return: list of strings like ['a', 'b'], sorted by rank
994 """
995 return self.get_daemon_names("up:active", status=status)
996
997 def get_all_mds_rank(self, status=None):
998 mdsmap = self.get_mds_map(status)
999 result = []
1000 for mds_status in sorted(mdsmap['info'].values(),
1001 key=lambda _: _['rank']):
1002 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1003 result.append(mds_status['rank'])
1004
1005 return result
1006
1007 def get_rank(self, rank=None, status=None):
1008 if status is None:
1009 status = self.getinfo()
1010 if rank is None:
1011 rank = 0
1012 return status.get_rank(self.id, rank)
1013
1014 def rank_restart(self, rank=0, status=None):
1015 name = self.get_rank(rank=rank, status=status)['name']
1016 self.mds_restart(mds_id=name)
1017
1018 def rank_signal(self, signal, rank=0, status=None):
1019 name = self.get_rank(rank=rank, status=status)['name']
1020 self.mds_signal(name, signal)
1021
1022 def rank_freeze(self, yes, rank=0):
1023 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1024
1025 def rank_fail(self, rank=0):
1026 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1027
1028 def get_ranks(self, status=None):
1029 if status is None:
1030 status = self.getinfo()
1031 return status.get_ranks(self.id)
1032
1033 def get_damaged(self, status=None):
1034 if status is None:
1035 status = self.getinfo()
1036 return status.get_damaged(self.id)
1037
1038 def get_replays(self, status=None):
1039 if status is None:
1040 status = self.getinfo()
1041 return status.get_replays(self.id)
1042
1043 def get_replay(self, rank=0, status=None):
1044 for replay in self.get_replays(status=status):
1045 if replay['rank'] == rank:
1046 return replay
1047 return None
1048
1049 def get_rank_names(self, status=None):
1050 """
1051 Return MDS daemon names of those daemons holding a rank,
1052 sorted by rank. This includes e.g. up:replay/reconnect
1053 as well as active, but does not include standby or
1054 standby-replay.
1055 """
1056 mdsmap = self.get_mds_map(status)
1057 result = []
1058 for mds_status in sorted(mdsmap['info'].values(),
1059 key=lambda _: _['rank']):
1060 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1061 result.append(mds_status['name'])
1062
1063 return result
1064
1065 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
1066 """
1067 Wait until all daemons are healthy
1068 :return:
1069 """
1070
1071 if timeout is None:
1072 timeout = DAEMON_WAIT_TIMEOUT
1073
1074 if status is None:
1075 status = self.status()
1076
1077 elapsed = 0
1078 while True:
1079 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1080 return status
1081 else:
1082 time.sleep(1)
1083 elapsed += 1
1084
1085 if elapsed > timeout:
1086 log.debug("status = {0}".format(status))
1087 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1088
1089 status = self.status()
1090
1091 def dencoder(self, obj_type, obj_blob):
1092 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1093 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1094 return p.stdout.getvalue()
1095
1096 def rados(self, *args, **kwargs):
1097 """
1098 Callout to rados CLI.
1099 """
1100
1101 return self.mon_manager.do_rados(*args, **kwargs)
1102
1103 def radosm(self, *args, **kwargs):
1104 """
1105 Interact with the metadata pool via rados CLI.
1106 """
1107
1108 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1109
1110 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
1111 """
1112 Interact with the metadata pool via rados CLI. Get the stdout.
1113 """
1114
1115 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
1116
1117 def get_metadata_object(self, object_type, object_id):
1118 """
1119 Retrieve an object from the metadata pool, pass it through
1120 ceph-dencoder to dump it to JSON, and return the decoded object.
1121 """
1122
1123 o = self.radosmo(['get', object_id, '-'])
1124 j = self.dencoder(object_type, o)
1125 try:
1126 return json.loads(j)
1127 except (TypeError, ValueError):
1128 log.error("Failed to decode JSON: '{0}'".format(j))
1129 raise
1130
1131 def get_journal_version(self):
1132 """
1133 Read the JournalPointer and Journal::Header objects to learn the version of
1134 encoding in use.
1135 """
1136 journal_pointer_object = '400.00000000'
1137 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1138 journal_ino = journal_pointer_dump['journal_pointer']['front']
1139
1140 journal_header_object = "{0:x}.00000000".format(journal_ino)
1141 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1142
1143 version = journal_header_dump['journal_header']['stream_format']
1144 log.debug("Read journal version {0}".format(version))
1145
1146 return version
1147
1148 def mds_asok(self, command, mds_id=None, timeout=None):
1149 if mds_id is None:
1150 return self.rank_asok(command, timeout=timeout)
1151
1152 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1153
1154 def mds_tell(self, command, mds_id=None):
1155 if mds_id is None:
1156 return self.rank_tell(command)
1157
1158 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1159
1160 def rank_asok(self, command, rank=0, status=None, timeout=None):
1161 info = self.get_rank(rank=rank, status=status)
1162 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1163
1164 def rank_tell(self, command, rank=0, status=None):
1165 try:
1166 out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command)
1167 return json.loads(out)
1168 except json.decoder.JSONDecodeError:
1169 log.error("could not decode: {}".format(out))
1170 raise
1171
1172 def ranks_tell(self, command, status=None):
1173 if status is None:
1174 status = self.status()
1175 out = []
1176 for r in status.get_ranks(self.id):
1177 result = self.rank_tell(command, rank=r['rank'], status=status)
1178 out.append((r['rank'], result))
1179 return sorted(out)
1180
1181 def ranks_perf(self, f, status=None):
1182 perf = self.ranks_tell(["perf", "dump"], status=status)
1183 out = []
1184 for rank, perf in perf:
1185 out.append((rank, f(perf)))
1186 return out
1187
1188 def read_cache(self, path, depth=None):
1189 cmd = ["dump", "tree", path]
1190 if depth is not None:
1191 cmd.append(depth.__str__())
1192 result = self.mds_asok(cmd)
1193 if len(result) == 0:
1194 raise RuntimeError("Path not found in cache: {0}".format(path))
1195
1196 return result
1197
1198 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1199 """
1200 Block until the MDS reaches a particular state, or a failure condition
1201 is met.
1202
1203 When there are multiple MDSs, succeed when exaclty one MDS is in the
1204 goal state, or fail when any MDS is in the reject state.
1205
1206 :param goal_state: Return once the MDS is in this state
1207 :param reject: Fail if the MDS enters this state before the goal state
1208 :param timeout: Fail if this many seconds pass before reaching goal
1209 :return: number of seconds waited, rounded down to integer
1210 """
1211
1212 started_at = time.time()
1213 while True:
1214 status = self.status()
1215 if rank is not None:
1216 try:
1217 mds_info = status.get_rank(self.id, rank)
1218 current_state = mds_info['state'] if mds_info else None
1219 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1220 except:
1221 mdsmap = self.get_mds_map(status=status)
1222 if rank in mdsmap['failed']:
1223 log.debug("Waiting for rank {0} to come back.".format(rank))
1224 current_state = None
1225 else:
1226 raise
1227 elif mds_id is not None:
1228 # mds_info is None if no daemon with this ID exists in the map
1229 mds_info = status.get_mds(mds_id)
1230 current_state = mds_info['state'] if mds_info else None
1231 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1232 else:
1233 # In general, look for a single MDS
1234 states = [m['state'] for m in status.get_ranks(self.id)]
1235 if [s for s in states if s == goal_state] == [goal_state]:
1236 current_state = goal_state
1237 elif reject in states:
1238 current_state = reject
1239 else:
1240 current_state = None
1241 log.debug("mapped states {0} to {1}".format(states, current_state))
1242
1243 elapsed = time.time() - started_at
1244 if current_state == goal_state:
1245 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
1246 return elapsed
1247 elif reject is not None and current_state == reject:
1248 raise RuntimeError("MDS in reject state {0}".format(current_state))
1249 elif timeout is not None and elapsed > timeout:
1250 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1251 raise RuntimeError(
1252 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1253 elapsed, goal_state, current_state
1254 ))
1255 else:
1256 time.sleep(1)
1257
1258 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
1259 if pool is None:
1260 pool = self.get_data_pool_name()
1261
1262 obj_name = "{0:x}.00000000".format(ino_no)
1263
1264 args = ["getxattr", obj_name, xattr_name]
1265 try:
1266 proc = self.rados(args, pool=pool, stdout=BytesIO())
1267 except CommandFailedError as e:
1268 log.error(e.__str__())
1269 raise ObjectNotFound(obj_name)
1270
1271 obj_blob = proc.stdout.getvalue()
1272 return json.loads(self.dencoder(obj_type, obj_blob).strip())
1273
1274 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1275 """
1276 Write to an xattr of the 0th data object of an inode. Will
1277 succeed whether the object and/or xattr already exist or not.
1278
1279 :param ino_no: integer inode number
1280 :param xattr_name: string name of the xattr
1281 :param data: byte array data to write to the xattr
1282 :param pool: name of data pool or None to use primary data pool
1283 :return: None
1284 """
1285 if pool is None:
1286 pool = self.get_data_pool_name()
1287
1288 obj_name = "{0:x}.00000000".format(ino_no)
1289 args = ["setxattr", obj_name, xattr_name, data]
1290 self.rados(args, pool=pool)
1291
1292 def read_symlink(self, ino_no, pool=None):
1293 return self._read_data_xattr(ino_no, "symlink", "string_wrapper", pool)
1294
1295 def read_backtrace(self, ino_no, pool=None):
1296 """
1297 Read the backtrace from the data pool, return a dict in the format
1298 given by inode_backtrace_t::dump, which is something like:
1299
1300 ::
1301
1302 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1303 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1304
1305 { "ino": 1099511627778,
1306 "ancestors": [
1307 { "dirino": 1,
1308 "dname": "blah",
1309 "version": 11}],
1310 "pool": 1,
1311 "old_pools": []}
1312
1313 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1314 one data pool and that will be used.
1315 """
1316 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1317
1318 def read_layout(self, ino_no, pool=None):
1319 """
1320 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1321 ::
1322 {
1323 "stripe_unit": 4194304,
1324 "stripe_count": 1,
1325 "object_size": 4194304,
1326 "pool_id": 1,
1327 "pool_ns": "",
1328 }
1329
1330 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1331 one data pool and that will be used.
1332 """
1333 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1334
1335 def _enumerate_data_objects(self, ino, size):
1336 """
1337 Get the list of expected data objects for a range, and the list of objects
1338 that really exist.
1339
1340 :return a tuple of two lists of strings (expected, actual)
1341 """
1342 stripe_size = 1024 * 1024 * 4
1343
1344 size = max(stripe_size, size)
1345
1346 want_objects = [
1347 "{0:x}.{1:08x}".format(ino, n)
1348 for n in range(0, ((size - 1) // stripe_size) + 1)
1349 ]
1350
1351 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
1352
1353 return want_objects, exist_objects
1354
1355 def data_objects_present(self, ino, size):
1356 """
1357 Check that *all* the expected data objects for an inode are present in the data pool
1358 """
1359
1360 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1361 missing = set(want_objects) - set(exist_objects)
1362
1363 if missing:
1364 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
1365 ino, size, missing
1366 ))
1367 return False
1368 else:
1369 log.debug("All objects for ino {0} size {1} found".format(ino, size))
1370 return True
1371
1372 def data_objects_absent(self, ino, size):
1373 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1374 present = set(want_objects) & set(exist_objects)
1375
1376 if present:
1377 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1378 ino, size, present
1379 ))
1380 return False
1381 else:
1382 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
1383 return True
1384
1385 def dirfrag_exists(self, ino, frag):
1386 try:
1387 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1388 except CommandFailedError:
1389 return False
1390 else:
1391 return True
1392
1393 def list_dirfrag(self, dir_ino):
1394 """
1395 Read the named object and return the list of omap keys
1396
1397 :return a list of 0 or more strings
1398 """
1399
1400 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1401
1402 try:
1403 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
1404 except CommandFailedError as e:
1405 log.error(e.__str__())
1406 raise ObjectNotFound(dirfrag_obj_name)
1407
1408 return key_list_str.strip().split("\n") if key_list_str else []
1409
1410 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1411 """
1412 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1413 warning : The splitting of directory is not considered here.
1414 """
1415
1416 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1417 try:
1418 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1419 except CommandFailedError as e:
1420 log.error(e.__str__())
1421 raise ObjectNotFound(dir_ino)
1422
1423 def erase_metadata_objects(self, prefix):
1424 """
1425 For all objects in the metadata pool matching the prefix,
1426 erase them.
1427
1428 This O(N) with the number of objects in the pool, so only suitable
1429 for use on toy test filesystems.
1430 """
1431 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
1432 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1433 for o in matching_objects:
1434 self.radosm(["rm", o])
1435
1436 def erase_mds_objects(self, rank):
1437 """
1438 Erase all the per-MDS objects for a particular rank. This includes
1439 inotable, sessiontable, journal
1440 """
1441
1442 def obj_prefix(multiplier):
1443 """
1444 MDS object naming conventions like rank 1's
1445 journal is at 201.***
1446 """
1447 return "%x." % (multiplier * 0x100 + rank)
1448
1449 # MDS_INO_LOG_OFFSET
1450 self.erase_metadata_objects(obj_prefix(2))
1451 # MDS_INO_LOG_BACKUP_OFFSET
1452 self.erase_metadata_objects(obj_prefix(3))
1453 # MDS_INO_LOG_POINTER_OFFSET
1454 self.erase_metadata_objects(obj_prefix(4))
1455 # MDSTables & SessionMap
1456 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1457
1458 @property
1459 def _prefix(self):
1460 """
1461 Override this to set a different
1462 """
1463 return ""
1464
1465 def _make_rank(self, rank):
1466 return "{}:{}".format(self.name, rank)
1467
1468 def _run_tool(self, tool, args, rank=None, quiet=False):
1469 # Tests frequently have [client] configuration that jacks up
1470 # the objecter log level (unlikely to be interesting here)
1471 # and does not set the mds log level (very interesting here)
1472 if quiet:
1473 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1474 else:
1475 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1476
1477 if rank is not None:
1478 base_args.extend(["--rank", "%s" % str(rank)])
1479
1480 t1 = datetime.datetime.now()
1481 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
1482 duration = datetime.datetime.now() - t1
1483 log.debug("Ran {0} in time {1}, result:\n{2}".format(
1484 base_args + args, duration, r
1485 ))
1486 return r
1487
1488 @property
1489 def tool_remote(self):
1490 """
1491 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1492 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1493 so that tests can use this remote to go get locally written output files from the tools.
1494 """
1495 return self.mon_manager.controller
1496
1497 def journal_tool(self, args, rank, quiet=False):
1498 """
1499 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1500 """
1501 fs_rank = self._make_rank(rank)
1502 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1503
1504 def meta_tool(self, args, rank, quiet=False):
1505 """
1506 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1507 """
1508 fs_rank = self._make_rank(rank)
1509 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1510
1511 def table_tool(self, args, quiet=False):
1512 """
1513 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1514 """
1515 return self._run_tool("cephfs-table-tool", args, None, quiet)
1516
1517 def data_scan(self, args, quiet=False, worker_count=1):
1518 """
1519 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1520
1521 :param worker_count: if greater than 1, multiple workers will be run
1522 in parallel and the return value will be None
1523 """
1524
1525 workers = []
1526
1527 for n in range(0, worker_count):
1528 if worker_count > 1:
1529 # data-scan args first token is a command, followed by args to it.
1530 # insert worker arguments after the command.
1531 cmd = args[0]
1532 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1533 else:
1534 worker_args = args
1535
1536 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1537 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1538
1539 for w in workers:
1540 w.get()
1541
1542 if worker_count == 1:
1543 return workers[0].value
1544 else:
1545 return None
1546
1547 def is_full(self):
1548 return self.is_pool_full(self.get_data_pool_name())
1549
1550 def authorize(self, client_id, caps=('/', 'rw')):
1551 """
1552 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1553 keyring.
1554
1555 client_id: client id that will be authorized
1556 caps: tuple containing the path and permission (can be r or rw)
1557 respectively.
1558 """
1559 client_name = 'client.' + client_id
1560 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1561 client_name, *caps)
1562
1563 def grow(self, new_max_mds, status=None):
1564 oldmax = self.get_var('max_mds', status=status)
1565 assert(new_max_mds > oldmax)
1566 self.set_max_mds(new_max_mds)
1567 return self.wait_for_daemons()
1568
1569 def shrink(self, new_max_mds, status=None):
1570 oldmax = self.get_var('max_mds', status=status)
1571 assert(new_max_mds < oldmax)
1572 self.set_max_mds(new_max_mds)
1573 return self.wait_for_daemons()
1574
1575 def run_scrub(self, cmd, rank=0):
1576 return self.rank_tell(["scrub"] + cmd, rank)
1577
1578 def get_scrub_status(self, rank=0):
1579 return self.run_scrub(["status"], rank)
1580
1581 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1582 timeout=300, reverse=False):
1583 # time out after "timeout" seconds and assume as done
1584 if result is None:
1585 result = "no active scrubs running"
1586 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1587 while proceed():
1588 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1589 assert out_json is not None
1590 if not reverse:
1591 if result in out_json['status']:
1592 log.info("all active scrubs completed")
1593 return True
1594 else:
1595 if result not in out_json['status']:
1596 log.info("all active scrubs completed")
1597 return True
1598
1599 if tag is not None:
1600 status = out_json['scrubs'][tag]
1601 if status is not None:
1602 log.info(f"scrub status for tag:{tag} - {status}")
1603 else:
1604 log.info(f"scrub has completed for tag:{tag}")
1605 return True
1606
1607 # timed out waiting for scrub to complete
1608 return False