]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
import quincy beta 17.1.0
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
7c673cae 11
f67539c2
TL
12from io import BytesIO, StringIO
13from errno import EBUSY
e306af50 14
7c673cae
FG
15from teuthology.exceptions import CommandFailedError
16from teuthology import misc
17from teuthology.nuke import clear_firewall
18from teuthology.parallel import parallel
f67539c2 19from teuthology import contextutil
7c673cae
FG
20from tasks.ceph_manager import write_conf
21from tasks import ceph_manager
22
23
24log = logging.getLogger(__name__)
25
26
27DAEMON_WAIT_TIMEOUT = 120
28ROOT_INO = 1
29
92f5a8d4
TL
30class FileLayout(object):
31 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
32 self.pool = pool
33 self.pool_namespace = pool_namespace
34 self.stripe_unit = stripe_unit
35 self.stripe_count = stripe_count
36 self.object_size = object_size
37
38 @classmethod
39 def load_from_ceph(layout_str):
40 # TODO
41 pass
42
43 def items(self):
44 if self.pool is not None:
45 yield ("pool", self.pool)
46 if self.pool_namespace:
47 yield ("pool_namespace", self.pool_namespace)
48 if self.stripe_unit is not None:
49 yield ("stripe_unit", self.stripe_unit)
50 if self.stripe_count is not None:
51 yield ("stripe_count", self.stripe_count)
52 if self.object_size is not None:
53 yield ("object_size", self.stripe_size)
7c673cae
FG
54
55class ObjectNotFound(Exception):
56 def __init__(self, object_name):
57 self._object_name = object_name
58
59 def __str__(self):
60 return "Object not found: '{0}'".format(self._object_name)
61
f67539c2
TL
62class FSMissing(Exception):
63 def __init__(self, ident):
64 self.ident = ident
65
66 def __str__(self):
67 return f"File system {self.ident} does not exist in the map"
68
7c673cae
FG
69class FSStatus(object):
70 """
71 Operations on a snapshot of the FSMap.
72 """
522d829b 73 def __init__(self, mon_manager, epoch=None):
7c673cae 74 self.mon = mon_manager
522d829b
TL
75 cmd = ["fs", "dump", "--format=json"]
76 if epoch is not None:
77 cmd.append(str(epoch))
78 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
7c673cae
FG
79
80 def __str__(self):
81 return json.dumps(self.map, indent = 2, sort_keys = True)
82
83 # Expose the fsmap for manual inspection.
84 def __getitem__(self, key):
85 """
86 Get a field from the fsmap.
87 """
88 return self.map[key]
89
90 def get_filesystems(self):
91 """
92 Iterator for all filesystems.
93 """
94 for fs in self.map['filesystems']:
95 yield fs
96
97 def get_all(self):
98 """
99 Iterator for all the mds_info components in the FSMap.
100 """
9f95a23c 101 for info in self.map['standbys']:
7c673cae
FG
102 yield info
103 for fs in self.map['filesystems']:
104 for info in fs['mdsmap']['info'].values():
105 yield info
106
107 def get_standbys(self):
108 """
109 Iterator for all standbys.
110 """
111 for info in self.map['standbys']:
112 yield info
113
114 def get_fsmap(self, fscid):
115 """
116 Get the fsmap for the given FSCID.
117 """
118 for fs in self.map['filesystems']:
119 if fscid is None or fs['id'] == fscid:
120 return fs
f67539c2 121 raise FSMissing(fscid)
7c673cae
FG
122
123 def get_fsmap_byname(self, name):
124 """
125 Get the fsmap for the given file system name.
126 """
127 for fs in self.map['filesystems']:
128 if name is None or fs['mdsmap']['fs_name'] == name:
129 return fs
f67539c2 130 raise FSMissing(name)
7c673cae
FG
131
132 def get_replays(self, fscid):
133 """
134 Get the standby:replay MDS for the given FSCID.
135 """
136 fs = self.get_fsmap(fscid)
137 for info in fs['mdsmap']['info'].values():
138 if info['state'] == 'up:standby-replay':
139 yield info
140
141 def get_ranks(self, fscid):
142 """
143 Get the ranks for the given FSCID.
144 """
145 fs = self.get_fsmap(fscid)
146 for info in fs['mdsmap']['info'].values():
11fdf7f2 147 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
148 yield info
149
a4b75251
TL
150 def get_damaged(self, fscid):
151 """
152 Get the damaged ranks for the given FSCID.
153 """
154 fs = self.get_fsmap(fscid)
155 return fs['mdsmap']['damaged']
156
7c673cae
FG
157 def get_rank(self, fscid, rank):
158 """
159 Get the rank for the given FSCID.
160 """
161 for info in self.get_ranks(fscid):
162 if info['rank'] == rank:
163 return info
164 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
165
166 def get_mds(self, name):
167 """
168 Get the info for the given MDS name.
169 """
170 for info in self.get_all():
171 if info['name'] == name:
172 return info
173 return None
174
175 def get_mds_addr(self, name):
176 """
177 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
178 """
179 info = self.get_mds(name)
180 if info:
181 return info['addr']
182 else:
e306af50 183 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
184 raise RuntimeError("MDS id '{0}' not found in map".format(name))
185
f67539c2
TL
186 def get_mds_addrs(self, name):
187 """
188 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
189 """
190 info = self.get_mds(name)
191 if info:
192 return [e['addr'] for e in info['addrs']['addrvec']]
193 else:
194 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
195 raise RuntimeError("MDS id '{0}' not found in map".format(name))
196
9f95a23c
TL
197 def get_mds_gid(self, gid):
198 """
199 Get the info for the given MDS gid.
200 """
201 for info in self.get_all():
202 if info['gid'] == gid:
203 return info
204 return None
205
206 def hadfailover(self, status):
207 """
208 Compares two statuses for mds failovers.
209 Returns True if there is a failover.
210 """
211 for fs in status.map['filesystems']:
212 for info in fs['mdsmap']['info'].values():
213 oldinfo = self.get_mds_gid(info['gid'])
214 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
215 return True
216 #all matching
217 return False
218
7c673cae
FG
219class CephCluster(object):
220 @property
221 def admin_remote(self):
222 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 223 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
224 return result
225
f67539c2 226 def __init__(self, ctx) -> None:
7c673cae
FG
227 self._ctx = ctx
228 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
229
230 def get_config(self, key, service_type=None):
231 """
232 Get config from mon by default, or a specific service if caller asks for it
233 """
234 if service_type is None:
235 service_type = 'mon'
236
237 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
238 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
239
240 def set_ceph_conf(self, subsys, key, value):
241 if subsys not in self._ctx.ceph['ceph'].conf:
242 self._ctx.ceph['ceph'].conf[subsys] = {}
243 self._ctx.ceph['ceph'].conf[subsys][key] = value
244 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
245 # used a different config path this won't work.
246
247 def clear_ceph_conf(self, subsys, key):
248 del self._ctx.ceph['ceph'].conf[subsys][key]
249 write_conf(self._ctx)
250
f64942e4
AA
251 def json_asok(self, command, service_type, service_id, timeout=None):
252 if timeout is None:
253 timeout = 15*60
f6b5b4d7 254 command.insert(0, '--format=json')
f64942e4 255 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
f6b5b4d7
TL
256 response_data = proc.stdout.getvalue().strip()
257 if len(response_data) > 0:
258 j = json.loads(response_data)
259 pretty = json.dumps(j, sort_keys=True, indent=2)
260 log.debug(f"_json_asok output\n{pretty}")
261 return j
7c673cae 262 else:
f6b5b4d7 263 log.debug("_json_asok output empty")
7c673cae
FG
264 return None
265
f67539c2
TL
266 def is_addr_blocklisted(self, addr=None):
267 if addr is None:
268 log.warn("Couldn't get the client address, so the blocklisted "
269 "status undetermined")
270 return False
271
272 blocklist = json.loads(self.mon_manager.run_cluster_cmd(
273 args=["osd", "blocklist", "ls", "--format=json"],
274 stdout=StringIO()).stdout.getvalue())
275 for b in blocklist:
276 if addr == b["addr"]:
277 return True
278 return False
279
7c673cae
FG
280
281class MDSCluster(CephCluster):
282 """
283 Collective operations on all the MDS daemons in the Ceph cluster. These
284 daemons may be in use by various Filesystems.
285
286 For the benefit of pre-multi-filesystem tests, this class is also
287 a parent of Filesystem. The correct way to use MDSCluster going forward is
288 as a separate instance outside of your (multiple) Filesystem instances.
289 """
f67539c2 290
7c673cae
FG
291 def __init__(self, ctx):
292 super(MDSCluster, self).__init__(ctx)
293
f67539c2
TL
294 @property
295 def mds_ids(self):
296 # do this dynamically because the list of ids may change periodically with cephadm
297 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
7c673cae 298
f67539c2
TL
299 @property
300 def mds_daemons(self):
301 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
7c673cae
FG
302
303 def _one_or_all(self, mds_id, cb, in_parallel=True):
304 """
305 Call a callback for a single named MDS, or for all.
306
307 Note that the parallelism here isn't for performance, it's to avoid being overly kind
308 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
309 avoid being overly kind by executing them in a particular order. However, some actions
310 don't cope with being done in parallel, so it's optional (`in_parallel`)
311
312 :param mds_id: MDS daemon name, or None
313 :param cb: Callback taking single argument of MDS daemon name
314 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
315 """
f67539c2 316
7c673cae
FG
317 if mds_id is None:
318 if in_parallel:
319 with parallel() as p:
320 for mds_id in self.mds_ids:
321 p.spawn(cb, mds_id)
322 else:
323 for mds_id in self.mds_ids:
324 cb(mds_id)
325 else:
326 cb(mds_id)
327
181888fb
FG
328 def get_config(self, key, service_type=None):
329 """
330 get_config specialization of service_type="mds"
331 """
332 if service_type != "mds":
333 return super(MDSCluster, self).get_config(key, service_type)
334
335 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
336 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
337 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
338 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
339
7c673cae
FG
340 def mds_stop(self, mds_id=None):
341 """
342 Stop the MDS daemon process(se). If it held a rank, that rank
343 will eventually go laggy.
344 """
345 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
346
347 def mds_fail(self, mds_id=None):
348 """
349 Inform MDSMonitor of the death of the daemon process(es). If it held
350 a rank, that rank will be relinquished.
351 """
352 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
353
354 def mds_restart(self, mds_id=None):
355 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
356
357 def mds_fail_restart(self, mds_id=None):
358 """
359 Variation on restart that includes marking MDSs as failed, so that doing this
360 operation followed by waiting for healthy daemon states guarantees that they
361 have gone down and come up, rather than potentially seeing the healthy states
362 that existed before the restart.
363 """
364 def _fail_restart(id_):
365 self.mds_daemons[id_].stop()
366 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
367 self.mds_daemons[id_].restart()
368
369 self._one_or_all(mds_id, _fail_restart)
370
1adf2230
AA
371 def mds_signal(self, mds_id, sig, silent=False):
372 """
373 signal a MDS daemon
374 """
375 self.mds_daemons[mds_id].signal(sig, silent);
376
181888fb
FG
377 def newfs(self, name='cephfs', create=True):
378 return Filesystem(self._ctx, name=name, create=create)
7c673cae 379
522d829b
TL
380 def status(self, epoch=None):
381 return FSStatus(self.mon_manager, epoch)
7c673cae 382
7c673cae
FG
383 def get_standby_daemons(self):
384 return set([s['name'] for s in self.status().get_standbys()])
385
386 def get_mds_hostnames(self):
387 result = set()
388 for mds_id in self.mds_ids:
389 mds_remote = self.mon_manager.find_remote('mds', mds_id)
390 result.add(mds_remote.hostname)
391
392 return list(result)
393
394 def set_clients_block(self, blocked, mds_id=None):
395 """
396 Block (using iptables) client communications to this MDS. Be careful: if
397 other services are running on this MDS, or other MDSs try to talk to this
398 MDS, their communications may also be blocked as collatoral damage.
399
400 :param mds_id: Optional ID of MDS to block, default to all
401 :return:
402 """
403 da_flag = "-A" if blocked else "-D"
404
405 def set_block(_mds_id):
406 remote = self.mon_manager.find_remote('mds', _mds_id)
407 status = self.status()
408
409 addr = status.get_mds_addr(_mds_id)
410 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
411
412 remote.run(
413 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
414 "comment", "--comment", "teuthology"])
415 remote.run(
416 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
417 "comment", "--comment", "teuthology"])
418
419 self._one_or_all(mds_id, set_block, in_parallel=False)
420
f67539c2
TL
421 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
422 """
423 Block (using iptables) communications from a provided MDS to other MDSs.
424 Block all ports that an MDS uses for communication.
425
426 :param blocked: True to block the MDS, False otherwise
427 :param mds_rank_1: MDS rank
428 :param mds_rank_2: MDS rank
429 :return:
430 """
431 da_flag = "-A" if blocked else "-D"
432
433 def set_block(mds_ids):
434 status = self.status()
435
436 mds = mds_ids[0]
437 remote = self.mon_manager.find_remote('mds', mds)
438 addrs = status.get_mds_addrs(mds)
439 for addr in addrs:
440 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
441 remote.run(
442 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
443 "comment", "--comment", "teuthology"])
444
445
446 mds = mds_ids[1]
447 remote = self.mon_manager.find_remote('mds', mds)
448 addrs = status.get_mds_addrs(mds)
449 for addr in addrs:
450 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
451 remote.run(
452 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
453 "comment", "--comment", "teuthology"])
454 remote.run(
455 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
456 "comment", "--comment", "teuthology"])
457
458 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
459
7c673cae
FG
460 def clear_firewall(self):
461 clear_firewall(self._ctx)
462
463 def get_mds_info(self, mds_id):
464 return FSStatus(self.mon_manager).get_mds(mds_id)
465
7c673cae
FG
466 def is_pool_full(self, pool_name):
467 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
468 for pool in pools:
469 if pool['pool_name'] == pool_name:
470 return 'full' in pool['flags_names'].split(",")
471
472 raise RuntimeError("Pool not found '{0}'".format(pool_name))
473
f67539c2
TL
474 def delete_all_filesystems(self):
475 """
476 Remove all filesystems that exist, and any pools in use by them.
477 """
478 for fs in self.status().get_filesystems():
479 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
480
20effc67
TL
481 @property
482 def beacon_timeout(self):
483 """
484 Generate an acceptable timeout for the mons to drive some MDSMap change
485 because of missed beacons from some MDS. This involves looking up the
486 grace period in use by the mons and adding an acceptable buffer.
487 """
488
489 grace = float(self.get_config("mds_beacon_grace", service_type="mon"))
490 return grace*2+15
491
f67539c2 492
7c673cae
FG
493class Filesystem(MDSCluster):
494 """
495 This object is for driving a CephFS filesystem. The MDS daemons driven by
496 MDSCluster may be shared with other Filesystems.
497 """
f67539c2 498 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
7c673cae
FG
499 super(Filesystem, self).__init__(ctx)
500
181888fb 501 self.name = name
7c673cae 502 self.id = None
7c673cae 503 self.metadata_pool_name = None
181888fb
FG
504 self.metadata_overlay = False
505 self.data_pool_name = None
7c673cae 506 self.data_pools = None
f91f0fd5 507 self.fs_config = fs_config
f67539c2 508 self.ec_profile = fs_config.get('ec_profile')
7c673cae
FG
509
510 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
511 self.client_id = client_list[0]
512 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
513
181888fb 514 if name is not None:
7c673cae
FG
515 if fscid is not None:
516 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 517 if create and not self.legacy_configured():
7c673cae 518 self.create()
181888fb
FG
519 else:
520 if fscid is not None:
521 self.id = fscid
522 self.getinfo(refresh = True)
7c673cae
FG
523
524 # Stash a reference to the first created filesystem on ctx, so
525 # that if someone drops to the interactive shell they can easily
526 # poke our methods.
527 if not hasattr(self._ctx, "filesystem"):
528 self._ctx.filesystem = self
529
f67539c2
TL
530 def dead(self):
531 try:
532 return not bool(self.get_mds_map())
533 except FSMissing:
534 return True
535
9f95a23c
TL
536 def get_task_status(self, status_key):
537 return self.mon_manager.get_service_task_status("mds", status_key)
538
7c673cae
FG
539 def getinfo(self, refresh = False):
540 status = self.status()
541 if self.id is not None:
542 fsmap = status.get_fsmap(self.id)
543 elif self.name is not None:
544 fsmap = status.get_fsmap_byname(self.name)
545 else:
546 fss = [fs for fs in status.get_filesystems()]
547 if len(fss) == 1:
548 fsmap = fss[0]
549 elif len(fss) == 0:
550 raise RuntimeError("no file system available")
551 else:
552 raise RuntimeError("more than one file system available")
553 self.id = fsmap['id']
554 self.name = fsmap['mdsmap']['fs_name']
555 self.get_pool_names(status = status, refresh = refresh)
556 return status
557
181888fb
FG
558 def set_metadata_overlay(self, overlay):
559 if self.id is not None:
560 raise RuntimeError("cannot specify fscid when configuring overlay")
561 self.metadata_overlay = overlay
562
11fdf7f2 563 def reach_max_mds(self):
20effc67 564 status = self.wait_for_daemons()
11fdf7f2 565 mds_map = self.get_mds_map(status=status)
20effc67 566 assert(mds_map['in'] == list(range(0, mds_map['max_mds'])))
11fdf7f2 567
f67539c2
TL
568 def reset(self):
569 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
570
11fdf7f2
TL
571 def fail(self):
572 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
573
9f95a23c
TL
574 def set_flag(self, var, *args):
575 a = map(lambda x: str(x).lower(), args)
576 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
577
578 def set_allow_multifs(self, yes=True):
579 self.set_flag("enable_multiple", yes)
580
91327a77 581 def set_var(self, var, *args):
9f95a23c 582 a = map(lambda x: str(x).lower(), args)
91327a77
AA
583 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
584
11fdf7f2
TL
585 def set_down(self, down=True):
586 self.set_var("down", str(down).lower())
587
588 def set_joinable(self, joinable=True):
9f95a23c 589 self.set_var("joinable", joinable)
11fdf7f2 590
7c673cae 591 def set_max_mds(self, max_mds):
f64942e4 592 self.set_var("max_mds", "%d" % max_mds)
7c673cae 593
f91f0fd5
TL
594 def set_session_timeout(self, timeout):
595 self.set_var("session_timeout", "%d" % timeout)
596
11fdf7f2 597 def set_allow_standby_replay(self, yes):
9f95a23c 598 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
599
600 def set_allow_new_snaps(self, yes):
9f95a23c 601 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 602
522d829b
TL
603 def compat(self, *args):
604 a = map(lambda x: str(x).lower(), args)
605 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
606
607 def add_compat(self, *args):
608 self.compat("add_compat", *args)
609
610 def add_incompat(self, *args):
611 self.compat("add_incompat", *args)
612
613 def rm_compat(self, *args):
614 self.compat("rm_compat", *args)
615
616 def rm_incompat(self, *args):
617 self.compat("rm_incompat", *args)
618
f67539c2
TL
619 def required_client_features(self, *args, **kwargs):
620 c = ["fs", "required_client_features", self.name, *args]
621 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
622
522d829b
TL
623 # Since v15.1.0 the pg autoscale mode has been enabled as default,
624 # will let the pg autoscale mode to calculate the pg_num as needed.
625 # We set the pg_num_min to 64 to make sure that pg autoscale mode
626 # won't set the pg_num to low to fix Tracker#45434.
627 pg_num = 64
628 pg_num_min = 64
629 target_size_ratio = 0.9
630 target_size_ratio_ec = 0.9
7c673cae
FG
631
632 def create(self):
633 if self.name is None:
634 self.name = "cephfs"
635 if self.metadata_pool_name is None:
636 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
637 if self.data_pool_name is None:
638 data_pool_name = "{0}_data".format(self.name)
639 else:
640 data_pool_name = self.data_pool_name
7c673cae 641
522d829b
TL
642 # will use the ec pool to store the data and a small amount of
643 # metadata still goes to the primary data pool for all files.
644 if not self.metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
645 self.target_size_ratio = 0.05
646
cd265ab1 647 log.debug("Creating filesystem '{0}'".format(self.name))
7c673cae 648
7c673cae 649 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
20effc67 650 self.metadata_pool_name,
522d829b 651 '--pg_num_min', str(self.pg_num_min))
f67539c2
TL
652
653 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
522d829b
TL
654 data_pool_name, str(self.pg_num),
655 '--pg_num_min', str(self.pg_num_min),
656 '--target_size_ratio',
657 str(self.target_size_ratio))
f67539c2 658
181888fb
FG
659 if self.metadata_overlay:
660 self.mon_manager.raw_cluster_cmd('fs', 'new',
661 self.name, self.metadata_pool_name, data_pool_name,
662 '--allow-dangerous-metadata-overlay')
663 else:
f67539c2
TL
664 self.mon_manager.raw_cluster_cmd('fs', 'new',
665 self.name,
666 self.metadata_pool_name,
667 data_pool_name)
668
b32b8144 669 if self.ec_profile and 'disabled' not in self.ec_profile:
f67539c2 670 ec_data_pool_name = data_pool_name + "_ec"
cd265ab1 671 log.debug("EC profile is %s", self.ec_profile)
f67539c2 672 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
3efd9988
FG
673 cmd.extend(self.ec_profile)
674 self.mon_manager.raw_cluster_cmd(*cmd)
675 self.mon_manager.raw_cluster_cmd(
522d829b
TL
676 'osd', 'pool', 'create', ec_data_pool_name,
677 'erasure', ec_data_pool_name,
678 '--pg_num_min', str(self.pg_num_min),
679 '--target_size_ratio', str(self.target_size_ratio_ec))
3efd9988
FG
680 self.mon_manager.raw_cluster_cmd(
681 'osd', 'pool', 'set',
f67539c2
TL
682 ec_data_pool_name, 'allow_ec_overwrites', 'true')
683 self.add_data_pool(ec_data_pool_name, create=False)
684 self.check_pool_application(ec_data_pool_name)
685
686 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
687
35e4c445
FG
688 self.check_pool_application(self.metadata_pool_name)
689 self.check_pool_application(data_pool_name)
f67539c2 690
7c673cae 691 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
692 try:
693 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
694 except CommandFailedError as e:
695 if e.exitstatus == 22:
696 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
697 pass
698 else:
699 raise
7c673cae 700
f91f0fd5
TL
701 if self.fs_config is not None:
702 max_mds = self.fs_config.get('max_mds', 1)
703 if max_mds > 1:
704 self.set_max_mds(max_mds)
705
f67539c2
TL
706 standby_replay = self.fs_config.get('standby_replay', False)
707 self.set_allow_standby_replay(standby_replay)
708
f91f0fd5
TL
709 # If absent will use the default value (60 seconds)
710 session_timeout = self.fs_config.get('session_timeout', 60)
711 if session_timeout != 60:
712 self.set_session_timeout(session_timeout)
713
7c673cae
FG
714 self.getinfo(refresh = True)
715
522d829b
TL
716 # wait pgs to be clean
717 self.mon_manager.wait_for_clean()
718
f67539c2
TL
719 def run_client_payload(self, cmd):
720 # avoid circular dep by importing here:
721 from tasks.cephfs.fuse_mount import FuseMount
722 d = misc.get_testdir(self._ctx)
20effc67 723 m = FuseMount(self._ctx, d, "admin", self.client_remote, cephfs_name=self.name)
522d829b 724 m.mount_wait()
f67539c2
TL
725 m.run_shell_payload(cmd)
726 m.umount_wait(require_clean=True)
727
728 def _remove_pool(self, name, **kwargs):
729 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
730 return self.mon_manager.ceph(c, **kwargs)
731
732 def rm(self, **kwargs):
733 c = f'fs rm {self.name} --yes-i-really-mean-it'
734 return self.mon_manager.ceph(c, **kwargs)
735
736 def remove_pools(self, data_pools):
737 self._remove_pool(self.get_metadata_pool_name())
738 for poolname in data_pools:
739 try:
740 self._remove_pool(poolname)
741 except CommandFailedError as e:
742 # EBUSY, this data pool is used by two metadata pools, let the
743 # 2nd pass delete it
744 if e.exitstatus == EBUSY:
745 pass
746 else:
747 raise
748
749 def destroy(self, reset_obj_attrs=True):
750 log.info(f'Destroying file system {self.name} and related pools')
751
752 if self.dead():
753 log.debug('already dead...')
754 return
755
756 data_pools = self.get_data_pool_names(refresh=True)
757
758 # make sure no MDSs are attached to given FS.
759 self.fail()
760 self.rm()
761
762 self.remove_pools(data_pools)
763
764 if reset_obj_attrs:
765 self.id = None
766 self.name = None
767 self.metadata_pool_name = None
768 self.data_pool_name = None
769 self.data_pools = None
770
771 def recreate(self):
772 self.destroy()
773
774 self.create()
775 self.getinfo(refresh=True)
776
35e4c445
FG
777 def check_pool_application(self, pool_name):
778 osd_map = self.mon_manager.get_osd_dump_json()
779 for pool in osd_map['pools']:
780 if pool['pool_name'] == pool_name:
781 if "application_metadata" in pool:
782 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
783 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
784 format(pool_name=pool_name))
35e4c445 785
7c673cae
FG
786 def __del__(self):
787 if getattr(self._ctx, "filesystem", None) == self:
788 delattr(self._ctx, "filesystem")
789
790 def exists(self):
791 """
792 Whether a filesystem exists in the mon's filesystem list
793 """
794 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
795 return self.name in [fs['name'] for fs in fs_list]
796
797 def legacy_configured(self):
798 """
799 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
800 the case, the caller should avoid using Filesystem.create
801 """
802 try:
803 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
804 pools = json.loads(out_text)
805 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
806 if metadata_pool_exists:
807 self.metadata_pool_name = 'metadata'
808 except CommandFailedError as e:
809 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
810 # structured output (--format) from the CLI.
811 if e.exitstatus == 22:
812 metadata_pool_exists = True
813 else:
814 raise
815
816 return metadata_pool_exists
817
818 def _df(self):
819 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
820
f67539c2 821 # may raise FSMissing
f64942e4
AA
822 def get_mds_map(self, status=None):
823 if status is None:
824 status = self.status()
825 return status.get_fsmap(self.id)['mdsmap']
7c673cae 826
11fdf7f2
TL
827 def get_var(self, var, status=None):
828 return self.get_mds_map(status=status)[var]
91327a77 829
92f5a8d4
TL
830 def set_dir_layout(self, mount, path, layout):
831 for name, value in layout.items():
832 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
833
834 def add_data_pool(self, name, create=True):
835 if create:
522d829b
TL
836 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
837 '--pg_num_min', str(self.pg_num_min))
7c673cae
FG
838 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
839 self.get_pool_names(refresh = True)
840 for poolid, fs_name in self.data_pools.items():
841 if name == fs_name:
842 return poolid
843 raise RuntimeError("could not get just created pool '{0}'".format(name))
844
845 def get_pool_names(self, refresh = False, status = None):
846 if refresh or self.metadata_pool_name is None or self.data_pools is None:
847 if status is None:
848 status = self.status()
849 fsmap = status.get_fsmap(self.id)
850
851 osd_map = self.mon_manager.get_osd_dump_json()
852 id_to_name = {}
853 for p in osd_map['pools']:
854 id_to_name[p['pool']] = p['pool_name']
855
856 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
857 self.data_pools = {}
858 for data_pool in fsmap['mdsmap']['data_pools']:
859 self.data_pools[data_pool] = id_to_name[data_pool]
860
861 def get_data_pool_name(self, refresh = False):
862 if refresh or self.data_pools is None:
863 self.get_pool_names(refresh = True)
864 assert(len(self.data_pools) == 1)
e306af50 865 return next(iter(self.data_pools.values()))
7c673cae
FG
866
867 def get_data_pool_id(self, refresh = False):
868 """
869 Don't call this if you have multiple data pools
870 :return: integer
871 """
872 if refresh or self.data_pools is None:
873 self.get_pool_names(refresh = True)
874 assert(len(self.data_pools) == 1)
e306af50 875 return next(iter(self.data_pools.keys()))
7c673cae
FG
876
877 def get_data_pool_names(self, refresh = False):
878 if refresh or self.data_pools is None:
879 self.get_pool_names(refresh = True)
e306af50 880 return list(self.data_pools.values())
7c673cae
FG
881
882 def get_metadata_pool_name(self):
883 return self.metadata_pool_name
884
181888fb
FG
885 def set_data_pool_name(self, name):
886 if self.id is not None:
887 raise RuntimeError("can't set filesystem name if its fscid is set")
888 self.data_pool_name = name
889
522d829b
TL
890 def get_pool_pg_num(self, pool_name):
891 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
892 pool_name, 'pg_num',
893 '--format=json-pretty'))
894 return int(pgs['pg_num'])
895
7c673cae
FG
896 def get_namespace_id(self):
897 return self.id
898
899 def get_pool_df(self, pool_name):
900 """
901 Return a dict like:
902 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
903 """
904 for pool_df in self._df()['pools']:
905 if pool_df['name'] == pool_name:
906 return pool_df['stats']
907
908 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
909
910 def get_usage(self):
911 return self._df()['stats']['total_used_bytes']
912
11fdf7f2 913 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
914 """
915 Return true if all daemons are in one of active, standby, standby-replay, and
916 at least max_mds daemons are in 'active'.
917
918 Unlike most of Filesystem, this function is tolerant of new-style `fs`
919 commands being missing, because we are part of the ceph installation
920 process during upgrade suites, so must fall back to old style commands
921 when we get an EINVAL on a new style command.
922
923 :return:
924 """
11fdf7f2
TL
925 # First, check to see that processes haven't exited with an error code
926 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
927 mds.check_status()
7c673cae
FG
928
929 active_count = 0
f67539c2 930 mds_map = self.get_mds_map(status=status)
7c673cae 931
cd265ab1 932 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
7c673cae
FG
933
934 for mds_id, mds_status in mds_map['info'].items():
935 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
936 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
937 return False
938 elif mds_status['state'] == 'up:active':
939 active_count += 1
940
cd265ab1 941 log.debug("are_daemons_healthy: {0}/{1}".format(
7c673cae
FG
942 active_count, mds_map['max_mds']
943 ))
944
11fdf7f2
TL
945 if not skip_max_mds_check:
946 if active_count > mds_map['max_mds']:
cd265ab1 947 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
11fdf7f2
TL
948 return False
949 elif active_count == mds_map['max_mds']:
950 # The MDSMap says these guys are active, but let's check they really are
951 for mds_id, mds_status in mds_map['info'].items():
952 if mds_status['state'] == 'up:active':
953 try:
f67539c2 954 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
11fdf7f2
TL
955 except CommandFailedError as cfe:
956 if cfe.exitstatus == errno.EINVAL:
957 # Old version, can't do this check
958 continue
959 else:
960 # MDS not even running
961 return False
962
963 if daemon_status['state'] != 'up:active':
964 # MDS hasn't taken the latest map yet
7c673cae
FG
965 return False
966
11fdf7f2
TL
967 return True
968 else:
969 return False
7c673cae 970 else:
cd265ab1 971 log.debug("are_daemons_healthy: skipping max_mds check")
11fdf7f2 972 return True
7c673cae 973
11fdf7f2 974 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
975 """
976 Return MDS daemon names of those daemons in the given state
977 :param state:
978 :return:
979 """
11fdf7f2 980 mdsmap = self.get_mds_map(status)
7c673cae 981 result = []
9f95a23c
TL
982 for mds_status in sorted(mdsmap['info'].values(),
983 key=lambda _: _['rank']):
7c673cae
FG
984 if mds_status['state'] == state or state is None:
985 result.append(mds_status['name'])
986
987 return result
988
9f95a23c 989 def get_active_names(self, status=None):
7c673cae
FG
990 """
991 Return MDS daemon names of those daemons holding ranks
992 in state up:active
993
994 :return: list of strings like ['a', 'b'], sorted by rank
995 """
9f95a23c 996 return self.get_daemon_names("up:active", status=status)
7c673cae 997
11fdf7f2
TL
998 def get_all_mds_rank(self, status=None):
999 mdsmap = self.get_mds_map(status)
7c673cae 1000 result = []
9f95a23c
TL
1001 for mds_status in sorted(mdsmap['info'].values(),
1002 key=lambda _: _['rank']):
7c673cae
FG
1003 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1004 result.append(mds_status['rank'])
1005
1006 return result
1007
f6b5b4d7 1008 def get_rank(self, rank=None, status=None):
28e407b8
AA
1009 if status is None:
1010 status = self.getinfo()
f6b5b4d7
TL
1011 if rank is None:
1012 rank = 0
28e407b8
AA
1013 return status.get_rank(self.id, rank)
1014
11fdf7f2
TL
1015 def rank_restart(self, rank=0, status=None):
1016 name = self.get_rank(rank=rank, status=status)['name']
1017 self.mds_restart(mds_id=name)
1018
1019 def rank_signal(self, signal, rank=0, status=None):
1020 name = self.get_rank(rank=rank, status=status)['name']
1021 self.mds_signal(name, signal)
1022
1023 def rank_freeze(self, yes, rank=0):
1024 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1025
1026 def rank_fail(self, rank=0):
1027 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1028
28e407b8
AA
1029 def get_ranks(self, status=None):
1030 if status is None:
1031 status = self.getinfo()
1032 return status.get_ranks(self.id)
1033
a4b75251
TL
1034 def get_damaged(self, status=None):
1035 if status is None:
1036 status = self.getinfo()
1037 return status.get_damaged(self.id)
1038
11fdf7f2
TL
1039 def get_replays(self, status=None):
1040 if status is None:
1041 status = self.getinfo()
1042 return status.get_replays(self.id)
1043
1044 def get_replay(self, rank=0, status=None):
1045 for replay in self.get_replays(status=status):
1046 if replay['rank'] == rank:
1047 return replay
1048 return None
1049
28e407b8 1050 def get_rank_names(self, status=None):
7c673cae
FG
1051 """
1052 Return MDS daemon names of those daemons holding a rank,
1053 sorted by rank. This includes e.g. up:replay/reconnect
1054 as well as active, but does not include standby or
1055 standby-replay.
1056 """
11fdf7f2 1057 mdsmap = self.get_mds_map(status)
7c673cae 1058 result = []
9f95a23c
TL
1059 for mds_status in sorted(mdsmap['info'].values(),
1060 key=lambda _: _['rank']):
7c673cae
FG
1061 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1062 result.append(mds_status['name'])
1063
1064 return result
1065
11fdf7f2 1066 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
1067 """
1068 Wait until all daemons are healthy
1069 :return:
1070 """
1071
1072 if timeout is None:
1073 timeout = DAEMON_WAIT_TIMEOUT
1074
11fdf7f2
TL
1075 if status is None:
1076 status = self.status()
1077
7c673cae
FG
1078 elapsed = 0
1079 while True:
11fdf7f2
TL
1080 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1081 return status
7c673cae
FG
1082 else:
1083 time.sleep(1)
1084 elapsed += 1
1085
1086 if elapsed > timeout:
cd265ab1 1087 log.debug("status = {0}".format(status))
7c673cae
FG
1088 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1089
11fdf7f2
TL
1090 status = self.status()
1091
f67539c2
TL
1092 def dencoder(self, obj_type, obj_blob):
1093 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1094 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1095 return p.stdout.getvalue()
1096
1097 def rados(self, *args, **kwargs):
7c673cae 1098 """
f67539c2 1099 Callout to rados CLI.
7c673cae 1100 """
7c673cae 1101
f67539c2 1102 return self.mon_manager.do_rados(*args, **kwargs)
7c673cae 1103
f67539c2 1104 def radosm(self, *args, **kwargs):
7c673cae 1105 """
f67539c2 1106 Interact with the metadata pool via rados CLI.
7c673cae 1107 """
7c673cae 1108
f67539c2
TL
1109 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1110
1111 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
7c673cae 1112 """
f67539c2 1113 Interact with the metadata pool via rados CLI. Get the stdout.
7c673cae 1114 """
7c673cae 1115
f67539c2 1116 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
7c673cae
FG
1117
1118 def get_metadata_object(self, object_type, object_id):
1119 """
1120 Retrieve an object from the metadata pool, pass it through
1121 ceph-dencoder to dump it to JSON, and return the decoded object.
1122 """
7c673cae 1123
f67539c2
TL
1124 o = self.radosmo(['get', object_id, '-'])
1125 j = self.dencoder(object_type, o)
7c673cae 1126 try:
f67539c2 1127 return json.loads(j)
7c673cae 1128 except (TypeError, ValueError):
f67539c2 1129 log.error("Failed to decode JSON: '{0}'".format(j))
7c673cae
FG
1130 raise
1131
7c673cae
FG
1132 def get_journal_version(self):
1133 """
1134 Read the JournalPointer and Journal::Header objects to learn the version of
1135 encoding in use.
1136 """
1137 journal_pointer_object = '400.00000000'
1138 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1139 journal_ino = journal_pointer_dump['journal_pointer']['front']
1140
1141 journal_header_object = "{0:x}.00000000".format(journal_ino)
1142 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1143
1144 version = journal_header_dump['journal_header']['stream_format']
cd265ab1 1145 log.debug("Read journal version {0}".format(version))
7c673cae
FG
1146
1147 return version
1148
f64942e4 1149 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae 1150 if mds_id is None:
f67539c2 1151 return self.rank_asok(command, timeout=timeout)
7c673cae 1152
f64942e4 1153 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1154
f67539c2
TL
1155 def mds_tell(self, command, mds_id=None):
1156 if mds_id is None:
1157 return self.rank_tell(command)
1158
1159 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1160
f64942e4
AA
1161 def rank_asok(self, command, rank=0, status=None, timeout=None):
1162 info = self.get_rank(rank=rank, status=status)
1163 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1164
11fdf7f2 1165 def rank_tell(self, command, rank=0, status=None):
20effc67
TL
1166 try:
1167 out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command)
1168 return json.loads(out)
1169 except json.decoder.JSONDecodeError:
1170 log.error("could not decode: {}".format(out))
1171 raise
11fdf7f2 1172
f6b5b4d7
TL
1173 def ranks_tell(self, command, status=None):
1174 if status is None:
1175 status = self.status()
1176 out = []
1177 for r in status.get_ranks(self.id):
1178 result = self.rank_tell(command, rank=r['rank'], status=status)
1179 out.append((r['rank'], result))
1180 return sorted(out)
1181
1182 def ranks_perf(self, f, status=None):
1183 perf = self.ranks_tell(["perf", "dump"], status=status)
1184 out = []
1185 for rank, perf in perf:
1186 out.append((rank, f(perf)))
1187 return out
1188
7c673cae
FG
1189 def read_cache(self, path, depth=None):
1190 cmd = ["dump", "tree", path]
1191 if depth is not None:
1192 cmd.append(depth.__str__())
1193 result = self.mds_asok(cmd)
1194 if len(result) == 0:
1195 raise RuntimeError("Path not found in cache: {0}".format(path))
1196
1197 return result
1198
1199 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1200 """
1201 Block until the MDS reaches a particular state, or a failure condition
1202 is met.
1203
1204 When there are multiple MDSs, succeed when exaclty one MDS is in the
1205 goal state, or fail when any MDS is in the reject state.
1206
1207 :param goal_state: Return once the MDS is in this state
1208 :param reject: Fail if the MDS enters this state before the goal state
1209 :param timeout: Fail if this many seconds pass before reaching goal
1210 :return: number of seconds waited, rounded down to integer
1211 """
1212
1213 started_at = time.time()
1214 while True:
1215 status = self.status()
1216 if rank is not None:
f64942e4
AA
1217 try:
1218 mds_info = status.get_rank(self.id, rank)
1219 current_state = mds_info['state'] if mds_info else None
cd265ab1 1220 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
f64942e4
AA
1221 except:
1222 mdsmap = self.get_mds_map(status=status)
1223 if rank in mdsmap['failed']:
cd265ab1 1224 log.debug("Waiting for rank {0} to come back.".format(rank))
f64942e4
AA
1225 current_state = None
1226 else:
1227 raise
7c673cae
FG
1228 elif mds_id is not None:
1229 # mds_info is None if no daemon with this ID exists in the map
1230 mds_info = status.get_mds(mds_id)
1231 current_state = mds_info['state'] if mds_info else None
cd265ab1 1232 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
7c673cae
FG
1233 else:
1234 # In general, look for a single MDS
1235 states = [m['state'] for m in status.get_ranks(self.id)]
1236 if [s for s in states if s == goal_state] == [goal_state]:
1237 current_state = goal_state
1238 elif reject in states:
1239 current_state = reject
1240 else:
1241 current_state = None
cd265ab1 1242 log.debug("mapped states {0} to {1}".format(states, current_state))
7c673cae
FG
1243
1244 elapsed = time.time() - started_at
1245 if current_state == goal_state:
cd265ab1 1246 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
7c673cae
FG
1247 return elapsed
1248 elif reject is not None and current_state == reject:
1249 raise RuntimeError("MDS in reject state {0}".format(current_state))
1250 elif timeout is not None and elapsed > timeout:
1251 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1252 raise RuntimeError(
1253 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1254 elapsed, goal_state, current_state
1255 ))
1256 else:
1257 time.sleep(1)
1258
f67539c2 1259 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
7c673cae
FG
1260 if pool is None:
1261 pool = self.get_data_pool_name()
1262
1263 obj_name = "{0:x}.00000000".format(ino_no)
1264
f67539c2 1265 args = ["getxattr", obj_name, xattr_name]
7c673cae 1266 try:
f67539c2 1267 proc = self.rados(args, pool=pool, stdout=BytesIO())
7c673cae
FG
1268 except CommandFailedError as e:
1269 log.error(e.__str__())
1270 raise ObjectNotFound(obj_name)
1271
f67539c2
TL
1272 obj_blob = proc.stdout.getvalue()
1273 return json.loads(self.dencoder(obj_type, obj_blob).strip())
7c673cae
FG
1274
1275 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1276 """
1277 Write to an xattr of the 0th data object of an inode. Will
1278 succeed whether the object and/or xattr already exist or not.
1279
1280 :param ino_no: integer inode number
1281 :param xattr_name: string name of the xattr
1282 :param data: byte array data to write to the xattr
1283 :param pool: name of data pool or None to use primary data pool
1284 :return: None
1285 """
7c673cae
FG
1286 if pool is None:
1287 pool = self.get_data_pool_name()
1288
1289 obj_name = "{0:x}.00000000".format(ino_no)
f67539c2
TL
1290 args = ["setxattr", obj_name, xattr_name, data]
1291 self.rados(args, pool=pool)
7c673cae 1292
20effc67
TL
1293 def read_symlink(self, ino_no, pool=None):
1294 return self._read_data_xattr(ino_no, "symlink", "string_wrapper", pool)
1295
7c673cae
FG
1296 def read_backtrace(self, ino_no, pool=None):
1297 """
1298 Read the backtrace from the data pool, return a dict in the format
1299 given by inode_backtrace_t::dump, which is something like:
1300
1301 ::
1302
1303 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1304 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1305
1306 { "ino": 1099511627778,
1307 "ancestors": [
1308 { "dirino": 1,
1309 "dname": "blah",
1310 "version": 11}],
1311 "pool": 1,
1312 "old_pools": []}
1313
1314 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1315 one data pool and that will be used.
1316 """
1317 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1318
1319 def read_layout(self, ino_no, pool=None):
1320 """
1321 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1322 ::
1323 {
1324 "stripe_unit": 4194304,
1325 "stripe_count": 1,
1326 "object_size": 4194304,
1327 "pool_id": 1,
1328 "pool_ns": "",
1329 }
1330
1331 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1332 one data pool and that will be used.
1333 """
1334 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1335
1336 def _enumerate_data_objects(self, ino, size):
1337 """
1338 Get the list of expected data objects for a range, and the list of objects
1339 that really exist.
1340
1341 :return a tuple of two lists of strings (expected, actual)
1342 """
1343 stripe_size = 1024 * 1024 * 4
1344
1345 size = max(stripe_size, size)
1346
1347 want_objects = [
1348 "{0:x}.{1:08x}".format(ino, n)
e306af50 1349 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1350 ]
1351
f67539c2 1352 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
7c673cae
FG
1353
1354 return want_objects, exist_objects
1355
1356 def data_objects_present(self, ino, size):
1357 """
1358 Check that *all* the expected data objects for an inode are present in the data pool
1359 """
1360
1361 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1362 missing = set(want_objects) - set(exist_objects)
1363
1364 if missing:
cd265ab1 1365 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
7c673cae
FG
1366 ino, size, missing
1367 ))
1368 return False
1369 else:
cd265ab1 1370 log.debug("All objects for ino {0} size {1} found".format(ino, size))
7c673cae
FG
1371 return True
1372
1373 def data_objects_absent(self, ino, size):
1374 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1375 present = set(want_objects) & set(exist_objects)
1376
1377 if present:
cd265ab1 1378 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
7c673cae
FG
1379 ino, size, present
1380 ))
1381 return False
1382 else:
cd265ab1 1383 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
7c673cae
FG
1384 return True
1385
1386 def dirfrag_exists(self, ino, frag):
1387 try:
f67539c2 1388 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1389 except CommandFailedError:
7c673cae
FG
1390 return False
1391 else:
1392 return True
1393
7c673cae
FG
1394 def list_dirfrag(self, dir_ino):
1395 """
1396 Read the named object and return the list of omap keys
1397
1398 :return a list of 0 or more strings
1399 """
1400
1401 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1402
1403 try:
f67539c2 1404 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
7c673cae
FG
1405 except CommandFailedError as e:
1406 log.error(e.__str__())
1407 raise ObjectNotFound(dirfrag_obj_name)
1408
f67539c2
TL
1409 return key_list_str.strip().split("\n") if key_list_str else []
1410
1411 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1412 """
1413 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1414 warning : The splitting of directory is not considered here.
1415 """
1416
1417 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1418 try:
1419 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1420 except CommandFailedError as e:
1421 log.error(e.__str__())
1422 raise ObjectNotFound(dir_ino)
7c673cae
FG
1423
1424 def erase_metadata_objects(self, prefix):
1425 """
1426 For all objects in the metadata pool matching the prefix,
1427 erase them.
1428
1429 This O(N) with the number of objects in the pool, so only suitable
1430 for use on toy test filesystems.
1431 """
f67539c2 1432 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
7c673cae
FG
1433 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1434 for o in matching_objects:
f67539c2 1435 self.radosm(["rm", o])
7c673cae
FG
1436
1437 def erase_mds_objects(self, rank):
1438 """
1439 Erase all the per-MDS objects for a particular rank. This includes
1440 inotable, sessiontable, journal
1441 """
1442
1443 def obj_prefix(multiplier):
1444 """
1445 MDS object naming conventions like rank 1's
1446 journal is at 201.***
1447 """
1448 return "%x." % (multiplier * 0x100 + rank)
1449
1450 # MDS_INO_LOG_OFFSET
1451 self.erase_metadata_objects(obj_prefix(2))
1452 # MDS_INO_LOG_BACKUP_OFFSET
1453 self.erase_metadata_objects(obj_prefix(3))
1454 # MDS_INO_LOG_POINTER_OFFSET
1455 self.erase_metadata_objects(obj_prefix(4))
1456 # MDSTables & SessionMap
1457 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1458
1459 @property
1460 def _prefix(self):
1461 """
1462 Override this to set a different
1463 """
1464 return ""
1465
f64942e4
AA
1466 def _make_rank(self, rank):
1467 return "{}:{}".format(self.name, rank)
1468
7c673cae
FG
1469 def _run_tool(self, tool, args, rank=None, quiet=False):
1470 # Tests frequently have [client] configuration that jacks up
1471 # the objecter log level (unlikely to be interesting here)
1472 # and does not set the mds log level (very interesting here)
1473 if quiet:
1474 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1475 else:
1476 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1477
1478 if rank is not None:
f64942e4 1479 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1480
1481 t1 = datetime.datetime.now()
e306af50 1482 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae 1483 duration = datetime.datetime.now() - t1
cd265ab1 1484 log.debug("Ran {0} in time {1}, result:\n{2}".format(
7c673cae
FG
1485 base_args + args, duration, r
1486 ))
1487 return r
1488
1489 @property
1490 def tool_remote(self):
1491 """
1492 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1493 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1494 so that tests can use this remote to go get locally written output files from the tools.
1495 """
f67539c2 1496 return self.mon_manager.controller
7c673cae 1497
f64942e4 1498 def journal_tool(self, args, rank, quiet=False):
7c673cae 1499 """
f64942e4 1500 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1501 """
f64942e4
AA
1502 fs_rank = self._make_rank(rank)
1503 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae 1504
f67539c2
TL
1505 def meta_tool(self, args, rank, quiet=False):
1506 """
1507 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1508 """
1509 fs_rank = self._make_rank(rank)
1510 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1511
7c673cae
FG
1512 def table_tool(self, args, quiet=False):
1513 """
1514 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1515 """
1516 return self._run_tool("cephfs-table-tool", args, None, quiet)
1517
1518 def data_scan(self, args, quiet=False, worker_count=1):
1519 """
1520 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1521
1522 :param worker_count: if greater than 1, multiple workers will be run
1523 in parallel and the return value will be None
1524 """
1525
1526 workers = []
1527
1528 for n in range(0, worker_count):
1529 if worker_count > 1:
1530 # data-scan args first token is a command, followed by args to it.
1531 # insert worker arguments after the command.
1532 cmd = args[0]
1533 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1534 else:
1535 worker_args = args
1536
1537 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1538 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1539
1540 for w in workers:
1541 w.get()
1542
1543 if worker_count == 1:
1544 return workers[0].value
1545 else:
1546 return None
b32b8144
FG
1547
1548 def is_full(self):
1549 return self.is_pool_full(self.get_data_pool_name())
f67539c2
TL
1550
1551 def authorize(self, client_id, caps=('/', 'rw')):
1552 """
1553 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1554 keyring.
1555
1556 client_id: client id that will be authorized
1557 caps: tuple containing the path and permission (can be r or rw)
1558 respectively.
1559 """
1560 client_name = 'client.' + client_id
1561 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1562 client_name, *caps)
1563
1564 def grow(self, new_max_mds, status=None):
1565 oldmax = self.get_var('max_mds', status=status)
1566 assert(new_max_mds > oldmax)
1567 self.set_max_mds(new_max_mds)
1568 return self.wait_for_daemons()
1569
1570 def shrink(self, new_max_mds, status=None):
1571 oldmax = self.get_var('max_mds', status=status)
1572 assert(new_max_mds < oldmax)
1573 self.set_max_mds(new_max_mds)
1574 return self.wait_for_daemons()
1575
1576 def run_scrub(self, cmd, rank=0):
1577 return self.rank_tell(["scrub"] + cmd, rank)
1578
1579 def get_scrub_status(self, rank=0):
1580 return self.run_scrub(["status"], rank)
1581
1582 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1583 timeout=300, reverse=False):
1584 # time out after "timeout" seconds and assume as done
1585 if result is None:
1586 result = "no active scrubs running"
1587 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1588 while proceed():
1589 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1590 assert out_json is not None
1591 if not reverse:
1592 if result in out_json['status']:
1593 log.info("all active scrubs completed")
1594 return True
1595 else:
1596 if result not in out_json['status']:
1597 log.info("all active scrubs completed")
1598 return True
1599
1600 if tag is not None:
1601 status = out_json['scrubs'][tag]
1602 if status is not None:
1603 log.info(f"scrub status for tag:{tag} - {status}")
1604 else:
1605 log.info(f"scrub has completed for tag:{tag}")
1606 return True
1607
1608 # timed out waiting for scrub to complete
1609 return False