]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/filesystem.py
import ceph 16.2.6
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
1
2 import json
3 import logging
4 from gevent import Greenlet
5 import os
6 import time
7 import datetime
8 import re
9 import errno
10 import random
11 import traceback
12
13 from io import BytesIO, StringIO
14 from errno import EBUSY
15
16 from teuthology.exceptions import CommandFailedError
17 from teuthology import misc
18 from teuthology.nuke import clear_firewall
19 from teuthology.parallel import parallel
20 from teuthology import contextutil
21 from tasks.ceph_manager import write_conf
22 from tasks import ceph_manager
23
24
25 log = logging.getLogger(__name__)
26
27
28 DAEMON_WAIT_TIMEOUT = 120
29 ROOT_INO = 1
30
31 class FileLayout(object):
32 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
33 self.pool = pool
34 self.pool_namespace = pool_namespace
35 self.stripe_unit = stripe_unit
36 self.stripe_count = stripe_count
37 self.object_size = object_size
38
39 @classmethod
40 def load_from_ceph(layout_str):
41 # TODO
42 pass
43
44 def items(self):
45 if self.pool is not None:
46 yield ("pool", self.pool)
47 if self.pool_namespace:
48 yield ("pool_namespace", self.pool_namespace)
49 if self.stripe_unit is not None:
50 yield ("stripe_unit", self.stripe_unit)
51 if self.stripe_count is not None:
52 yield ("stripe_count", self.stripe_count)
53 if self.object_size is not None:
54 yield ("object_size", self.stripe_size)
55
56 class ObjectNotFound(Exception):
57 def __init__(self, object_name):
58 self._object_name = object_name
59
60 def __str__(self):
61 return "Object not found: '{0}'".format(self._object_name)
62
63 class FSMissing(Exception):
64 def __init__(self, ident):
65 self.ident = ident
66
67 def __str__(self):
68 return f"File system {self.ident} does not exist in the map"
69
70 class FSStatus(object):
71 """
72 Operations on a snapshot of the FSMap.
73 """
74 def __init__(self, mon_manager, epoch=None):
75 self.mon = mon_manager
76 cmd = ["fs", "dump", "--format=json"]
77 if epoch is not None:
78 cmd.append(str(epoch))
79 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
80
81 def __str__(self):
82 return json.dumps(self.map, indent = 2, sort_keys = True)
83
84 # Expose the fsmap for manual inspection.
85 def __getitem__(self, key):
86 """
87 Get a field from the fsmap.
88 """
89 return self.map[key]
90
91 def get_filesystems(self):
92 """
93 Iterator for all filesystems.
94 """
95 for fs in self.map['filesystems']:
96 yield fs
97
98 def get_all(self):
99 """
100 Iterator for all the mds_info components in the FSMap.
101 """
102 for info in self.map['standbys']:
103 yield info
104 for fs in self.map['filesystems']:
105 for info in fs['mdsmap']['info'].values():
106 yield info
107
108 def get_standbys(self):
109 """
110 Iterator for all standbys.
111 """
112 for info in self.map['standbys']:
113 yield info
114
115 def get_fsmap(self, fscid):
116 """
117 Get the fsmap for the given FSCID.
118 """
119 for fs in self.map['filesystems']:
120 if fscid is None or fs['id'] == fscid:
121 return fs
122 raise FSMissing(fscid)
123
124 def get_fsmap_byname(self, name):
125 """
126 Get the fsmap for the given file system name.
127 """
128 for fs in self.map['filesystems']:
129 if name is None or fs['mdsmap']['fs_name'] == name:
130 return fs
131 raise FSMissing(name)
132
133 def get_replays(self, fscid):
134 """
135 Get the standby:replay MDS for the given FSCID.
136 """
137 fs = self.get_fsmap(fscid)
138 for info in fs['mdsmap']['info'].values():
139 if info['state'] == 'up:standby-replay':
140 yield info
141
142 def get_ranks(self, fscid):
143 """
144 Get the ranks for the given FSCID.
145 """
146 fs = self.get_fsmap(fscid)
147 for info in fs['mdsmap']['info'].values():
148 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
149 yield info
150
151 def get_rank(self, fscid, rank):
152 """
153 Get the rank for the given FSCID.
154 """
155 for info in self.get_ranks(fscid):
156 if info['rank'] == rank:
157 return info
158 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
159
160 def get_mds(self, name):
161 """
162 Get the info for the given MDS name.
163 """
164 for info in self.get_all():
165 if info['name'] == name:
166 return info
167 return None
168
169 def get_mds_addr(self, name):
170 """
171 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
172 """
173 info = self.get_mds(name)
174 if info:
175 return info['addr']
176 else:
177 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
178 raise RuntimeError("MDS id '{0}' not found in map".format(name))
179
180 def get_mds_addrs(self, name):
181 """
182 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
183 """
184 info = self.get_mds(name)
185 if info:
186 return [e['addr'] for e in info['addrs']['addrvec']]
187 else:
188 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
189 raise RuntimeError("MDS id '{0}' not found in map".format(name))
190
191 def get_mds_gid(self, gid):
192 """
193 Get the info for the given MDS gid.
194 """
195 for info in self.get_all():
196 if info['gid'] == gid:
197 return info
198 return None
199
200 def hadfailover(self, status):
201 """
202 Compares two statuses for mds failovers.
203 Returns True if there is a failover.
204 """
205 for fs in status.map['filesystems']:
206 for info in fs['mdsmap']['info'].values():
207 oldinfo = self.get_mds_gid(info['gid'])
208 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
209 return True
210 #all matching
211 return False
212
213 class CephCluster(object):
214 @property
215 def admin_remote(self):
216 first_mon = misc.get_first_mon(self._ctx, None)
217 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
218 return result
219
220 def __init__(self, ctx) -> None:
221 self._ctx = ctx
222 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
223
224 def get_config(self, key, service_type=None):
225 """
226 Get config from mon by default, or a specific service if caller asks for it
227 """
228 if service_type is None:
229 service_type = 'mon'
230
231 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
232 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
233
234 def set_ceph_conf(self, subsys, key, value):
235 if subsys not in self._ctx.ceph['ceph'].conf:
236 self._ctx.ceph['ceph'].conf[subsys] = {}
237 self._ctx.ceph['ceph'].conf[subsys][key] = value
238 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
239 # used a different config path this won't work.
240
241 def clear_ceph_conf(self, subsys, key):
242 del self._ctx.ceph['ceph'].conf[subsys][key]
243 write_conf(self._ctx)
244
245 def json_asok(self, command, service_type, service_id, timeout=None):
246 if timeout is None:
247 timeout = 15*60
248 command.insert(0, '--format=json')
249 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
250 response_data = proc.stdout.getvalue().strip()
251 if len(response_data) > 0:
252 j = json.loads(response_data)
253 pretty = json.dumps(j, sort_keys=True, indent=2)
254 log.debug(f"_json_asok output\n{pretty}")
255 return j
256 else:
257 log.debug("_json_asok output empty")
258 return None
259
260 def is_addr_blocklisted(self, addr=None):
261 if addr is None:
262 log.warn("Couldn't get the client address, so the blocklisted "
263 "status undetermined")
264 return False
265
266 blocklist = json.loads(self.mon_manager.run_cluster_cmd(
267 args=["osd", "blocklist", "ls", "--format=json"],
268 stdout=StringIO()).stdout.getvalue())
269 for b in blocklist:
270 if addr == b["addr"]:
271 return True
272 return False
273
274
275 class MDSCluster(CephCluster):
276 """
277 Collective operations on all the MDS daemons in the Ceph cluster. These
278 daemons may be in use by various Filesystems.
279
280 For the benefit of pre-multi-filesystem tests, this class is also
281 a parent of Filesystem. The correct way to use MDSCluster going forward is
282 as a separate instance outside of your (multiple) Filesystem instances.
283 """
284
285 def __init__(self, ctx):
286 super(MDSCluster, self).__init__(ctx)
287
288 @property
289 def mds_ids(self):
290 # do this dynamically because the list of ids may change periodically with cephadm
291 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
292
293 @property
294 def mds_daemons(self):
295 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
296
297 def _one_or_all(self, mds_id, cb, in_parallel=True):
298 """
299 Call a callback for a single named MDS, or for all.
300
301 Note that the parallelism here isn't for performance, it's to avoid being overly kind
302 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
303 avoid being overly kind by executing them in a particular order. However, some actions
304 don't cope with being done in parallel, so it's optional (`in_parallel`)
305
306 :param mds_id: MDS daemon name, or None
307 :param cb: Callback taking single argument of MDS daemon name
308 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
309 """
310
311 if mds_id is None:
312 if in_parallel:
313 with parallel() as p:
314 for mds_id in self.mds_ids:
315 p.spawn(cb, mds_id)
316 else:
317 for mds_id in self.mds_ids:
318 cb(mds_id)
319 else:
320 cb(mds_id)
321
322 def get_config(self, key, service_type=None):
323 """
324 get_config specialization of service_type="mds"
325 """
326 if service_type != "mds":
327 return super(MDSCluster, self).get_config(key, service_type)
328
329 # Some tests stop MDS daemons, don't send commands to a dead one:
330 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
331 service_id = random.sample(running_daemons, 1)[0]
332 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
333
334 def mds_stop(self, mds_id=None):
335 """
336 Stop the MDS daemon process(se). If it held a rank, that rank
337 will eventually go laggy.
338 """
339 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
340
341 def mds_fail(self, mds_id=None):
342 """
343 Inform MDSMonitor of the death of the daemon process(es). If it held
344 a rank, that rank will be relinquished.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
347
348 def mds_restart(self, mds_id=None):
349 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
350
351 def mds_fail_restart(self, mds_id=None):
352 """
353 Variation on restart that includes marking MDSs as failed, so that doing this
354 operation followed by waiting for healthy daemon states guarantees that they
355 have gone down and come up, rather than potentially seeing the healthy states
356 that existed before the restart.
357 """
358 def _fail_restart(id_):
359 self.mds_daemons[id_].stop()
360 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
361 self.mds_daemons[id_].restart()
362
363 self._one_or_all(mds_id, _fail_restart)
364
365 def mds_signal(self, mds_id, sig, silent=False):
366 """
367 signal a MDS daemon
368 """
369 self.mds_daemons[mds_id].signal(sig, silent);
370
371 def newfs(self, name='cephfs', create=True):
372 return Filesystem(self._ctx, name=name, create=create)
373
374 def status(self, epoch=None):
375 return FSStatus(self.mon_manager, epoch)
376
377 def get_standby_daemons(self):
378 return set([s['name'] for s in self.status().get_standbys()])
379
380 def get_mds_hostnames(self):
381 result = set()
382 for mds_id in self.mds_ids:
383 mds_remote = self.mon_manager.find_remote('mds', mds_id)
384 result.add(mds_remote.hostname)
385
386 return list(result)
387
388 def set_clients_block(self, blocked, mds_id=None):
389 """
390 Block (using iptables) client communications to this MDS. Be careful: if
391 other services are running on this MDS, or other MDSs try to talk to this
392 MDS, their communications may also be blocked as collatoral damage.
393
394 :param mds_id: Optional ID of MDS to block, default to all
395 :return:
396 """
397 da_flag = "-A" if blocked else "-D"
398
399 def set_block(_mds_id):
400 remote = self.mon_manager.find_remote('mds', _mds_id)
401 status = self.status()
402
403 addr = status.get_mds_addr(_mds_id)
404 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
405
406 remote.run(
407 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
409 remote.run(
410 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
411 "comment", "--comment", "teuthology"])
412
413 self._one_or_all(mds_id, set_block, in_parallel=False)
414
415 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
416 """
417 Block (using iptables) communications from a provided MDS to other MDSs.
418 Block all ports that an MDS uses for communication.
419
420 :param blocked: True to block the MDS, False otherwise
421 :param mds_rank_1: MDS rank
422 :param mds_rank_2: MDS rank
423 :return:
424 """
425 da_flag = "-A" if blocked else "-D"
426
427 def set_block(mds_ids):
428 status = self.status()
429
430 mds = mds_ids[0]
431 remote = self.mon_manager.find_remote('mds', mds)
432 addrs = status.get_mds_addrs(mds)
433 for addr in addrs:
434 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
435 remote.run(
436 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
437 "comment", "--comment", "teuthology"])
438
439
440 mds = mds_ids[1]
441 remote = self.mon_manager.find_remote('mds', mds)
442 addrs = status.get_mds_addrs(mds)
443 for addr in addrs:
444 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
445 remote.run(
446 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
448 remote.run(
449 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
450 "comment", "--comment", "teuthology"])
451
452 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
453
454 def clear_firewall(self):
455 clear_firewall(self._ctx)
456
457 def get_mds_info(self, mds_id):
458 return FSStatus(self.mon_manager).get_mds(mds_id)
459
460 def is_pool_full(self, pool_name):
461 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
462 for pool in pools:
463 if pool['pool_name'] == pool_name:
464 return 'full' in pool['flags_names'].split(",")
465
466 raise RuntimeError("Pool not found '{0}'".format(pool_name))
467
468 def delete_all_filesystems(self):
469 """
470 Remove all filesystems that exist, and any pools in use by them.
471 """
472 for fs in self.status().get_filesystems():
473 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
474
475
476 class Filesystem(MDSCluster):
477 """
478 This object is for driving a CephFS filesystem. The MDS daemons driven by
479 MDSCluster may be shared with other Filesystems.
480 """
481 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
482 super(Filesystem, self).__init__(ctx)
483
484 self.name = name
485 self.id = None
486 self.metadata_pool_name = None
487 self.metadata_overlay = False
488 self.data_pool_name = None
489 self.data_pools = None
490 self.fs_config = fs_config
491 self.ec_profile = fs_config.get('ec_profile')
492
493 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
494 self.client_id = client_list[0]
495 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
496
497 if name is not None:
498 if fscid is not None:
499 raise RuntimeError("cannot specify fscid when creating fs")
500 if create and not self.legacy_configured():
501 self.create()
502 else:
503 if fscid is not None:
504 self.id = fscid
505 self.getinfo(refresh = True)
506
507 # Stash a reference to the first created filesystem on ctx, so
508 # that if someone drops to the interactive shell they can easily
509 # poke our methods.
510 if not hasattr(self._ctx, "filesystem"):
511 self._ctx.filesystem = self
512
513 def dead(self):
514 try:
515 return not bool(self.get_mds_map())
516 except FSMissing:
517 return True
518
519 def get_task_status(self, status_key):
520 return self.mon_manager.get_service_task_status("mds", status_key)
521
522 def getinfo(self, refresh = False):
523 status = self.status()
524 if self.id is not None:
525 fsmap = status.get_fsmap(self.id)
526 elif self.name is not None:
527 fsmap = status.get_fsmap_byname(self.name)
528 else:
529 fss = [fs for fs in status.get_filesystems()]
530 if len(fss) == 1:
531 fsmap = fss[0]
532 elif len(fss) == 0:
533 raise RuntimeError("no file system available")
534 else:
535 raise RuntimeError("more than one file system available")
536 self.id = fsmap['id']
537 self.name = fsmap['mdsmap']['fs_name']
538 self.get_pool_names(status = status, refresh = refresh)
539 return status
540
541 def set_metadata_overlay(self, overlay):
542 if self.id is not None:
543 raise RuntimeError("cannot specify fscid when configuring overlay")
544 self.metadata_overlay = overlay
545
546 def deactivate(self, rank):
547 if rank < 0:
548 raise RuntimeError("invalid rank")
549 elif rank == 0:
550 raise RuntimeError("cannot deactivate rank 0")
551 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
552
553 def reach_max_mds(self):
554 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
555 status = self.getinfo()
556 mds_map = self.get_mds_map(status=status)
557 max_mds = mds_map['max_mds']
558
559 count = len(list(self.get_ranks(status=status)))
560 if count > max_mds:
561 try:
562 # deactivate mds in decending order
563 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
564 while count > max_mds:
565 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
566 target = targets[0]
567 log.debug("deactivating rank %d" % target['rank'])
568 self.deactivate(target['rank'])
569 status = self.wait_for_daemons(skip_max_mds_check=True)
570 count = len(list(self.get_ranks(status=status)))
571 except:
572 # In Mimic, deactivation is done automatically:
573 log.info("Error:\n{}".format(traceback.format_exc()))
574 status = self.wait_for_daemons()
575 else:
576 status = self.wait_for_daemons()
577
578 mds_map = self.get_mds_map(status=status)
579 assert(mds_map['max_mds'] == max_mds)
580 assert(mds_map['in'] == list(range(0, max_mds)))
581
582 def reset(self):
583 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
584
585 def fail(self):
586 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
587
588 def set_flag(self, var, *args):
589 a = map(lambda x: str(x).lower(), args)
590 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
591
592 def set_allow_multifs(self, yes=True):
593 self.set_flag("enable_multiple", yes)
594
595 def set_var(self, var, *args):
596 a = map(lambda x: str(x).lower(), args)
597 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
598
599 def set_down(self, down=True):
600 self.set_var("down", str(down).lower())
601
602 def set_joinable(self, joinable=True):
603 self.set_var("joinable", joinable)
604
605 def set_max_mds(self, max_mds):
606 self.set_var("max_mds", "%d" % max_mds)
607
608 def set_session_timeout(self, timeout):
609 self.set_var("session_timeout", "%d" % timeout)
610
611 def set_allow_standby_replay(self, yes):
612 self.set_var("allow_standby_replay", yes)
613
614 def set_allow_new_snaps(self, yes):
615 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
616
617 def compat(self, *args):
618 a = map(lambda x: str(x).lower(), args)
619 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
620
621 def add_compat(self, *args):
622 self.compat("add_compat", *args)
623
624 def add_incompat(self, *args):
625 self.compat("add_incompat", *args)
626
627 def rm_compat(self, *args):
628 self.compat("rm_compat", *args)
629
630 def rm_incompat(self, *args):
631 self.compat("rm_incompat", *args)
632
633 def required_client_features(self, *args, **kwargs):
634 c = ["fs", "required_client_features", self.name, *args]
635 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
636
637 # Since v15.1.0 the pg autoscale mode has been enabled as default,
638 # will let the pg autoscale mode to calculate the pg_num as needed.
639 # We set the pg_num_min to 64 to make sure that pg autoscale mode
640 # won't set the pg_num to low to fix Tracker#45434.
641 pg_num = 64
642 pg_num_min = 64
643 target_size_ratio = 0.9
644 target_size_ratio_ec = 0.9
645
646 def create(self):
647 if self.name is None:
648 self.name = "cephfs"
649 if self.metadata_pool_name is None:
650 self.metadata_pool_name = "{0}_metadata".format(self.name)
651 if self.data_pool_name is None:
652 data_pool_name = "{0}_data".format(self.name)
653 else:
654 data_pool_name = self.data_pool_name
655
656 # will use the ec pool to store the data and a small amount of
657 # metadata still goes to the primary data pool for all files.
658 if not self.metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
659 self.target_size_ratio = 0.05
660
661 log.debug("Creating filesystem '{0}'".format(self.name))
662
663 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
664 self.metadata_pool_name, str(self.pg_num),
665 '--pg_num_min', str(self.pg_num_min))
666
667 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
668 data_pool_name, str(self.pg_num),
669 '--pg_num_min', str(self.pg_num_min),
670 '--target_size_ratio',
671 str(self.target_size_ratio))
672
673 if self.metadata_overlay:
674 self.mon_manager.raw_cluster_cmd('fs', 'new',
675 self.name, self.metadata_pool_name, data_pool_name,
676 '--allow-dangerous-metadata-overlay')
677 else:
678 self.mon_manager.raw_cluster_cmd('fs', 'new',
679 self.name,
680 self.metadata_pool_name,
681 data_pool_name)
682
683 if self.ec_profile and 'disabled' not in self.ec_profile:
684 ec_data_pool_name = data_pool_name + "_ec"
685 log.debug("EC profile is %s", self.ec_profile)
686 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
687 cmd.extend(self.ec_profile)
688 self.mon_manager.raw_cluster_cmd(*cmd)
689 self.mon_manager.raw_cluster_cmd(
690 'osd', 'pool', 'create', ec_data_pool_name,
691 'erasure', ec_data_pool_name,
692 '--pg_num_min', str(self.pg_num_min),
693 '--target_size_ratio', str(self.target_size_ratio_ec))
694 self.mon_manager.raw_cluster_cmd(
695 'osd', 'pool', 'set',
696 ec_data_pool_name, 'allow_ec_overwrites', 'true')
697 self.add_data_pool(ec_data_pool_name, create=False)
698 self.check_pool_application(ec_data_pool_name)
699
700 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
701
702 self.check_pool_application(self.metadata_pool_name)
703 self.check_pool_application(data_pool_name)
704
705 # Turn off spurious standby count warnings from modifying max_mds in tests.
706 try:
707 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
708 except CommandFailedError as e:
709 if e.exitstatus == 22:
710 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
711 pass
712 else:
713 raise
714
715 if self.fs_config is not None:
716 max_mds = self.fs_config.get('max_mds', 1)
717 if max_mds > 1:
718 self.set_max_mds(max_mds)
719
720 standby_replay = self.fs_config.get('standby_replay', False)
721 self.set_allow_standby_replay(standby_replay)
722
723 # If absent will use the default value (60 seconds)
724 session_timeout = self.fs_config.get('session_timeout', 60)
725 if session_timeout != 60:
726 self.set_session_timeout(session_timeout)
727
728 self.getinfo(refresh = True)
729
730 # wait pgs to be clean
731 self.mon_manager.wait_for_clean()
732
733 def run_client_payload(self, cmd):
734 # avoid circular dep by importing here:
735 from tasks.cephfs.fuse_mount import FuseMount
736 d = misc.get_testdir(self._ctx)
737 m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
738 m.mount_wait()
739 m.run_shell_payload(cmd)
740 m.umount_wait(require_clean=True)
741
742 def _remove_pool(self, name, **kwargs):
743 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
744 return self.mon_manager.ceph(c, **kwargs)
745
746 def rm(self, **kwargs):
747 c = f'fs rm {self.name} --yes-i-really-mean-it'
748 return self.mon_manager.ceph(c, **kwargs)
749
750 def remove_pools(self, data_pools):
751 self._remove_pool(self.get_metadata_pool_name())
752 for poolname in data_pools:
753 try:
754 self._remove_pool(poolname)
755 except CommandFailedError as e:
756 # EBUSY, this data pool is used by two metadata pools, let the
757 # 2nd pass delete it
758 if e.exitstatus == EBUSY:
759 pass
760 else:
761 raise
762
763 def destroy(self, reset_obj_attrs=True):
764 log.info(f'Destroying file system {self.name} and related pools')
765
766 if self.dead():
767 log.debug('already dead...')
768 return
769
770 data_pools = self.get_data_pool_names(refresh=True)
771
772 # make sure no MDSs are attached to given FS.
773 self.fail()
774 self.rm()
775
776 self.remove_pools(data_pools)
777
778 if reset_obj_attrs:
779 self.id = None
780 self.name = None
781 self.metadata_pool_name = None
782 self.data_pool_name = None
783 self.data_pools = None
784
785 def recreate(self):
786 self.destroy()
787
788 self.create()
789 self.getinfo(refresh=True)
790
791 def check_pool_application(self, pool_name):
792 osd_map = self.mon_manager.get_osd_dump_json()
793 for pool in osd_map['pools']:
794 if pool['pool_name'] == pool_name:
795 if "application_metadata" in pool:
796 if not "cephfs" in pool['application_metadata']:
797 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
798 format(pool_name=pool_name))
799
800 def __del__(self):
801 if getattr(self._ctx, "filesystem", None) == self:
802 delattr(self._ctx, "filesystem")
803
804 def exists(self):
805 """
806 Whether a filesystem exists in the mon's filesystem list
807 """
808 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
809 return self.name in [fs['name'] for fs in fs_list]
810
811 def legacy_configured(self):
812 """
813 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
814 the case, the caller should avoid using Filesystem.create
815 """
816 try:
817 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
818 pools = json.loads(out_text)
819 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
820 if metadata_pool_exists:
821 self.metadata_pool_name = 'metadata'
822 except CommandFailedError as e:
823 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
824 # structured output (--format) from the CLI.
825 if e.exitstatus == 22:
826 metadata_pool_exists = True
827 else:
828 raise
829
830 return metadata_pool_exists
831
832 def _df(self):
833 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
834
835 # may raise FSMissing
836 def get_mds_map(self, status=None):
837 if status is None:
838 status = self.status()
839 return status.get_fsmap(self.id)['mdsmap']
840
841 def get_var(self, var, status=None):
842 return self.get_mds_map(status=status)[var]
843
844 def set_dir_layout(self, mount, path, layout):
845 for name, value in layout.items():
846 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
847
848 def add_data_pool(self, name, create=True):
849 if create:
850 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
851 '--pg_num_min', str(self.pg_num_min))
852 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
853 self.get_pool_names(refresh = True)
854 for poolid, fs_name in self.data_pools.items():
855 if name == fs_name:
856 return poolid
857 raise RuntimeError("could not get just created pool '{0}'".format(name))
858
859 def get_pool_names(self, refresh = False, status = None):
860 if refresh or self.metadata_pool_name is None or self.data_pools is None:
861 if status is None:
862 status = self.status()
863 fsmap = status.get_fsmap(self.id)
864
865 osd_map = self.mon_manager.get_osd_dump_json()
866 id_to_name = {}
867 for p in osd_map['pools']:
868 id_to_name[p['pool']] = p['pool_name']
869
870 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
871 self.data_pools = {}
872 for data_pool in fsmap['mdsmap']['data_pools']:
873 self.data_pools[data_pool] = id_to_name[data_pool]
874
875 def get_data_pool_name(self, refresh = False):
876 if refresh or self.data_pools is None:
877 self.get_pool_names(refresh = True)
878 assert(len(self.data_pools) == 1)
879 return next(iter(self.data_pools.values()))
880
881 def get_data_pool_id(self, refresh = False):
882 """
883 Don't call this if you have multiple data pools
884 :return: integer
885 """
886 if refresh or self.data_pools is None:
887 self.get_pool_names(refresh = True)
888 assert(len(self.data_pools) == 1)
889 return next(iter(self.data_pools.keys()))
890
891 def get_data_pool_names(self, refresh = False):
892 if refresh or self.data_pools is None:
893 self.get_pool_names(refresh = True)
894 return list(self.data_pools.values())
895
896 def get_metadata_pool_name(self):
897 return self.metadata_pool_name
898
899 def set_data_pool_name(self, name):
900 if self.id is not None:
901 raise RuntimeError("can't set filesystem name if its fscid is set")
902 self.data_pool_name = name
903
904 def get_pool_pg_num(self, pool_name):
905 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
906 pool_name, 'pg_num',
907 '--format=json-pretty'))
908 return int(pgs['pg_num'])
909
910 def get_namespace_id(self):
911 return self.id
912
913 def get_pool_df(self, pool_name):
914 """
915 Return a dict like:
916 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
917 """
918 for pool_df in self._df()['pools']:
919 if pool_df['name'] == pool_name:
920 return pool_df['stats']
921
922 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
923
924 def get_usage(self):
925 return self._df()['stats']['total_used_bytes']
926
927 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
928 """
929 Return true if all daemons are in one of active, standby, standby-replay, and
930 at least max_mds daemons are in 'active'.
931
932 Unlike most of Filesystem, this function is tolerant of new-style `fs`
933 commands being missing, because we are part of the ceph installation
934 process during upgrade suites, so must fall back to old style commands
935 when we get an EINVAL on a new style command.
936
937 :return:
938 """
939 # First, check to see that processes haven't exited with an error code
940 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
941 mds.check_status()
942
943 active_count = 0
944 mds_map = self.get_mds_map(status=status)
945
946 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
947
948 for mds_id, mds_status in mds_map['info'].items():
949 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
950 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
951 return False
952 elif mds_status['state'] == 'up:active':
953 active_count += 1
954
955 log.debug("are_daemons_healthy: {0}/{1}".format(
956 active_count, mds_map['max_mds']
957 ))
958
959 if not skip_max_mds_check:
960 if active_count > mds_map['max_mds']:
961 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
962 return False
963 elif active_count == mds_map['max_mds']:
964 # The MDSMap says these guys are active, but let's check they really are
965 for mds_id, mds_status in mds_map['info'].items():
966 if mds_status['state'] == 'up:active':
967 try:
968 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
969 except CommandFailedError as cfe:
970 if cfe.exitstatus == errno.EINVAL:
971 # Old version, can't do this check
972 continue
973 else:
974 # MDS not even running
975 return False
976
977 if daemon_status['state'] != 'up:active':
978 # MDS hasn't taken the latest map yet
979 return False
980
981 return True
982 else:
983 return False
984 else:
985 log.debug("are_daemons_healthy: skipping max_mds check")
986 return True
987
988 def get_daemon_names(self, state=None, status=None):
989 """
990 Return MDS daemon names of those daemons in the given state
991 :param state:
992 :return:
993 """
994 mdsmap = self.get_mds_map(status)
995 result = []
996 for mds_status in sorted(mdsmap['info'].values(),
997 key=lambda _: _['rank']):
998 if mds_status['state'] == state or state is None:
999 result.append(mds_status['name'])
1000
1001 return result
1002
1003 def get_active_names(self, status=None):
1004 """
1005 Return MDS daemon names of those daemons holding ranks
1006 in state up:active
1007
1008 :return: list of strings like ['a', 'b'], sorted by rank
1009 """
1010 return self.get_daemon_names("up:active", status=status)
1011
1012 def get_all_mds_rank(self, status=None):
1013 mdsmap = self.get_mds_map(status)
1014 result = []
1015 for mds_status in sorted(mdsmap['info'].values(),
1016 key=lambda _: _['rank']):
1017 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1018 result.append(mds_status['rank'])
1019
1020 return result
1021
1022 def get_rank(self, rank=None, status=None):
1023 if status is None:
1024 status = self.getinfo()
1025 if rank is None:
1026 rank = 0
1027 return status.get_rank(self.id, rank)
1028
1029 def rank_restart(self, rank=0, status=None):
1030 name = self.get_rank(rank=rank, status=status)['name']
1031 self.mds_restart(mds_id=name)
1032
1033 def rank_signal(self, signal, rank=0, status=None):
1034 name = self.get_rank(rank=rank, status=status)['name']
1035 self.mds_signal(name, signal)
1036
1037 def rank_freeze(self, yes, rank=0):
1038 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1039
1040 def rank_fail(self, rank=0):
1041 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1042
1043 def get_ranks(self, status=None):
1044 if status is None:
1045 status = self.getinfo()
1046 return status.get_ranks(self.id)
1047
1048 def get_replays(self, status=None):
1049 if status is None:
1050 status = self.getinfo()
1051 return status.get_replays(self.id)
1052
1053 def get_replay(self, rank=0, status=None):
1054 for replay in self.get_replays(status=status):
1055 if replay['rank'] == rank:
1056 return replay
1057 return None
1058
1059 def get_rank_names(self, status=None):
1060 """
1061 Return MDS daemon names of those daemons holding a rank,
1062 sorted by rank. This includes e.g. up:replay/reconnect
1063 as well as active, but does not include standby or
1064 standby-replay.
1065 """
1066 mdsmap = self.get_mds_map(status)
1067 result = []
1068 for mds_status in sorted(mdsmap['info'].values(),
1069 key=lambda _: _['rank']):
1070 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1071 result.append(mds_status['name'])
1072
1073 return result
1074
1075 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
1076 """
1077 Wait until all daemons are healthy
1078 :return:
1079 """
1080
1081 if timeout is None:
1082 timeout = DAEMON_WAIT_TIMEOUT
1083
1084 if status is None:
1085 status = self.status()
1086
1087 elapsed = 0
1088 while True:
1089 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1090 return status
1091 else:
1092 time.sleep(1)
1093 elapsed += 1
1094
1095 if elapsed > timeout:
1096 log.debug("status = {0}".format(status))
1097 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1098
1099 status = self.status()
1100
1101 def dencoder(self, obj_type, obj_blob):
1102 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1103 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1104 return p.stdout.getvalue()
1105
1106 def rados(self, *args, **kwargs):
1107 """
1108 Callout to rados CLI.
1109 """
1110
1111 return self.mon_manager.do_rados(*args, **kwargs)
1112
1113 def radosm(self, *args, **kwargs):
1114 """
1115 Interact with the metadata pool via rados CLI.
1116 """
1117
1118 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1119
1120 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
1121 """
1122 Interact with the metadata pool via rados CLI. Get the stdout.
1123 """
1124
1125 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
1126
1127 def get_metadata_object(self, object_type, object_id):
1128 """
1129 Retrieve an object from the metadata pool, pass it through
1130 ceph-dencoder to dump it to JSON, and return the decoded object.
1131 """
1132
1133 o = self.radosmo(['get', object_id, '-'])
1134 j = self.dencoder(object_type, o)
1135 try:
1136 return json.loads(j)
1137 except (TypeError, ValueError):
1138 log.error("Failed to decode JSON: '{0}'".format(j))
1139 raise
1140
1141 def get_journal_version(self):
1142 """
1143 Read the JournalPointer and Journal::Header objects to learn the version of
1144 encoding in use.
1145 """
1146 journal_pointer_object = '400.00000000'
1147 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1148 journal_ino = journal_pointer_dump['journal_pointer']['front']
1149
1150 journal_header_object = "{0:x}.00000000".format(journal_ino)
1151 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1152
1153 version = journal_header_dump['journal_header']['stream_format']
1154 log.debug("Read journal version {0}".format(version))
1155
1156 return version
1157
1158 def mds_asok(self, command, mds_id=None, timeout=None):
1159 if mds_id is None:
1160 return self.rank_asok(command, timeout=timeout)
1161
1162 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
1163
1164 def mds_tell(self, command, mds_id=None):
1165 if mds_id is None:
1166 return self.rank_tell(command)
1167
1168 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1169
1170 def rank_asok(self, command, rank=0, status=None, timeout=None):
1171 info = self.get_rank(rank=rank, status=status)
1172 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
1173
1174 def rank_tell(self, command, rank=0, status=None):
1175 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
1176
1177 def ranks_tell(self, command, status=None):
1178 if status is None:
1179 status = self.status()
1180 out = []
1181 for r in status.get_ranks(self.id):
1182 result = self.rank_tell(command, rank=r['rank'], status=status)
1183 out.append((r['rank'], result))
1184 return sorted(out)
1185
1186 def ranks_perf(self, f, status=None):
1187 perf = self.ranks_tell(["perf", "dump"], status=status)
1188 out = []
1189 for rank, perf in perf:
1190 out.append((rank, f(perf)))
1191 return out
1192
1193 def read_cache(self, path, depth=None):
1194 cmd = ["dump", "tree", path]
1195 if depth is not None:
1196 cmd.append(depth.__str__())
1197 result = self.mds_asok(cmd)
1198 if len(result) == 0:
1199 raise RuntimeError("Path not found in cache: {0}".format(path))
1200
1201 return result
1202
1203 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1204 """
1205 Block until the MDS reaches a particular state, or a failure condition
1206 is met.
1207
1208 When there are multiple MDSs, succeed when exaclty one MDS is in the
1209 goal state, or fail when any MDS is in the reject state.
1210
1211 :param goal_state: Return once the MDS is in this state
1212 :param reject: Fail if the MDS enters this state before the goal state
1213 :param timeout: Fail if this many seconds pass before reaching goal
1214 :return: number of seconds waited, rounded down to integer
1215 """
1216
1217 started_at = time.time()
1218 while True:
1219 status = self.status()
1220 if rank is not None:
1221 try:
1222 mds_info = status.get_rank(self.id, rank)
1223 current_state = mds_info['state'] if mds_info else None
1224 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
1225 except:
1226 mdsmap = self.get_mds_map(status=status)
1227 if rank in mdsmap['failed']:
1228 log.debug("Waiting for rank {0} to come back.".format(rank))
1229 current_state = None
1230 else:
1231 raise
1232 elif mds_id is not None:
1233 # mds_info is None if no daemon with this ID exists in the map
1234 mds_info = status.get_mds(mds_id)
1235 current_state = mds_info['state'] if mds_info else None
1236 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
1237 else:
1238 # In general, look for a single MDS
1239 states = [m['state'] for m in status.get_ranks(self.id)]
1240 if [s for s in states if s == goal_state] == [goal_state]:
1241 current_state = goal_state
1242 elif reject in states:
1243 current_state = reject
1244 else:
1245 current_state = None
1246 log.debug("mapped states {0} to {1}".format(states, current_state))
1247
1248 elapsed = time.time() - started_at
1249 if current_state == goal_state:
1250 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
1251 return elapsed
1252 elif reject is not None and current_state == reject:
1253 raise RuntimeError("MDS in reject state {0}".format(current_state))
1254 elif timeout is not None and elapsed > timeout:
1255 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1256 raise RuntimeError(
1257 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1258 elapsed, goal_state, current_state
1259 ))
1260 else:
1261 time.sleep(1)
1262
1263 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
1264 if pool is None:
1265 pool = self.get_data_pool_name()
1266
1267 obj_name = "{0:x}.00000000".format(ino_no)
1268
1269 args = ["getxattr", obj_name, xattr_name]
1270 try:
1271 proc = self.rados(args, pool=pool, stdout=BytesIO())
1272 except CommandFailedError as e:
1273 log.error(e.__str__())
1274 raise ObjectNotFound(obj_name)
1275
1276 obj_blob = proc.stdout.getvalue()
1277 return json.loads(self.dencoder(obj_type, obj_blob).strip())
1278
1279 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1280 """
1281 Write to an xattr of the 0th data object of an inode. Will
1282 succeed whether the object and/or xattr already exist or not.
1283
1284 :param ino_no: integer inode number
1285 :param xattr_name: string name of the xattr
1286 :param data: byte array data to write to the xattr
1287 :param pool: name of data pool or None to use primary data pool
1288 :return: None
1289 """
1290 if pool is None:
1291 pool = self.get_data_pool_name()
1292
1293 obj_name = "{0:x}.00000000".format(ino_no)
1294 args = ["setxattr", obj_name, xattr_name, data]
1295 self.rados(args, pool=pool)
1296
1297 def read_backtrace(self, ino_no, pool=None):
1298 """
1299 Read the backtrace from the data pool, return a dict in the format
1300 given by inode_backtrace_t::dump, which is something like:
1301
1302 ::
1303
1304 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1305 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1306
1307 { "ino": 1099511627778,
1308 "ancestors": [
1309 { "dirino": 1,
1310 "dname": "blah",
1311 "version": 11}],
1312 "pool": 1,
1313 "old_pools": []}
1314
1315 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1316 one data pool and that will be used.
1317 """
1318 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1319
1320 def read_layout(self, ino_no, pool=None):
1321 """
1322 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1323 ::
1324 {
1325 "stripe_unit": 4194304,
1326 "stripe_count": 1,
1327 "object_size": 4194304,
1328 "pool_id": 1,
1329 "pool_ns": "",
1330 }
1331
1332 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1333 one data pool and that will be used.
1334 """
1335 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1336
1337 def _enumerate_data_objects(self, ino, size):
1338 """
1339 Get the list of expected data objects for a range, and the list of objects
1340 that really exist.
1341
1342 :return a tuple of two lists of strings (expected, actual)
1343 """
1344 stripe_size = 1024 * 1024 * 4
1345
1346 size = max(stripe_size, size)
1347
1348 want_objects = [
1349 "{0:x}.{1:08x}".format(ino, n)
1350 for n in range(0, ((size - 1) // stripe_size) + 1)
1351 ]
1352
1353 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
1354
1355 return want_objects, exist_objects
1356
1357 def data_objects_present(self, ino, size):
1358 """
1359 Check that *all* the expected data objects for an inode are present in the data pool
1360 """
1361
1362 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1363 missing = set(want_objects) - set(exist_objects)
1364
1365 if missing:
1366 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
1367 ino, size, missing
1368 ))
1369 return False
1370 else:
1371 log.debug("All objects for ino {0} size {1} found".format(ino, size))
1372 return True
1373
1374 def data_objects_absent(self, ino, size):
1375 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1376 present = set(want_objects) & set(exist_objects)
1377
1378 if present:
1379 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
1380 ino, size, present
1381 ))
1382 return False
1383 else:
1384 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
1385 return True
1386
1387 def dirfrag_exists(self, ino, frag):
1388 try:
1389 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
1390 except CommandFailedError:
1391 return False
1392 else:
1393 return True
1394
1395 def list_dirfrag(self, dir_ino):
1396 """
1397 Read the named object and return the list of omap keys
1398
1399 :return a list of 0 or more strings
1400 """
1401
1402 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1403
1404 try:
1405 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
1406 except CommandFailedError as e:
1407 log.error(e.__str__())
1408 raise ObjectNotFound(dirfrag_obj_name)
1409
1410 return key_list_str.strip().split("\n") if key_list_str else []
1411
1412 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1413 """
1414 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1415 warning : The splitting of directory is not considered here.
1416 """
1417
1418 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1419 try:
1420 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1421 except CommandFailedError as e:
1422 log.error(e.__str__())
1423 raise ObjectNotFound(dir_ino)
1424
1425 def erase_metadata_objects(self, prefix):
1426 """
1427 For all objects in the metadata pool matching the prefix,
1428 erase them.
1429
1430 This O(N) with the number of objects in the pool, so only suitable
1431 for use on toy test filesystems.
1432 """
1433 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
1434 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1435 for o in matching_objects:
1436 self.radosm(["rm", o])
1437
1438 def erase_mds_objects(self, rank):
1439 """
1440 Erase all the per-MDS objects for a particular rank. This includes
1441 inotable, sessiontable, journal
1442 """
1443
1444 def obj_prefix(multiplier):
1445 """
1446 MDS object naming conventions like rank 1's
1447 journal is at 201.***
1448 """
1449 return "%x." % (multiplier * 0x100 + rank)
1450
1451 # MDS_INO_LOG_OFFSET
1452 self.erase_metadata_objects(obj_prefix(2))
1453 # MDS_INO_LOG_BACKUP_OFFSET
1454 self.erase_metadata_objects(obj_prefix(3))
1455 # MDS_INO_LOG_POINTER_OFFSET
1456 self.erase_metadata_objects(obj_prefix(4))
1457 # MDSTables & SessionMap
1458 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1459
1460 @property
1461 def _prefix(self):
1462 """
1463 Override this to set a different
1464 """
1465 return ""
1466
1467 def _make_rank(self, rank):
1468 return "{}:{}".format(self.name, rank)
1469
1470 def _run_tool(self, tool, args, rank=None, quiet=False):
1471 # Tests frequently have [client] configuration that jacks up
1472 # the objecter log level (unlikely to be interesting here)
1473 # and does not set the mds log level (very interesting here)
1474 if quiet:
1475 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1476 else:
1477 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1478
1479 if rank is not None:
1480 base_args.extend(["--rank", "%s" % str(rank)])
1481
1482 t1 = datetime.datetime.now()
1483 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
1484 duration = datetime.datetime.now() - t1
1485 log.debug("Ran {0} in time {1}, result:\n{2}".format(
1486 base_args + args, duration, r
1487 ))
1488 return r
1489
1490 @property
1491 def tool_remote(self):
1492 """
1493 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1494 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1495 so that tests can use this remote to go get locally written output files from the tools.
1496 """
1497 return self.mon_manager.controller
1498
1499 def journal_tool(self, args, rank, quiet=False):
1500 """
1501 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
1502 """
1503 fs_rank = self._make_rank(rank)
1504 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
1505
1506 def meta_tool(self, args, rank, quiet=False):
1507 """
1508 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1509 """
1510 fs_rank = self._make_rank(rank)
1511 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1512
1513 def table_tool(self, args, quiet=False):
1514 """
1515 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1516 """
1517 return self._run_tool("cephfs-table-tool", args, None, quiet)
1518
1519 def data_scan(self, args, quiet=False, worker_count=1):
1520 """
1521 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1522
1523 :param worker_count: if greater than 1, multiple workers will be run
1524 in parallel and the return value will be None
1525 """
1526
1527 workers = []
1528
1529 for n in range(0, worker_count):
1530 if worker_count > 1:
1531 # data-scan args first token is a command, followed by args to it.
1532 # insert worker arguments after the command.
1533 cmd = args[0]
1534 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1535 else:
1536 worker_args = args
1537
1538 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1539 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1540
1541 for w in workers:
1542 w.get()
1543
1544 if worker_count == 1:
1545 return workers[0].value
1546 else:
1547 return None
1548
1549 def is_full(self):
1550 return self.is_pool_full(self.get_data_pool_name())
1551
1552 def authorize(self, client_id, caps=('/', 'rw')):
1553 """
1554 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1555 keyring.
1556
1557 client_id: client id that will be authorized
1558 caps: tuple containing the path and permission (can be r or rw)
1559 respectively.
1560 """
1561 client_name = 'client.' + client_id
1562 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1563 client_name, *caps)
1564
1565 def grow(self, new_max_mds, status=None):
1566 oldmax = self.get_var('max_mds', status=status)
1567 assert(new_max_mds > oldmax)
1568 self.set_max_mds(new_max_mds)
1569 return self.wait_for_daemons()
1570
1571 def shrink(self, new_max_mds, status=None):
1572 oldmax = self.get_var('max_mds', status=status)
1573 assert(new_max_mds < oldmax)
1574 self.set_max_mds(new_max_mds)
1575 return self.wait_for_daemons()
1576
1577 def run_scrub(self, cmd, rank=0):
1578 return self.rank_tell(["scrub"] + cmd, rank)
1579
1580 def get_scrub_status(self, rank=0):
1581 return self.run_scrub(["status"], rank)
1582
1583 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1584 timeout=300, reverse=False):
1585 # time out after "timeout" seconds and assume as done
1586 if result is None:
1587 result = "no active scrubs running"
1588 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1589 while proceed():
1590 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1591 assert out_json is not None
1592 if not reverse:
1593 if result in out_json['status']:
1594 log.info("all active scrubs completed")
1595 return True
1596 else:
1597 if result not in out_json['status']:
1598 log.info("all active scrubs completed")
1599 return True
1600
1601 if tag is not None:
1602 status = out_json['scrubs'][tag]
1603 if status is not None:
1604 log.info(f"scrub status for tag:{tag} - {status}")
1605 else:
1606 log.info(f"scrub has completed for tag:{tag}")
1607 return True
1608
1609 # timed out waiting for scrub to complete
1610 return False