]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
compile with GCC 12 not 11
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
7c673cae 11
f67539c2
TL
12from io import BytesIO, StringIO
13from errno import EBUSY
e306af50 14
7c673cae
FG
15from teuthology.exceptions import CommandFailedError
16from teuthology import misc
17from teuthology.nuke import clear_firewall
18from teuthology.parallel import parallel
f67539c2 19from teuthology import contextutil
7c673cae
FG
20from tasks.ceph_manager import write_conf
21from tasks import ceph_manager
22
23
24log = logging.getLogger(__name__)
25
26
27DAEMON_WAIT_TIMEOUT = 120
28ROOT_INO = 1
29
92f5a8d4
TL
30class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
7c673cae
FG
54
55class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
f67539c2
TL
62class FSMissing(Exception):
63 def __init__(self, ident):
64 self.ident = ident
65
66 def __str__(self):
67 return f"File system {self.ident} does not exist in the map"
68
7c673cae
FG
69class FSStatus(object):
70 """
71 Operations on a snapshot of the FSMap.
72 """
522d829b 73 def __init__(self, mon_manager, epoch=None):
7c673cae 74 self.mon = mon_manager
522d829b
TL
75 cmd = ["fs", "dump", "--format=json"]
76 if epoch is not None:
77 cmd.append(str(epoch))
78 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
7c673cae
FG
79
80 def __str__(self):
81 return json.dumps(self.map, indent = 2, sort_keys = True)
82
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self, key):
85 """
86 Get a field from the fsmap.
87 """
88 return self.map[key]
89
90 def get_filesystems(self):
91 """
92 Iterator for all filesystems.
93 """
94 for fs in self.map['filesystems']:
95 yield fs
96
97 def get_all(self):
98 """
99 Iterator for all the mds_info components in the FSMap.
100 """
9f95a23c 101 for info in self.map['standbys']:
7c673cae
FG
102 yield info
103 for fs in self.map['filesystems']:
104 for info in fs['mdsmap']['info'].values():
105 yield info
106
107 def get_standbys(self):
108 """
109 Iterator for all standbys.
110 """
111 for info in self.map['standbys']:
112 yield info
113
114 def get_fsmap(self, fscid):
115 """
116 Get the fsmap for the given FSCID.
117 """
118 for fs in self.map['filesystems']:
119 if fscid is None or fs['id'] == fscid:
120 return fs
f67539c2 121 raise FSMissing(fscid)
7c673cae
FG
122
123 def get_fsmap_byname(self, name):
124 """
125 Get the fsmap for the given file system name.
126 """
127 for fs in self.map['filesystems']:
128 if name is None or fs['mdsmap']['fs_name'] == name:
129 return fs
f67539c2 130 raise FSMissing(name)
7c673cae
FG
131
132 def get_replays(self, fscid):
133 """
134 Get the standby:replay MDS for the given FSCID.
135 """
136 fs = self.get_fsmap(fscid)
137 for info in fs['mdsmap']['info'].values():
138 if info['state'] == 'up:standby-replay':
139 yield info
140
141 def get_ranks(self, fscid):
142 """
143 Get the ranks for the given FSCID.
144 """
145 fs = self.get_fsmap(fscid)
146 for info in fs['mdsmap']['info'].values():
11fdf7f2 147 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
148 yield info
149
a4b75251
TL
150 def get_damaged(self, fscid):
151 """
152 Get the damaged ranks for the given FSCID.
153 """
154 fs = self.get_fsmap(fscid)
155 return fs['mdsmap']['damaged']
156
7c673cae
FG
157 def get_rank(self, fscid, rank):
158 """
159 Get the rank for the given FSCID.
160 """
161 for info in self.get_ranks(fscid):
162 if info['rank'] == rank:
163 return info
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
165
166 def get_mds(self, name):
167 """
168 Get the info for the given MDS name.
169 """
170 for info in self.get_all():
171 if info['name'] == name:
172 return info
173 return None
174
175 def get_mds_addr(self, name):
176 """
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
178 """
179 info = self.get_mds(name)
180 if info:
181 return info['addr']
182 else:
e306af50 183 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
184 raise RuntimeError("MDS id '{0}' not found in map".format(name))
185
f67539c2
TL
186 def get_mds_addrs(self, name):
187 """
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
189 """
190 info = self.get_mds(name)
191 if info:
192 return [e['addr'] for e in info['addrs']['addrvec']]
193 else:
194 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name))
196
9f95a23c
TL
197 def get_mds_gid(self, gid):
198 """
199 Get the info for the given MDS gid.
200 """
201 for info in self.get_all():
202 if info['gid'] == gid:
203 return info
204 return None
205
206 def hadfailover(self, status):
207 """
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
210 """
211 for fs in status.map['filesystems']:
212 for info in fs['mdsmap']['info'].values():
213 oldinfo = self.get_mds_gid(info['gid'])
214 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
215 return True
216 #all matching
217 return False
218
7c673cae
FG
219class CephCluster(object):
220 @property
221 def admin_remote(self):
222 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 223 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
224 return result
225
f67539c2 226 def __init__(self, ctx) -> None:
7c673cae
FG
227 self._ctx = ctx
228 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
229
230 def get_config(self, key, service_type=None):
231 """
232 Get config from mon by default, or a specific service if caller asks for it
233 """
234 if service_type is None:
235 service_type = 'mon'
236
237 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def set_ceph_conf(self, subsys, key, value):
241 if subsys not in self._ctx.ceph['ceph'].conf:
242 self._ctx.ceph['ceph'].conf[subsys] = {}
243 self._ctx.ceph['ceph'].conf[subsys][key] = value
244 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
246
247 def clear_ceph_conf(self, subsys, key):
248 del self._ctx.ceph['ceph'].conf[subsys][key]
249 write_conf(self._ctx)
250
f64942e4
AA
251 def json_asok(self, command, service_type, service_id, timeout=None):
252 if timeout is None:
1e59de90 253 timeout = 300
f6b5b4d7 254 command.insert(0, '--format=json')
f64942e4 255 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
f6b5b4d7
TL
256 response_data = proc.stdout.getvalue().strip()
257 if len(response_data) > 0:
1e59de90
TL
258
259 def get_nonnumeric_values(value):
260 c = {"NaN": float("nan"), "Infinity": float("inf"),
261 "-Infinity": -float("inf")}
262 return c[value]
263
264 j = json.loads(response_data.replace('inf', 'Infinity'),
265 parse_constant=get_nonnumeric_values)
f6b5b4d7
TL
266 pretty = json.dumps(j, sort_keys=True, indent=2)
267 log.debug(f"_json_asok output\n{pretty}")
268 return j
7c673cae 269 else:
f6b5b4d7 270 log.debug("_json_asok output empty")
7c673cae
FG
271 return None
272
33c7a0ef
TL
273 def is_addr_blocklisted(self, addr):
274 blocklist = json.loads(self.mon_manager.raw_cluster_cmd(
275 "osd", "dump", "--format=json"))['blocklist']
276 if addr in blocklist:
277 return True
278 log.warn(f'The address {addr} is not blocklisted')
f67539c2
TL
279 return False
280
7c673cae
FG
281
282class MDSCluster(CephCluster):
283 """
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
286
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
290 """
f67539c2 291
7c673cae
FG
292 def __init__(self, ctx):
293 super(MDSCluster, self).__init__(ctx)
294
f67539c2
TL
295 @property
296 def mds_ids(self):
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
7c673cae 299
f67539c2
TL
300 @property
301 def mds_daemons(self):
302 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
7c673cae
FG
303
304 def _one_or_all(self, mds_id, cb, in_parallel=True):
305 """
306 Call a callback for a single named MDS, or for all.
307
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
312
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
316 """
f67539c2 317
7c673cae
FG
318 if mds_id is None:
319 if in_parallel:
320 with parallel() as p:
321 for mds_id in self.mds_ids:
322 p.spawn(cb, mds_id)
323 else:
324 for mds_id in self.mds_ids:
325 cb(mds_id)
326 else:
327 cb(mds_id)
328
181888fb
FG
329 def get_config(self, key, service_type=None):
330 """
331 get_config specialization of service_type="mds"
332 """
333 if service_type != "mds":
334 return super(MDSCluster, self).get_config(key, service_type)
335
336 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
337 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
338 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
339 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
340
7c673cae
FG
341 def mds_stop(self, mds_id=None):
342 """
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
347
348 def mds_fail(self, mds_id=None):
349 """
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
352 """
353 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
354
355 def mds_restart(self, mds_id=None):
356 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
357
358 def mds_fail_restart(self, mds_id=None):
359 """
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
364 """
365 def _fail_restart(id_):
366 self.mds_daemons[id_].stop()
367 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
368 self.mds_daemons[id_].restart()
369
370 self._one_or_all(mds_id, _fail_restart)
371
1adf2230
AA
372 def mds_signal(self, mds_id, sig, silent=False):
373 """
374 signal a MDS daemon
375 """
376 self.mds_daemons[mds_id].signal(sig, silent);
377
1e59de90
TL
378 def mds_is_running(self, mds_id):
379 return self.mds_daemons[mds_id].running()
380
181888fb
FG
381 def newfs(self, name='cephfs', create=True):
382 return Filesystem(self._ctx, name=name, create=create)
7c673cae 383
522d829b
TL
384 def status(self, epoch=None):
385 return FSStatus(self.mon_manager, epoch)
7c673cae 386
7c673cae
FG
387 def get_standby_daemons(self):
388 return set([s['name'] for s in self.status().get_standbys()])
389
390 def get_mds_hostnames(self):
391 result = set()
392 for mds_id in self.mds_ids:
393 mds_remote = self.mon_manager.find_remote('mds', mds_id)
394 result.add(mds_remote.hostname)
395
396 return list(result)
397
398 def set_clients_block(self, blocked, mds_id=None):
399 """
400 Block (using iptables) client communications to this MDS. Be careful: if
401 other services are running on this MDS, or other MDSs try to talk to this
402 MDS, their communications may also be blocked as collatoral damage.
403
404 :param mds_id: Optional ID of MDS to block, default to all
405 :return:
406 """
407 da_flag = "-A" if blocked else "-D"
408
409 def set_block(_mds_id):
410 remote = self.mon_manager.find_remote('mds', _mds_id)
411 status = self.status()
412
413 addr = status.get_mds_addr(_mds_id)
414 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
415
416 remote.run(
417 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
419 remote.run(
420 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
421 "comment", "--comment", "teuthology"])
422
423 self._one_or_all(mds_id, set_block, in_parallel=False)
424
f67539c2
TL
425 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
426 """
427 Block (using iptables) communications from a provided MDS to other MDSs.
428 Block all ports that an MDS uses for communication.
429
430 :param blocked: True to block the MDS, False otherwise
431 :param mds_rank_1: MDS rank
432 :param mds_rank_2: MDS rank
433 :return:
434 """
435 da_flag = "-A" if blocked else "-D"
436
437 def set_block(mds_ids):
438 status = self.status()
439
440 mds = mds_ids[0]
441 remote = self.mon_manager.find_remote('mds', mds)
442 addrs = status.get_mds_addrs(mds)
443 for addr in addrs:
444 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
445 remote.run(
446 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
1e59de90 447 "comment", "--comment", "teuthology"], omit_sudo=False)
f67539c2
TL
448
449
450 mds = mds_ids[1]
451 remote = self.mon_manager.find_remote('mds', mds)
452 addrs = status.get_mds_addrs(mds)
453 for addr in addrs:
454 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
455 remote.run(
456 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
1e59de90 457 "comment", "--comment", "teuthology"], omit_sudo=False)
f67539c2
TL
458 remote.run(
459 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
1e59de90 460 "comment", "--comment", "teuthology"], omit_sudo=False)
f67539c2
TL
461
462 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
463
7c673cae
FG
464 def clear_firewall(self):
465 clear_firewall(self._ctx)
466
467 def get_mds_info(self, mds_id):
468 return FSStatus(self.mon_manager).get_mds(mds_id)
469
7c673cae
FG
470 def is_pool_full(self, pool_name):
471 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
472 for pool in pools:
473 if pool['pool_name'] == pool_name:
474 return 'full' in pool['flags_names'].split(",")
475
476 raise RuntimeError("Pool not found '{0}'".format(pool_name))
477
f67539c2
TL
478 def delete_all_filesystems(self):
479 """
480 Remove all filesystems that exist, and any pools in use by them.
481 """
482 for fs in self.status().get_filesystems():
483 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
484
20effc67
TL
485 @property
486 def beacon_timeout(self):
487 """
488 Generate an acceptable timeout for the mons to drive some MDSMap change
489 because of missed beacons from some MDS. This involves looking up the
490 grace period in use by the mons and adding an acceptable buffer.
491 """
492
493 grace = float(self.get_config("mds_beacon_grace", service_type="mon"))
494 return grace*2+15
495
f67539c2 496
7c673cae 497class Filesystem(MDSCluster):
1e59de90
TL
498
499 """
500 Generator for all Filesystems in the cluster.
501 """
502 @classmethod
503 def get_all_fs(cls, ctx):
504 mdsc = MDSCluster(ctx)
505 status = mdsc.status()
506 for fs in status.get_filesystems():
507 yield cls(ctx, fscid=fs['id'])
508
7c673cae
FG
509 """
510 This object is for driving a CephFS filesystem. The MDS daemons driven by
511 MDSCluster may be shared with other Filesystems.
512 """
f67539c2 513 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
7c673cae
FG
514 super(Filesystem, self).__init__(ctx)
515
181888fb 516 self.name = name
7c673cae 517 self.id = None
7c673cae 518 self.metadata_pool_name = None
181888fb 519 self.data_pool_name = None
7c673cae 520 self.data_pools = None
f91f0fd5 521 self.fs_config = fs_config
f67539c2 522 self.ec_profile = fs_config.get('ec_profile')
7c673cae
FG
523
524 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
525 self.client_id = client_list[0]
526 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
527
181888fb 528 if name is not None:
7c673cae
FG
529 if fscid is not None:
530 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 531 if create and not self.legacy_configured():
7c673cae 532 self.create()
181888fb
FG
533 else:
534 if fscid is not None:
535 self.id = fscid
536 self.getinfo(refresh = True)
7c673cae
FG
537
538 # Stash a reference to the first created filesystem on ctx, so
539 # that if someone drops to the interactive shell they can easily
540 # poke our methods.
541 if not hasattr(self._ctx, "filesystem"):
542 self._ctx.filesystem = self
543
f67539c2
TL
544 def dead(self):
545 try:
546 return not bool(self.get_mds_map())
547 except FSMissing:
548 return True
549
9f95a23c
TL
550 def get_task_status(self, status_key):
551 return self.mon_manager.get_service_task_status("mds", status_key)
552
7c673cae
FG
553 def getinfo(self, refresh = False):
554 status = self.status()
555 if self.id is not None:
556 fsmap = status.get_fsmap(self.id)
557 elif self.name is not None:
558 fsmap = status.get_fsmap_byname(self.name)
559 else:
560 fss = [fs for fs in status.get_filesystems()]
561 if len(fss) == 1:
562 fsmap = fss[0]
563 elif len(fss) == 0:
564 raise RuntimeError("no file system available")
565 else:
566 raise RuntimeError("more than one file system available")
567 self.id = fsmap['id']
568 self.name = fsmap['mdsmap']['fs_name']
569 self.get_pool_names(status = status, refresh = refresh)
570 return status
571
11fdf7f2 572 def reach_max_mds(self):
20effc67 573 status = self.wait_for_daemons()
11fdf7f2 574 mds_map = self.get_mds_map(status=status)
20effc67 575 assert(mds_map['in'] == list(range(0, mds_map['max_mds'])))
11fdf7f2 576
f67539c2
TL
577 def reset(self):
578 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
579
11fdf7f2
TL
580 def fail(self):
581 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
582
9f95a23c
TL
583 def set_flag(self, var, *args):
584 a = map(lambda x: str(x).lower(), args)
585 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
586
587 def set_allow_multifs(self, yes=True):
588 self.set_flag("enable_multiple", yes)
589
91327a77 590 def set_var(self, var, *args):
9f95a23c 591 a = map(lambda x: str(x).lower(), args)
91327a77
AA
592 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
593
11fdf7f2
TL
594 def set_down(self, down=True):
595 self.set_var("down", str(down).lower())
596
597 def set_joinable(self, joinable=True):
9f95a23c 598 self.set_var("joinable", joinable)
11fdf7f2 599
7c673cae 600 def set_max_mds(self, max_mds):
f64942e4 601 self.set_var("max_mds", "%d" % max_mds)
7c673cae 602
f91f0fd5
TL
603 def set_session_timeout(self, timeout):
604 self.set_var("session_timeout", "%d" % timeout)
605
11fdf7f2 606 def set_allow_standby_replay(self, yes):
9f95a23c 607 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
608
609 def set_allow_new_snaps(self, yes):
9f95a23c 610 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 611
1e59de90
TL
612 def set_bal_rank_mask(self, bal_rank_mask):
613 self.set_var("bal_rank_mask", bal_rank_mask)
614
615 def set_refuse_client_session(self, yes):
616 self.set_var("refuse_client_session", yes)
617
522d829b
TL
618 def compat(self, *args):
619 a = map(lambda x: str(x).lower(), args)
620 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
621
622 def add_compat(self, *args):
623 self.compat("add_compat", *args)
624
625 def add_incompat(self, *args):
626 self.compat("add_incompat", *args)
627
628 def rm_compat(self, *args):
629 self.compat("rm_compat", *args)
630
631 def rm_incompat(self, *args):
632 self.compat("rm_incompat", *args)
633
f67539c2
TL
634 def required_client_features(self, *args, **kwargs):
635 c = ["fs", "required_client_features", self.name, *args]
636 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
637
522d829b
TL
638 # Since v15.1.0 the pg autoscale mode has been enabled as default,
639 # will let the pg autoscale mode to calculate the pg_num as needed.
640 # We set the pg_num_min to 64 to make sure that pg autoscale mode
641 # won't set the pg_num to low to fix Tracker#45434.
642 pg_num = 64
643 pg_num_min = 64
644 target_size_ratio = 0.9
645 target_size_ratio_ec = 0.9
7c673cae 646
1e59de90 647 def create(self, recover=False, metadata_overlay=False):
7c673cae
FG
648 if self.name is None:
649 self.name = "cephfs"
650 if self.metadata_pool_name is None:
651 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
652 if self.data_pool_name is None:
653 data_pool_name = "{0}_data".format(self.name)
654 else:
655 data_pool_name = self.data_pool_name
7c673cae 656
522d829b
TL
657 # will use the ec pool to store the data and a small amount of
658 # metadata still goes to the primary data pool for all files.
1e59de90 659 if not metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
522d829b
TL
660 self.target_size_ratio = 0.05
661
cd265ab1 662 log.debug("Creating filesystem '{0}'".format(self.name))
7c673cae 663
39ae355f
TL
664 try:
665 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
666 self.metadata_pool_name,
667 '--pg_num_min', str(self.pg_num_min))
f67539c2 668
39ae355f
TL
669 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
670 data_pool_name, str(self.pg_num),
671 '--pg_num_min', str(self.pg_num_min),
672 '--target_size_ratio',
673 str(self.target_size_ratio))
674 except CommandFailedError as e:
675 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
676 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
677 self.metadata_pool_name,
678 str(self.pg_num_min))
679
680 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
681 data_pool_name, str(self.pg_num),
682 str(self.pg_num_min))
683 else:
684 raise
f67539c2 685
1e59de90
TL
686 args = ["fs", "new", self.name, self.metadata_pool_name, data_pool_name]
687 if recover:
688 args.append('--recover')
689 if metadata_overlay:
690 args.append('--allow-dangerous-metadata-overlay')
691 self.mon_manager.raw_cluster_cmd(*args)
f67539c2 692
1e59de90 693 if not recover:
b32b8144 694 if self.ec_profile and 'disabled' not in self.ec_profile:
f67539c2 695 ec_data_pool_name = data_pool_name + "_ec"
cd265ab1 696 log.debug("EC profile is %s", self.ec_profile)
f67539c2 697 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
3efd9988
FG
698 cmd.extend(self.ec_profile)
699 self.mon_manager.raw_cluster_cmd(*cmd)
39ae355f
TL
700 try:
701 self.mon_manager.raw_cluster_cmd(
702 'osd', 'pool', 'create', ec_data_pool_name,
703 'erasure', ec_data_pool_name,
704 '--pg_num_min', str(self.pg_num_min),
705 '--target_size_ratio', str(self.target_size_ratio_ec))
706 except CommandFailedError as e:
707 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
708 self.mon_manager.raw_cluster_cmd(
709 'osd', 'pool', 'create', ec_data_pool_name,
710 str(self.pg_num_min), 'erasure', ec_data_pool_name)
711 else:
712 raise
3efd9988
FG
713 self.mon_manager.raw_cluster_cmd(
714 'osd', 'pool', 'set',
f67539c2
TL
715 ec_data_pool_name, 'allow_ec_overwrites', 'true')
716 self.add_data_pool(ec_data_pool_name, create=False)
717 self.check_pool_application(ec_data_pool_name)
718
719 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
720
35e4c445
FG
721 self.check_pool_application(self.metadata_pool_name)
722 self.check_pool_application(data_pool_name)
f67539c2 723
7c673cae 724 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
725 try:
726 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
727 except CommandFailedError as e:
728 if e.exitstatus == 22:
729 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
730 pass
731 else:
732 raise
7c673cae 733
f91f0fd5 734 if self.fs_config is not None:
1e59de90 735 log.debug(f"fs_config: {self.fs_config}")
f91f0fd5
TL
736 max_mds = self.fs_config.get('max_mds', 1)
737 if max_mds > 1:
738 self.set_max_mds(max_mds)
739
f67539c2
TL
740 standby_replay = self.fs_config.get('standby_replay', False)
741 self.set_allow_standby_replay(standby_replay)
742
f91f0fd5
TL
743 # If absent will use the default value (60 seconds)
744 session_timeout = self.fs_config.get('session_timeout', 60)
745 if session_timeout != 60:
746 self.set_session_timeout(session_timeout)
747
1e59de90
TL
748 if self.fs_config.get('subvols', None) is not None:
749 log.debug(f"Creating {self.fs_config.get('subvols')} subvols "
750 f"for filesystem '{self.name}'")
751 if not hasattr(self._ctx, "created_subvols"):
752 self._ctx.created_subvols = dict()
753
754 subvols = self.fs_config.get('subvols')
755 assert(isinstance(subvols, dict))
756 assert(isinstance(subvols['create'], int))
757 assert(subvols['create'] > 0)
758
759 for sv in range(0, subvols['create']):
760 sv_name = f'sv_{sv}'
761 self.mon_manager.raw_cluster_cmd(
762 'fs', 'subvolume', 'create', self.name, sv_name,
763 self.fs_config.get('subvol_options', ''))
764
765 if self.name not in self._ctx.created_subvols:
766 self._ctx.created_subvols[self.name] = []
767
768 subvol_path = self.mon_manager.raw_cluster_cmd(
769 'fs', 'subvolume', 'getpath', self.name, sv_name)
770 subvol_path = subvol_path.strip()
771 self._ctx.created_subvols[self.name].append(subvol_path)
772 else:
773 log.debug(f"Not Creating any subvols for filesystem '{self.name}'")
774
775
7c673cae
FG
776 self.getinfo(refresh = True)
777
522d829b
TL
778 # wait pgs to be clean
779 self.mon_manager.wait_for_clean()
780
f67539c2
TL
781 def run_client_payload(self, cmd):
782 # avoid circular dep by importing here:
783 from tasks.cephfs.fuse_mount import FuseMount
2a845540
TL
784
785 # Wait for at MDS daemons to be ready before mounting the
786 # ceph-fuse client in run_client_payload()
787 self.wait_for_daemons()
788
f67539c2 789 d = misc.get_testdir(self._ctx)
20effc67 790 m = FuseMount(self._ctx, d, "admin", self.client_remote, cephfs_name=self.name)
522d829b 791 m.mount_wait()
f67539c2
TL
792 m.run_shell_payload(cmd)
793 m.umount_wait(require_clean=True)
794
795 def _remove_pool(self, name, **kwargs):
796 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
797 return self.mon_manager.ceph(c, **kwargs)
798
799 def rm(self, **kwargs):
800 c = f'fs rm {self.name} --yes-i-really-mean-it'
801 return self.mon_manager.ceph(c, **kwargs)
802
803 def remove_pools(self, data_pools):
804 self._remove_pool(self.get_metadata_pool_name())
805 for poolname in data_pools:
806 try:
807 self._remove_pool(poolname)
808 except CommandFailedError as e:
809 # EBUSY, this data pool is used by two metadata pools, let the
810 # 2nd pass delete it
811 if e.exitstatus == EBUSY:
812 pass
813 else:
814 raise
815
816 def destroy(self, reset_obj_attrs=True):
817 log.info(f'Destroying file system {self.name} and related pools')
818
819 if self.dead():
820 log.debug('already dead...')
821 return
822
823 data_pools = self.get_data_pool_names(refresh=True)
824
825 # make sure no MDSs are attached to given FS.
826 self.fail()
827 self.rm()
828
829 self.remove_pools(data_pools)
830
831 if reset_obj_attrs:
832 self.id = None
833 self.name = None
834 self.metadata_pool_name = None
835 self.data_pool_name = None
836 self.data_pools = None
837
838 def recreate(self):
839 self.destroy()
840
841 self.create()
842 self.getinfo(refresh=True)
843
35e4c445
FG
844 def check_pool_application(self, pool_name):
845 osd_map = self.mon_manager.get_osd_dump_json()
846 for pool in osd_map['pools']:
847 if pool['pool_name'] == pool_name:
848 if "application_metadata" in pool:
849 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
850 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
851 format(pool_name=pool_name))
35e4c445 852
7c673cae
FG
853 def __del__(self):
854 if getattr(self._ctx, "filesystem", None) == self:
855 delattr(self._ctx, "filesystem")
856
857 def exists(self):
858 """
859 Whether a filesystem exists in the mon's filesystem list
860 """
861 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
862 return self.name in [fs['name'] for fs in fs_list]
863
864 def legacy_configured(self):
865 """
866 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
867 the case, the caller should avoid using Filesystem.create
868 """
869 try:
870 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
871 pools = json.loads(out_text)
872 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
873 if metadata_pool_exists:
874 self.metadata_pool_name = 'metadata'
875 except CommandFailedError as e:
876 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
877 # structured output (--format) from the CLI.
878 if e.exitstatus == 22:
879 metadata_pool_exists = True
880 else:
881 raise
882
883 return metadata_pool_exists
884
885 def _df(self):
886 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
887
f67539c2 888 # may raise FSMissing
f64942e4
AA
889 def get_mds_map(self, status=None):
890 if status is None:
891 status = self.status()
892 return status.get_fsmap(self.id)['mdsmap']
7c673cae 893
11fdf7f2
TL
894 def get_var(self, var, status=None):
895 return self.get_mds_map(status=status)[var]
91327a77 896
92f5a8d4
TL
897 def set_dir_layout(self, mount, path, layout):
898 for name, value in layout.items():
899 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
900
901 def add_data_pool(self, name, create=True):
902 if create:
39ae355f
TL
903 try:
904 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
905 '--pg_num_min', str(self.pg_num_min))
906 except CommandFailedError as e:
907 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
908 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
909 str(self.pg_num_min))
910 else:
911 raise
7c673cae
FG
912 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
913 self.get_pool_names(refresh = True)
914 for poolid, fs_name in self.data_pools.items():
915 if name == fs_name:
916 return poolid
917 raise RuntimeError("could not get just created pool '{0}'".format(name))
918
919 def get_pool_names(self, refresh = False, status = None):
920 if refresh or self.metadata_pool_name is None or self.data_pools is None:
921 if status is None:
922 status = self.status()
923 fsmap = status.get_fsmap(self.id)
924
925 osd_map = self.mon_manager.get_osd_dump_json()
926 id_to_name = {}
927 for p in osd_map['pools']:
928 id_to_name[p['pool']] = p['pool_name']
929
930 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
931 self.data_pools = {}
932 for data_pool in fsmap['mdsmap']['data_pools']:
933 self.data_pools[data_pool] = id_to_name[data_pool]
934
935 def get_data_pool_name(self, refresh = False):
936 if refresh or self.data_pools is None:
937 self.get_pool_names(refresh = True)
938 assert(len(self.data_pools) == 1)
e306af50 939 return next(iter(self.data_pools.values()))
7c673cae
FG
940
941 def get_data_pool_id(self, refresh = False):
942 """
943 Don't call this if you have multiple data pools
944 :return: integer
945 """
946 if refresh or self.data_pools is None:
947 self.get_pool_names(refresh = True)
948 assert(len(self.data_pools) == 1)
e306af50 949 return next(iter(self.data_pools.keys()))
7c673cae
FG
950
951 def get_data_pool_names(self, refresh = False):
952 if refresh or self.data_pools is None:
953 self.get_pool_names(refresh = True)
e306af50 954 return list(self.data_pools.values())
7c673cae
FG
955
956 def get_metadata_pool_name(self):
957 return self.metadata_pool_name
958
181888fb
FG
959 def set_data_pool_name(self, name):
960 if self.id is not None:
961 raise RuntimeError("can't set filesystem name if its fscid is set")
962 self.data_pool_name = name
963
522d829b
TL
964 def get_pool_pg_num(self, pool_name):
965 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
966 pool_name, 'pg_num',
967 '--format=json-pretty'))
968 return int(pgs['pg_num'])
969
7c673cae
FG
970 def get_namespace_id(self):
971 return self.id
972
973 def get_pool_df(self, pool_name):
974 """
975 Return a dict like:
976 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
977 """
978 for pool_df in self._df()['pools']:
979 if pool_df['name'] == pool_name:
980 return pool_df['stats']
981
982 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
983
984 def get_usage(self):
985 return self._df()['stats']['total_used_bytes']
986
11fdf7f2 987 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
988 """
989 Return true if all daemons are in one of active, standby, standby-replay, and
990 at least max_mds daemons are in 'active'.
991
992 Unlike most of Filesystem, this function is tolerant of new-style `fs`
993 commands being missing, because we are part of the ceph installation
994 process during upgrade suites, so must fall back to old style commands
995 when we get an EINVAL on a new style command.
996
997 :return:
998 """
11fdf7f2
TL
999 # First, check to see that processes haven't exited with an error code
1000 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
1001 mds.check_status()
7c673cae
FG
1002
1003 active_count = 0
f67539c2 1004 mds_map = self.get_mds_map(status=status)
7c673cae 1005
cd265ab1 1006 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
7c673cae
FG
1007
1008 for mds_id, mds_status in mds_map['info'].items():
1009 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
1010 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
1011 return False
1012 elif mds_status['state'] == 'up:active':
1013 active_count += 1
1014
cd265ab1 1015 log.debug("are_daemons_healthy: {0}/{1}".format(
7c673cae
FG
1016 active_count, mds_map['max_mds']
1017 ))
1018
11fdf7f2
TL
1019 if not skip_max_mds_check:
1020 if active_count > mds_map['max_mds']:
cd265ab1 1021 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
11fdf7f2
TL
1022 return False
1023 elif active_count == mds_map['max_mds']:
1024 # The MDSMap says these guys are active, but let's check they really are
1025 for mds_id, mds_status in mds_map['info'].items():
1026 if mds_status['state'] == 'up:active':
1027 try:
f67539c2 1028 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
11fdf7f2
TL
1029 except CommandFailedError as cfe:
1030 if cfe.exitstatus == errno.EINVAL:
1031 # Old version, can't do this check
1032 continue
1033 else:
1034 # MDS not even running
1035 return False
1036
1037 if daemon_status['state'] != 'up:active':
1038 # MDS hasn't taken the latest map yet
7c673cae
FG
1039 return False
1040
11fdf7f2
TL
1041 return True
1042 else:
1043 return False
7c673cae 1044 else:
cd265ab1 1045 log.debug("are_daemons_healthy: skipping max_mds check")
11fdf7f2 1046 return True
7c673cae 1047
11fdf7f2 1048 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
1049 """
1050 Return MDS daemon names of those daemons in the given state
1051 :param state:
1052 :return:
1053 """
11fdf7f2 1054 mdsmap = self.get_mds_map(status)
7c673cae 1055 result = []
9f95a23c
TL
1056 for mds_status in sorted(mdsmap['info'].values(),
1057 key=lambda _: _['rank']):
7c673cae
FG
1058 if mds_status['state'] == state or state is None:
1059 result.append(mds_status['name'])
1060
1061 return result
1062
9f95a23c 1063 def get_active_names(self, status=None):
7c673cae
FG
1064 """
1065 Return MDS daemon names of those daemons holding ranks
1066 in state up:active
1067
1068 :return: list of strings like ['a', 'b'], sorted by rank
1069 """
9f95a23c 1070 return self.get_daemon_names("up:active", status=status)
7c673cae 1071
11fdf7f2
TL
1072 def get_all_mds_rank(self, status=None):
1073 mdsmap = self.get_mds_map(status)
7c673cae 1074 result = []
9f95a23c
TL
1075 for mds_status in sorted(mdsmap['info'].values(),
1076 key=lambda _: _['rank']):
7c673cae
FG
1077 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1078 result.append(mds_status['rank'])
1079
1080 return result
1081
f6b5b4d7 1082 def get_rank(self, rank=None, status=None):
28e407b8
AA
1083 if status is None:
1084 status = self.getinfo()
f6b5b4d7
TL
1085 if rank is None:
1086 rank = 0
28e407b8
AA
1087 return status.get_rank(self.id, rank)
1088
11fdf7f2
TL
1089 def rank_restart(self, rank=0, status=None):
1090 name = self.get_rank(rank=rank, status=status)['name']
1091 self.mds_restart(mds_id=name)
1092
1093 def rank_signal(self, signal, rank=0, status=None):
1094 name = self.get_rank(rank=rank, status=status)['name']
1095 self.mds_signal(name, signal)
1096
1097 def rank_freeze(self, yes, rank=0):
1098 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1099
1e59de90
TL
1100 def rank_repaired(self, rank):
1101 self.mon_manager.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self.id, rank))
1102
11fdf7f2
TL
1103 def rank_fail(self, rank=0):
1104 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1105
1e59de90
TL
1106 def rank_is_running(self, rank=0, status=None):
1107 name = self.get_rank(rank=rank, status=status)['name']
1108 return self.mds_is_running(name)
1109
28e407b8
AA
1110 def get_ranks(self, status=None):
1111 if status is None:
1112 status = self.getinfo()
1113 return status.get_ranks(self.id)
1114
a4b75251
TL
1115 def get_damaged(self, status=None):
1116 if status is None:
1117 status = self.getinfo()
1118 return status.get_damaged(self.id)
1119
11fdf7f2
TL
1120 def get_replays(self, status=None):
1121 if status is None:
1122 status = self.getinfo()
1123 return status.get_replays(self.id)
1124
1125 def get_replay(self, rank=0, status=None):
1126 for replay in self.get_replays(status=status):
1127 if replay['rank'] == rank:
1128 return replay
1129 return None
1130
28e407b8 1131 def get_rank_names(self, status=None):
7c673cae
FG
1132 """
1133 Return MDS daemon names of those daemons holding a rank,
1134 sorted by rank. This includes e.g. up:replay/reconnect
1135 as well as active, but does not include standby or
1136 standby-replay.
1137 """
11fdf7f2 1138 mdsmap = self.get_mds_map(status)
7c673cae 1139 result = []
9f95a23c
TL
1140 for mds_status in sorted(mdsmap['info'].values(),
1141 key=lambda _: _['rank']):
7c673cae
FG
1142 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1143 result.append(mds_status['name'])
1144
1145 return result
1146
11fdf7f2 1147 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
1148 """
1149 Wait until all daemons are healthy
1150 :return:
1151 """
1152
1153 if timeout is None:
1154 timeout = DAEMON_WAIT_TIMEOUT
1155
11fdf7f2
TL
1156 if status is None:
1157 status = self.status()
1158
7c673cae
FG
1159 elapsed = 0
1160 while True:
11fdf7f2
TL
1161 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1162 return status
7c673cae
FG
1163 else:
1164 time.sleep(1)
1165 elapsed += 1
1166
1167 if elapsed > timeout:
cd265ab1 1168 log.debug("status = {0}".format(status))
7c673cae
FG
1169 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1170
11fdf7f2
TL
1171 status = self.status()
1172
f67539c2
TL
1173 def dencoder(self, obj_type, obj_blob):
1174 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1175 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1176 return p.stdout.getvalue()
1177
1178 def rados(self, *args, **kwargs):
7c673cae 1179 """
f67539c2 1180 Callout to rados CLI.
7c673cae 1181 """
7c673cae 1182
f67539c2 1183 return self.mon_manager.do_rados(*args, **kwargs)
7c673cae 1184
f67539c2 1185 def radosm(self, *args, **kwargs):
7c673cae 1186 """
f67539c2 1187 Interact with the metadata pool via rados CLI.
7c673cae 1188 """
7c673cae 1189
f67539c2
TL
1190 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1191
1192 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
7c673cae 1193 """
f67539c2 1194 Interact with the metadata pool via rados CLI. Get the stdout.
7c673cae 1195 """
7c673cae 1196
f67539c2 1197 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
7c673cae
FG
1198
1199 def get_metadata_object(self, object_type, object_id):
1200 """
1201 Retrieve an object from the metadata pool, pass it through
1202 ceph-dencoder to dump it to JSON, and return the decoded object.
1203 """
7c673cae 1204
f67539c2
TL
1205 o = self.radosmo(['get', object_id, '-'])
1206 j = self.dencoder(object_type, o)
7c673cae 1207 try:
f67539c2 1208 return json.loads(j)
7c673cae 1209 except (TypeError, ValueError):
f67539c2 1210 log.error("Failed to decode JSON: '{0}'".format(j))
7c673cae
FG
1211 raise
1212
7c673cae
FG
1213 def get_journal_version(self):
1214 """
1215 Read the JournalPointer and Journal::Header objects to learn the version of
1216 encoding in use.
1217 """
1218 journal_pointer_object = '400.00000000'
1219 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1220 journal_ino = journal_pointer_dump['journal_pointer']['front']
1221
1222 journal_header_object = "{0:x}.00000000".format(journal_ino)
1223 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1224
1225 version = journal_header_dump['journal_header']['stream_format']
cd265ab1 1226 log.debug("Read journal version {0}".format(version))
7c673cae
FG
1227
1228 return version
1229
f64942e4 1230 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae 1231 if mds_id is None:
f67539c2 1232 return self.rank_asok(command, timeout=timeout)
7c673cae 1233
f64942e4 1234 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1235
f67539c2
TL
1236 def mds_tell(self, command, mds_id=None):
1237 if mds_id is None:
1238 return self.rank_tell(command)
1239
1240 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1241
f64942e4
AA
1242 def rank_asok(self, command, rank=0, status=None, timeout=None):
1243 info = self.get_rank(rank=rank, status=status)
1244 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1245
11fdf7f2 1246 def rank_tell(self, command, rank=0, status=None):
20effc67
TL
1247 try:
1248 out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command)
1249 return json.loads(out)
1250 except json.decoder.JSONDecodeError:
1251 log.error("could not decode: {}".format(out))
1252 raise
11fdf7f2 1253
f6b5b4d7
TL
1254 def ranks_tell(self, command, status=None):
1255 if status is None:
1256 status = self.status()
1257 out = []
1258 for r in status.get_ranks(self.id):
1259 result = self.rank_tell(command, rank=r['rank'], status=status)
1260 out.append((r['rank'], result))
1261 return sorted(out)
1262
1263 def ranks_perf(self, f, status=None):
1264 perf = self.ranks_tell(["perf", "dump"], status=status)
1265 out = []
1266 for rank, perf in perf:
1267 out.append((rank, f(perf)))
1268 return out
1269
1e59de90 1270 def read_cache(self, path, depth=None, rank=None):
7c673cae
FG
1271 cmd = ["dump", "tree", path]
1272 if depth is not None:
1273 cmd.append(depth.__str__())
1e59de90
TL
1274 result = self.rank_asok(cmd, rank=rank)
1275 if result is None or len(result) == 0:
7c673cae
FG
1276 raise RuntimeError("Path not found in cache: {0}".format(path))
1277
1278 return result
1279
1280 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1281 """
1282 Block until the MDS reaches a particular state, or a failure condition
1283 is met.
1284
1285 When there are multiple MDSs, succeed when exaclty one MDS is in the
1286 goal state, or fail when any MDS is in the reject state.
1287
1288 :param goal_state: Return once the MDS is in this state
1289 :param reject: Fail if the MDS enters this state before the goal state
1290 :param timeout: Fail if this many seconds pass before reaching goal
1291 :return: number of seconds waited, rounded down to integer
1292 """
1293
1294 started_at = time.time()
1295 while True:
1296 status = self.status()
1297 if rank is not None:
f64942e4
AA
1298 try:
1299 mds_info = status.get_rank(self.id, rank)
1300 current_state = mds_info['state'] if mds_info else None
cd265ab1 1301 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
f64942e4
AA
1302 except:
1303 mdsmap = self.get_mds_map(status=status)
1304 if rank in mdsmap['failed']:
cd265ab1 1305 log.debug("Waiting for rank {0} to come back.".format(rank))
f64942e4
AA
1306 current_state = None
1307 else:
1308 raise
7c673cae
FG
1309 elif mds_id is not None:
1310 # mds_info is None if no daemon with this ID exists in the map
1311 mds_info = status.get_mds(mds_id)
1312 current_state = mds_info['state'] if mds_info else None
cd265ab1 1313 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
7c673cae
FG
1314 else:
1315 # In general, look for a single MDS
1316 states = [m['state'] for m in status.get_ranks(self.id)]
1317 if [s for s in states if s == goal_state] == [goal_state]:
1318 current_state = goal_state
1319 elif reject in states:
1320 current_state = reject
1321 else:
1322 current_state = None
cd265ab1 1323 log.debug("mapped states {0} to {1}".format(states, current_state))
7c673cae
FG
1324
1325 elapsed = time.time() - started_at
1326 if current_state == goal_state:
cd265ab1 1327 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
7c673cae
FG
1328 return elapsed
1329 elif reject is not None and current_state == reject:
1330 raise RuntimeError("MDS in reject state {0}".format(current_state))
1331 elif timeout is not None and elapsed > timeout:
1332 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1333 raise RuntimeError(
1334 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1335 elapsed, goal_state, current_state
1336 ))
1337 else:
1338 time.sleep(1)
1339
f67539c2 1340 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
7c673cae
FG
1341 if pool is None:
1342 pool = self.get_data_pool_name()
1343
1344 obj_name = "{0:x}.00000000".format(ino_no)
1345
f67539c2 1346 args = ["getxattr", obj_name, xattr_name]
7c673cae 1347 try:
f67539c2 1348 proc = self.rados(args, pool=pool, stdout=BytesIO())
7c673cae
FG
1349 except CommandFailedError as e:
1350 log.error(e.__str__())
1351 raise ObjectNotFound(obj_name)
1352
f67539c2
TL
1353 obj_blob = proc.stdout.getvalue()
1354 return json.loads(self.dencoder(obj_type, obj_blob).strip())
7c673cae
FG
1355
1356 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1357 """
1358 Write to an xattr of the 0th data object of an inode. Will
1359 succeed whether the object and/or xattr already exist or not.
1360
1361 :param ino_no: integer inode number
1362 :param xattr_name: string name of the xattr
1363 :param data: byte array data to write to the xattr
1364 :param pool: name of data pool or None to use primary data pool
1365 :return: None
1366 """
7c673cae
FG
1367 if pool is None:
1368 pool = self.get_data_pool_name()
1369
1370 obj_name = "{0:x}.00000000".format(ino_no)
f67539c2
TL
1371 args = ["setxattr", obj_name, xattr_name, data]
1372 self.rados(args, pool=pool)
7c673cae 1373
20effc67
TL
1374 def read_symlink(self, ino_no, pool=None):
1375 return self._read_data_xattr(ino_no, "symlink", "string_wrapper", pool)
1376
7c673cae
FG
1377 def read_backtrace(self, ino_no, pool=None):
1378 """
1379 Read the backtrace from the data pool, return a dict in the format
1380 given by inode_backtrace_t::dump, which is something like:
1381
1382 ::
1383
1384 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1385 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1386
1387 { "ino": 1099511627778,
1388 "ancestors": [
1389 { "dirino": 1,
1390 "dname": "blah",
1391 "version": 11}],
1392 "pool": 1,
1393 "old_pools": []}
1394
1395 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1396 one data pool and that will be used.
1397 """
1398 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1399
1400 def read_layout(self, ino_no, pool=None):
1401 """
1402 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1403 ::
1404 {
1405 "stripe_unit": 4194304,
1406 "stripe_count": 1,
1407 "object_size": 4194304,
1408 "pool_id": 1,
1409 "pool_ns": "",
1410 }
1411
1412 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1413 one data pool and that will be used.
1414 """
1415 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1416
1417 def _enumerate_data_objects(self, ino, size):
1418 """
1419 Get the list of expected data objects for a range, and the list of objects
1420 that really exist.
1421
1422 :return a tuple of two lists of strings (expected, actual)
1423 """
1424 stripe_size = 1024 * 1024 * 4
1425
1426 size = max(stripe_size, size)
1427
1428 want_objects = [
1429 "{0:x}.{1:08x}".format(ino, n)
e306af50 1430 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1431 ]
1432
f67539c2 1433 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
7c673cae
FG
1434
1435 return want_objects, exist_objects
1436
1437 def data_objects_present(self, ino, size):
1438 """
1439 Check that *all* the expected data objects for an inode are present in the data pool
1440 """
1441
1442 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1443 missing = set(want_objects) - set(exist_objects)
1444
1445 if missing:
cd265ab1 1446 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
7c673cae
FG
1447 ino, size, missing
1448 ))
1449 return False
1450 else:
cd265ab1 1451 log.debug("All objects for ino {0} size {1} found".format(ino, size))
7c673cae
FG
1452 return True
1453
1454 def data_objects_absent(self, ino, size):
1455 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1456 present = set(want_objects) & set(exist_objects)
1457
1458 if present:
cd265ab1 1459 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
7c673cae
FG
1460 ino, size, present
1461 ))
1462 return False
1463 else:
cd265ab1 1464 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
7c673cae
FG
1465 return True
1466
1467 def dirfrag_exists(self, ino, frag):
1468 try:
f67539c2 1469 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1470 except CommandFailedError:
7c673cae
FG
1471 return False
1472 else:
1473 return True
1474
7c673cae
FG
1475 def list_dirfrag(self, dir_ino):
1476 """
1477 Read the named object and return the list of omap keys
1478
1479 :return a list of 0 or more strings
1480 """
1481
1482 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1483
1484 try:
f67539c2 1485 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
7c673cae
FG
1486 except CommandFailedError as e:
1487 log.error(e.__str__())
1488 raise ObjectNotFound(dirfrag_obj_name)
1489
f67539c2
TL
1490 return key_list_str.strip().split("\n") if key_list_str else []
1491
1492 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1493 """
1494 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1495 warning : The splitting of directory is not considered here.
1496 """
1497
1498 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1499 try:
1500 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1501 except CommandFailedError as e:
1502 log.error(e.__str__())
1503 raise ObjectNotFound(dir_ino)
7c673cae
FG
1504
1505 def erase_metadata_objects(self, prefix):
1506 """
1507 For all objects in the metadata pool matching the prefix,
1508 erase them.
1509
1510 This O(N) with the number of objects in the pool, so only suitable
1511 for use on toy test filesystems.
1512 """
f67539c2 1513 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
7c673cae
FG
1514 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1515 for o in matching_objects:
f67539c2 1516 self.radosm(["rm", o])
7c673cae
FG
1517
1518 def erase_mds_objects(self, rank):
1519 """
1520 Erase all the per-MDS objects for a particular rank. This includes
1521 inotable, sessiontable, journal
1522 """
1523
1524 def obj_prefix(multiplier):
1525 """
1526 MDS object naming conventions like rank 1's
1527 journal is at 201.***
1528 """
1529 return "%x." % (multiplier * 0x100 + rank)
1530
1531 # MDS_INO_LOG_OFFSET
1532 self.erase_metadata_objects(obj_prefix(2))
1533 # MDS_INO_LOG_BACKUP_OFFSET
1534 self.erase_metadata_objects(obj_prefix(3))
1535 # MDS_INO_LOG_POINTER_OFFSET
1536 self.erase_metadata_objects(obj_prefix(4))
1537 # MDSTables & SessionMap
1538 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1539
1540 @property
1541 def _prefix(self):
1542 """
1543 Override this to set a different
1544 """
1545 return ""
1546
f64942e4
AA
1547 def _make_rank(self, rank):
1548 return "{}:{}".format(self.name, rank)
1549
7c673cae
FG
1550 def _run_tool(self, tool, args, rank=None, quiet=False):
1551 # Tests frequently have [client] configuration that jacks up
1552 # the objecter log level (unlikely to be interesting here)
1553 # and does not set the mds log level (very interesting here)
1554 if quiet:
1555 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1556 else:
1e59de90 1557 base_args = [os.path.join(self._prefix, tool), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1']
7c673cae
FG
1558
1559 if rank is not None:
f64942e4 1560 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1561
1562 t1 = datetime.datetime.now()
e306af50 1563 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae 1564 duration = datetime.datetime.now() - t1
cd265ab1 1565 log.debug("Ran {0} in time {1}, result:\n{2}".format(
7c673cae
FG
1566 base_args + args, duration, r
1567 ))
1568 return r
1569
1570 @property
1571 def tool_remote(self):
1572 """
1573 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1574 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1575 so that tests can use this remote to go get locally written output files from the tools.
1576 """
f67539c2 1577 return self.mon_manager.controller
7c673cae 1578
f64942e4 1579 def journal_tool(self, args, rank, quiet=False):
7c673cae 1580 """
f64942e4 1581 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1582 """
f64942e4
AA
1583 fs_rank = self._make_rank(rank)
1584 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae 1585
f67539c2
TL
1586 def meta_tool(self, args, rank, quiet=False):
1587 """
1588 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1589 """
1590 fs_rank = self._make_rank(rank)
1591 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1592
7c673cae
FG
1593 def table_tool(self, args, quiet=False):
1594 """
1595 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1596 """
1597 return self._run_tool("cephfs-table-tool", args, None, quiet)
1598
1599 def data_scan(self, args, quiet=False, worker_count=1):
1600 """
1601 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1602
1603 :param worker_count: if greater than 1, multiple workers will be run
1604 in parallel and the return value will be None
1605 """
1606
1607 workers = []
1608
1609 for n in range(0, worker_count):
1610 if worker_count > 1:
1611 # data-scan args first token is a command, followed by args to it.
1612 # insert worker arguments after the command.
1613 cmd = args[0]
1614 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1615 else:
1616 worker_args = args
1617
1618 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1619 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1620
1621 for w in workers:
1622 w.get()
1623
1624 if worker_count == 1:
1625 return workers[0].value
1626 else:
1627 return None
b32b8144
FG
1628
1629 def is_full(self):
1630 return self.is_pool_full(self.get_data_pool_name())
f67539c2
TL
1631
1632 def authorize(self, client_id, caps=('/', 'rw')):
1633 """
1634 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1635 keyring.
1636
1637 client_id: client id that will be authorized
1638 caps: tuple containing the path and permission (can be r or rw)
1639 respectively.
1640 """
1e59de90
TL
1641 if isinstance(caps[0], (tuple, list)):
1642 x = []
1643 for c in caps:
1644 x.extend(c)
1645 caps = tuple(x)
1646
f67539c2
TL
1647 client_name = 'client.' + client_id
1648 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1649 client_name, *caps)
1650
1651 def grow(self, new_max_mds, status=None):
1652 oldmax = self.get_var('max_mds', status=status)
1653 assert(new_max_mds > oldmax)
1654 self.set_max_mds(new_max_mds)
1655 return self.wait_for_daemons()
1656
1657 def shrink(self, new_max_mds, status=None):
1658 oldmax = self.get_var('max_mds', status=status)
1659 assert(new_max_mds < oldmax)
1660 self.set_max_mds(new_max_mds)
1661 return self.wait_for_daemons()
1662
1663 def run_scrub(self, cmd, rank=0):
1664 return self.rank_tell(["scrub"] + cmd, rank)
1665
1666 def get_scrub_status(self, rank=0):
1667 return self.run_scrub(["status"], rank)
1668
1e59de90
TL
1669 def flush(self, rank=0):
1670 return self.rank_tell(["flush", "journal"], rank=rank)
1671
f67539c2
TL
1672 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1673 timeout=300, reverse=False):
1674 # time out after "timeout" seconds and assume as done
1675 if result is None:
1676 result = "no active scrubs running"
1677 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1678 while proceed():
1679 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1680 assert out_json is not None
1681 if not reverse:
1682 if result in out_json['status']:
1683 log.info("all active scrubs completed")
1684 return True
1685 else:
1686 if result not in out_json['status']:
1687 log.info("all active scrubs completed")
1688 return True
1689
1690 if tag is not None:
1691 status = out_json['scrubs'][tag]
1692 if status is not None:
1693 log.info(f"scrub status for tag:{tag} - {status}")
1694 else:
1695 log.info(f"scrub has completed for tag:{tag}")
1696 return True
1697
1698 # timed out waiting for scrub to complete
1699 return False
1e59de90
TL
1700
1701 def get_damage(self, rank=None):
1702 if rank is None:
1703 result = {}
1704 for info in self.get_ranks():
1705 rank = info['rank']
1706 result[rank] = self.get_damage(rank=rank)
1707 return result
1708 else:
1709 return self.rank_tell(['damage', 'ls'], rank=rank)