]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
7c673cae 11
f67539c2
TL
12from io import BytesIO, StringIO
13from errno import EBUSY
e306af50 14
7c673cae
FG
15from teuthology.exceptions import CommandFailedError
16from teuthology import misc
17from teuthology.nuke import clear_firewall
18from teuthology.parallel import parallel
f67539c2 19from teuthology import contextutil
7c673cae
FG
20from tasks.ceph_manager import write_conf
21from tasks import ceph_manager
22
23
24log = logging.getLogger(__name__)
25
26
27DAEMON_WAIT_TIMEOUT = 120
28ROOT_INO = 1
29
92f5a8d4
TL
30class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
7c673cae
FG
54
55class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
f67539c2
TL
62class FSMissing(Exception):
63 def __init__(self, ident):
64 self.ident = ident
65
66 def __str__(self):
67 return f"File system {self.ident} does not exist in the map"
68
7c673cae
FG
69class FSStatus(object):
70 """
71 Operations on a snapshot of the FSMap.
72 """
522d829b 73 def __init__(self, mon_manager, epoch=None):
7c673cae 74 self.mon = mon_manager
522d829b
TL
75 cmd = ["fs", "dump", "--format=json"]
76 if epoch is not None:
77 cmd.append(str(epoch))
78 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
7c673cae
FG
79
80 def __str__(self):
81 return json.dumps(self.map, indent = 2, sort_keys = True)
82
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self, key):
85 """
86 Get a field from the fsmap.
87 """
88 return self.map[key]
89
90 def get_filesystems(self):
91 """
92 Iterator for all filesystems.
93 """
94 for fs in self.map['filesystems']:
95 yield fs
96
97 def get_all(self):
98 """
99 Iterator for all the mds_info components in the FSMap.
100 """
9f95a23c 101 for info in self.map['standbys']:
7c673cae
FG
102 yield info
103 for fs in self.map['filesystems']:
104 for info in fs['mdsmap']['info'].values():
105 yield info
106
107 def get_standbys(self):
108 """
109 Iterator for all standbys.
110 """
111 for info in self.map['standbys']:
112 yield info
113
114 def get_fsmap(self, fscid):
115 """
116 Get the fsmap for the given FSCID.
117 """
118 for fs in self.map['filesystems']:
119 if fscid is None or fs['id'] == fscid:
120 return fs
f67539c2 121 raise FSMissing(fscid)
7c673cae
FG
122
123 def get_fsmap_byname(self, name):
124 """
125 Get the fsmap for the given file system name.
126 """
127 for fs in self.map['filesystems']:
128 if name is None or fs['mdsmap']['fs_name'] == name:
129 return fs
f67539c2 130 raise FSMissing(name)
7c673cae
FG
131
132 def get_replays(self, fscid):
133 """
134 Get the standby:replay MDS for the given FSCID.
135 """
136 fs = self.get_fsmap(fscid)
137 for info in fs['mdsmap']['info'].values():
138 if info['state'] == 'up:standby-replay':
139 yield info
140
141 def get_ranks(self, fscid):
142 """
143 Get the ranks for the given FSCID.
144 """
145 fs = self.get_fsmap(fscid)
146 for info in fs['mdsmap']['info'].values():
11fdf7f2 147 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
148 yield info
149
a4b75251
TL
150 def get_damaged(self, fscid):
151 """
152 Get the damaged ranks for the given FSCID.
153 """
154 fs = self.get_fsmap(fscid)
155 return fs['mdsmap']['damaged']
156
7c673cae
FG
157 def get_rank(self, fscid, rank):
158 """
159 Get the rank for the given FSCID.
160 """
161 for info in self.get_ranks(fscid):
162 if info['rank'] == rank:
163 return info
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
165
166 def get_mds(self, name):
167 """
168 Get the info for the given MDS name.
169 """
170 for info in self.get_all():
171 if info['name'] == name:
172 return info
173 return None
174
175 def get_mds_addr(self, name):
176 """
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
178 """
179 info = self.get_mds(name)
180 if info:
181 return info['addr']
182 else:
e306af50 183 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
184 raise RuntimeError("MDS id '{0}' not found in map".format(name))
185
f67539c2
TL
186 def get_mds_addrs(self, name):
187 """
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
189 """
190 info = self.get_mds(name)
191 if info:
192 return [e['addr'] for e in info['addrs']['addrvec']]
193 else:
194 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name))
196
9f95a23c
TL
197 def get_mds_gid(self, gid):
198 """
199 Get the info for the given MDS gid.
200 """
201 for info in self.get_all():
202 if info['gid'] == gid:
203 return info
204 return None
205
206 def hadfailover(self, status):
207 """
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
210 """
211 for fs in status.map['filesystems']:
212 for info in fs['mdsmap']['info'].values():
213 oldinfo = self.get_mds_gid(info['gid'])
214 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
215 return True
216 #all matching
217 return False
218
7c673cae
FG
219class CephCluster(object):
220 @property
221 def admin_remote(self):
222 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 223 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
224 return result
225
f67539c2 226 def __init__(self, ctx) -> None:
7c673cae
FG
227 self._ctx = ctx
228 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
229
230 def get_config(self, key, service_type=None):
231 """
232 Get config from mon by default, or a specific service if caller asks for it
233 """
234 if service_type is None:
235 service_type = 'mon'
236
237 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def set_ceph_conf(self, subsys, key, value):
241 if subsys not in self._ctx.ceph['ceph'].conf:
242 self._ctx.ceph['ceph'].conf[subsys] = {}
243 self._ctx.ceph['ceph'].conf[subsys][key] = value
244 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
246
247 def clear_ceph_conf(self, subsys, key):
248 del self._ctx.ceph['ceph'].conf[subsys][key]
249 write_conf(self._ctx)
250
f64942e4
AA
251 def json_asok(self, command, service_type, service_id, timeout=None):
252 if timeout is None:
1e59de90 253 timeout = 300
f6b5b4d7 254 command.insert(0, '--format=json')
f64942e4 255 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
f6b5b4d7
TL
256 response_data = proc.stdout.getvalue().strip()
257 if len(response_data) > 0:
1e59de90
TL
258
259 def get_nonnumeric_values(value):
260 c = {"NaN": float("nan"), "Infinity": float("inf"),
261 "-Infinity": -float("inf")}
262 return c[value]
263
264 j = json.loads(response_data.replace('inf', 'Infinity'),
265 parse_constant=get_nonnumeric_values)
f6b5b4d7
TL
266 pretty = json.dumps(j, sort_keys=True, indent=2)
267 log.debug(f"_json_asok output\n{pretty}")
268 return j
7c673cae 269 else:
f6b5b4d7 270 log.debug("_json_asok output empty")
7c673cae
FG
271 return None
272
33c7a0ef
TL
273 def is_addr_blocklisted(self, addr):
274 blocklist = json.loads(self.mon_manager.raw_cluster_cmd(
275 "osd", "dump", "--format=json"))['blocklist']
276 if addr in blocklist:
277 return True
278 log.warn(f'The address {addr} is not blocklisted')
f67539c2
TL
279 return False
280
7c673cae
FG
281
282class MDSCluster(CephCluster):
283 """
284 Collective operations on all the MDS daemons in the Ceph cluster. These
285 daemons may be in use by various Filesystems.
286
287 For the benefit of pre-multi-filesystem tests, this class is also
288 a parent of Filesystem. The correct way to use MDSCluster going forward is
289 as a separate instance outside of your (multiple) Filesystem instances.
290 """
f67539c2 291
7c673cae
FG
292 def __init__(self, ctx):
293 super(MDSCluster, self).__init__(ctx)
294
f67539c2
TL
295 @property
296 def mds_ids(self):
297 # do this dynamically because the list of ids may change periodically with cephadm
298 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
7c673cae 299
f67539c2
TL
300 @property
301 def mds_daemons(self):
302 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
7c673cae
FG
303
304 def _one_or_all(self, mds_id, cb, in_parallel=True):
305 """
306 Call a callback for a single named MDS, or for all.
307
308 Note that the parallelism here isn't for performance, it's to avoid being overly kind
309 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
310 avoid being overly kind by executing them in a particular order. However, some actions
311 don't cope with being done in parallel, so it's optional (`in_parallel`)
312
313 :param mds_id: MDS daemon name, or None
314 :param cb: Callback taking single argument of MDS daemon name
315 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
316 """
f67539c2 317
7c673cae
FG
318 if mds_id is None:
319 if in_parallel:
320 with parallel() as p:
321 for mds_id in self.mds_ids:
322 p.spawn(cb, mds_id)
323 else:
324 for mds_id in self.mds_ids:
325 cb(mds_id)
326 else:
327 cb(mds_id)
328
181888fb
FG
329 def get_config(self, key, service_type=None):
330 """
331 get_config specialization of service_type="mds"
332 """
333 if service_type != "mds":
334 return super(MDSCluster, self).get_config(key, service_type)
335
336 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
337 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
338 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
339 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
340
7c673cae
FG
341 def mds_stop(self, mds_id=None):
342 """
343 Stop the MDS daemon process(se). If it held a rank, that rank
344 will eventually go laggy.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
347
348 def mds_fail(self, mds_id=None):
349 """
350 Inform MDSMonitor of the death of the daemon process(es). If it held
351 a rank, that rank will be relinquished.
352 """
353 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
354
355 def mds_restart(self, mds_id=None):
356 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
357
358 def mds_fail_restart(self, mds_id=None):
359 """
360 Variation on restart that includes marking MDSs as failed, so that doing this
361 operation followed by waiting for healthy daemon states guarantees that they
362 have gone down and come up, rather than potentially seeing the healthy states
363 that existed before the restart.
364 """
365 def _fail_restart(id_):
366 self.mds_daemons[id_].stop()
367 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
368 self.mds_daemons[id_].restart()
369
370 self._one_or_all(mds_id, _fail_restart)
371
1adf2230
AA
372 def mds_signal(self, mds_id, sig, silent=False):
373 """
374 signal a MDS daemon
375 """
376 self.mds_daemons[mds_id].signal(sig, silent);
377
1e59de90
TL
378 def mds_is_running(self, mds_id):
379 return self.mds_daemons[mds_id].running()
380
181888fb
FG
381 def newfs(self, name='cephfs', create=True):
382 return Filesystem(self._ctx, name=name, create=create)
7c673cae 383
522d829b
TL
384 def status(self, epoch=None):
385 return FSStatus(self.mon_manager, epoch)
7c673cae 386
7c673cae
FG
387 def get_standby_daemons(self):
388 return set([s['name'] for s in self.status().get_standbys()])
389
390 def get_mds_hostnames(self):
391 result = set()
392 for mds_id in self.mds_ids:
393 mds_remote = self.mon_manager.find_remote('mds', mds_id)
394 result.add(mds_remote.hostname)
395
396 return list(result)
397
398 def set_clients_block(self, blocked, mds_id=None):
399 """
400 Block (using iptables) client communications to this MDS. Be careful: if
401 other services are running on this MDS, or other MDSs try to talk to this
402 MDS, their communications may also be blocked as collatoral damage.
403
404 :param mds_id: Optional ID of MDS to block, default to all
405 :return:
406 """
407 da_flag = "-A" if blocked else "-D"
408
409 def set_block(_mds_id):
410 remote = self.mon_manager.find_remote('mds', _mds_id)
411 status = self.status()
412
413 addr = status.get_mds_addr(_mds_id)
414 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
415
416 remote.run(
417 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
418 "comment", "--comment", "teuthology"])
419 remote.run(
420 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
421 "comment", "--comment", "teuthology"])
422
423 self._one_or_all(mds_id, set_block, in_parallel=False)
424
f67539c2
TL
425 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
426 """
427 Block (using iptables) communications from a provided MDS to other MDSs.
428 Block all ports that an MDS uses for communication.
429
430 :param blocked: True to block the MDS, False otherwise
431 :param mds_rank_1: MDS rank
432 :param mds_rank_2: MDS rank
433 :return:
434 """
435 da_flag = "-A" if blocked else "-D"
436
437 def set_block(mds_ids):
438 status = self.status()
439
440 mds = mds_ids[0]
441 remote = self.mon_manager.find_remote('mds', mds)
442 addrs = status.get_mds_addrs(mds)
443 for addr in addrs:
444 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
445 remote.run(
446 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
1e59de90 447 "comment", "--comment", "teuthology"], omit_sudo=False)
f67539c2
TL
448
449
450 mds = mds_ids[1]
451 remote = self.mon_manager.find_remote('mds', mds)
452 addrs = status.get_mds_addrs(mds)
453 for addr in addrs:
454 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
455 remote.run(
456 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
1e59de90 457 "comment", "--comment", "teuthology"], omit_sudo=False)
f67539c2
TL
458 remote.run(
459 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
1e59de90 460 "comment", "--comment", "teuthology"], omit_sudo=False)
f67539c2
TL
461
462 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
463
7c673cae
FG
464 def clear_firewall(self):
465 clear_firewall(self._ctx)
466
467 def get_mds_info(self, mds_id):
468 return FSStatus(self.mon_manager).get_mds(mds_id)
469
7c673cae
FG
470 def is_pool_full(self, pool_name):
471 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
472 for pool in pools:
473 if pool['pool_name'] == pool_name:
474 return 'full' in pool['flags_names'].split(",")
475
476 raise RuntimeError("Pool not found '{0}'".format(pool_name))
477
f67539c2
TL
478 def delete_all_filesystems(self):
479 """
480 Remove all filesystems that exist, and any pools in use by them.
481 """
482 for fs in self.status().get_filesystems():
483 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
484
20effc67
TL
485 @property
486 def beacon_timeout(self):
487 """
488 Generate an acceptable timeout for the mons to drive some MDSMap change
489 because of missed beacons from some MDS. This involves looking up the
490 grace period in use by the mons and adding an acceptable buffer.
491 """
492
493 grace = float(self.get_config("mds_beacon_grace", service_type="mon"))
494 return grace*2+15
495
f67539c2 496
7c673cae 497class Filesystem(MDSCluster):
1e59de90
TL
498
499 """
500 Generator for all Filesystems in the cluster.
501 """
502 @classmethod
503 def get_all_fs(cls, ctx):
504 mdsc = MDSCluster(ctx)
505 status = mdsc.status()
506 for fs in status.get_filesystems():
507 yield cls(ctx, fscid=fs['id'])
508
7c673cae
FG
509 """
510 This object is for driving a CephFS filesystem. The MDS daemons driven by
511 MDSCluster may be shared with other Filesystems.
512 """
f67539c2 513 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
7c673cae
FG
514 super(Filesystem, self).__init__(ctx)
515
181888fb 516 self.name = name
7c673cae 517 self.id = None
7c673cae 518 self.metadata_pool_name = None
181888fb 519 self.data_pool_name = None
7c673cae 520 self.data_pools = None
f91f0fd5 521 self.fs_config = fs_config
f67539c2 522 self.ec_profile = fs_config.get('ec_profile')
7c673cae
FG
523
524 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
525 self.client_id = client_list[0]
526 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
527
181888fb 528 if name is not None:
7c673cae
FG
529 if fscid is not None:
530 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 531 if create and not self.legacy_configured():
7c673cae 532 self.create()
181888fb
FG
533 else:
534 if fscid is not None:
535 self.id = fscid
536 self.getinfo(refresh = True)
7c673cae
FG
537
538 # Stash a reference to the first created filesystem on ctx, so
539 # that if someone drops to the interactive shell they can easily
540 # poke our methods.
541 if not hasattr(self._ctx, "filesystem"):
542 self._ctx.filesystem = self
543
f67539c2
TL
544 def dead(self):
545 try:
546 return not bool(self.get_mds_map())
547 except FSMissing:
548 return True
549
9f95a23c
TL
550 def get_task_status(self, status_key):
551 return self.mon_manager.get_service_task_status("mds", status_key)
552
7c673cae
FG
553 def getinfo(self, refresh = False):
554 status = self.status()
555 if self.id is not None:
556 fsmap = status.get_fsmap(self.id)
557 elif self.name is not None:
558 fsmap = status.get_fsmap_byname(self.name)
559 else:
560 fss = [fs for fs in status.get_filesystems()]
561 if len(fss) == 1:
562 fsmap = fss[0]
563 elif len(fss) == 0:
564 raise RuntimeError("no file system available")
565 else:
566 raise RuntimeError("more than one file system available")
567 self.id = fsmap['id']
568 self.name = fsmap['mdsmap']['fs_name']
569 self.get_pool_names(status = status, refresh = refresh)
570 return status
571
11fdf7f2 572 def reach_max_mds(self):
20effc67 573 status = self.wait_for_daemons()
11fdf7f2 574 mds_map = self.get_mds_map(status=status)
20effc67 575 assert(mds_map['in'] == list(range(0, mds_map['max_mds'])))
11fdf7f2 576
f67539c2
TL
577 def reset(self):
578 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
579
11fdf7f2
TL
580 def fail(self):
581 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
582
9f95a23c
TL
583 def set_flag(self, var, *args):
584 a = map(lambda x: str(x).lower(), args)
585 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
586
587 def set_allow_multifs(self, yes=True):
588 self.set_flag("enable_multiple", yes)
589
91327a77 590 def set_var(self, var, *args):
9f95a23c 591 a = map(lambda x: str(x).lower(), args)
91327a77
AA
592 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
593
11fdf7f2
TL
594 def set_down(self, down=True):
595 self.set_var("down", str(down).lower())
596
597 def set_joinable(self, joinable=True):
9f95a23c 598 self.set_var("joinable", joinable)
11fdf7f2 599
7c673cae 600 def set_max_mds(self, max_mds):
f64942e4 601 self.set_var("max_mds", "%d" % max_mds)
7c673cae 602
f91f0fd5
TL
603 def set_session_timeout(self, timeout):
604 self.set_var("session_timeout", "%d" % timeout)
605
11fdf7f2 606 def set_allow_standby_replay(self, yes):
9f95a23c 607 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
608
609 def set_allow_new_snaps(self, yes):
9f95a23c 610 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 611
1e59de90
TL
612 def set_bal_rank_mask(self, bal_rank_mask):
613 self.set_var("bal_rank_mask", bal_rank_mask)
614
615 def set_refuse_client_session(self, yes):
616 self.set_var("refuse_client_session", yes)
617
522d829b
TL
618 def compat(self, *args):
619 a = map(lambda x: str(x).lower(), args)
620 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
621
622 def add_compat(self, *args):
623 self.compat("add_compat", *args)
624
625 def add_incompat(self, *args):
626 self.compat("add_incompat", *args)
627
628 def rm_compat(self, *args):
629 self.compat("rm_compat", *args)
630
631 def rm_incompat(self, *args):
632 self.compat("rm_incompat", *args)
633
f67539c2
TL
634 def required_client_features(self, *args, **kwargs):
635 c = ["fs", "required_client_features", self.name, *args]
636 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
637
522d829b
TL
638 # Since v15.1.0 the pg autoscale mode has been enabled as default,
639 # will let the pg autoscale mode to calculate the pg_num as needed.
640 # We set the pg_num_min to 64 to make sure that pg autoscale mode
641 # won't set the pg_num to low to fix Tracker#45434.
642 pg_num = 64
643 pg_num_min = 64
644 target_size_ratio = 0.9
645 target_size_ratio_ec = 0.9
7c673cae 646
1e59de90 647 def create(self, recover=False, metadata_overlay=False):
7c673cae
FG
648 if self.name is None:
649 self.name = "cephfs"
650 if self.metadata_pool_name is None:
651 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
652 if self.data_pool_name is None:
653 data_pool_name = "{0}_data".format(self.name)
654 else:
655 data_pool_name = self.data_pool_name
7c673cae 656
522d829b
TL
657 # will use the ec pool to store the data and a small amount of
658 # metadata still goes to the primary data pool for all files.
1e59de90 659 if not metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
522d829b
TL
660 self.target_size_ratio = 0.05
661
cd265ab1 662 log.debug("Creating filesystem '{0}'".format(self.name))
7c673cae 663
39ae355f
TL
664 try:
665 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
666 self.metadata_pool_name,
667 '--pg_num_min', str(self.pg_num_min))
f67539c2 668
39ae355f
TL
669 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
670 data_pool_name, str(self.pg_num),
671 '--pg_num_min', str(self.pg_num_min),
672 '--target_size_ratio',
673 str(self.target_size_ratio))
674 except CommandFailedError as e:
675 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
676 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
677 self.metadata_pool_name,
678 str(self.pg_num_min))
679
680 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
681 data_pool_name, str(self.pg_num),
682 str(self.pg_num_min))
683 else:
684 raise
f67539c2 685
1e59de90
TL
686 args = ["fs", "new", self.name, self.metadata_pool_name, data_pool_name]
687 if recover:
688 args.append('--recover')
689 if metadata_overlay:
690 args.append('--allow-dangerous-metadata-overlay')
691 self.mon_manager.raw_cluster_cmd(*args)
f67539c2 692
1e59de90 693 if not recover:
b32b8144 694 if self.ec_profile and 'disabled' not in self.ec_profile:
f67539c2 695 ec_data_pool_name = data_pool_name + "_ec"
cd265ab1 696 log.debug("EC profile is %s", self.ec_profile)
f67539c2 697 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
3efd9988
FG
698 cmd.extend(self.ec_profile)
699 self.mon_manager.raw_cluster_cmd(*cmd)
39ae355f
TL
700 try:
701 self.mon_manager.raw_cluster_cmd(
702 'osd', 'pool', 'create', ec_data_pool_name,
703 'erasure', ec_data_pool_name,
704 '--pg_num_min', str(self.pg_num_min),
705 '--target_size_ratio', str(self.target_size_ratio_ec))
706 except CommandFailedError as e:
707 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
708 self.mon_manager.raw_cluster_cmd(
709 'osd', 'pool', 'create', ec_data_pool_name,
710 str(self.pg_num_min), 'erasure', ec_data_pool_name)
711 else:
712 raise
3efd9988
FG
713 self.mon_manager.raw_cluster_cmd(
714 'osd', 'pool', 'set',
f67539c2
TL
715 ec_data_pool_name, 'allow_ec_overwrites', 'true')
716 self.add_data_pool(ec_data_pool_name, create=False)
717 self.check_pool_application(ec_data_pool_name)
718
719 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
720
35e4c445
FG
721 self.check_pool_application(self.metadata_pool_name)
722 self.check_pool_application(data_pool_name)
f67539c2 723
7c673cae 724 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
725 try:
726 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
727 except CommandFailedError as e:
728 if e.exitstatus == 22:
729 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
730 pass
731 else:
732 raise
7c673cae 733
f91f0fd5 734 if self.fs_config is not None:
1e59de90 735 log.debug(f"fs_config: {self.fs_config}")
f91f0fd5
TL
736 max_mds = self.fs_config.get('max_mds', 1)
737 if max_mds > 1:
738 self.set_max_mds(max_mds)
739
f67539c2
TL
740 standby_replay = self.fs_config.get('standby_replay', False)
741 self.set_allow_standby_replay(standby_replay)
742
f91f0fd5
TL
743 # If absent will use the default value (60 seconds)
744 session_timeout = self.fs_config.get('session_timeout', 60)
745 if session_timeout != 60:
746 self.set_session_timeout(session_timeout)
747
1e59de90
TL
748 if self.fs_config.get('subvols', None) is not None:
749 log.debug(f"Creating {self.fs_config.get('subvols')} subvols "
750 f"for filesystem '{self.name}'")
751 if not hasattr(self._ctx, "created_subvols"):
752 self._ctx.created_subvols = dict()
753
754 subvols = self.fs_config.get('subvols')
755 assert(isinstance(subvols, dict))
756 assert(isinstance(subvols['create'], int))
757 assert(subvols['create'] > 0)
758
759 for sv in range(0, subvols['create']):
760 sv_name = f'sv_{sv}'
761 self.mon_manager.raw_cluster_cmd(
762 'fs', 'subvolume', 'create', self.name, sv_name,
763 self.fs_config.get('subvol_options', ''))
764
765 if self.name not in self._ctx.created_subvols:
766 self._ctx.created_subvols[self.name] = []
767
768 subvol_path = self.mon_manager.raw_cluster_cmd(
769 'fs', 'subvolume', 'getpath', self.name, sv_name)
770 subvol_path = subvol_path.strip()
771 self._ctx.created_subvols[self.name].append(subvol_path)
772 else:
773 log.debug(f"Not Creating any subvols for filesystem '{self.name}'")
774
775
7c673cae
FG
776 self.getinfo(refresh = True)
777
522d829b
TL
778 # wait pgs to be clean
779 self.mon_manager.wait_for_clean()
780
f67539c2
TL
781 def run_client_payload(self, cmd):
782 # avoid circular dep by importing here:
783 from tasks.cephfs.fuse_mount import FuseMount
2a845540
TL
784
785 # Wait for at MDS daemons to be ready before mounting the
786 # ceph-fuse client in run_client_payload()
787 self.wait_for_daemons()
788
f67539c2 789 d = misc.get_testdir(self._ctx)
20effc67 790 m = FuseMount(self._ctx, d, "admin", self.client_remote, cephfs_name=self.name)
522d829b 791 m.mount_wait()
f67539c2
TL
792 m.run_shell_payload(cmd)
793 m.umount_wait(require_clean=True)
794
795 def _remove_pool(self, name, **kwargs):
796 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
797 return self.mon_manager.ceph(c, **kwargs)
798
799 def rm(self, **kwargs):
800 c = f'fs rm {self.name} --yes-i-really-mean-it'
801 return self.mon_manager.ceph(c, **kwargs)
802
803 def remove_pools(self, data_pools):
804 self._remove_pool(self.get_metadata_pool_name())
805 for poolname in data_pools:
806 try:
807 self._remove_pool(poolname)
808 except CommandFailedError as e:
809 # EBUSY, this data pool is used by two metadata pools, let the
810 # 2nd pass delete it
811 if e.exitstatus == EBUSY:
812 pass
813 else:
814 raise
815
816 def destroy(self, reset_obj_attrs=True):
817 log.info(f'Destroying file system {self.name} and related pools')
818
819 if self.dead():
820 log.debug('already dead...')
821 return
822
823 data_pools = self.get_data_pool_names(refresh=True)
824
825 # make sure no MDSs are attached to given FS.
826 self.fail()
827 self.rm()
828
829 self.remove_pools(data_pools)
830
831 if reset_obj_attrs:
832 self.id = None
833 self.name = None
834 self.metadata_pool_name = None
835 self.data_pool_name = None
836 self.data_pools = None
837
838 def recreate(self):
839 self.destroy()
840
841 self.create()
842 self.getinfo(refresh=True)
843
35e4c445
FG
844 def check_pool_application(self, pool_name):
845 osd_map = self.mon_manager.get_osd_dump_json()
846 for pool in osd_map['pools']:
847 if pool['pool_name'] == pool_name:
848 if "application_metadata" in pool:
849 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
850 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
851 format(pool_name=pool_name))
35e4c445 852
7c673cae
FG
853 def __del__(self):
854 if getattr(self._ctx, "filesystem", None) == self:
855 delattr(self._ctx, "filesystem")
856
857 def exists(self):
858 """
859 Whether a filesystem exists in the mon's filesystem list
860 """
861 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
862 return self.name in [fs['name'] for fs in fs_list]
863
864 def legacy_configured(self):
865 """
866 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
867 the case, the caller should avoid using Filesystem.create
868 """
869 try:
870 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
871 pools = json.loads(out_text)
872 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
873 if metadata_pool_exists:
874 self.metadata_pool_name = 'metadata'
875 except CommandFailedError as e:
876 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
877 # structured output (--format) from the CLI.
878 if e.exitstatus == 22:
879 metadata_pool_exists = True
880 else:
881 raise
882
883 return metadata_pool_exists
884
885 def _df(self):
886 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
887
f67539c2 888 # may raise FSMissing
f64942e4
AA
889 def get_mds_map(self, status=None):
890 if status is None:
891 status = self.status()
892 return status.get_fsmap(self.id)['mdsmap']
7c673cae 893
11fdf7f2
TL
894 def get_var(self, var, status=None):
895 return self.get_mds_map(status=status)[var]
91327a77 896
92f5a8d4
TL
897 def set_dir_layout(self, mount, path, layout):
898 for name, value in layout.items():
899 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
900
901 def add_data_pool(self, name, create=True):
902 if create:
39ae355f
TL
903 try:
904 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
905 '--pg_num_min', str(self.pg_num_min))
906 except CommandFailedError as e:
907 if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option
908 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
909 str(self.pg_num_min))
910 else:
911 raise
7c673cae
FG
912 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
913 self.get_pool_names(refresh = True)
914 for poolid, fs_name in self.data_pools.items():
915 if name == fs_name:
916 return poolid
917 raise RuntimeError("could not get just created pool '{0}'".format(name))
918
919 def get_pool_names(self, refresh = False, status = None):
920 if refresh or self.metadata_pool_name is None or self.data_pools is None:
921 if status is None:
922 status = self.status()
923 fsmap = status.get_fsmap(self.id)
924
925 osd_map = self.mon_manager.get_osd_dump_json()
926 id_to_name = {}
927 for p in osd_map['pools']:
928 id_to_name[p['pool']] = p['pool_name']
929
930 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
931 self.data_pools = {}
932 for data_pool in fsmap['mdsmap']['data_pools']:
933 self.data_pools[data_pool] = id_to_name[data_pool]
934
935 def get_data_pool_name(self, refresh = False):
936 if refresh or self.data_pools is None:
937 self.get_pool_names(refresh = True)
938 assert(len(self.data_pools) == 1)
e306af50 939 return next(iter(self.data_pools.values()))
7c673cae
FG
940
941 def get_data_pool_id(self, refresh = False):
942 """
943 Don't call this if you have multiple data pools
944 :return: integer
945 """
946 if refresh or self.data_pools is None:
947 self.get_pool_names(refresh = True)
948 assert(len(self.data_pools) == 1)
e306af50 949 return next(iter(self.data_pools.keys()))
7c673cae
FG
950
951 def get_data_pool_names(self, refresh = False):
952 if refresh or self.data_pools is None:
953 self.get_pool_names(refresh = True)
e306af50 954 return list(self.data_pools.values())
7c673cae
FG
955
956 def get_metadata_pool_name(self):
957 return self.metadata_pool_name
958
181888fb
FG
959 def set_data_pool_name(self, name):
960 if self.id is not None:
961 raise RuntimeError("can't set filesystem name if its fscid is set")
962 self.data_pool_name = name
963
522d829b
TL
964 def get_pool_pg_num(self, pool_name):
965 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
966 pool_name, 'pg_num',
967 '--format=json-pretty'))
968 return int(pgs['pg_num'])
969
7c673cae
FG
970 def get_namespace_id(self):
971 return self.id
972
973 def get_pool_df(self, pool_name):
974 """
975 Return a dict like:
976 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
977 """
978 for pool_df in self._df()['pools']:
979 if pool_df['name'] == pool_name:
980 return pool_df['stats']
981
982 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
983
984 def get_usage(self):
985 return self._df()['stats']['total_used_bytes']
986
11fdf7f2 987 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
988 """
989 Return true if all daemons are in one of active, standby, standby-replay, and
990 at least max_mds daemons are in 'active'.
991
992 Unlike most of Filesystem, this function is tolerant of new-style `fs`
993 commands being missing, because we are part of the ceph installation
994 process during upgrade suites, so must fall back to old style commands
995 when we get an EINVAL on a new style command.
996
997 :return:
998 """
11fdf7f2
TL
999 # First, check to see that processes haven't exited with an error code
1000 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
1001 mds.check_status()
7c673cae
FG
1002
1003 active_count = 0
f67539c2 1004 mds_map = self.get_mds_map(status=status)
7c673cae 1005
cd265ab1 1006 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
7c673cae
FG
1007
1008 for mds_id, mds_status in mds_map['info'].items():
1009 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
1010 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
1011 return False
1012 elif mds_status['state'] == 'up:active':
1013 active_count += 1
1014
cd265ab1 1015 log.debug("are_daemons_healthy: {0}/{1}".format(
7c673cae
FG
1016 active_count, mds_map['max_mds']
1017 ))
1018
11fdf7f2
TL
1019 if not skip_max_mds_check:
1020 if active_count > mds_map['max_mds']:
cd265ab1 1021 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
11fdf7f2
TL
1022 return False
1023 elif active_count == mds_map['max_mds']:
1024 # The MDSMap says these guys are active, but let's check they really are
1025 for mds_id, mds_status in mds_map['info'].items():
1026 if mds_status['state'] == 'up:active':
1027 try:
f67539c2 1028 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
11fdf7f2
TL
1029 except CommandFailedError as cfe:
1030 if cfe.exitstatus == errno.EINVAL:
1031 # Old version, can't do this check
1032 continue
1033 else:
1034 # MDS not even running
1035 return False
1036
1037 if daemon_status['state'] != 'up:active':
1038 # MDS hasn't taken the latest map yet
7c673cae
FG
1039 return False
1040
11fdf7f2
TL
1041 return True
1042 else:
1043 return False
7c673cae 1044 else:
cd265ab1 1045 log.debug("are_daemons_healthy: skipping max_mds check")
11fdf7f2 1046 return True
7c673cae 1047
11fdf7f2 1048 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
1049 """
1050 Return MDS daemon names of those daemons in the given state
1051 :param state:
1052 :return:
1053 """
11fdf7f2 1054 mdsmap = self.get_mds_map(status)
7c673cae 1055 result = []
9f95a23c
TL
1056 for mds_status in sorted(mdsmap['info'].values(),
1057 key=lambda _: _['rank']):
7c673cae
FG
1058 if mds_status['state'] == state or state is None:
1059 result.append(mds_status['name'])
1060
1061 return result
1062
9f95a23c 1063 def get_active_names(self, status=None):
7c673cae
FG
1064 """
1065 Return MDS daemon names of those daemons holding ranks
1066 in state up:active
1067
1068 :return: list of strings like ['a', 'b'], sorted by rank
1069 """
9f95a23c 1070 return self.get_daemon_names("up:active", status=status)
7c673cae 1071
11fdf7f2
TL
1072 def get_all_mds_rank(self, status=None):
1073 mdsmap = self.get_mds_map(status)
7c673cae 1074 result = []
9f95a23c
TL
1075 for mds_status in sorted(mdsmap['info'].values(),
1076 key=lambda _: _['rank']):
7c673cae
FG
1077 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1078 result.append(mds_status['rank'])
1079
1080 return result
1081
f6b5b4d7 1082 def get_rank(self, rank=None, status=None):
28e407b8
AA
1083 if status is None:
1084 status = self.getinfo()
f6b5b4d7
TL
1085 if rank is None:
1086 rank = 0
28e407b8
AA
1087 return status.get_rank(self.id, rank)
1088
11fdf7f2
TL
1089 def rank_restart(self, rank=0, status=None):
1090 name = self.get_rank(rank=rank, status=status)['name']
1091 self.mds_restart(mds_id=name)
1092
1093 def rank_signal(self, signal, rank=0, status=None):
1094 name = self.get_rank(rank=rank, status=status)['name']
1095 self.mds_signal(name, signal)
1096
1097 def rank_freeze(self, yes, rank=0):
1098 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1099
1e59de90
TL
1100 def rank_repaired(self, rank):
1101 self.mon_manager.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self.id, rank))
1102
11fdf7f2
TL
1103 def rank_fail(self, rank=0):
1104 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1105
1e59de90
TL
1106 def rank_is_running(self, rank=0, status=None):
1107 name = self.get_rank(rank=rank, status=status)['name']
1108 return self.mds_is_running(name)
1109
28e407b8
AA
1110 def get_ranks(self, status=None):
1111 if status is None:
1112 status = self.getinfo()
1113 return status.get_ranks(self.id)
1114
a4b75251
TL
1115 def get_damaged(self, status=None):
1116 if status is None:
1117 status = self.getinfo()
1118 return status.get_damaged(self.id)
1119
11fdf7f2
TL
1120 def get_replays(self, status=None):
1121 if status is None:
1122 status = self.getinfo()
1123 return status.get_replays(self.id)
1124
1125 def get_replay(self, rank=0, status=None):
1126 for replay in self.get_replays(status=status):
1127 if replay['rank'] == rank:
1128 return replay
1129 return None
1130
28e407b8 1131 def get_rank_names(self, status=None):
7c673cae
FG
1132 """
1133 Return MDS daemon names of those daemons holding a rank,
1134 sorted by rank. This includes e.g. up:replay/reconnect
1135 as well as active, but does not include standby or
1136 standby-replay.
1137 """
11fdf7f2 1138 mdsmap = self.get_mds_map(status)
7c673cae 1139 result = []
9f95a23c
TL
1140 for mds_status in sorted(mdsmap['info'].values(),
1141 key=lambda _: _['rank']):
7c673cae
FG
1142 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1143 result.append(mds_status['name'])
1144
1145 return result
1146
11fdf7f2 1147 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
1148 """
1149 Wait until all daemons are healthy
1150 :return:
1151 """
1152
1153 if timeout is None:
1154 timeout = DAEMON_WAIT_TIMEOUT
1155
05a536ef
TL
1156 if self.id is None:
1157 status = self.getinfo(refresh=True)
1158
11fdf7f2
TL
1159 if status is None:
1160 status = self.status()
1161
7c673cae
FG
1162 elapsed = 0
1163 while True:
11fdf7f2
TL
1164 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1165 return status
7c673cae
FG
1166 else:
1167 time.sleep(1)
1168 elapsed += 1
1169
1170 if elapsed > timeout:
cd265ab1 1171 log.debug("status = {0}".format(status))
7c673cae
FG
1172 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1173
11fdf7f2
TL
1174 status = self.status()
1175
f67539c2
TL
1176 def dencoder(self, obj_type, obj_blob):
1177 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1178 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1179 return p.stdout.getvalue()
1180
1181 def rados(self, *args, **kwargs):
7c673cae 1182 """
f67539c2 1183 Callout to rados CLI.
7c673cae 1184 """
7c673cae 1185
f67539c2 1186 return self.mon_manager.do_rados(*args, **kwargs)
7c673cae 1187
f67539c2 1188 def radosm(self, *args, **kwargs):
7c673cae 1189 """
f67539c2 1190 Interact with the metadata pool via rados CLI.
7c673cae 1191 """
7c673cae 1192
f67539c2
TL
1193 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1194
1195 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
7c673cae 1196 """
f67539c2 1197 Interact with the metadata pool via rados CLI. Get the stdout.
7c673cae 1198 """
7c673cae 1199
f67539c2 1200 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
7c673cae
FG
1201
1202 def get_metadata_object(self, object_type, object_id):
1203 """
1204 Retrieve an object from the metadata pool, pass it through
1205 ceph-dencoder to dump it to JSON, and return the decoded object.
1206 """
7c673cae 1207
f67539c2
TL
1208 o = self.radosmo(['get', object_id, '-'])
1209 j = self.dencoder(object_type, o)
7c673cae 1210 try:
f67539c2 1211 return json.loads(j)
7c673cae 1212 except (TypeError, ValueError):
f67539c2 1213 log.error("Failed to decode JSON: '{0}'".format(j))
7c673cae
FG
1214 raise
1215
7c673cae
FG
1216 def get_journal_version(self):
1217 """
1218 Read the JournalPointer and Journal::Header objects to learn the version of
1219 encoding in use.
1220 """
1221 journal_pointer_object = '400.00000000'
1222 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1223 journal_ino = journal_pointer_dump['journal_pointer']['front']
1224
1225 journal_header_object = "{0:x}.00000000".format(journal_ino)
1226 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1227
1228 version = journal_header_dump['journal_header']['stream_format']
cd265ab1 1229 log.debug("Read journal version {0}".format(version))
7c673cae
FG
1230
1231 return version
1232
f64942e4 1233 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae 1234 if mds_id is None:
f67539c2 1235 return self.rank_asok(command, timeout=timeout)
7c673cae 1236
f64942e4 1237 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1238
f67539c2
TL
1239 def mds_tell(self, command, mds_id=None):
1240 if mds_id is None:
1241 return self.rank_tell(command)
1242
1243 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1244
f64942e4
AA
1245 def rank_asok(self, command, rank=0, status=None, timeout=None):
1246 info = self.get_rank(rank=rank, status=status)
1247 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1248
11fdf7f2 1249 def rank_tell(self, command, rank=0, status=None):
20effc67
TL
1250 try:
1251 out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command)
1252 return json.loads(out)
1253 except json.decoder.JSONDecodeError:
1254 log.error("could not decode: {}".format(out))
1255 raise
11fdf7f2 1256
f6b5b4d7
TL
1257 def ranks_tell(self, command, status=None):
1258 if status is None:
1259 status = self.status()
1260 out = []
1261 for r in status.get_ranks(self.id):
1262 result = self.rank_tell(command, rank=r['rank'], status=status)
1263 out.append((r['rank'], result))
1264 return sorted(out)
1265
1266 def ranks_perf(self, f, status=None):
1267 perf = self.ranks_tell(["perf", "dump"], status=status)
1268 out = []
1269 for rank, perf in perf:
1270 out.append((rank, f(perf)))
1271 return out
1272
1e59de90 1273 def read_cache(self, path, depth=None, rank=None):
7c673cae
FG
1274 cmd = ["dump", "tree", path]
1275 if depth is not None:
1276 cmd.append(depth.__str__())
1e59de90
TL
1277 result = self.rank_asok(cmd, rank=rank)
1278 if result is None or len(result) == 0:
7c673cae
FG
1279 raise RuntimeError("Path not found in cache: {0}".format(path))
1280
1281 return result
1282
1283 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1284 """
1285 Block until the MDS reaches a particular state, or a failure condition
1286 is met.
1287
1288 When there are multiple MDSs, succeed when exaclty one MDS is in the
1289 goal state, or fail when any MDS is in the reject state.
1290
1291 :param goal_state: Return once the MDS is in this state
1292 :param reject: Fail if the MDS enters this state before the goal state
1293 :param timeout: Fail if this many seconds pass before reaching goal
1294 :return: number of seconds waited, rounded down to integer
1295 """
1296
1297 started_at = time.time()
1298 while True:
1299 status = self.status()
1300 if rank is not None:
f64942e4
AA
1301 try:
1302 mds_info = status.get_rank(self.id, rank)
1303 current_state = mds_info['state'] if mds_info else None
cd265ab1 1304 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
f64942e4
AA
1305 except:
1306 mdsmap = self.get_mds_map(status=status)
1307 if rank in mdsmap['failed']:
cd265ab1 1308 log.debug("Waiting for rank {0} to come back.".format(rank))
f64942e4
AA
1309 current_state = None
1310 else:
1311 raise
7c673cae
FG
1312 elif mds_id is not None:
1313 # mds_info is None if no daemon with this ID exists in the map
1314 mds_info = status.get_mds(mds_id)
1315 current_state = mds_info['state'] if mds_info else None
cd265ab1 1316 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
7c673cae
FG
1317 else:
1318 # In general, look for a single MDS
1319 states = [m['state'] for m in status.get_ranks(self.id)]
1320 if [s for s in states if s == goal_state] == [goal_state]:
1321 current_state = goal_state
1322 elif reject in states:
1323 current_state = reject
1324 else:
1325 current_state = None
cd265ab1 1326 log.debug("mapped states {0} to {1}".format(states, current_state))
7c673cae
FG
1327
1328 elapsed = time.time() - started_at
1329 if current_state == goal_state:
cd265ab1 1330 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
7c673cae
FG
1331 return elapsed
1332 elif reject is not None and current_state == reject:
1333 raise RuntimeError("MDS in reject state {0}".format(current_state))
1334 elif timeout is not None and elapsed > timeout:
1335 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1336 raise RuntimeError(
1337 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1338 elapsed, goal_state, current_state
1339 ))
1340 else:
1341 time.sleep(1)
1342
f67539c2 1343 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
7c673cae
FG
1344 if pool is None:
1345 pool = self.get_data_pool_name()
1346
1347 obj_name = "{0:x}.00000000".format(ino_no)
1348
f67539c2 1349 args = ["getxattr", obj_name, xattr_name]
7c673cae 1350 try:
f67539c2 1351 proc = self.rados(args, pool=pool, stdout=BytesIO())
7c673cae
FG
1352 except CommandFailedError as e:
1353 log.error(e.__str__())
1354 raise ObjectNotFound(obj_name)
1355
f67539c2
TL
1356 obj_blob = proc.stdout.getvalue()
1357 return json.loads(self.dencoder(obj_type, obj_blob).strip())
7c673cae
FG
1358
1359 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1360 """
1361 Write to an xattr of the 0th data object of an inode. Will
1362 succeed whether the object and/or xattr already exist or not.
1363
1364 :param ino_no: integer inode number
1365 :param xattr_name: string name of the xattr
1366 :param data: byte array data to write to the xattr
1367 :param pool: name of data pool or None to use primary data pool
1368 :return: None
1369 """
7c673cae
FG
1370 if pool is None:
1371 pool = self.get_data_pool_name()
1372
1373 obj_name = "{0:x}.00000000".format(ino_no)
f67539c2
TL
1374 args = ["setxattr", obj_name, xattr_name, data]
1375 self.rados(args, pool=pool)
7c673cae 1376
20effc67
TL
1377 def read_symlink(self, ino_no, pool=None):
1378 return self._read_data_xattr(ino_no, "symlink", "string_wrapper", pool)
1379
7c673cae
FG
1380 def read_backtrace(self, ino_no, pool=None):
1381 """
1382 Read the backtrace from the data pool, return a dict in the format
1383 given by inode_backtrace_t::dump, which is something like:
1384
1385 ::
1386
1387 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1388 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1389
1390 { "ino": 1099511627778,
1391 "ancestors": [
1392 { "dirino": 1,
1393 "dname": "blah",
1394 "version": 11}],
1395 "pool": 1,
1396 "old_pools": []}
1397
1398 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1399 one data pool and that will be used.
1400 """
1401 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1402
1403 def read_layout(self, ino_no, pool=None):
1404 """
1405 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1406 ::
1407 {
1408 "stripe_unit": 4194304,
1409 "stripe_count": 1,
1410 "object_size": 4194304,
1411 "pool_id": 1,
1412 "pool_ns": "",
1413 }
1414
1415 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1416 one data pool and that will be used.
1417 """
1418 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1419
1420 def _enumerate_data_objects(self, ino, size):
1421 """
1422 Get the list of expected data objects for a range, and the list of objects
1423 that really exist.
1424
1425 :return a tuple of two lists of strings (expected, actual)
1426 """
1427 stripe_size = 1024 * 1024 * 4
1428
1429 size = max(stripe_size, size)
1430
1431 want_objects = [
1432 "{0:x}.{1:08x}".format(ino, n)
e306af50 1433 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1434 ]
1435
f67539c2 1436 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
7c673cae
FG
1437
1438 return want_objects, exist_objects
1439
1440 def data_objects_present(self, ino, size):
1441 """
1442 Check that *all* the expected data objects for an inode are present in the data pool
1443 """
1444
1445 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1446 missing = set(want_objects) - set(exist_objects)
1447
1448 if missing:
cd265ab1 1449 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
7c673cae
FG
1450 ino, size, missing
1451 ))
1452 return False
1453 else:
cd265ab1 1454 log.debug("All objects for ino {0} size {1} found".format(ino, size))
7c673cae
FG
1455 return True
1456
1457 def data_objects_absent(self, ino, size):
1458 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1459 present = set(want_objects) & set(exist_objects)
1460
1461 if present:
cd265ab1 1462 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
7c673cae
FG
1463 ino, size, present
1464 ))
1465 return False
1466 else:
cd265ab1 1467 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
7c673cae
FG
1468 return True
1469
1470 def dirfrag_exists(self, ino, frag):
1471 try:
f67539c2 1472 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1473 except CommandFailedError:
7c673cae
FG
1474 return False
1475 else:
1476 return True
1477
7c673cae
FG
1478 def list_dirfrag(self, dir_ino):
1479 """
1480 Read the named object and return the list of omap keys
1481
1482 :return a list of 0 or more strings
1483 """
1484
1485 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1486
1487 try:
f67539c2 1488 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
7c673cae
FG
1489 except CommandFailedError as e:
1490 log.error(e.__str__())
1491 raise ObjectNotFound(dirfrag_obj_name)
1492
f67539c2
TL
1493 return key_list_str.strip().split("\n") if key_list_str else []
1494
1495 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1496 """
1497 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1498 warning : The splitting of directory is not considered here.
1499 """
1500
1501 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1502 try:
1503 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1504 except CommandFailedError as e:
1505 log.error(e.__str__())
1506 raise ObjectNotFound(dir_ino)
7c673cae
FG
1507
1508 def erase_metadata_objects(self, prefix):
1509 """
1510 For all objects in the metadata pool matching the prefix,
1511 erase them.
1512
1513 This O(N) with the number of objects in the pool, so only suitable
1514 for use on toy test filesystems.
1515 """
f67539c2 1516 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
7c673cae
FG
1517 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1518 for o in matching_objects:
f67539c2 1519 self.radosm(["rm", o])
7c673cae
FG
1520
1521 def erase_mds_objects(self, rank):
1522 """
1523 Erase all the per-MDS objects for a particular rank. This includes
1524 inotable, sessiontable, journal
1525 """
1526
1527 def obj_prefix(multiplier):
1528 """
1529 MDS object naming conventions like rank 1's
1530 journal is at 201.***
1531 """
1532 return "%x." % (multiplier * 0x100 + rank)
1533
1534 # MDS_INO_LOG_OFFSET
1535 self.erase_metadata_objects(obj_prefix(2))
1536 # MDS_INO_LOG_BACKUP_OFFSET
1537 self.erase_metadata_objects(obj_prefix(3))
1538 # MDS_INO_LOG_POINTER_OFFSET
1539 self.erase_metadata_objects(obj_prefix(4))
1540 # MDSTables & SessionMap
1541 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1542
1543 @property
1544 def _prefix(self):
1545 """
1546 Override this to set a different
1547 """
1548 return ""
1549
f64942e4
AA
1550 def _make_rank(self, rank):
1551 return "{}:{}".format(self.name, rank)
1552
7c673cae
FG
1553 def _run_tool(self, tool, args, rank=None, quiet=False):
1554 # Tests frequently have [client] configuration that jacks up
1555 # the objecter log level (unlikely to be interesting here)
1556 # and does not set the mds log level (very interesting here)
1557 if quiet:
1558 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1559 else:
1e59de90 1560 base_args = [os.path.join(self._prefix, tool), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1']
7c673cae
FG
1561
1562 if rank is not None:
f64942e4 1563 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1564
1565 t1 = datetime.datetime.now()
e306af50 1566 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae 1567 duration = datetime.datetime.now() - t1
cd265ab1 1568 log.debug("Ran {0} in time {1}, result:\n{2}".format(
7c673cae
FG
1569 base_args + args, duration, r
1570 ))
1571 return r
1572
1573 @property
1574 def tool_remote(self):
1575 """
1576 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1577 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1578 so that tests can use this remote to go get locally written output files from the tools.
1579 """
f67539c2 1580 return self.mon_manager.controller
7c673cae 1581
f64942e4 1582 def journal_tool(self, args, rank, quiet=False):
7c673cae 1583 """
f64942e4 1584 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1585 """
f64942e4
AA
1586 fs_rank = self._make_rank(rank)
1587 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae 1588
f67539c2
TL
1589 def meta_tool(self, args, rank, quiet=False):
1590 """
1591 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1592 """
1593 fs_rank = self._make_rank(rank)
1594 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1595
7c673cae
FG
1596 def table_tool(self, args, quiet=False):
1597 """
1598 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1599 """
1600 return self._run_tool("cephfs-table-tool", args, None, quiet)
1601
1602 def data_scan(self, args, quiet=False, worker_count=1):
1603 """
1604 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1605
1606 :param worker_count: if greater than 1, multiple workers will be run
1607 in parallel and the return value will be None
1608 """
1609
1610 workers = []
1611
1612 for n in range(0, worker_count):
1613 if worker_count > 1:
1614 # data-scan args first token is a command, followed by args to it.
1615 # insert worker arguments after the command.
1616 cmd = args[0]
1617 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1618 else:
1619 worker_args = args
1620
1621 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1622 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1623
1624 for w in workers:
1625 w.get()
1626
1627 if worker_count == 1:
1628 return workers[0].value
1629 else:
1630 return None
b32b8144
FG
1631
1632 def is_full(self):
1633 return self.is_pool_full(self.get_data_pool_name())
f67539c2
TL
1634
1635 def authorize(self, client_id, caps=('/', 'rw')):
1636 """
1637 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1638 keyring.
1639
1640 client_id: client id that will be authorized
1641 caps: tuple containing the path and permission (can be r or rw)
1642 respectively.
1643 """
1e59de90
TL
1644 if isinstance(caps[0], (tuple, list)):
1645 x = []
1646 for c in caps:
1647 x.extend(c)
1648 caps = tuple(x)
1649
f67539c2
TL
1650 client_name = 'client.' + client_id
1651 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1652 client_name, *caps)
1653
1654 def grow(self, new_max_mds, status=None):
1655 oldmax = self.get_var('max_mds', status=status)
1656 assert(new_max_mds > oldmax)
1657 self.set_max_mds(new_max_mds)
1658 return self.wait_for_daemons()
1659
1660 def shrink(self, new_max_mds, status=None):
1661 oldmax = self.get_var('max_mds', status=status)
1662 assert(new_max_mds < oldmax)
1663 self.set_max_mds(new_max_mds)
1664 return self.wait_for_daemons()
1665
1666 def run_scrub(self, cmd, rank=0):
1667 return self.rank_tell(["scrub"] + cmd, rank)
1668
1669 def get_scrub_status(self, rank=0):
1670 return self.run_scrub(["status"], rank)
1671
1e59de90
TL
1672 def flush(self, rank=0):
1673 return self.rank_tell(["flush", "journal"], rank=rank)
1674
f67539c2
TL
1675 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1676 timeout=300, reverse=False):
1677 # time out after "timeout" seconds and assume as done
1678 if result is None:
1679 result = "no active scrubs running"
1680 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1681 while proceed():
1682 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1683 assert out_json is not None
1684 if not reverse:
1685 if result in out_json['status']:
1686 log.info("all active scrubs completed")
1687 return True
1688 else:
1689 if result not in out_json['status']:
1690 log.info("all active scrubs completed")
1691 return True
1692
1693 if tag is not None:
1694 status = out_json['scrubs'][tag]
1695 if status is not None:
1696 log.info(f"scrub status for tag:{tag} - {status}")
1697 else:
1698 log.info(f"scrub has completed for tag:{tag}")
1699 return True
1700
1701 # timed out waiting for scrub to complete
1702 return False
1e59de90
TL
1703
1704 def get_damage(self, rank=None):
1705 if rank is None:
1706 result = {}
1707 for info in self.get_ranks():
1708 rank = info['rank']
1709 result[rank] = self.get_damage(rank=rank)
1710 return result
1711 else:
1712 return self.rank_tell(['damage', 'ls'], rank=rank)