]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11
12 from io import BytesIO, StringIO
13 from errno import EBUSY
14
15 from teuthology.exceptions import CommandFailedError
16 from teuthology import misc
17 from teuthology.nuke import clear_firewall
18 from teuthology.parallel import parallel
19 from teuthology import contextutil
20 from tasks.ceph_manager import write_conf
21 from tasks import ceph_manager
22
23
24 log = logging.getLogger(__name__)
25
26
27 DAEMON_WAIT_TIMEOUT = 120
28 ROOT_INO = 1
29
30 class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
54
55 class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
62 class FSMissing(Exception):
63 def __init__(self, ident):
64 self.ident = ident
65
66 def __str__(self):
67 return f"File system {self.ident} does not exist in the map"
68
69 class FSStatus(object):
70 """
71 Operations on a snapshot of the FSMap.
72 """
73 def __init__(self, mon_manager, epoch=None):
74 self.mon = mon_manager
75 cmd = ["fs", "dump", "--format=json"]
76 if epoch is not None:
77 cmd.append(str(epoch))
78 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
79
80 def __str__(self):
81 return json.dumps(self.map, indent = 2, sort_keys = True)
82
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self, key):
85 """
86 Get a field from the fsmap.
87 """
88 return self.map[key]
89
90 def get_filesystems(self):
91 """
92 Iterator for all filesystems.
93 """
94 for fs in self.map['filesystems']:
95 yield fs
96
97 def get_all(self):
98 """
99 Iterator for all the mds_info components in the FSMap.
100 """
101 for info in self.map['standbys']:
102 yield info
103 for fs in self.map['filesystems']:
104 for info in fs['mdsmap']['info'].values():
105 yield info
106
107 def get_standbys(self):
108 """
109 Iterator for all standbys.
110 """
111 for info in self.map['standbys']:
112 yield info
113
114 def get_fsmap(self, fscid):
115 """
116 Get the fsmap for the given FSCID.
117 """
118 for fs in self.map['filesystems']:
119 if fscid is None or fs['id'] == fscid:
120 return fs
121 raise FSMissing(fscid)
122
123 def get_fsmap_byname(self, name):
124 """
125 Get the fsmap for the given file system name.
126 """
127 for fs in self.map['filesystems']:
128 if name is None or fs['mdsmap']['fs_name'] == name:
129 return fs
130 raise FSMissing(name)
131
132 def get_replays(self, fscid):
133 """
134 Get the standby:replay MDS for the given FSCID.
135 """
136 fs = self.get_fsmap(fscid)
137 for info in fs['mdsmap']['info'].values():
138 if info['state'] == 'up:standby-replay':
139 yield info
140
141 def get_ranks(self, fscid):
142 """
143 Get the ranks for the given FSCID.
144 """
145 fs = self.get_fsmap(fscid)
146 for info in fs['mdsmap']['info'].values():
147 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
148 yield info
149
150 def get_damaged(self, fscid):
151 """
152 Get the damaged ranks for the given FSCID.
153 """
154 fs = self.get_fsmap(fscid)
155 return fs['mdsmap']['damaged']
156
157 def get_rank(self, fscid, rank):
158 """
159 Get the rank for the given FSCID.
160 """
161 for info in self.get_ranks(fscid):
162 if info['rank'] == rank:
163 return info
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
165
166 def get_mds(self, name):
167 """
168 Get the info for the given MDS name.
169 """
170 for info in self.get_all():
171 if info['name'] == name:
172 return info
173 return None
174
175 def get_mds_addr(self, name):
176 """
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
178 """
179 info = self.get_mds(name)
180 if info:
181 return info['addr']
182 else:
183 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
184 raise RuntimeError("MDS id '{0}' not found in map".format(name))
185
186 def get_mds_addrs(self, name):
187 """
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
189 """
190 info = self.get_mds(name)
191 if info:
192 return [e['addr'] for e in info['addrs']['addrvec']]
193 else:
194 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name))
196
197 def get_mds_gid(self, gid):
198 """
199 Get the info for the given MDS gid.
200 """
201 for info in self.get_all():
202 if info['gid'] == gid:
203 return info
204 return None
205
206 def hadfailover(self, status):
207 """
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
210 """
211 for fs in status.map['filesystems']:
212 for info in fs['mdsmap']['info'].values():
213 oldinfo = self.get_mds_gid(info['gid'])
214 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
215 return True
216 #all matching
217 return False
218
219 class CephCluster(object):
220 @property
221 def admin_remote(self):
222 first_mon = misc.get_first_mon(self._ctx, None)
223 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
224 return result
225
226 def __init__(self, ctx) -> None:
227 self._ctx = ctx
228 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
229
230 def get_config(self, key, service_type=None):
231 """
232 Get config from mon by default, or a specific service if caller asks for it
233 """
234 if service_type is None:
235 service_type = 'mon'
236
237 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def set_ceph_conf(self, subsys, key, value):
241 if subsys not in self._ctx.ceph['ceph'].conf:
242 self._ctx.ceph['ceph'].conf[subsys] = {}
243 self._ctx.ceph['ceph'].conf[subsys][key] = value
244 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
246
247 def clear_ceph_conf(self, subsys, key):
248 del self._ctx.ceph['ceph'].conf[subsys][key]
249 write_conf(self._ctx)
250
251 def json_asok(self, command, service_type, service_id, timeout=None):
252 if timeout is None:
253 timeout = 300
254 command.insert(0, '--format=json')
255 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
256 response_data = proc.stdout.getvalue().strip()
257 if len(response_data) > 0:
258
259 def get_nonnumeric_values(value):
260 c = {"NaN": float("nan"), "Infinity": float("inf"),
261 "-Infinity": -float("inf")}
262 return c[value]
263
264 j = json.loads(response_data.replace('inf', 'Infinity'),
265 parse_constant=get_nonnumeric_values)
266 pretty = json.dumps(j, sort_keys=True, indent=2)
267 log.debug(f"_json_asok output\n{pretty}")
268 return j
269 else:
270 log.debug("_json_asok output empty")
271 return None
272
273 def is_addr_blocklisted(self, addr):
274 blocklist = json.loads(self.mon_manager.raw_cluster_cmd(
275 "osd", "dump", "--format=json"))['blocklist']
276 if addr in blocklist:
277 return True
278 log.warn(f'The address {addr} is not blocklisted')
279 return False
280
281
282 class MDSCluster(CephCluster):
283 """
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
286
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
290 """
291
292 def __init__(self, ctx):
293 super(MDSCluster, self).__init__(ctx)
294
295 @property
296 def mds_ids(self):
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
299
300 @property
301 def mds_daemons(self):
302 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
303
304 def _one_or_all(self, mds_id, cb, in_parallel=True):
305 """
306 Call a callback for a single named MDS, or for all.
307
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
312
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
316 """
317
318 if mds_id is None:
319 if in_parallel:
320 with parallel() as p:
321 for mds_id in self.mds_ids:
322 p.spawn(cb, mds_id)
323 else:
324 for mds_id in self.mds_ids:
325 cb(mds_id)
326 else:
327 cb(mds_id)
328
329 def get_config(self, key, service_type=None):
330 """
331 get_config specialization of service_type="mds"
332 """
333 if service_type != "mds":
334 return super(MDSCluster, self).get_config(key, service_type)
335
336 # Some tests stop MDS daemons, don't send commands to a dead one:
337 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
338 service_id = random.sample(running_daemons, 1)[0]
339 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
340
341 def mds_stop(self, mds_id=None):
342 """
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
347
348 def mds_fail(self, mds_id=None):
349 """
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
352 """
353 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
354
355 def mds_restart(self, mds_id=None):
356 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
357
358 def mds_fail_restart(self, mds_id=None):
359 """
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
364 """
365 def _fail_restart(id_):
366 self.mds_daemons[id_].stop()
367 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
368 self.mds_daemons[id_].restart()
369
370 self._one_or_all(mds_id, _fail_restart)
371
372 def mds_signal(self, mds_id, sig, silent=False):
373 """
374 signal a MDS daemon
375 """
376 self.mds_daemons[mds_id].signal(sig, silent);
377
378 def mds_is_running(self, mds_id):
379 return self.mds_daemons[mds_id].running()
380
381 def newfs(self, name='cephfs', create=True):
382 return Filesystem(self._ctx, name=name, create=create)
383
384 def status(self, epoch=None):
385 return FSStatus(self.mon_manager, epoch)
386
387 def get_standby_daemons(self):
388 return set([s['name'] for s in self.status().get_standbys()])
389
390 def get_mds_hostnames(self):
391 result = set()
392 for mds_id in self.mds_ids:
393 mds_remote = self.mon_manager.find_remote('mds', mds_id)
394 result.add(mds_remote.hostname)
395
396 return list(result)
397
398 def set_clients_block(self, blocked, mds_id=None):
399 """
400 Block (using iptables) client communications to this MDS. Be careful: if
401 other services are running on this MDS, or other MDSs try to talk to this
402 MDS, their communications may also be blocked as collatoral damage.
403
404 :param mds_id: Optional ID of MDS to block, default to all
405 :return:
406 """
407 da_flag = "-A" if blocked else "-D"
408
409 def set_block(_mds_id):
410 remote = self.mon_manager.find_remote('mds', _mds_id)
411 status = self.status()
412
413 addr = status.get_mds_addr(_mds_id)
414 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
415
416 remote.run(
417 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
419 remote.run(
420 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
421 "comment", "--comment", "teuthology"])
422
423 self._one_or_all(mds_id, set_block, in_parallel=False)
424
425 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
426 """
427 Block (using iptables) communications from a provided MDS to other MDSs.
428 Block all ports that an MDS uses for communication.
429
430 :param blocked: True to block the MDS, False otherwise
431 :param mds_rank_1: MDS rank
432 :param mds_rank_2: MDS rank
433 :return:
434 """
435 da_flag = "-A" if blocked else "-D"
436
437 def set_block(mds_ids):
438 status = self.status()
439
440 mds = mds_ids[0]
441 remote = self.mon_manager.find_remote('mds', mds)
442 addrs = status.get_mds_addrs(mds)
443 for addr in addrs:
444 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
445 remote.run(
446 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"], omit_sudo=False)
448
449
450 mds = mds_ids[1]
451 remote = self.mon_manager.find_remote('mds', mds)
452 addrs = status.get_mds_addrs(mds)
453 for addr in addrs:
454 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
455 remote.run(
456 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
457 "comment", "--comment", "teuthology"], omit_sudo=False)
458 remote.run(
459 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
460 "comment", "--comment", "teuthology"], omit_sudo=False)
461
462 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
463
464 def clear_firewall(self):
465 clear_firewall(self._ctx)
466
467 def get_mds_info(self, mds_id):
468 return FSStatus(self.mon_manager).get_mds(mds_id)
469
470 def is_pool_full(self, pool_name):
471 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
472 for pool in pools:
473 if pool['pool_name'] == pool_name:
474 return 'full' in pool['flags_names'].split(",")
475
476 raise RuntimeError("Pool not found '{0}'".format(pool_name))
477
478 def delete_all_filesystems(self):
479 """
480 Remove all filesystems that exist, and any pools in use by them.
481 """
482 for fs in self.status().get_filesystems():
483 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
484
485 @property
486 def beacon_timeout(self):
487 """
488 Generate an acceptable timeout for the mons to drive some MDSMap change
489 because of missed beacons from some MDS. This involves looking up the
490 grace period in use by the mons and adding an acceptable buffer.
491 """
492
493 grace = float(self.get_config("mds_beacon_grace", service_type="mon"))
494 return grace*2+15
495
496
497 class Filesystem(MDSCluster):
498
499 """
500 Generator for all Filesystems in the cluster.
501 """
502 @classmethod
503 def get_all_fs(cls, ctx):
504 mdsc = MDSCluster(ctx)
505 status = mdsc.status()
506 for fs in status.get_filesystems():
507 yield cls(ctx, fscid=fs['id'])
508
509 """
510 This object is for driving a CephFS filesystem. The MDS daemons driven by
511 MDSCluster may be shared with other Filesystems.
512 """
513 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
514 super(Filesystem, self).__init__(ctx)
515
516 self.name = name
517 self.id = None
518 self.metadata_pool_name = None
519 self.data_pool_name = None
520 self.data_pools = None
521 self.fs_config = fs_config
522 self.ec_profile = fs_config.get('ec_profile')
523
524 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
525 self.client_id = client_list[0]
526 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
527
528 if name is not None:
529 if fscid is not None:
530 raise RuntimeError("cannot specify fscid when creating fs")
531 if create and not self.legacy_configured():
532 self.create()
533 else:
534 if fscid is not None:
535 self.id = fscid
536 self.getinfo(refresh = True)
537
538 # Stash a reference to the first created filesystem on ctx, so
539 # that if someone drops to the interactive shell they can easily
540 # poke our methods.
541 if not hasattr(self._ctx, "filesystem"):
542 self._ctx.filesystem = self
543
544 def dead(self):
545 try:
546 return not bool(self.get_mds_map())
547 except FSMissing:
548 return True
549
550 def get_task_status(self, status_key):
551 return self.mon_manager.get_service_task_status("mds", status_key)
552
553 def getinfo(self, refresh = False):
554 status = self.status()
555 if self.id is not None:
556 fsmap = status.get_fsmap(self.id)
557 elif self.name is not None:
558 fsmap = status.get_fsmap_byname(self.name)
559 else:
560 fss = [fs for fs in status.get_filesystems()]
561 if len(fss) == 1:
562 fsmap = fss[0]
563 elif len(fss) == 0:
564 raise RuntimeError("no file system available")
565 else:
566 raise RuntimeError("more than one file system available")
567 self.id = fsmap['id']
568 self.name = fsmap['mdsmap']['fs_name']
569 self.get_pool_names(status = status, refresh = refresh)
570 return status
571
572 def reach_max_mds(self):
573 status = self.wait_for_daemons()
574 mds_map = self.get_mds_map(status=status)
575 assert(mds_map['in'] == list(range(0, mds_map['max_mds'])))
576
577 def reset(self):
578 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
579
580 def fail(self):
581 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
582
583 def set_flag(self, var, *args):
584 a = map(lambda x: str(x).lower(), args)
585 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
586
587 def set_allow_multifs(self, yes=True):
588 self.set_flag("enable_multiple", yes)
589
590 def set_var(self, var, *args):
591 a = map(lambda x: str(x).lower(), args)
592 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
593
594 def set_down(self, down=True):
595 self.set_var("down", str(down).lower())
596
597 def set_joinable(self, joinable=True):
598 self.set_var("joinable", joinable)
599
600 def set_max_mds(self, max_mds):
601 self.set_var("max_mds", "%d" % max_mds)
602
603 def set_session_timeout(self, timeout):
604 self.set_var("session_timeout", "%d" % timeout)
605
606 def set_allow_standby_replay(self, yes):
607 self.set_var("allow_standby_replay", yes)
608
609 def set_allow_new_snaps(self, yes):
610 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
611
612 def set_bal_rank_mask(self, bal_rank_mask):
613 self.set_var("bal_rank_mask", bal_rank_mask)
614
615 def set_refuse_client_session(self, yes):
616 self.set_var("refuse_client_session", yes)
617
618 def compat(self, *args):
619 a = map(lambda x: str(x).lower(), args)
620 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
621
622 def add_compat(self, *args):
623 self.compat("add_compat", *args)
624
625 def add_incompat(self, *args):
626 self.compat("add_incompat", *args)
627
628 def rm_compat(self, *args):
629 self.compat("rm_compat", *args)
630
631 def rm_incompat(self, *args):
632 self.compat("rm_incompat", *args)
633
634 def required_client_features(self, *args, **kwargs):
635 c = ["fs", "required_client_features", self.name, *args]
636 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
637
638 # Since v15.1.0 the pg autoscale mode has been enabled as default,
639 # will let the pg autoscale mode to calculate the pg_num as needed.
640 # We set the pg_num_min to 64 to make sure that pg autoscale mode
641 # won't set the pg_num to low to fix Tracker#45434.
642 pg_num = 64
643 pg_num_min = 64
644 target_size_ratio = 0.9
645 target_size_ratio_ec = 0.9
646
647 def create(self, recover=False, metadata_overlay=False):
648 if self.name is None:
649 self.name = "cephfs"
650 if self.metadata_pool_name is None:
651 self.metadata_pool_name = "{0}_metadata".format(self.name)
652 if self.data_pool_name is None:
653 data_pool_name = "{0}_data".format(self.name)
654 else:
655 data_pool_name = self.data_pool_name
656
657 # will use the ec pool to store the data and a small amount of
658 # metadata still goes to the primary data pool for all files.
659 if not metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
660 self.target_size_ratio = 0.05
661
662 log.debug("Creating filesystem '{0}'".format(self.name))
663
664 try:
665 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
666 self.metadata_pool_name,
667 '--pg_num_min', str(self.pg_num_min))
668
669 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
670 data_pool_name, str(self.pg_num),
671 '--pg_num_min', str(self.pg_num_min),
672 '--target_size_ratio',
673 str(self.target_size_ratio))
674 except CommandFailedError as e:
675 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
676 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
677 self.metadata_pool_name,
678 str(self.pg_num_min))
679
680 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
681 data_pool_name, str(self.pg_num),
682 str(self.pg_num_min))
683 else:
684 raise
685
686 args = ["fs", "new", self.name, self.metadata_pool_name, data_pool_name]
687 if recover:
688 args.append('--recover')
689 if metadata_overlay:
690 args.append('--allow-dangerous-metadata-overlay')
691 self.mon_manager.raw_cluster_cmd(*args)
692
693 if not recover:
694 if self.ec_profile and 'disabled' not in self.ec_profile:
695 ec_data_pool_name = data_pool_name + "_ec"
696 log.debug("EC profile is %s", self.ec_profile)
697 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
698 cmd.extend(self.ec_profile)
699 self.mon_manager.raw_cluster_cmd(*cmd)
700 try:
701 self.mon_manager.raw_cluster_cmd(
702 'osd', 'pool', 'create', ec_data_pool_name,
703 'erasure', ec_data_pool_name,
704 '--pg_num_min', str(self.pg_num_min),
705 '--target_size_ratio', str(self.target_size_ratio_ec))
706 except CommandFailedError as e:
707 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
708 self.mon_manager.raw_cluster_cmd(
709 'osd', 'pool', 'create', ec_data_pool_name,
710 str(self.pg_num_min), 'erasure', ec_data_pool_name)
711 else:
712 raise
713 self.mon_manager.raw_cluster_cmd(
714 'osd', 'pool', 'set',
715 ec_data_pool_name, 'allow_ec_overwrites', 'true')
716 self.add_data_pool(ec_data_pool_name, create=False)
717 self.check_pool_application(ec_data_pool_name)
718
719 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
720
721 self.check_pool_application(self.metadata_pool_name)
722 self.check_pool_application(data_pool_name)
723
724 # Turn off spurious standby count warnings from modifying max_mds in tests.
725 try:
726 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
727 except CommandFailedError as e:
728 if e.exitstatus == 22:
729 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
730 pass
731 else:
732 raise
733
734 if self.fs_config is not None:
735 log.debug(f"fs_config: {self.fs_config}")
736 max_mds = self.fs_config.get('max_mds', 1)
737 if max_mds > 1:
738 self.set_max_mds(max_mds)
739
740 standby_replay = self.fs_config.get('standby_replay', False)
741 self.set_allow_standby_replay(standby_replay)
742
743 # If absent will use the default value (60 seconds)
744 session_timeout = self.fs_config.get('session_timeout', 60)
745 if session_timeout != 60:
746 self.set_session_timeout(session_timeout)
747
748 if self.fs_config.get('subvols', None) is not None:
749 log.debug(f"Creating {self.fs_config.get('subvols')} subvols "
750 f"for filesystem '{self.name}'")
751 if not hasattr(self._ctx, "created_subvols"):
752 self._ctx.created_subvols = dict()
753
754 subvols = self.fs_config.get('subvols')
755 assert(isinstance(subvols, dict))
756 assert(isinstance(subvols['create'], int))
757 assert(subvols['create'] > 0)
758
759 for sv in range(0, subvols['create']):
760 sv_name = f'sv_{sv}'
761 self.mon_manager.raw_cluster_cmd(
762 'fs', 'subvolume', 'create', self.name, sv_name,
763 self.fs_config.get('subvol_options', ''))
764
765 if self.name not in self._ctx.created_subvols:
766 self._ctx.created_subvols[self.name] = []
767
768 subvol_path = self.mon_manager.raw_cluster_cmd(
769 'fs', 'subvolume', 'getpath', self.name, sv_name)
770 subvol_path = subvol_path.strip()
771 self._ctx.created_subvols[self.name].append(subvol_path)
772 else:
773 log.debug(f"Not Creating any subvols for filesystem '{self.name}'")
774
775
776 self.getinfo(refresh = True)
777
778 # wait pgs to be clean
779 self.mon_manager.wait_for_clean()
780
781 def run_client_payload(self, cmd):
782 # avoid circular dep by importing here:
783 from tasks.cephfs.fuse_mount import FuseMount
784
785 # Wait for at MDS daemons to be ready before mounting the
786 # ceph-fuse client in run_client_payload()
787 self.wait_for_daemons()
788
789 d = misc.get_testdir(self._ctx)
790 m = FuseMount(self._ctx, d, "admin", self.client_remote, cephfs_name=self.name)
791 m.mount_wait()
792 m.run_shell_payload(cmd)
793 m.umount_wait(require_clean=True)
794
795 def _remove_pool(self, name, **kwargs):
796 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
797 return self.mon_manager.ceph(c, **kwargs)
798
799 def rm(self, **kwargs):
800 c = f'fs rm {self.name} --yes-i-really-mean-it'
801 return self.mon_manager.ceph(c, **kwargs)
802
803 def remove_pools(self, data_pools):
804 self._remove_pool(self.get_metadata_pool_name())
805 for poolname in data_pools:
806 try:
807 self._remove_pool(poolname)
808 except CommandFailedError as e:
809 # EBUSY, this data pool is used by two metadata pools, let the
810 # 2nd pass delete it
811 if e.exitstatus == EBUSY:
812 pass
813 else:
814 raise
815
816 def destroy(self, reset_obj_attrs=True):
817 log.info(f'Destroying file system {self.name} and related pools')
818
819 if self.dead():
820 log.debug('already dead...')
821 return
822
823 data_pools = self.get_data_pool_names(refresh=True)
824
825 # make sure no MDSs are attached to given FS.
826 self.fail()
827 self.rm()
828
829 self.remove_pools(data_pools)
830
831 if reset_obj_attrs:
832 self.id = None
833 self.name = None
834 self.metadata_pool_name = None
835 self.data_pool_name = None
836 self.data_pools = None
837
838 def recreate(self):
839 self.destroy()
840
841 self.create()
842 self.getinfo(refresh=True)
843
844 def check_pool_application(self, pool_name):
845 osd_map = self.mon_manager.get_osd_dump_json()
846 for pool in osd_map['pools']:
847 if pool['pool_name'] == pool_name:
848 if "application_metadata" in pool:
849 if not "cephfs" in pool['application_metadata']:
850 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
851 format(pool_name=pool_name))
852
853 def __del__(self):
854 if getattr(self._ctx, "filesystem", None) == self:
855 delattr(self._ctx, "filesystem")
856
857 def exists(self):
858 """
859 Whether a filesystem exists in the mon's filesystem list
860 """
861 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
862 return self.name in [fs['name'] for fs in fs_list]
863
864 def legacy_configured(self):
865 """
866 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
867 the case, the caller should avoid using Filesystem.create
868 """
869 try:
870 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
871 pools = json.loads(out_text)
872 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
873 if metadata_pool_exists:
874 self.metadata_pool_name = 'metadata'
875 except CommandFailedError as e:
876 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
877 # structured output (--format) from the CLI.
878 if e.exitstatus == 22:
879 metadata_pool_exists = True
880 else:
881 raise
882
883 return metadata_pool_exists
884
885 def _df(self):
886 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
887
888 # may raise FSMissing
889 def get_mds_map(self, status=None):
890 if status is None:
891 status = self.status()
892 return status.get_fsmap(self.id)['mdsmap']
893
894 def get_var(self, var, status=None):
895 return self.get_mds_map(status=status)[var]
896
897 def set_dir_layout(self, mount, path, layout):
898 for name, value in layout.items():
899 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
900
901 def add_data_pool(self, name, create=True):
902 if create:
903 try:
904 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
905 '--pg_num_min', str(self.pg_num_min))
906 except CommandFailedError as e:
907 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
908 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
909 str(self.pg_num_min))
910 else:
911 raise
912 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
913 self.get_pool_names(refresh = True)
914 for poolid, fs_name in self.data_pools.items():
915 if name == fs_name:
916 return poolid
917 raise RuntimeError("could not get just created pool '{0}'".format(name))
918
919 def get_pool_names(self, refresh = False, status = None):
920 if refresh or self.metadata_pool_name is None or self.data_pools is None:
921 if status is None:
922 status = self.status()
923 fsmap = status.get_fsmap(self.id)
924
925 osd_map = self.mon_manager.get_osd_dump_json()
926 id_to_name = {}
927 for p in osd_map['pools']:
928 id_to_name[p['pool']] = p['pool_name']
929
930 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
931 self.data_pools = {}
932 for data_pool in fsmap['mdsmap']['data_pools']:
933 self.data_pools[data_pool] = id_to_name[data_pool]
934
935 def get_data_pool_name(self, refresh = False):
936 if refresh or self.data_pools is None:
937 self.get_pool_names(refresh = True)
938 assert(len(self.data_pools) == 1)
939 return next(iter(self.data_pools.values()))
940
941 def get_data_pool_id(self, refresh = False):
942 """
943 Don't call this if you have multiple data pools
944 :return: integer
945 """
946 if refresh or self.data_pools is None:
947 self.get_pool_names(refresh = True)
948 assert(len(self.data_pools) == 1)
949 return next(iter(self.data_pools.keys()))
950
951 def get_data_pool_names(self, refresh = False):
952 if refresh or self.data_pools is None:
953 self.get_pool_names(refresh = True)
954 return list(self.data_pools.values())
955
956 def get_metadata_pool_name(self):
957 return self.metadata_pool_name
958
959 def set_data_pool_name(self, name):
960 if self.id is not None:
961 raise RuntimeError("can't set filesystem name if its fscid is set")
962 self.data_pool_name = name
963
964 def get_pool_pg_num(self, pool_name):
965 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
966 pool_name, 'pg_num',
967 '--format=json-pretty'))
968 return int(pgs['pg_num'])
969
970 def get_namespace_id(self):
971 return self.id
972
973 def get_pool_df(self, pool_name):
974 """
975 Return a dict like:
976 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
977 """
978 for pool_df in self._df()['pools']:
979 if pool_df['name'] == pool_name:
980 return pool_df['stats']
981
982 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
983
984 def get_usage(self):
985 return self._df()['stats']['total_used_bytes']
986
987 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
988 """
989 Return true if all daemons are in one of active, standby, standby-replay, and
990 at least max_mds daemons are in 'active'.
991
992 Unlike most of Filesystem, this function is tolerant of new-style `fs`
993 commands being missing, because we are part of the ceph installation
994 process during upgrade suites, so must fall back to old style commands
995 when we get an EINVAL on a new style command.
996
997 :return:
998 """
999 # First, check to see that processes haven't exited with an error code
1000 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
1001 mds.check_status()
1002
1003 active_count = 0
1004 mds_map = self.get_mds_map(status=status)
1005
1006 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
1007
1008 for mds_id, mds_status in mds_map['info'].items():
1009 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
1010 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
1011 return False
1012 elif mds_status['state'] == 'up:active':
1013 active_count += 1
1014
1015 log.debug("are_daemons_healthy: {0}/{1}".format(
1016 active_count, mds_map['max_mds']
1017 ))
1018
1019 if not skip_max_mds_check:
1020 if active_count > mds_map['max_mds']:
1021 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
1022 return False
1023 elif active_count == mds_map['max_mds']:
1024 # The MDSMap says these guys are active, but let's check they really are
1025 for mds_id, mds_status in mds_map['info'].items():
1026 if mds_status['state'] == 'up:active':
1027 try:
1028 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
1029 except CommandFailedError as cfe:
1030 if cfe.exitstatus == errno.EINVAL:
1031 # Old version, can't do this check
1032 continue
1033 else:
1034 # MDS not even running
1035 return False
1036
1037 if daemon_status['state'] != 'up:active':
1038 # MDS hasn't taken the latest map yet
1039 return False
1040
1041 return True
1042 else:
1043 return False
1044 else:
1045 log.debug("are_daemons_healthy: skipping max_mds check")
1046 return True
1047
1048 def get_daemon_names(self, state=None, status=None):
1049 """
1050 Return MDS daemon names of those daemons in the given state
1051 :param state:
1052 :return:
1053 """
1054 mdsmap = self.get_mds_map(status)
1055 result = []
1056 for mds_status in sorted(mdsmap['info'].values(),
1057 key=lambda _: _['rank']):
1058 if mds_status['state'] == state or state is None:
1059 result.append(mds_status['name'])
1060
1061 return result
1062
1063 def get_active_names(self, status=None):
1064 """
1065 Return MDS daemon names of those daemons holding ranks
1066 in state up:active
1067
1068 :return: list of strings like ['a', 'b'], sorted by rank
1069 """
1070 return self.get_daemon_names("up:active", status=status)
1071
1072 def get_all_mds_rank(self, status=None):
1073 mdsmap = self.get_mds_map(status)
1074 result = []
1075 for mds_status in sorted(mdsmap['info'].values(),
1076 key=lambda _: _['rank']):
1077 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1078 result.append(mds_status['rank'])
1079
1080 return result
1081
1082 def get_rank(self, rank=None, status=None):
1083 if status is None:
1084 status = self.getinfo()
1085 if rank is None:
1086 rank = 0
1087 return status.get_rank(self.id, rank)
1088
1089 def rank_restart(self, rank=0, status=None):
1090 name = self.get_rank(rank=rank, status=status)['name']
1091 self.mds_restart(mds_id=name)
1092
1093 def rank_signal(self, signal, rank=0, status=None):
1094 name = self.get_rank(rank=rank, status=status)['name']
1095 self.mds_signal(name, signal)
1096
1097 def rank_freeze(self, yes, rank=0):
1098 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1099
1100 def rank_repaired(self, rank):
1101 self.mon_manager.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self.id, rank))
1102
1103 def rank_fail(self, rank=0):
1104 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1105
1106 def rank_is_running(self, rank=0, status=None):
1107 name = self.get_rank(rank=rank, status=status)['name']
1108 return self.mds_is_running(name)
1109
1110 def get_ranks(self, status=None):
1111 if status is None:
1112 status = self.getinfo()
1113 return status.get_ranks(self.id)
1114
1115 def get_damaged(self, status=None):
1116 if status is None:
1117 status = self.getinfo()
1118 return status.get_damaged(self.id)
1119
1120 def get_replays(self, status=None):
1121 if status is None:
1122 status = self.getinfo()
1123 return status.get_replays(self.id)
1124
1125 def get_replay(self, rank=0, status=None):
1126 for replay in self.get_replays(status=status):
1127 if replay['rank'] == rank:
1128 return replay
1129 return None
1130
1131 def get_rank_names(self, status=None):
1132 """
1133 Return MDS daemon names of those daemons holding a rank,
1134 sorted by rank. This includes e.g. up:replay/reconnect
1135 as well as active, but does not include standby or
1136 standby-replay.
1137 """
1138 mdsmap = self.get_mds_map(status)
1139 result = []
1140 for mds_status in sorted(mdsmap['info'].values(),
1141 key=lambda _: _['rank']):
1142 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1143 result.append(mds_status['name'])
1144
1145 return result
1146
1147 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
1148 """
1149 Wait until all daemons are healthy
1150 :return:
1151 """
1152
1153 if timeout is None:
1154 timeout = DAEMON_WAIT_TIMEOUT
1155
1156 if self.id is None:
1157 status = self.getinfo(refresh=True)
1158
1159 if status is None:
1160 status = self.status()
1161
1162 elapsed = 0
1163 while True:
1164 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1165 return status
1166 else:
1167 time.sleep(1)
1168 elapsed += 1
1169
1170 if elapsed > timeout:
1171 log.debug("status = {0}".format(status))
1172 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1173
1174 status = self.status()
1175
1176 def dencoder(self, obj_type, obj_blob):
1177 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1178 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1179 return p.stdout.getvalue()
1180
1181 def rados(self, *args, **kwargs):
1182 """
1183 Callout to rados CLI.
1184 """
1185
1186 return self.mon_manager.do_rados(*args, **kwargs)
1187
1188 def radosm(self, *args, **kwargs):
1189 """
1190 Interact with the metadata pool via rados CLI.
1191 """
1192
1193 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1194
1195 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
1196 """
1197 Interact with the metadata pool via rados CLI. Get the stdout.
1198 """
1199
1200 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
1201
1202 def get_metadata_object(self, object_type, object_id):
1203 """
1204 Retrieve an object from the metadata pool, pass it through
1205 ceph-dencoder to dump it to JSON, and return the decoded object.
1206 """
1207
1208 o = self.radosmo(['get', object_id, '-'])
1209 j = self.dencoder(object_type, o)
1210 try:
1211 return json.loads(j)
1212 except (TypeError, ValueError):
1213 log.error("Failed to decode JSON: '{0}'".format(j))
1214 raise
1215
1216 def get_journal_version(self):
1217 """
1218 Read the JournalPointer and Journal::Header objects to learn the version of
1219 encoding in use.
1220 """
1221 journal_pointer_object = '400.00000000'
1222 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1223 journal_ino = journal_pointer_dump['journal_pointer']['front']
1224
1225 journal_header_object = "{0:x}.00000000".format(journal_ino)
1226 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1227
1228 version = journal_header_dump['journal_header']['stream_format']
1229 log.debug("Read journal version {0}".format(version))
1230
1231 return version
1232
1233 def mds_asok(self, command, mds_id=None, timeout=None):
1234 if mds_id is None:
1235 return self.rank_asok(command, timeout=timeout)
1236
1237 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1238
1239 def mds_tell(self, command, mds_id=None):
1240 if mds_id is None:
1241 return self.rank_tell(command)
1242
1243 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1244
1245 def rank_asok(self, command, rank=0, status=None, timeout=None):
1246 info = self.get_rank(rank=rank, status=status)
1247 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1248
1249 def rank_tell(self, command, rank=0, status=None):
1250 try:
1251 out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command)
1252 return json.loads(out)
1253 except json.decoder.JSONDecodeError:
1254 log.error("could not decode: {}".format(out))
1255 raise
1256
1257 def ranks_tell(self, command, status=None):
1258 if status is None:
1259 status = self.status()
1260 out = []
1261 for r in status.get_ranks(self.id):
1262 result = self.rank_tell(command, rank=r['rank'], status=status)
1263 out.append((r['rank'], result))
1264 return sorted(out)
1265
1266 def ranks_perf(self, f, status=None):
1267 perf = self.ranks_tell(["perf", "dump"], status=status)
1268 out = []
1269 for rank, perf in perf:
1270 out.append((rank, f(perf)))
1271 return out
1272
1273 def read_cache(self, path, depth=None, rank=None):
1274 cmd = ["dump", "tree", path]
1275 if depth is not None:
1276 cmd.append(depth.__str__())
1277 result = self.rank_asok(cmd, rank=rank)
1278 if result is None or len(result) == 0:
1279 raise RuntimeError("Path not found in cache: {0}".format(path))
1280
1281 return result
1282
1283 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1284 """
1285 Block until the MDS reaches a particular state, or a failure condition
1286 is met.
1287
1288 When there are multiple MDSs, succeed when exaclty one MDS is in the
1289 goal state, or fail when any MDS is in the reject state.
1290
1291 :param goal_state: Return once the MDS is in this state
1292 :param reject: Fail if the MDS enters this state before the goal state
1293 :param timeout: Fail if this many seconds pass before reaching goal
1294 :return: number of seconds waited, rounded down to integer
1295 """
1296
1297 started_at = time.time()
1298 while True:
1299 status = self.status()
1300 if rank is not None:
1301 try:
1302 mds_info = status.get_rank(self.id, rank)
1303 current_state = mds_info['state'] if mds_info else None
1304 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1305 except:
1306 mdsmap = self.get_mds_map(status=status)
1307 if rank in mdsmap['failed']:
1308 log.debug("Waiting for rank {0} to come back.".format(rank))
1309 current_state = None
1310 else:
1311 raise
1312 elif mds_id is not None:
1313 # mds_info is None if no daemon with this ID exists in the map
1314 mds_info = status.get_mds(mds_id)
1315 current_state = mds_info['state'] if mds_info else None
1316 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1317 else:
1318 # In general, look for a single MDS
1319 states = [m['state'] for m in status.get_ranks(self.id)]
1320 if [s for s in states if s == goal_state] == [goal_state]:
1321 current_state = goal_state
1322 elif reject in states:
1323 current_state = reject
1324 else:
1325 current_state = None
1326 log.debug("mapped states {0} to {1}".format(states, current_state))
1327
1328 elapsed = time.time() - started_at
1329 if current_state == goal_state:
1330 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
1331 return elapsed
1332 elif reject is not None and current_state == reject:
1333 raise RuntimeError("MDS in reject state {0}".format(current_state))
1334 elif timeout is not None and elapsed > timeout:
1335 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1336 raise RuntimeError(
1337 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1338 elapsed, goal_state, current_state
1339 ))
1340 else:
1341 time.sleep(1)
1342
1343 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
1344 if pool is None:
1345 pool = self.get_data_pool_name()
1346
1347 obj_name = "{0:x}.00000000".format(ino_no)
1348
1349 args = ["getxattr", obj_name, xattr_name]
1350 try:
1351 proc = self.rados(args, pool=pool, stdout=BytesIO())
1352 except CommandFailedError as e:
1353 log.error(e.__str__())
1354 raise ObjectNotFound(obj_name)
1355
1356 obj_blob = proc.stdout.getvalue()
1357 return json.loads(self.dencoder(obj_type, obj_blob).strip())
1358
1359 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1360 """
1361 Write to an xattr of the 0th data object of an inode. Will
1362 succeed whether the object and/or xattr already exist or not.
1363
1364 :param ino_no: integer inode number
1365 :param xattr_name: string name of the xattr
1366 :param data: byte array data to write to the xattr
1367 :param pool: name of data pool or None to use primary data pool
1368 :return: None
1369 """
1370 if pool is None:
1371 pool = self.get_data_pool_name()
1372
1373 obj_name = "{0:x}.00000000".format(ino_no)
1374 args = ["setxattr", obj_name, xattr_name, data]
1375 self.rados(args, pool=pool)
1376
1377 def read_symlink(self, ino_no, pool=None):
1378 return self._read_data_xattr(ino_no, "symlink", "string_wrapper", pool)
1379
1380 def read_backtrace(self, ino_no, pool=None):
1381 """
1382 Read the backtrace from the data pool, return a dict in the format
1383 given by inode_backtrace_t::dump, which is something like:
1384
1385 ::
1386
1387 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1388 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1389
1390 { "ino": 1099511627778,
1391 "ancestors": [
1392 { "dirino": 1,
1393 "dname": "blah",
1394 "version": 11}],
1395 "pool": 1,
1396 "old_pools": []}
1397
1398 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1399 one data pool and that will be used.
1400 """
1401 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1402
1403 def read_layout(self, ino_no, pool=None):
1404 """
1405 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1406 ::
1407 {
1408 "stripe_unit": 4194304,
1409 "stripe_count": 1,
1410 "object_size": 4194304,
1411 "pool_id": 1,
1412 "pool_ns": "",
1413 }
1414
1415 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1416 one data pool and that will be used.
1417 """
1418 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1419
1420 def _enumerate_data_objects(self, ino, size):
1421 """
1422 Get the list of expected data objects for a range, and the list of objects
1423 that really exist.
1424
1425 :return a tuple of two lists of strings (expected, actual)
1426 """
1427 stripe_size = 1024 * 1024 * 4
1428
1429 size = max(stripe_size, size)
1430
1431 want_objects = [
1432 "{0:x}.{1:08x}".format(ino, n)
1433 for n in range(0, ((size - 1) // stripe_size) + 1)
1434 ]
1435
1436 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
1437
1438 return want_objects, exist_objects
1439
1440 def data_objects_present(self, ino, size):
1441 """
1442 Check that *all* the expected data objects for an inode are present in the data pool
1443 """
1444
1445 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1446 missing = set(want_objects) - set(exist_objects)
1447
1448 if missing:
1449 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
1450 ino, size, missing
1451 ))
1452 return False
1453 else:
1454 log.debug("All objects for ino {0} size {1} found".format(ino, size))
1455 return True
1456
1457 def data_objects_absent(self, ino, size):
1458 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1459 present = set(want_objects) & set(exist_objects)
1460
1461 if present:
1462 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1463 ino, size, present
1464 ))
1465 return False
1466 else:
1467 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
1468 return True
1469
1470 def dirfrag_exists(self, ino, frag):
1471 try:
1472 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1473 except CommandFailedError:
1474 return False
1475 else:
1476 return True
1477
1478 def list_dirfrag(self, dir_ino):
1479 """
1480 Read the named object and return the list of omap keys
1481
1482 :return a list of 0 or more strings
1483 """
1484
1485 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1486
1487 try:
1488 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
1489 except CommandFailedError as e:
1490 log.error(e.__str__())
1491 raise ObjectNotFound(dirfrag_obj_name)
1492
1493 return key_list_str.strip().split("\n") if key_list_str else []
1494
1495 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1496 """
1497 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1498 warning : The splitting of directory is not considered here.
1499 """
1500
1501 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1502 try:
1503 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1504 except CommandFailedError as e:
1505 log.error(e.__str__())
1506 raise ObjectNotFound(dir_ino)
1507
1508 def erase_metadata_objects(self, prefix):
1509 """
1510 For all objects in the metadata pool matching the prefix,
1511 erase them.
1512
1513 This O(N) with the number of objects in the pool, so only suitable
1514 for use on toy test filesystems.
1515 """
1516 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
1517 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1518 for o in matching_objects:
1519 self.radosm(["rm", o])
1520
1521 def erase_mds_objects(self, rank):
1522 """
1523 Erase all the per-MDS objects for a particular rank. This includes
1524 inotable, sessiontable, journal
1525 """
1526
1527 def obj_prefix(multiplier):
1528 """
1529 MDS object naming conventions like rank 1's
1530 journal is at 201.***
1531 """
1532 return "%x." % (multiplier * 0x100 + rank)
1533
1534 # MDS_INO_LOG_OFFSET
1535 self.erase_metadata_objects(obj_prefix(2))
1536 # MDS_INO_LOG_BACKUP_OFFSET
1537 self.erase_metadata_objects(obj_prefix(3))
1538 # MDS_INO_LOG_POINTER_OFFSET
1539 self.erase_metadata_objects(obj_prefix(4))
1540 # MDSTables & SessionMap
1541 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1542
1543 @property
1544 def _prefix(self):
1545 """
1546 Override this to set a different
1547 """
1548 return ""
1549
1550 def _make_rank(self, rank):
1551 return "{}:{}".format(self.name, rank)
1552
1553 def _run_tool(self, tool, args, rank=None, quiet=False):
1554 # Tests frequently have [client] configuration that jacks up
1555 # the objecter log level (unlikely to be interesting here)
1556 # and does not set the mds log level (very interesting here)
1557 if quiet:
1558 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1559 else:
1560 base_args = [os.path.join(self._prefix, tool), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1']
1561
1562 if rank is not None:
1563 base_args.extend(["--rank", "%s" % str(rank)])
1564
1565 t1 = datetime.datetime.now()
1566 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
1567 duration = datetime.datetime.now() - t1
1568 log.debug("Ran {0} in time {1}, result:\n{2}".format(
1569 base_args + args, duration, r
1570 ))
1571 return r
1572
1573 @property
1574 def tool_remote(self):
1575 """
1576 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1577 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1578 so that tests can use this remote to go get locally written output files from the tools.
1579 """
1580 return self.mon_manager.controller
1581
1582 def journal_tool(self, args, rank, quiet=False):
1583 """
1584 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1585 """
1586 fs_rank = self._make_rank(rank)
1587 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1588
1589 def meta_tool(self, args, rank, quiet=False):
1590 """
1591 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1592 """
1593 fs_rank = self._make_rank(rank)
1594 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1595
1596 def table_tool(self, args, quiet=False):
1597 """
1598 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1599 """
1600 return self._run_tool("cephfs-table-tool", args, None, quiet)
1601
1602 def data_scan(self, args, quiet=False, worker_count=1):
1603 """
1604 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1605
1606 :param worker_count: if greater than 1, multiple workers will be run
1607 in parallel and the return value will be None
1608 """
1609
1610 workers = []
1611
1612 for n in range(0, worker_count):
1613 if worker_count > 1:
1614 # data-scan args first token is a command, followed by args to it.
1615 # insert worker arguments after the command.
1616 cmd = args[0]
1617 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1618 else:
1619 worker_args = args
1620
1621 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1622 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1623
1624 for w in workers:
1625 w.get()
1626
1627 if worker_count == 1:
1628 return workers[0].value
1629 else:
1630 return None
1631
1632 def is_full(self):
1633 return self.is_pool_full(self.get_data_pool_name())
1634
1635 def authorize(self, client_id, caps=('/', 'rw')):
1636 """
1637 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1638 keyring.
1639
1640 client_id: client id that will be authorized
1641 caps: tuple containing the path and permission (can be r or rw)
1642 respectively.
1643 """
1644 if isinstance(caps[0], (tuple, list)):
1645 x = []
1646 for c in caps:
1647 x.extend(c)
1648 caps = tuple(x)
1649
1650 client_name = 'client.' + client_id
1651 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1652 client_name, *caps)
1653
1654 def grow(self, new_max_mds, status=None):
1655 oldmax = self.get_var('max_mds', status=status)
1656 assert(new_max_mds > oldmax)
1657 self.set_max_mds(new_max_mds)
1658 return self.wait_for_daemons()
1659
1660 def shrink(self, new_max_mds, status=None):
1661 oldmax = self.get_var('max_mds', status=status)
1662 assert(new_max_mds < oldmax)
1663 self.set_max_mds(new_max_mds)
1664 return self.wait_for_daemons()
1665
1666 def run_scrub(self, cmd, rank=0):
1667 return self.rank_tell(["scrub"] + cmd, rank)
1668
1669 def get_scrub_status(self, rank=0):
1670 return self.run_scrub(["status"], rank)
1671
1672 def flush(self, rank=0):
1673 return self.rank_tell(["flush", "journal"], rank=rank)
1674
1675 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1676 timeout=300, reverse=False):
1677 # time out after "timeout" seconds and assume as done
1678 if result is None:
1679 result = "no active scrubs running"
1680 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1681 while proceed():
1682 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1683 assert out_json is not None
1684 if not reverse:
1685 if result in out_json['status']:
1686 log.info("all active scrubs completed")
1687 return True
1688 else:
1689 if result not in out_json['status']:
1690 log.info("all active scrubs completed")
1691 return True
1692
1693 if tag is not None:
1694 status = out_json['scrubs'][tag]
1695 if status is not None:
1696 log.info(f"scrub status for tag:{tag} - {status}")
1697 else:
1698 log.info(f"scrub has completed for tag:{tag}")
1699 return True
1700
1701 # timed out waiting for scrub to complete
1702 return False
1703
1704 def get_damage(self, rank=None):
1705 if rank is None:
1706 result = {}
1707 for info in self.get_ranks():
1708 rank = info['rank']
1709 result[rank] = self.get_damage(rank=rank)
1710 return result
1711 else:
1712 return self.rank_tell(['damage', 'ls'], rank=rank)