]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
import ceph 16.2.7
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11 import traceback
12
13 from io import BytesIO, StringIO
14 from errno import EBUSY
15
16 from teuthology.exceptions import CommandFailedError
17 from teuthology import misc
18 from teuthology.nuke import clear_firewall
19 from teuthology.parallel import parallel
20 from teuthology import contextutil
21 from tasks.ceph_manager import write_conf
22 from tasks import ceph_manager
23
24
25 log = logging.getLogger(__name__)
26
27
28 DAEMON_WAIT_TIMEOUT = 120
29 ROOT_INO = 1
30
31 class FileLayout(object):
32 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
33 self.pool = pool
34 self.pool_namespace = pool_namespace
35 self.stripe_unit = stripe_unit
36 self.stripe_count = stripe_count
37 self.object_size = object_size
38
39 @classmethod
40 def load_from_ceph(layout_str):
41 # TODO
42 pass
43
44 def items(self):
45 if self.pool is not None:
46 yield ("pool", self.pool)
47 if self.pool_namespace:
48 yield ("pool_namespace", self.pool_namespace)
49 if self.stripe_unit is not None:
50 yield ("stripe_unit", self.stripe_unit)
51 if self.stripe_count is not None:
52 yield ("stripe_count", self.stripe_count)
53 if self.object_size is not None:
54 yield ("object_size", self.stripe_size)
55
56 class ObjectNotFound(Exception):
57 def __init__(self, object_name):
58 self._object_name = object_name
59
60 def __str__(self):
61 return "Object not found: '{0}'".format(self._object_name)
62
63 class FSMissing(Exception):
64 def __init__(self, ident):
65 self.ident = ident
66
67 def __str__(self):
68 return f"File system {self.ident} does not exist in the map"
69
70 class FSStatus(object):
71 """
72 Operations on a snapshot of the FSMap.
73 """
74 def __init__(self, mon_manager, epoch=None):
75 self.mon = mon_manager
76 cmd = ["fs", "dump", "--format=json"]
77 if epoch is not None:
78 cmd.append(str(epoch))
79 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
80
81 def __str__(self):
82 return json.dumps(self.map, indent = 2, sort_keys = True)
83
84 # Expose the fsmap for manual inspection.
85 def __getitem__(self, key):
86 """
87 Get a field from the fsmap.
88 """
89 return self.map[key]
90
91 def get_filesystems(self):
92 """
93 Iterator for all filesystems.
94 """
95 for fs in self.map['filesystems']:
96 yield fs
97
98 def get_all(self):
99 """
100 Iterator for all the mds_info components in the FSMap.
101 """
102 for info in self.map['standbys']:
103 yield info
104 for fs in self.map['filesystems']:
105 for info in fs['mdsmap']['info'].values():
106 yield info
107
108 def get_standbys(self):
109 """
110 Iterator for all standbys.
111 """
112 for info in self.map['standbys']:
113 yield info
114
115 def get_fsmap(self, fscid):
116 """
117 Get the fsmap for the given FSCID.
118 """
119 for fs in self.map['filesystems']:
120 if fscid is None or fs['id'] == fscid:
121 return fs
122 raise FSMissing(fscid)
123
124 def get_fsmap_byname(self, name):
125 """
126 Get the fsmap for the given file system name.
127 """
128 for fs in self.map['filesystems']:
129 if name is None or fs['mdsmap']['fs_name'] == name:
130 return fs
131 raise FSMissing(name)
132
133 def get_replays(self, fscid):
134 """
135 Get the standby:replay MDS for the given FSCID.
136 """
137 fs = self.get_fsmap(fscid)
138 for info in fs['mdsmap']['info'].values():
139 if info['state'] == 'up:standby-replay':
140 yield info
141
142 def get_ranks(self, fscid):
143 """
144 Get the ranks for the given FSCID.
145 """
146 fs = self.get_fsmap(fscid)
147 for info in fs['mdsmap']['info'].values():
148 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
149 yield info
150
151 def get_damaged(self, fscid):
152 """
153 Get the damaged ranks for the given FSCID.
154 """
155 fs = self.get_fsmap(fscid)
156 return fs['mdsmap']['damaged']
157
158 def get_rank(self, fscid, rank):
159 """
160 Get the rank for the given FSCID.
161 """
162 for info in self.get_ranks(fscid):
163 if info['rank'] == rank:
164 return info
165 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
166
167 def get_mds(self, name):
168 """
169 Get the info for the given MDS name.
170 """
171 for info in self.get_all():
172 if info['name'] == name:
173 return info
174 return None
175
176 def get_mds_addr(self, name):
177 """
178 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
179 """
180 info = self.get_mds(name)
181 if info:
182 return info['addr']
183 else:
184 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
185 raise RuntimeError("MDS id '{0}' not found in map".format(name))
186
187 def get_mds_addrs(self, name):
188 """
189 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
190 """
191 info = self.get_mds(name)
192 if info:
193 return [e['addr'] for e in info['addrs']['addrvec']]
194 else:
195 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
196 raise RuntimeError("MDS id '{0}' not found in map".format(name))
197
198 def get_mds_gid(self, gid):
199 """
200 Get the info for the given MDS gid.
201 """
202 for info in self.get_all():
203 if info['gid'] == gid:
204 return info
205 return None
206
207 def hadfailover(self, status):
208 """
209 Compares two statuses for mds failovers.
210 Returns True if there is a failover.
211 """
212 for fs in status.map['filesystems']:
213 for info in fs['mdsmap']['info'].values():
214 oldinfo = self.get_mds_gid(info['gid'])
215 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
216 return True
217 #all matching
218 return False
219
220 class CephCluster(object):
221 @property
222 def admin_remote(self):
223 first_mon = misc.get_first_mon(self._ctx, None)
224 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
225 return result
226
227 def __init__(self, ctx) -> None:
228 self._ctx = ctx
229 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
230
231 def get_config(self, key, service_type=None):
232 """
233 Get config from mon by default, or a specific service if caller asks for it
234 """
235 if service_type is None:
236 service_type = 'mon'
237
238 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
239 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
240
241 def set_ceph_conf(self, subsys, key, value):
242 if subsys not in self._ctx.ceph['ceph'].conf:
243 self._ctx.ceph['ceph'].conf[subsys] = {}
244 self._ctx.ceph['ceph'].conf[subsys][key] = value
245 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
246 # used a different config path this won't work.
247
248 def clear_ceph_conf(self, subsys, key):
249 del self._ctx.ceph['ceph'].conf[subsys][key]
250 write_conf(self._ctx)
251
252 def json_asok(self, command, service_type, service_id, timeout=None):
253 if timeout is None:
254 timeout = 15*60
255 command.insert(0, '--format=json')
256 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
257 response_data = proc.stdout.getvalue().strip()
258 if len(response_data) > 0:
259 j = json.loads(response_data)
260 pretty = json.dumps(j, sort_keys=True, indent=2)
261 log.debug(f"_json_asok output\n{pretty}")
262 return j
263 else:
264 log.debug("_json_asok output empty")
265 return None
266
267 def is_addr_blocklisted(self, addr=None):
268 if addr is None:
269 log.warn("Couldn't get the client address, so the blocklisted "
270 "status undetermined")
271 return False
272
273 blocklist = json.loads(self.mon_manager.run_cluster_cmd(
274 args=["osd", "blocklist", "ls", "--format=json"],
275 stdout=StringIO()).stdout.getvalue())
276 for b in blocklist:
277 if addr == b["addr"]:
278 return True
279 return False
280
281
282 class MDSCluster(CephCluster):
283 """
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
286
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
290 """
291
292 def __init__(self, ctx):
293 super(MDSCluster, self).__init__(ctx)
294
295 @property
296 def mds_ids(self):
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
299
300 @property
301 def mds_daemons(self):
302 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
303
304 def _one_or_all(self, mds_id, cb, in_parallel=True):
305 """
306 Call a callback for a single named MDS, or for all.
307
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
312
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
316 """
317
318 if mds_id is None:
319 if in_parallel:
320 with parallel() as p:
321 for mds_id in self.mds_ids:
322 p.spawn(cb, mds_id)
323 else:
324 for mds_id in self.mds_ids:
325 cb(mds_id)
326 else:
327 cb(mds_id)
328
329 def get_config(self, key, service_type=None):
330 """
331 get_config specialization of service_type="mds"
332 """
333 if service_type != "mds":
334 return super(MDSCluster, self).get_config(key, service_type)
335
336 # Some tests stop MDS daemons, don't send commands to a dead one:
337 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
338 service_id = random.sample(running_daemons, 1)[0]
339 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
340
341 def mds_stop(self, mds_id=None):
342 """
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
347
348 def mds_fail(self, mds_id=None):
349 """
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
352 """
353 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
354
355 def mds_restart(self, mds_id=None):
356 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
357
358 def mds_fail_restart(self, mds_id=None):
359 """
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
364 """
365 def _fail_restart(id_):
366 self.mds_daemons[id_].stop()
367 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
368 self.mds_daemons[id_].restart()
369
370 self._one_or_all(mds_id, _fail_restart)
371
372 def mds_signal(self, mds_id, sig, silent=False):
373 """
374 signal a MDS daemon
375 """
376 self.mds_daemons[mds_id].signal(sig, silent);
377
378 def newfs(self, name='cephfs', create=True):
379 return Filesystem(self._ctx, name=name, create=create)
380
381 def status(self, epoch=None):
382 return FSStatus(self.mon_manager, epoch)
383
384 def get_standby_daemons(self):
385 return set([s['name'] for s in self.status().get_standbys()])
386
387 def get_mds_hostnames(self):
388 result = set()
389 for mds_id in self.mds_ids:
390 mds_remote = self.mon_manager.find_remote('mds', mds_id)
391 result.add(mds_remote.hostname)
392
393 return list(result)
394
395 def set_clients_block(self, blocked, mds_id=None):
396 """
397 Block (using iptables) client communications to this MDS. Be careful: if
398 other services are running on this MDS, or other MDSs try to talk to this
399 MDS, their communications may also be blocked as collatoral damage.
400
401 :param mds_id: Optional ID of MDS to block, default to all
402 :return:
403 """
404 da_flag = "-A" if blocked else "-D"
405
406 def set_block(_mds_id):
407 remote = self.mon_manager.find_remote('mds', _mds_id)
408 status = self.status()
409
410 addr = status.get_mds_addr(_mds_id)
411 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
412
413 remote.run(
414 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
415 "comment", "--comment", "teuthology"])
416 remote.run(
417 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
419
420 self._one_or_all(mds_id, set_block, in_parallel=False)
421
422 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
423 """
424 Block (using iptables) communications from a provided MDS to other MDSs.
425 Block all ports that an MDS uses for communication.
426
427 :param blocked: True to block the MDS, False otherwise
428 :param mds_rank_1: MDS rank
429 :param mds_rank_2: MDS rank
430 :return:
431 """
432 da_flag = "-A" if blocked else "-D"
433
434 def set_block(mds_ids):
435 status = self.status()
436
437 mds = mds_ids[0]
438 remote = self.mon_manager.find_remote('mds', mds)
439 addrs = status.get_mds_addrs(mds)
440 for addr in addrs:
441 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
442 remote.run(
443 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
444 "comment", "--comment", "teuthology"])
445
446
447 mds = mds_ids[1]
448 remote = self.mon_manager.find_remote('mds', mds)
449 addrs = status.get_mds_addrs(mds)
450 for addr in addrs:
451 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
452 remote.run(
453 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
454 "comment", "--comment", "teuthology"])
455 remote.run(
456 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
457 "comment", "--comment", "teuthology"])
458
459 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
460
461 def clear_firewall(self):
462 clear_firewall(self._ctx)
463
464 def get_mds_info(self, mds_id):
465 return FSStatus(self.mon_manager).get_mds(mds_id)
466
467 def is_pool_full(self, pool_name):
468 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
469 for pool in pools:
470 if pool['pool_name'] == pool_name:
471 return 'full' in pool['flags_names'].split(",")
472
473 raise RuntimeError("Pool not found '{0}'".format(pool_name))
474
475 def delete_all_filesystems(self):
476 """
477 Remove all filesystems that exist, and any pools in use by them.
478 """
479 for fs in self.status().get_filesystems():
480 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
481
482
483 class Filesystem(MDSCluster):
484 """
485 This object is for driving a CephFS filesystem. The MDS daemons driven by
486 MDSCluster may be shared with other Filesystems.
487 """
488 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
489 super(Filesystem, self).__init__(ctx)
490
491 self.name = name
492 self.id = None
493 self.metadata_pool_name = None
494 self.metadata_overlay = False
495 self.data_pool_name = None
496 self.data_pools = None
497 self.fs_config = fs_config
498 self.ec_profile = fs_config.get('ec_profile')
499
500 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
501 self.client_id = client_list[0]
502 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
503
504 if name is not None:
505 if fscid is not None:
506 raise RuntimeError("cannot specify fscid when creating fs")
507 if create and not self.legacy_configured():
508 self.create()
509 else:
510 if fscid is not None:
511 self.id = fscid
512 self.getinfo(refresh = True)
513
514 # Stash a reference to the first created filesystem on ctx, so
515 # that if someone drops to the interactive shell they can easily
516 # poke our methods.
517 if not hasattr(self._ctx, "filesystem"):
518 self._ctx.filesystem = self
519
520 def dead(self):
521 try:
522 return not bool(self.get_mds_map())
523 except FSMissing:
524 return True
525
526 def get_task_status(self, status_key):
527 return self.mon_manager.get_service_task_status("mds", status_key)
528
529 def getinfo(self, refresh = False):
530 status = self.status()
531 if self.id is not None:
532 fsmap = status.get_fsmap(self.id)
533 elif self.name is not None:
534 fsmap = status.get_fsmap_byname(self.name)
535 else:
536 fss = [fs for fs in status.get_filesystems()]
537 if len(fss) == 1:
538 fsmap = fss[0]
539 elif len(fss) == 0:
540 raise RuntimeError("no file system available")
541 else:
542 raise RuntimeError("more than one file system available")
543 self.id = fsmap['id']
544 self.name = fsmap['mdsmap']['fs_name']
545 self.get_pool_names(status = status, refresh = refresh)
546 return status
547
548 def set_metadata_overlay(self, overlay):
549 if self.id is not None:
550 raise RuntimeError("cannot specify fscid when configuring overlay")
551 self.metadata_overlay = overlay
552
553 def deactivate(self, rank):
554 if rank < 0:
555 raise RuntimeError("invalid rank")
556 elif rank == 0:
557 raise RuntimeError("cannot deactivate rank 0")
558 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
559
560 def reach_max_mds(self):
561 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
562 status = self.getinfo()
563 mds_map = self.get_mds_map(status=status)
564 max_mds = mds_map['max_mds']
565
566 count = len(list(self.get_ranks(status=status)))
567 if count > max_mds:
568 try:
569 # deactivate mds in decending order
570 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
571 while count > max_mds:
572 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
573 target = targets[0]
574 log.debug("deactivating rank %d" % target['rank'])
575 self.deactivate(target['rank'])
576 status = self.wait_for_daemons(skip_max_mds_check=True)
577 count = len(list(self.get_ranks(status=status)))
578 except:
579 # In Mimic, deactivation is done automatically:
580 log.info("Error:\n{}".format(traceback.format_exc()))
581 status = self.wait_for_daemons()
582 else:
583 status = self.wait_for_daemons()
584
585 mds_map = self.get_mds_map(status=status)
586 assert(mds_map['max_mds'] == max_mds)
587 assert(mds_map['in'] == list(range(0, max_mds)))
588
589 def reset(self):
590 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
591
592 def fail(self):
593 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
594
595 def set_flag(self, var, *args):
596 a = map(lambda x: str(x).lower(), args)
597 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
598
599 def set_allow_multifs(self, yes=True):
600 self.set_flag("enable_multiple", yes)
601
602 def set_var(self, var, *args):
603 a = map(lambda x: str(x).lower(), args)
604 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
605
606 def set_down(self, down=True):
607 self.set_var("down", str(down).lower())
608
609 def set_joinable(self, joinable=True):
610 self.set_var("joinable", joinable)
611
612 def set_max_mds(self, max_mds):
613 self.set_var("max_mds", "%d" % max_mds)
614
615 def set_session_timeout(self, timeout):
616 self.set_var("session_timeout", "%d" % timeout)
617
618 def set_allow_standby_replay(self, yes):
619 self.set_var("allow_standby_replay", yes)
620
621 def set_allow_new_snaps(self, yes):
622 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
623
624 def compat(self, *args):
625 a = map(lambda x: str(x).lower(), args)
626 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
627
628 def add_compat(self, *args):
629 self.compat("add_compat", *args)
630
631 def add_incompat(self, *args):
632 self.compat("add_incompat", *args)
633
634 def rm_compat(self, *args):
635 self.compat("rm_compat", *args)
636
637 def rm_incompat(self, *args):
638 self.compat("rm_incompat", *args)
639
640 def required_client_features(self, *args, **kwargs):
641 c = ["fs", "required_client_features", self.name, *args]
642 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
643
644 # Since v15.1.0 the pg autoscale mode has been enabled as default,
645 # will let the pg autoscale mode to calculate the pg_num as needed.
646 # We set the pg_num_min to 64 to make sure that pg autoscale mode
647 # won't set the pg_num to low to fix Tracker#45434.
648 pg_num = 64
649 pg_num_min = 64
650 target_size_ratio = 0.9
651 target_size_ratio_ec = 0.9
652
653 def create(self):
654 if self.name is None:
655 self.name = "cephfs"
656 if self.metadata_pool_name is None:
657 self.metadata_pool_name = "{0}_metadata".format(self.name)
658 if self.data_pool_name is None:
659 data_pool_name = "{0}_data".format(self.name)
660 else:
661 data_pool_name = self.data_pool_name
662
663 # will use the ec pool to store the data and a small amount of
664 # metadata still goes to the primary data pool for all files.
665 if not self.metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
666 self.target_size_ratio = 0.05
667
668 log.debug("Creating filesystem '{0}'".format(self.name))
669
670 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
671 self.metadata_pool_name, str(self.pg_num),
672 '--pg_num_min', str(self.pg_num_min))
673
674 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
675 data_pool_name, str(self.pg_num),
676 '--pg_num_min', str(self.pg_num_min),
677 '--target_size_ratio',
678 str(self.target_size_ratio))
679
680 if self.metadata_overlay:
681 self.mon_manager.raw_cluster_cmd('fs', 'new',
682 self.name, self.metadata_pool_name, data_pool_name,
683 '--allow-dangerous-metadata-overlay')
684 else:
685 self.mon_manager.raw_cluster_cmd('fs', 'new',
686 self.name,
687 self.metadata_pool_name,
688 data_pool_name)
689
690 if self.ec_profile and 'disabled' not in self.ec_profile:
691 ec_data_pool_name = data_pool_name + "_ec"
692 log.debug("EC profile is %s", self.ec_profile)
693 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
694 cmd.extend(self.ec_profile)
695 self.mon_manager.raw_cluster_cmd(*cmd)
696 self.mon_manager.raw_cluster_cmd(
697 'osd', 'pool', 'create', ec_data_pool_name,
698 'erasure', ec_data_pool_name,
699 '--pg_num_min', str(self.pg_num_min),
700 '--target_size_ratio', str(self.target_size_ratio_ec))
701 self.mon_manager.raw_cluster_cmd(
702 'osd', 'pool', 'set',
703 ec_data_pool_name, 'allow_ec_overwrites', 'true')
704 self.add_data_pool(ec_data_pool_name, create=False)
705 self.check_pool_application(ec_data_pool_name)
706
707 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
708
709 self.check_pool_application(self.metadata_pool_name)
710 self.check_pool_application(data_pool_name)
711
712 # Turn off spurious standby count warnings from modifying max_mds in tests.
713 try:
714 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
715 except CommandFailedError as e:
716 if e.exitstatus == 22:
717 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
718 pass
719 else:
720 raise
721
722 if self.fs_config is not None:
723 max_mds = self.fs_config.get('max_mds', 1)
724 if max_mds > 1:
725 self.set_max_mds(max_mds)
726
727 standby_replay = self.fs_config.get('standby_replay', False)
728 self.set_allow_standby_replay(standby_replay)
729
730 # If absent will use the default value (60 seconds)
731 session_timeout = self.fs_config.get('session_timeout', 60)
732 if session_timeout != 60:
733 self.set_session_timeout(session_timeout)
734
735 self.getinfo(refresh = True)
736
737 # wait pgs to be clean
738 self.mon_manager.wait_for_clean()
739
740 def run_client_payload(self, cmd):
741 # avoid circular dep by importing here:
742 from tasks.cephfs.fuse_mount import FuseMount
743 d = misc.get_testdir(self._ctx)
744 m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
745 m.mount_wait()
746 m.run_shell_payload(cmd)
747 m.umount_wait(require_clean=True)
748
749 def _remove_pool(self, name, **kwargs):
750 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
751 return self.mon_manager.ceph(c, **kwargs)
752
753 def rm(self, **kwargs):
754 c = f'fs rm {self.name} --yes-i-really-mean-it'
755 return self.mon_manager.ceph(c, **kwargs)
756
757 def remove_pools(self, data_pools):
758 self._remove_pool(self.get_metadata_pool_name())
759 for poolname in data_pools:
760 try:
761 self._remove_pool(poolname)
762 except CommandFailedError as e:
763 # EBUSY, this data pool is used by two metadata pools, let the
764 # 2nd pass delete it
765 if e.exitstatus == EBUSY:
766 pass
767 else:
768 raise
769
770 def destroy(self, reset_obj_attrs=True):
771 log.info(f'Destroying file system {self.name} and related pools')
772
773 if self.dead():
774 log.debug('already dead...')
775 return
776
777 data_pools = self.get_data_pool_names(refresh=True)
778
779 # make sure no MDSs are attached to given FS.
780 self.fail()
781 self.rm()
782
783 self.remove_pools(data_pools)
784
785 if reset_obj_attrs:
786 self.id = None
787 self.name = None
788 self.metadata_pool_name = None
789 self.data_pool_name = None
790 self.data_pools = None
791
792 def recreate(self):
793 self.destroy()
794
795 self.create()
796 self.getinfo(refresh=True)
797
798 def check_pool_application(self, pool_name):
799 osd_map = self.mon_manager.get_osd_dump_json()
800 for pool in osd_map['pools']:
801 if pool['pool_name'] == pool_name:
802 if "application_metadata" in pool:
803 if not "cephfs" in pool['application_metadata']:
804 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
805 format(pool_name=pool_name))
806
807 def __del__(self):
808 if getattr(self._ctx, "filesystem", None) == self:
809 delattr(self._ctx, "filesystem")
810
811 def exists(self):
812 """
813 Whether a filesystem exists in the mon's filesystem list
814 """
815 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
816 return self.name in [fs['name'] for fs in fs_list]
817
818 def legacy_configured(self):
819 """
820 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
821 the case, the caller should avoid using Filesystem.create
822 """
823 try:
824 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
825 pools = json.loads(out_text)
826 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
827 if metadata_pool_exists:
828 self.metadata_pool_name = 'metadata'
829 except CommandFailedError as e:
830 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
831 # structured output (--format) from the CLI.
832 if e.exitstatus == 22:
833 metadata_pool_exists = True
834 else:
835 raise
836
837 return metadata_pool_exists
838
839 def _df(self):
840 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
841
842 # may raise FSMissing
843 def get_mds_map(self, status=None):
844 if status is None:
845 status = self.status()
846 return status.get_fsmap(self.id)['mdsmap']
847
848 def get_var(self, var, status=None):
849 return self.get_mds_map(status=status)[var]
850
851 def set_dir_layout(self, mount, path, layout):
852 for name, value in layout.items():
853 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
854
855 def add_data_pool(self, name, create=True):
856 if create:
857 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
858 '--pg_num_min', str(self.pg_num_min))
859 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
860 self.get_pool_names(refresh = True)
861 for poolid, fs_name in self.data_pools.items():
862 if name == fs_name:
863 return poolid
864 raise RuntimeError("could not get just created pool '{0}'".format(name))
865
866 def get_pool_names(self, refresh = False, status = None):
867 if refresh or self.metadata_pool_name is None or self.data_pools is None:
868 if status is None:
869 status = self.status()
870 fsmap = status.get_fsmap(self.id)
871
872 osd_map = self.mon_manager.get_osd_dump_json()
873 id_to_name = {}
874 for p in osd_map['pools']:
875 id_to_name[p['pool']] = p['pool_name']
876
877 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
878 self.data_pools = {}
879 for data_pool in fsmap['mdsmap']['data_pools']:
880 self.data_pools[data_pool] = id_to_name[data_pool]
881
882 def get_data_pool_name(self, refresh = False):
883 if refresh or self.data_pools is None:
884 self.get_pool_names(refresh = True)
885 assert(len(self.data_pools) == 1)
886 return next(iter(self.data_pools.values()))
887
888 def get_data_pool_id(self, refresh = False):
889 """
890 Don't call this if you have multiple data pools
891 :return: integer
892 """
893 if refresh or self.data_pools is None:
894 self.get_pool_names(refresh = True)
895 assert(len(self.data_pools) == 1)
896 return next(iter(self.data_pools.keys()))
897
898 def get_data_pool_names(self, refresh = False):
899 if refresh or self.data_pools is None:
900 self.get_pool_names(refresh = True)
901 return list(self.data_pools.values())
902
903 def get_metadata_pool_name(self):
904 return self.metadata_pool_name
905
906 def set_data_pool_name(self, name):
907 if self.id is not None:
908 raise RuntimeError("can't set filesystem name if its fscid is set")
909 self.data_pool_name = name
910
911 def get_pool_pg_num(self, pool_name):
912 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
913 pool_name, 'pg_num',
914 '--format=json-pretty'))
915 return int(pgs['pg_num'])
916
917 def get_namespace_id(self):
918 return self.id
919
920 def get_pool_df(self, pool_name):
921 """
922 Return a dict like:
923 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
924 """
925 for pool_df in self._df()['pools']:
926 if pool_df['name'] == pool_name:
927 return pool_df['stats']
928
929 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
930
931 def get_usage(self):
932 return self._df()['stats']['total_used_bytes']
933
934 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
935 """
936 Return true if all daemons are in one of active, standby, standby-replay, and
937 at least max_mds daemons are in 'active'.
938
939 Unlike most of Filesystem, this function is tolerant of new-style `fs`
940 commands being missing, because we are part of the ceph installation
941 process during upgrade suites, so must fall back to old style commands
942 when we get an EINVAL on a new style command.
943
944 :return:
945 """
946 # First, check to see that processes haven't exited with an error code
947 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
948 mds.check_status()
949
950 active_count = 0
951 mds_map = self.get_mds_map(status=status)
952
953 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
954
955 for mds_id, mds_status in mds_map['info'].items():
956 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
957 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
958 return False
959 elif mds_status['state'] == 'up:active':
960 active_count += 1
961
962 log.debug("are_daemons_healthy: {0}/{1}".format(
963 active_count, mds_map['max_mds']
964 ))
965
966 if not skip_max_mds_check:
967 if active_count > mds_map['max_mds']:
968 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
969 return False
970 elif active_count == mds_map['max_mds']:
971 # The MDSMap says these guys are active, but let's check they really are
972 for mds_id, mds_status in mds_map['info'].items():
973 if mds_status['state'] == 'up:active':
974 try:
975 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
976 except CommandFailedError as cfe:
977 if cfe.exitstatus == errno.EINVAL:
978 # Old version, can't do this check
979 continue
980 else:
981 # MDS not even running
982 return False
983
984 if daemon_status['state'] != 'up:active':
985 # MDS hasn't taken the latest map yet
986 return False
987
988 return True
989 else:
990 return False
991 else:
992 log.debug("are_daemons_healthy: skipping max_mds check")
993 return True
994
995 def get_daemon_names(self, state=None, status=None):
996 """
997 Return MDS daemon names of those daemons in the given state
998 :param state:
999 :return:
1000 """
1001 mdsmap = self.get_mds_map(status)
1002 result = []
1003 for mds_status in sorted(mdsmap['info'].values(),
1004 key=lambda _: _['rank']):
1005 if mds_status['state'] == state or state is None:
1006 result.append(mds_status['name'])
1007
1008 return result
1009
1010 def get_active_names(self, status=None):
1011 """
1012 Return MDS daemon names of those daemons holding ranks
1013 in state up:active
1014
1015 :return: list of strings like ['a', 'b'], sorted by rank
1016 """
1017 return self.get_daemon_names("up:active", status=status)
1018
1019 def get_all_mds_rank(self, status=None):
1020 mdsmap = self.get_mds_map(status)
1021 result = []
1022 for mds_status in sorted(mdsmap['info'].values(),
1023 key=lambda _: _['rank']):
1024 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1025 result.append(mds_status['rank'])
1026
1027 return result
1028
1029 def get_rank(self, rank=None, status=None):
1030 if status is None:
1031 status = self.getinfo()
1032 if rank is None:
1033 rank = 0
1034 return status.get_rank(self.id, rank)
1035
1036 def rank_restart(self, rank=0, status=None):
1037 name = self.get_rank(rank=rank, status=status)['name']
1038 self.mds_restart(mds_id=name)
1039
1040 def rank_signal(self, signal, rank=0, status=None):
1041 name = self.get_rank(rank=rank, status=status)['name']
1042 self.mds_signal(name, signal)
1043
1044 def rank_freeze(self, yes, rank=0):
1045 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1046
1047 def rank_fail(self, rank=0):
1048 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1049
1050 def get_ranks(self, status=None):
1051 if status is None:
1052 status = self.getinfo()
1053 return status.get_ranks(self.id)
1054
1055 def get_damaged(self, status=None):
1056 if status is None:
1057 status = self.getinfo()
1058 return status.get_damaged(self.id)
1059
1060 def get_replays(self, status=None):
1061 if status is None:
1062 status = self.getinfo()
1063 return status.get_replays(self.id)
1064
1065 def get_replay(self, rank=0, status=None):
1066 for replay in self.get_replays(status=status):
1067 if replay['rank'] == rank:
1068 return replay
1069 return None
1070
1071 def get_rank_names(self, status=None):
1072 """
1073 Return MDS daemon names of those daemons holding a rank,
1074 sorted by rank. This includes e.g. up:replay/reconnect
1075 as well as active, but does not include standby or
1076 standby-replay.
1077 """
1078 mdsmap = self.get_mds_map(status)
1079 result = []
1080 for mds_status in sorted(mdsmap['info'].values(),
1081 key=lambda _: _['rank']):
1082 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1083 result.append(mds_status['name'])
1084
1085 return result
1086
1087 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
1088 """
1089 Wait until all daemons are healthy
1090 :return:
1091 """
1092
1093 if timeout is None:
1094 timeout = DAEMON_WAIT_TIMEOUT
1095
1096 if status is None:
1097 status = self.status()
1098
1099 elapsed = 0
1100 while True:
1101 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1102 return status
1103 else:
1104 time.sleep(1)
1105 elapsed += 1
1106
1107 if elapsed > timeout:
1108 log.debug("status = {0}".format(status))
1109 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1110
1111 status = self.status()
1112
1113 def dencoder(self, obj_type, obj_blob):
1114 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1115 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1116 return p.stdout.getvalue()
1117
1118 def rados(self, *args, **kwargs):
1119 """
1120 Callout to rados CLI.
1121 """
1122
1123 return self.mon_manager.do_rados(*args, **kwargs)
1124
1125 def radosm(self, *args, **kwargs):
1126 """
1127 Interact with the metadata pool via rados CLI.
1128 """
1129
1130 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1131
1132 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
1133 """
1134 Interact with the metadata pool via rados CLI. Get the stdout.
1135 """
1136
1137 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
1138
1139 def get_metadata_object(self, object_type, object_id):
1140 """
1141 Retrieve an object from the metadata pool, pass it through
1142 ceph-dencoder to dump it to JSON, and return the decoded object.
1143 """
1144
1145 o = self.radosmo(['get', object_id, '-'])
1146 j = self.dencoder(object_type, o)
1147 try:
1148 return json.loads(j)
1149 except (TypeError, ValueError):
1150 log.error("Failed to decode JSON: '{0}'".format(j))
1151 raise
1152
1153 def get_journal_version(self):
1154 """
1155 Read the JournalPointer and Journal::Header objects to learn the version of
1156 encoding in use.
1157 """
1158 journal_pointer_object = '400.00000000'
1159 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1160 journal_ino = journal_pointer_dump['journal_pointer']['front']
1161
1162 journal_header_object = "{0:x}.00000000".format(journal_ino)
1163 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1164
1165 version = journal_header_dump['journal_header']['stream_format']
1166 log.debug("Read journal version {0}".format(version))
1167
1168 return version
1169
1170 def mds_asok(self, command, mds_id=None, timeout=None):
1171 if mds_id is None:
1172 return self.rank_asok(command, timeout=timeout)
1173
1174 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1175
1176 def mds_tell(self, command, mds_id=None):
1177 if mds_id is None:
1178 return self.rank_tell(command)
1179
1180 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1181
1182 def rank_asok(self, command, rank=0, status=None, timeout=None):
1183 info = self.get_rank(rank=rank, status=status)
1184 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1185
1186 def rank_tell(self, command, rank=0, status=None):
1187 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
1188
1189 def ranks_tell(self, command, status=None):
1190 if status is None:
1191 status = self.status()
1192 out = []
1193 for r in status.get_ranks(self.id):
1194 result = self.rank_tell(command, rank=r['rank'], status=status)
1195 out.append((r['rank'], result))
1196 return sorted(out)
1197
1198 def ranks_perf(self, f, status=None):
1199 perf = self.ranks_tell(["perf", "dump"], status=status)
1200 out = []
1201 for rank, perf in perf:
1202 out.append((rank, f(perf)))
1203 return out
1204
1205 def read_cache(self, path, depth=None):
1206 cmd = ["dump", "tree", path]
1207 if depth is not None:
1208 cmd.append(depth.__str__())
1209 result = self.mds_asok(cmd)
1210 if len(result) == 0:
1211 raise RuntimeError("Path not found in cache: {0}".format(path))
1212
1213 return result
1214
1215 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1216 """
1217 Block until the MDS reaches a particular state, or a failure condition
1218 is met.
1219
1220 When there are multiple MDSs, succeed when exaclty one MDS is in the
1221 goal state, or fail when any MDS is in the reject state.
1222
1223 :param goal_state: Return once the MDS is in this state
1224 :param reject: Fail if the MDS enters this state before the goal state
1225 :param timeout: Fail if this many seconds pass before reaching goal
1226 :return: number of seconds waited, rounded down to integer
1227 """
1228
1229 started_at = time.time()
1230 while True:
1231 status = self.status()
1232 if rank is not None:
1233 try:
1234 mds_info = status.get_rank(self.id, rank)
1235 current_state = mds_info['state'] if mds_info else None
1236 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1237 except:
1238 mdsmap = self.get_mds_map(status=status)
1239 if rank in mdsmap['failed']:
1240 log.debug("Waiting for rank {0} to come back.".format(rank))
1241 current_state = None
1242 else:
1243 raise
1244 elif mds_id is not None:
1245 # mds_info is None if no daemon with this ID exists in the map
1246 mds_info = status.get_mds(mds_id)
1247 current_state = mds_info['state'] if mds_info else None
1248 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1249 else:
1250 # In general, look for a single MDS
1251 states = [m['state'] for m in status.get_ranks(self.id)]
1252 if [s for s in states if s == goal_state] == [goal_state]:
1253 current_state = goal_state
1254 elif reject in states:
1255 current_state = reject
1256 else:
1257 current_state = None
1258 log.debug("mapped states {0} to {1}".format(states, current_state))
1259
1260 elapsed = time.time() - started_at
1261 if current_state == goal_state:
1262 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
1263 return elapsed
1264 elif reject is not None and current_state == reject:
1265 raise RuntimeError("MDS in reject state {0}".format(current_state))
1266 elif timeout is not None and elapsed > timeout:
1267 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1268 raise RuntimeError(
1269 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1270 elapsed, goal_state, current_state
1271 ))
1272 else:
1273 time.sleep(1)
1274
1275 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
1276 if pool is None:
1277 pool = self.get_data_pool_name()
1278
1279 obj_name = "{0:x}.00000000".format(ino_no)
1280
1281 args = ["getxattr", obj_name, xattr_name]
1282 try:
1283 proc = self.rados(args, pool=pool, stdout=BytesIO())
1284 except CommandFailedError as e:
1285 log.error(e.__str__())
1286 raise ObjectNotFound(obj_name)
1287
1288 obj_blob = proc.stdout.getvalue()
1289 return json.loads(self.dencoder(obj_type, obj_blob).strip())
1290
1291 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1292 """
1293 Write to an xattr of the 0th data object of an inode. Will
1294 succeed whether the object and/or xattr already exist or not.
1295
1296 :param ino_no: integer inode number
1297 :param xattr_name: string name of the xattr
1298 :param data: byte array data to write to the xattr
1299 :param pool: name of data pool or None to use primary data pool
1300 :return: None
1301 """
1302 if pool is None:
1303 pool = self.get_data_pool_name()
1304
1305 obj_name = "{0:x}.00000000".format(ino_no)
1306 args = ["setxattr", obj_name, xattr_name, data]
1307 self.rados(args, pool=pool)
1308
1309 def read_backtrace(self, ino_no, pool=None):
1310 """
1311 Read the backtrace from the data pool, return a dict in the format
1312 given by inode_backtrace_t::dump, which is something like:
1313
1314 ::
1315
1316 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1317 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1318
1319 { "ino": 1099511627778,
1320 "ancestors": [
1321 { "dirino": 1,
1322 "dname": "blah",
1323 "version": 11}],
1324 "pool": 1,
1325 "old_pools": []}
1326
1327 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1328 one data pool and that will be used.
1329 """
1330 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1331
1332 def read_layout(self, ino_no, pool=None):
1333 """
1334 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1335 ::
1336 {
1337 "stripe_unit": 4194304,
1338 "stripe_count": 1,
1339 "object_size": 4194304,
1340 "pool_id": 1,
1341 "pool_ns": "",
1342 }
1343
1344 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1345 one data pool and that will be used.
1346 """
1347 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1348
1349 def _enumerate_data_objects(self, ino, size):
1350 """
1351 Get the list of expected data objects for a range, and the list of objects
1352 that really exist.
1353
1354 :return a tuple of two lists of strings (expected, actual)
1355 """
1356 stripe_size = 1024 * 1024 * 4
1357
1358 size = max(stripe_size, size)
1359
1360 want_objects = [
1361 "{0:x}.{1:08x}".format(ino, n)
1362 for n in range(0, ((size - 1) // stripe_size) + 1)
1363 ]
1364
1365 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
1366
1367 return want_objects, exist_objects
1368
1369 def data_objects_present(self, ino, size):
1370 """
1371 Check that *all* the expected data objects for an inode are present in the data pool
1372 """
1373
1374 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1375 missing = set(want_objects) - set(exist_objects)
1376
1377 if missing:
1378 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
1379 ino, size, missing
1380 ))
1381 return False
1382 else:
1383 log.debug("All objects for ino {0} size {1} found".format(ino, size))
1384 return True
1385
1386 def data_objects_absent(self, ino, size):
1387 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1388 present = set(want_objects) & set(exist_objects)
1389
1390 if present:
1391 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1392 ino, size, present
1393 ))
1394 return False
1395 else:
1396 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
1397 return True
1398
1399 def dirfrag_exists(self, ino, frag):
1400 try:
1401 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1402 except CommandFailedError:
1403 return False
1404 else:
1405 return True
1406
1407 def list_dirfrag(self, dir_ino):
1408 """
1409 Read the named object and return the list of omap keys
1410
1411 :return a list of 0 or more strings
1412 """
1413
1414 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1415
1416 try:
1417 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
1418 except CommandFailedError as e:
1419 log.error(e.__str__())
1420 raise ObjectNotFound(dirfrag_obj_name)
1421
1422 return key_list_str.strip().split("\n") if key_list_str else []
1423
1424 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1425 """
1426 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1427 warning : The splitting of directory is not considered here.
1428 """
1429
1430 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1431 try:
1432 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1433 except CommandFailedError as e:
1434 log.error(e.__str__())
1435 raise ObjectNotFound(dir_ino)
1436
1437 def erase_metadata_objects(self, prefix):
1438 """
1439 For all objects in the metadata pool matching the prefix,
1440 erase them.
1441
1442 This O(N) with the number of objects in the pool, so only suitable
1443 for use on toy test filesystems.
1444 """
1445 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
1446 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1447 for o in matching_objects:
1448 self.radosm(["rm", o])
1449
1450 def erase_mds_objects(self, rank):
1451 """
1452 Erase all the per-MDS objects for a particular rank. This includes
1453 inotable, sessiontable, journal
1454 """
1455
1456 def obj_prefix(multiplier):
1457 """
1458 MDS object naming conventions like rank 1's
1459 journal is at 201.***
1460 """
1461 return "%x." % (multiplier * 0x100 + rank)
1462
1463 # MDS_INO_LOG_OFFSET
1464 self.erase_metadata_objects(obj_prefix(2))
1465 # MDS_INO_LOG_BACKUP_OFFSET
1466 self.erase_metadata_objects(obj_prefix(3))
1467 # MDS_INO_LOG_POINTER_OFFSET
1468 self.erase_metadata_objects(obj_prefix(4))
1469 # MDSTables & SessionMap
1470 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1471
1472 @property
1473 def _prefix(self):
1474 """
1475 Override this to set a different
1476 """
1477 return ""
1478
1479 def _make_rank(self, rank):
1480 return "{}:{}".format(self.name, rank)
1481
1482 def _run_tool(self, tool, args, rank=None, quiet=False):
1483 # Tests frequently have [client] configuration that jacks up
1484 # the objecter log level (unlikely to be interesting here)
1485 # and does not set the mds log level (very interesting here)
1486 if quiet:
1487 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1488 else:
1489 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1490
1491 if rank is not None:
1492 base_args.extend(["--rank", "%s" % str(rank)])
1493
1494 t1 = datetime.datetime.now()
1495 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
1496 duration = datetime.datetime.now() - t1
1497 log.debug("Ran {0} in time {1}, result:\n{2}".format(
1498 base_args + args, duration, r
1499 ))
1500 return r
1501
1502 @property
1503 def tool_remote(self):
1504 """
1505 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1506 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1507 so that tests can use this remote to go get locally written output files from the tools.
1508 """
1509 return self.mon_manager.controller
1510
1511 def journal_tool(self, args, rank, quiet=False):
1512 """
1513 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1514 """
1515 fs_rank = self._make_rank(rank)
1516 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1517
1518 def meta_tool(self, args, rank, quiet=False):
1519 """
1520 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1521 """
1522 fs_rank = self._make_rank(rank)
1523 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1524
1525 def table_tool(self, args, quiet=False):
1526 """
1527 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1528 """
1529 return self._run_tool("cephfs-table-tool", args, None, quiet)
1530
1531 def data_scan(self, args, quiet=False, worker_count=1):
1532 """
1533 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1534
1535 :param worker_count: if greater than 1, multiple workers will be run
1536 in parallel and the return value will be None
1537 """
1538
1539 workers = []
1540
1541 for n in range(0, worker_count):
1542 if worker_count > 1:
1543 # data-scan args first token is a command, followed by args to it.
1544 # insert worker arguments after the command.
1545 cmd = args[0]
1546 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1547 else:
1548 worker_args = args
1549
1550 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1551 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1552
1553 for w in workers:
1554 w.get()
1555
1556 if worker_count == 1:
1557 return workers[0].value
1558 else:
1559 return None
1560
1561 def is_full(self):
1562 return self.is_pool_full(self.get_data_pool_name())
1563
1564 def authorize(self, client_id, caps=('/', 'rw')):
1565 """
1566 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1567 keyring.
1568
1569 client_id: client id that will be authorized
1570 caps: tuple containing the path and permission (can be r or rw)
1571 respectively.
1572 """
1573 client_name = 'client.' + client_id
1574 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1575 client_name, *caps)
1576
1577 def grow(self, new_max_mds, status=None):
1578 oldmax = self.get_var('max_mds', status=status)
1579 assert(new_max_mds > oldmax)
1580 self.set_max_mds(new_max_mds)
1581 return self.wait_for_daemons()
1582
1583 def shrink(self, new_max_mds, status=None):
1584 oldmax = self.get_var('max_mds', status=status)
1585 assert(new_max_mds < oldmax)
1586 self.set_max_mds(new_max_mds)
1587 return self.wait_for_daemons()
1588
1589 def run_scrub(self, cmd, rank=0):
1590 return self.rank_tell(["scrub"] + cmd, rank)
1591
1592 def get_scrub_status(self, rank=0):
1593 return self.run_scrub(["status"], rank)
1594
1595 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1596 timeout=300, reverse=False):
1597 # time out after "timeout" seconds and assume as done
1598 if result is None:
1599 result = "no active scrubs running"
1600 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1601 while proceed():
1602 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1603 assert out_json is not None
1604 if not reverse:
1605 if result in out_json['status']:
1606 log.info("all active scrubs completed")
1607 return True
1608 else:
1609 if result not in out_json['status']:
1610 log.info("all active scrubs completed")
1611 return True
1612
1613 if tag is not None:
1614 status = out_json['scrubs'][tag]
1615 if status is not None:
1616 log.info(f"scrub status for tag:{tag} - {status}")
1617 else:
1618 log.info(f"scrub has completed for tag:{tag}")
1619 return True
1620
1621 # timed out waiting for scrub to complete
1622 return False