]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
import ceph 16.2.6
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
11fdf7f2 11import traceback
7c673cae 12
f67539c2
TL
13from io import BytesIO, StringIO
14from errno import EBUSY
e306af50 15
7c673cae
FG
16from teuthology.exceptions import CommandFailedError
17from teuthology import misc
18from teuthology.nuke import clear_firewall
19from teuthology.parallel import parallel
f67539c2 20from teuthology import contextutil
7c673cae
FG
21from tasks.ceph_manager import write_conf
22from tasks import ceph_manager
23
24
25log = logging.getLogger(__name__)
26
27
28DAEMON_WAIT_TIMEOUT = 120
29ROOT_INO = 1
30
92f5a8d4
TL
31class FileLayout(object):
32 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
33 self.pool = pool
34 self.pool_namespace = pool_namespace
35 self.stripe_unit = stripe_unit
36 self.stripe_count = stripe_count
37 self.object_size = object_size
38
39 @classmethod
40 def load_from_ceph(layout_str):
41 # TODO
42 pass
43
44 def items(self):
45 if self.pool is not None:
46 yield ("pool", self.pool)
47 if self.pool_namespace:
48 yield ("pool_namespace", self.pool_namespace)
49 if self.stripe_unit is not None:
50 yield ("stripe_unit", self.stripe_unit)
51 if self.stripe_count is not None:
52 yield ("stripe_count", self.stripe_count)
53 if self.object_size is not None:
54 yield ("object_size", self.stripe_size)
7c673cae
FG
55
56class ObjectNotFound(Exception):
57 def __init__(self, object_name):
58 self._object_name = object_name
59
60 def __str__(self):
61 return "Object not found: '{0}'".format(self._object_name)
62
f67539c2
TL
63class FSMissing(Exception):
64 def __init__(self, ident):
65 self.ident = ident
66
67 def __str__(self):
68 return f"File system {self.ident} does not exist in the map"
69
7c673cae
FG
70class FSStatus(object):
71 """
72 Operations on a snapshot of the FSMap.
73 """
522d829b 74 def __init__(self, mon_manager, epoch=None):
7c673cae 75 self.mon = mon_manager
522d829b
TL
76 cmd = ["fs", "dump", "--format=json"]
77 if epoch is not None:
78 cmd.append(str(epoch))
79 self.map = json.loads(self.mon.raw_cluster_cmd(*cmd))
7c673cae
FG
80
81 def __str__(self):
82 return json.dumps(self.map, indent = 2, sort_keys = True)
83
84 # Expose the fsmap for manual inspection.
85 def __getitem__(self, key):
86 """
87 Get a field from the fsmap.
88 """
89 return self.map[key]
90
91 def get_filesystems(self):
92 """
93 Iterator for all filesystems.
94 """
95 for fs in self.map['filesystems']:
96 yield fs
97
98 def get_all(self):
99 """
100 Iterator for all the mds_info components in the FSMap.
101 """
9f95a23c 102 for info in self.map['standbys']:
7c673cae
FG
103 yield info
104 for fs in self.map['filesystems']:
105 for info in fs['mdsmap']['info'].values():
106 yield info
107
108 def get_standbys(self):
109 """
110 Iterator for all standbys.
111 """
112 for info in self.map['standbys']:
113 yield info
114
115 def get_fsmap(self, fscid):
116 """
117 Get the fsmap for the given FSCID.
118 """
119 for fs in self.map['filesystems']:
120 if fscid is None or fs['id'] == fscid:
121 return fs
f67539c2 122 raise FSMissing(fscid)
7c673cae
FG
123
124 def get_fsmap_byname(self, name):
125 """
126 Get the fsmap for the given file system name.
127 """
128 for fs in self.map['filesystems']:
129 if name is None or fs['mdsmap']['fs_name'] == name:
130 return fs
f67539c2 131 raise FSMissing(name)
7c673cae
FG
132
133 def get_replays(self, fscid):
134 """
135 Get the standby:replay MDS for the given FSCID.
136 """
137 fs = self.get_fsmap(fscid)
138 for info in fs['mdsmap']['info'].values():
139 if info['state'] == 'up:standby-replay':
140 yield info
141
142 def get_ranks(self, fscid):
143 """
144 Get the ranks for the given FSCID.
145 """
146 fs = self.get_fsmap(fscid)
147 for info in fs['mdsmap']['info'].values():
11fdf7f2 148 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
149 yield info
150
151 def get_rank(self, fscid, rank):
152 """
153 Get the rank for the given FSCID.
154 """
155 for info in self.get_ranks(fscid):
156 if info['rank'] == rank:
157 return info
158 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
159
160 def get_mds(self, name):
161 """
162 Get the info for the given MDS name.
163 """
164 for info in self.get_all():
165 if info['name'] == name:
166 return info
167 return None
168
169 def get_mds_addr(self, name):
170 """
171 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
172 """
173 info = self.get_mds(name)
174 if info:
175 return info['addr']
176 else:
e306af50 177 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
178 raise RuntimeError("MDS id '{0}' not found in map".format(name))
179
f67539c2
TL
180 def get_mds_addrs(self, name):
181 """
182 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
183 """
184 info = self.get_mds(name)
185 if info:
186 return [e['addr'] for e in info['addrs']['addrvec']]
187 else:
188 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
189 raise RuntimeError("MDS id '{0}' not found in map".format(name))
190
9f95a23c
TL
191 def get_mds_gid(self, gid):
192 """
193 Get the info for the given MDS gid.
194 """
195 for info in self.get_all():
196 if info['gid'] == gid:
197 return info
198 return None
199
200 def hadfailover(self, status):
201 """
202 Compares two statuses for mds failovers.
203 Returns True if there is a failover.
204 """
205 for fs in status.map['filesystems']:
206 for info in fs['mdsmap']['info'].values():
207 oldinfo = self.get_mds_gid(info['gid'])
208 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
209 return True
210 #all matching
211 return False
212
7c673cae
FG
213class CephCluster(object):
214 @property
215 def admin_remote(self):
216 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 217 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
218 return result
219
f67539c2 220 def __init__(self, ctx) -> None:
7c673cae
FG
221 self._ctx = ctx
222 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
223
224 def get_config(self, key, service_type=None):
225 """
226 Get config from mon by default, or a specific service if caller asks for it
227 """
228 if service_type is None:
229 service_type = 'mon'
230
231 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
232 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
233
234 def set_ceph_conf(self, subsys, key, value):
235 if subsys not in self._ctx.ceph['ceph'].conf:
236 self._ctx.ceph['ceph'].conf[subsys] = {}
237 self._ctx.ceph['ceph'].conf[subsys][key] = value
238 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
239 # used a different config path this won't work.
240
241 def clear_ceph_conf(self, subsys, key):
242 del self._ctx.ceph['ceph'].conf[subsys][key]
243 write_conf(self._ctx)
244
f64942e4
AA
245 def json_asok(self, command, service_type, service_id, timeout=None):
246 if timeout is None:
247 timeout = 15*60
f6b5b4d7 248 command.insert(0, '--format=json')
f64942e4 249 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
f6b5b4d7
TL
250 response_data = proc.stdout.getvalue().strip()
251 if len(response_data) > 0:
252 j = json.loads(response_data)
253 pretty = json.dumps(j, sort_keys=True, indent=2)
254 log.debug(f"_json_asok output\n{pretty}")
255 return j
7c673cae 256 else:
f6b5b4d7 257 log.debug("_json_asok output empty")
7c673cae
FG
258 return None
259
f67539c2
TL
260 def is_addr_blocklisted(self, addr=None):
261 if addr is None:
262 log.warn("Couldn't get the client address, so the blocklisted "
263 "status undetermined")
264 return False
265
266 blocklist = json.loads(self.mon_manager.run_cluster_cmd(
267 args=["osd", "blocklist", "ls", "--format=json"],
268 stdout=StringIO()).stdout.getvalue())
269 for b in blocklist:
270 if addr == b["addr"]:
271 return True
272 return False
273
7c673cae
FG
274
275class MDSCluster(CephCluster):
276 """
277 Collective operations on all the MDS daemons in the Ceph cluster. These
278 daemons may be in use by various Filesystems.
279
280 For the benefit of pre-multi-filesystem tests, this class is also
281 a parent of Filesystem. The correct way to use MDSCluster going forward is
282 as a separate instance outside of your (multiple) Filesystem instances.
283 """
f67539c2 284
7c673cae
FG
285 def __init__(self, ctx):
286 super(MDSCluster, self).__init__(ctx)
287
f67539c2
TL
288 @property
289 def mds_ids(self):
290 # do this dynamically because the list of ids may change periodically with cephadm
291 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
7c673cae 292
f67539c2
TL
293 @property
294 def mds_daemons(self):
295 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
7c673cae
FG
296
297 def _one_or_all(self, mds_id, cb, in_parallel=True):
298 """
299 Call a callback for a single named MDS, or for all.
300
301 Note that the parallelism here isn't for performance, it's to avoid being overly kind
302 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
303 avoid being overly kind by executing them in a particular order. However, some actions
304 don't cope with being done in parallel, so it's optional (`in_parallel`)
305
306 :param mds_id: MDS daemon name, or None
307 :param cb: Callback taking single argument of MDS daemon name
308 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
309 """
f67539c2 310
7c673cae
FG
311 if mds_id is None:
312 if in_parallel:
313 with parallel() as p:
314 for mds_id in self.mds_ids:
315 p.spawn(cb, mds_id)
316 else:
317 for mds_id in self.mds_ids:
318 cb(mds_id)
319 else:
320 cb(mds_id)
321
181888fb
FG
322 def get_config(self, key, service_type=None):
323 """
324 get_config specialization of service_type="mds"
325 """
326 if service_type != "mds":
327 return super(MDSCluster, self).get_config(key, service_type)
328
329 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
330 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
331 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
332 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
333
7c673cae
FG
334 def mds_stop(self, mds_id=None):
335 """
336 Stop the MDS daemon process(se). If it held a rank, that rank
337 will eventually go laggy.
338 """
339 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
340
341 def mds_fail(self, mds_id=None):
342 """
343 Inform MDSMonitor of the death of the daemon process(es). If it held
344 a rank, that rank will be relinquished.
345 """
346 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
347
348 def mds_restart(self, mds_id=None):
349 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
350
351 def mds_fail_restart(self, mds_id=None):
352 """
353 Variation on restart that includes marking MDSs as failed, so that doing this
354 operation followed by waiting for healthy daemon states guarantees that they
355 have gone down and come up, rather than potentially seeing the healthy states
356 that existed before the restart.
357 """
358 def _fail_restart(id_):
359 self.mds_daemons[id_].stop()
360 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
361 self.mds_daemons[id_].restart()
362
363 self._one_or_all(mds_id, _fail_restart)
364
1adf2230
AA
365 def mds_signal(self, mds_id, sig, silent=False):
366 """
367 signal a MDS daemon
368 """
369 self.mds_daemons[mds_id].signal(sig, silent);
370
181888fb
FG
371 def newfs(self, name='cephfs', create=True):
372 return Filesystem(self._ctx, name=name, create=create)
7c673cae 373
522d829b
TL
374 def status(self, epoch=None):
375 return FSStatus(self.mon_manager, epoch)
7c673cae 376
7c673cae
FG
377 def get_standby_daemons(self):
378 return set([s['name'] for s in self.status().get_standbys()])
379
380 def get_mds_hostnames(self):
381 result = set()
382 for mds_id in self.mds_ids:
383 mds_remote = self.mon_manager.find_remote('mds', mds_id)
384 result.add(mds_remote.hostname)
385
386 return list(result)
387
388 def set_clients_block(self, blocked, mds_id=None):
389 """
390 Block (using iptables) client communications to this MDS. Be careful: if
391 other services are running on this MDS, or other MDSs try to talk to this
392 MDS, their communications may also be blocked as collatoral damage.
393
394 :param mds_id: Optional ID of MDS to block, default to all
395 :return:
396 """
397 da_flag = "-A" if blocked else "-D"
398
399 def set_block(_mds_id):
400 remote = self.mon_manager.find_remote('mds', _mds_id)
401 status = self.status()
402
403 addr = status.get_mds_addr(_mds_id)
404 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
405
406 remote.run(
407 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
409 remote.run(
410 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
411 "comment", "--comment", "teuthology"])
412
413 self._one_or_all(mds_id, set_block, in_parallel=False)
414
f67539c2
TL
415 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
416 """
417 Block (using iptables) communications from a provided MDS to other MDSs.
418 Block all ports that an MDS uses for communication.
419
420 :param blocked: True to block the MDS, False otherwise
421 :param mds_rank_1: MDS rank
422 :param mds_rank_2: MDS rank
423 :return:
424 """
425 da_flag = "-A" if blocked else "-D"
426
427 def set_block(mds_ids):
428 status = self.status()
429
430 mds = mds_ids[0]
431 remote = self.mon_manager.find_remote('mds', mds)
432 addrs = status.get_mds_addrs(mds)
433 for addr in addrs:
434 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
435 remote.run(
436 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
437 "comment", "--comment", "teuthology"])
438
439
440 mds = mds_ids[1]
441 remote = self.mon_manager.find_remote('mds', mds)
442 addrs = status.get_mds_addrs(mds)
443 for addr in addrs:
444 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
445 remote.run(
446 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
448 remote.run(
449 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
450 "comment", "--comment", "teuthology"])
451
452 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
453
7c673cae
FG
454 def clear_firewall(self):
455 clear_firewall(self._ctx)
456
457 def get_mds_info(self, mds_id):
458 return FSStatus(self.mon_manager).get_mds(mds_id)
459
7c673cae
FG
460 def is_pool_full(self, pool_name):
461 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
462 for pool in pools:
463 if pool['pool_name'] == pool_name:
464 return 'full' in pool['flags_names'].split(",")
465
466 raise RuntimeError("Pool not found '{0}'".format(pool_name))
467
f67539c2
TL
468 def delete_all_filesystems(self):
469 """
470 Remove all filesystems that exist, and any pools in use by them.
471 """
472 for fs in self.status().get_filesystems():
473 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
474
475
7c673cae
FG
476class Filesystem(MDSCluster):
477 """
478 This object is for driving a CephFS filesystem. The MDS daemons driven by
479 MDSCluster may be shared with other Filesystems.
480 """
f67539c2 481 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
7c673cae
FG
482 super(Filesystem, self).__init__(ctx)
483
181888fb 484 self.name = name
7c673cae 485 self.id = None
7c673cae 486 self.metadata_pool_name = None
181888fb
FG
487 self.metadata_overlay = False
488 self.data_pool_name = None
7c673cae 489 self.data_pools = None
f91f0fd5 490 self.fs_config = fs_config
f67539c2 491 self.ec_profile = fs_config.get('ec_profile')
7c673cae
FG
492
493 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
494 self.client_id = client_list[0]
495 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
496
181888fb 497 if name is not None:
7c673cae
FG
498 if fscid is not None:
499 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 500 if create and not self.legacy_configured():
7c673cae 501 self.create()
181888fb
FG
502 else:
503 if fscid is not None:
504 self.id = fscid
505 self.getinfo(refresh = True)
7c673cae
FG
506
507 # Stash a reference to the first created filesystem on ctx, so
508 # that if someone drops to the interactive shell they can easily
509 # poke our methods.
510 if not hasattr(self._ctx, "filesystem"):
511 self._ctx.filesystem = self
512
f67539c2
TL
513 def dead(self):
514 try:
515 return not bool(self.get_mds_map())
516 except FSMissing:
517 return True
518
9f95a23c
TL
519 def get_task_status(self, status_key):
520 return self.mon_manager.get_service_task_status("mds", status_key)
521
7c673cae
FG
522 def getinfo(self, refresh = False):
523 status = self.status()
524 if self.id is not None:
525 fsmap = status.get_fsmap(self.id)
526 elif self.name is not None:
527 fsmap = status.get_fsmap_byname(self.name)
528 else:
529 fss = [fs for fs in status.get_filesystems()]
530 if len(fss) == 1:
531 fsmap = fss[0]
532 elif len(fss) == 0:
533 raise RuntimeError("no file system available")
534 else:
535 raise RuntimeError("more than one file system available")
536 self.id = fsmap['id']
537 self.name = fsmap['mdsmap']['fs_name']
538 self.get_pool_names(status = status, refresh = refresh)
539 return status
540
181888fb
FG
541 def set_metadata_overlay(self, overlay):
542 if self.id is not None:
543 raise RuntimeError("cannot specify fscid when configuring overlay")
544 self.metadata_overlay = overlay
545
7c673cae
FG
546 def deactivate(self, rank):
547 if rank < 0:
548 raise RuntimeError("invalid rank")
549 elif rank == 0:
550 raise RuntimeError("cannot deactivate rank 0")
551 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
552
11fdf7f2
TL
553 def reach_max_mds(self):
554 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
555 status = self.getinfo()
556 mds_map = self.get_mds_map(status=status)
557 max_mds = mds_map['max_mds']
558
559 count = len(list(self.get_ranks(status=status)))
560 if count > max_mds:
561 try:
562 # deactivate mds in decending order
563 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
564 while count > max_mds:
565 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
566 target = targets[0]
cd265ab1 567 log.debug("deactivating rank %d" % target['rank'])
11fdf7f2
TL
568 self.deactivate(target['rank'])
569 status = self.wait_for_daemons(skip_max_mds_check=True)
570 count = len(list(self.get_ranks(status=status)))
571 except:
572 # In Mimic, deactivation is done automatically:
573 log.info("Error:\n{}".format(traceback.format_exc()))
574 status = self.wait_for_daemons()
575 else:
576 status = self.wait_for_daemons()
577
578 mds_map = self.get_mds_map(status=status)
579 assert(mds_map['max_mds'] == max_mds)
580 assert(mds_map['in'] == list(range(0, max_mds)))
581
f67539c2
TL
582 def reset(self):
583 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
584
11fdf7f2
TL
585 def fail(self):
586 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
587
9f95a23c
TL
588 def set_flag(self, var, *args):
589 a = map(lambda x: str(x).lower(), args)
590 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
591
592 def set_allow_multifs(self, yes=True):
593 self.set_flag("enable_multiple", yes)
594
91327a77 595 def set_var(self, var, *args):
9f95a23c 596 a = map(lambda x: str(x).lower(), args)
91327a77
AA
597 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
598
11fdf7f2
TL
599 def set_down(self, down=True):
600 self.set_var("down", str(down).lower())
601
602 def set_joinable(self, joinable=True):
9f95a23c 603 self.set_var("joinable", joinable)
11fdf7f2 604
7c673cae 605 def set_max_mds(self, max_mds):
f64942e4 606 self.set_var("max_mds", "%d" % max_mds)
7c673cae 607
f91f0fd5
TL
608 def set_session_timeout(self, timeout):
609 self.set_var("session_timeout", "%d" % timeout)
610
11fdf7f2 611 def set_allow_standby_replay(self, yes):
9f95a23c 612 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
613
614 def set_allow_new_snaps(self, yes):
9f95a23c 615 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 616
522d829b
TL
617 def compat(self, *args):
618 a = map(lambda x: str(x).lower(), args)
619 self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a)
620
621 def add_compat(self, *args):
622 self.compat("add_compat", *args)
623
624 def add_incompat(self, *args):
625 self.compat("add_incompat", *args)
626
627 def rm_compat(self, *args):
628 self.compat("rm_compat", *args)
629
630 def rm_incompat(self, *args):
631 self.compat("rm_incompat", *args)
632
f67539c2
TL
633 def required_client_features(self, *args, **kwargs):
634 c = ["fs", "required_client_features", self.name, *args]
635 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
636
522d829b
TL
637 # Since v15.1.0 the pg autoscale mode has been enabled as default,
638 # will let the pg autoscale mode to calculate the pg_num as needed.
639 # We set the pg_num_min to 64 to make sure that pg autoscale mode
640 # won't set the pg_num to low to fix Tracker#45434.
641 pg_num = 64
642 pg_num_min = 64
643 target_size_ratio = 0.9
644 target_size_ratio_ec = 0.9
7c673cae
FG
645
646 def create(self):
647 if self.name is None:
648 self.name = "cephfs"
649 if self.metadata_pool_name is None:
650 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
651 if self.data_pool_name is None:
652 data_pool_name = "{0}_data".format(self.name)
653 else:
654 data_pool_name = self.data_pool_name
7c673cae 655
522d829b
TL
656 # will use the ec pool to store the data and a small amount of
657 # metadata still goes to the primary data pool for all files.
658 if not self.metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile:
659 self.target_size_ratio = 0.05
660
cd265ab1 661 log.debug("Creating filesystem '{0}'".format(self.name))
7c673cae 662
7c673cae 663 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
522d829b
TL
664 self.metadata_pool_name, str(self.pg_num),
665 '--pg_num_min', str(self.pg_num_min))
f67539c2
TL
666
667 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
522d829b
TL
668 data_pool_name, str(self.pg_num),
669 '--pg_num_min', str(self.pg_num_min),
670 '--target_size_ratio',
671 str(self.target_size_ratio))
f67539c2 672
181888fb
FG
673 if self.metadata_overlay:
674 self.mon_manager.raw_cluster_cmd('fs', 'new',
675 self.name, self.metadata_pool_name, data_pool_name,
676 '--allow-dangerous-metadata-overlay')
677 else:
f67539c2
TL
678 self.mon_manager.raw_cluster_cmd('fs', 'new',
679 self.name,
680 self.metadata_pool_name,
681 data_pool_name)
682
b32b8144 683 if self.ec_profile and 'disabled' not in self.ec_profile:
f67539c2 684 ec_data_pool_name = data_pool_name + "_ec"
cd265ab1 685 log.debug("EC profile is %s", self.ec_profile)
f67539c2 686 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
3efd9988
FG
687 cmd.extend(self.ec_profile)
688 self.mon_manager.raw_cluster_cmd(*cmd)
689 self.mon_manager.raw_cluster_cmd(
522d829b
TL
690 'osd', 'pool', 'create', ec_data_pool_name,
691 'erasure', ec_data_pool_name,
692 '--pg_num_min', str(self.pg_num_min),
693 '--target_size_ratio', str(self.target_size_ratio_ec))
3efd9988
FG
694 self.mon_manager.raw_cluster_cmd(
695 'osd', 'pool', 'set',
f67539c2
TL
696 ec_data_pool_name, 'allow_ec_overwrites', 'true')
697 self.add_data_pool(ec_data_pool_name, create=False)
698 self.check_pool_application(ec_data_pool_name)
699
700 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
701
35e4c445
FG
702 self.check_pool_application(self.metadata_pool_name)
703 self.check_pool_application(data_pool_name)
f67539c2 704
7c673cae 705 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
706 try:
707 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
708 except CommandFailedError as e:
709 if e.exitstatus == 22:
710 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
711 pass
712 else:
713 raise
7c673cae 714
f91f0fd5
TL
715 if self.fs_config is not None:
716 max_mds = self.fs_config.get('max_mds', 1)
717 if max_mds > 1:
718 self.set_max_mds(max_mds)
719
f67539c2
TL
720 standby_replay = self.fs_config.get('standby_replay', False)
721 self.set_allow_standby_replay(standby_replay)
722
f91f0fd5
TL
723 # If absent will use the default value (60 seconds)
724 session_timeout = self.fs_config.get('session_timeout', 60)
725 if session_timeout != 60:
726 self.set_session_timeout(session_timeout)
727
7c673cae
FG
728 self.getinfo(refresh = True)
729
522d829b
TL
730 # wait pgs to be clean
731 self.mon_manager.wait_for_clean()
732
f67539c2
TL
733 def run_client_payload(self, cmd):
734 # avoid circular dep by importing here:
735 from tasks.cephfs.fuse_mount import FuseMount
736 d = misc.get_testdir(self._ctx)
737 m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
522d829b 738 m.mount_wait()
f67539c2
TL
739 m.run_shell_payload(cmd)
740 m.umount_wait(require_clean=True)
741
742 def _remove_pool(self, name, **kwargs):
743 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
744 return self.mon_manager.ceph(c, **kwargs)
745
746 def rm(self, **kwargs):
747 c = f'fs rm {self.name} --yes-i-really-mean-it'
748 return self.mon_manager.ceph(c, **kwargs)
749
750 def remove_pools(self, data_pools):
751 self._remove_pool(self.get_metadata_pool_name())
752 for poolname in data_pools:
753 try:
754 self._remove_pool(poolname)
755 except CommandFailedError as e:
756 # EBUSY, this data pool is used by two metadata pools, let the
757 # 2nd pass delete it
758 if e.exitstatus == EBUSY:
759 pass
760 else:
761 raise
762
763 def destroy(self, reset_obj_attrs=True):
764 log.info(f'Destroying file system {self.name} and related pools')
765
766 if self.dead():
767 log.debug('already dead...')
768 return
769
770 data_pools = self.get_data_pool_names(refresh=True)
771
772 # make sure no MDSs are attached to given FS.
773 self.fail()
774 self.rm()
775
776 self.remove_pools(data_pools)
777
778 if reset_obj_attrs:
779 self.id = None
780 self.name = None
781 self.metadata_pool_name = None
782 self.data_pool_name = None
783 self.data_pools = None
784
785 def recreate(self):
786 self.destroy()
787
788 self.create()
789 self.getinfo(refresh=True)
790
35e4c445
FG
791 def check_pool_application(self, pool_name):
792 osd_map = self.mon_manager.get_osd_dump_json()
793 for pool in osd_map['pools']:
794 if pool['pool_name'] == pool_name:
795 if "application_metadata" in pool:
796 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
797 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
798 format(pool_name=pool_name))
35e4c445 799
7c673cae
FG
800 def __del__(self):
801 if getattr(self._ctx, "filesystem", None) == self:
802 delattr(self._ctx, "filesystem")
803
804 def exists(self):
805 """
806 Whether a filesystem exists in the mon's filesystem list
807 """
808 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
809 return self.name in [fs['name'] for fs in fs_list]
810
811 def legacy_configured(self):
812 """
813 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
814 the case, the caller should avoid using Filesystem.create
815 """
816 try:
817 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
818 pools = json.loads(out_text)
819 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
820 if metadata_pool_exists:
821 self.metadata_pool_name = 'metadata'
822 except CommandFailedError as e:
823 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
824 # structured output (--format) from the CLI.
825 if e.exitstatus == 22:
826 metadata_pool_exists = True
827 else:
828 raise
829
830 return metadata_pool_exists
831
832 def _df(self):
833 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
834
f67539c2 835 # may raise FSMissing
f64942e4
AA
836 def get_mds_map(self, status=None):
837 if status is None:
838 status = self.status()
839 return status.get_fsmap(self.id)['mdsmap']
7c673cae 840
11fdf7f2
TL
841 def get_var(self, var, status=None):
842 return self.get_mds_map(status=status)[var]
91327a77 843
92f5a8d4
TL
844 def set_dir_layout(self, mount, path, layout):
845 for name, value in layout.items():
846 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
847
848 def add_data_pool(self, name, create=True):
849 if create:
522d829b
TL
850 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name,
851 '--pg_num_min', str(self.pg_num_min))
7c673cae
FG
852 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
853 self.get_pool_names(refresh = True)
854 for poolid, fs_name in self.data_pools.items():
855 if name == fs_name:
856 return poolid
857 raise RuntimeError("could not get just created pool '{0}'".format(name))
858
859 def get_pool_names(self, refresh = False, status = None):
860 if refresh or self.metadata_pool_name is None or self.data_pools is None:
861 if status is None:
862 status = self.status()
863 fsmap = status.get_fsmap(self.id)
864
865 osd_map = self.mon_manager.get_osd_dump_json()
866 id_to_name = {}
867 for p in osd_map['pools']:
868 id_to_name[p['pool']] = p['pool_name']
869
870 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
871 self.data_pools = {}
872 for data_pool in fsmap['mdsmap']['data_pools']:
873 self.data_pools[data_pool] = id_to_name[data_pool]
874
875 def get_data_pool_name(self, refresh = False):
876 if refresh or self.data_pools is None:
877 self.get_pool_names(refresh = True)
878 assert(len(self.data_pools) == 1)
e306af50 879 return next(iter(self.data_pools.values()))
7c673cae
FG
880
881 def get_data_pool_id(self, refresh = False):
882 """
883 Don't call this if you have multiple data pools
884 :return: integer
885 """
886 if refresh or self.data_pools is None:
887 self.get_pool_names(refresh = True)
888 assert(len(self.data_pools) == 1)
e306af50 889 return next(iter(self.data_pools.keys()))
7c673cae
FG
890
891 def get_data_pool_names(self, refresh = False):
892 if refresh or self.data_pools is None:
893 self.get_pool_names(refresh = True)
e306af50 894 return list(self.data_pools.values())
7c673cae
FG
895
896 def get_metadata_pool_name(self):
897 return self.metadata_pool_name
898
181888fb
FG
899 def set_data_pool_name(self, name):
900 if self.id is not None:
901 raise RuntimeError("can't set filesystem name if its fscid is set")
902 self.data_pool_name = name
903
522d829b
TL
904 def get_pool_pg_num(self, pool_name):
905 pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get',
906 pool_name, 'pg_num',
907 '--format=json-pretty'))
908 return int(pgs['pg_num'])
909
7c673cae
FG
910 def get_namespace_id(self):
911 return self.id
912
913 def get_pool_df(self, pool_name):
914 """
915 Return a dict like:
916 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
917 """
918 for pool_df in self._df()['pools']:
919 if pool_df['name'] == pool_name:
920 return pool_df['stats']
921
922 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
923
924 def get_usage(self):
925 return self._df()['stats']['total_used_bytes']
926
11fdf7f2 927 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
928 """
929 Return true if all daemons are in one of active, standby, standby-replay, and
930 at least max_mds daemons are in 'active'.
931
932 Unlike most of Filesystem, this function is tolerant of new-style `fs`
933 commands being missing, because we are part of the ceph installation
934 process during upgrade suites, so must fall back to old style commands
935 when we get an EINVAL on a new style command.
936
937 :return:
938 """
11fdf7f2
TL
939 # First, check to see that processes haven't exited with an error code
940 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
941 mds.check_status()
7c673cae
FG
942
943 active_count = 0
f67539c2 944 mds_map = self.get_mds_map(status=status)
7c673cae 945
cd265ab1 946 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
7c673cae
FG
947
948 for mds_id, mds_status in mds_map['info'].items():
949 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
950 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
951 return False
952 elif mds_status['state'] == 'up:active':
953 active_count += 1
954
cd265ab1 955 log.debug("are_daemons_healthy: {0}/{1}".format(
7c673cae
FG
956 active_count, mds_map['max_mds']
957 ))
958
11fdf7f2
TL
959 if not skip_max_mds_check:
960 if active_count > mds_map['max_mds']:
cd265ab1 961 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
11fdf7f2
TL
962 return False
963 elif active_count == mds_map['max_mds']:
964 # The MDSMap says these guys are active, but let's check they really are
965 for mds_id, mds_status in mds_map['info'].items():
966 if mds_status['state'] == 'up:active':
967 try:
f67539c2 968 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
11fdf7f2
TL
969 except CommandFailedError as cfe:
970 if cfe.exitstatus == errno.EINVAL:
971 # Old version, can't do this check
972 continue
973 else:
974 # MDS not even running
975 return False
976
977 if daemon_status['state'] != 'up:active':
978 # MDS hasn't taken the latest map yet
7c673cae
FG
979 return False
980
11fdf7f2
TL
981 return True
982 else:
983 return False
7c673cae 984 else:
cd265ab1 985 log.debug("are_daemons_healthy: skipping max_mds check")
11fdf7f2 986 return True
7c673cae 987
11fdf7f2 988 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
989 """
990 Return MDS daemon names of those daemons in the given state
991 :param state:
992 :return:
993 """
11fdf7f2 994 mdsmap = self.get_mds_map(status)
7c673cae 995 result = []
9f95a23c
TL
996 for mds_status in sorted(mdsmap['info'].values(),
997 key=lambda _: _['rank']):
7c673cae
FG
998 if mds_status['state'] == state or state is None:
999 result.append(mds_status['name'])
1000
1001 return result
1002
9f95a23c 1003 def get_active_names(self, status=None):
7c673cae
FG
1004 """
1005 Return MDS daemon names of those daemons holding ranks
1006 in state up:active
1007
1008 :return: list of strings like ['a', 'b'], sorted by rank
1009 """
9f95a23c 1010 return self.get_daemon_names("up:active", status=status)
7c673cae 1011
11fdf7f2
TL
1012 def get_all_mds_rank(self, status=None):
1013 mdsmap = self.get_mds_map(status)
7c673cae 1014 result = []
9f95a23c
TL
1015 for mds_status in sorted(mdsmap['info'].values(),
1016 key=lambda _: _['rank']):
7c673cae
FG
1017 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1018 result.append(mds_status['rank'])
1019
1020 return result
1021
f6b5b4d7 1022 def get_rank(self, rank=None, status=None):
28e407b8
AA
1023 if status is None:
1024 status = self.getinfo()
f6b5b4d7
TL
1025 if rank is None:
1026 rank = 0
28e407b8
AA
1027 return status.get_rank(self.id, rank)
1028
11fdf7f2
TL
1029 def rank_restart(self, rank=0, status=None):
1030 name = self.get_rank(rank=rank, status=status)['name']
1031 self.mds_restart(mds_id=name)
1032
1033 def rank_signal(self, signal, rank=0, status=None):
1034 name = self.get_rank(rank=rank, status=status)['name']
1035 self.mds_signal(name, signal)
1036
1037 def rank_freeze(self, yes, rank=0):
1038 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
1039
1040 def rank_fail(self, rank=0):
1041 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
1042
28e407b8
AA
1043 def get_ranks(self, status=None):
1044 if status is None:
1045 status = self.getinfo()
1046 return status.get_ranks(self.id)
1047
11fdf7f2
TL
1048 def get_replays(self, status=None):
1049 if status is None:
1050 status = self.getinfo()
1051 return status.get_replays(self.id)
1052
1053 def get_replay(self, rank=0, status=None):
1054 for replay in self.get_replays(status=status):
1055 if replay['rank'] == rank:
1056 return replay
1057 return None
1058
28e407b8 1059 def get_rank_names(self, status=None):
7c673cae
FG
1060 """
1061 Return MDS daemon names of those daemons holding a rank,
1062 sorted by rank. This includes e.g. up:replay/reconnect
1063 as well as active, but does not include standby or
1064 standby-replay.
1065 """
11fdf7f2 1066 mdsmap = self.get_mds_map(status)
7c673cae 1067 result = []
9f95a23c
TL
1068 for mds_status in sorted(mdsmap['info'].values(),
1069 key=lambda _: _['rank']):
7c673cae
FG
1070 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1071 result.append(mds_status['name'])
1072
1073 return result
1074
11fdf7f2 1075 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
1076 """
1077 Wait until all daemons are healthy
1078 :return:
1079 """
1080
1081 if timeout is None:
1082 timeout = DAEMON_WAIT_TIMEOUT
1083
11fdf7f2
TL
1084 if status is None:
1085 status = self.status()
1086
7c673cae
FG
1087 elapsed = 0
1088 while True:
11fdf7f2
TL
1089 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1090 return status
7c673cae
FG
1091 else:
1092 time.sleep(1)
1093 elapsed += 1
1094
1095 if elapsed > timeout:
cd265ab1 1096 log.debug("status = {0}".format(status))
7c673cae
FG
1097 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1098
11fdf7f2
TL
1099 status = self.status()
1100
f67539c2
TL
1101 def dencoder(self, obj_type, obj_blob):
1102 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1103 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1104 return p.stdout.getvalue()
1105
1106 def rados(self, *args, **kwargs):
7c673cae 1107 """
f67539c2 1108 Callout to rados CLI.
7c673cae 1109 """
7c673cae 1110
f67539c2 1111 return self.mon_manager.do_rados(*args, **kwargs)
7c673cae 1112
f67539c2 1113 def radosm(self, *args, **kwargs):
7c673cae 1114 """
f67539c2 1115 Interact with the metadata pool via rados CLI.
7c673cae 1116 """
7c673cae 1117
f67539c2
TL
1118 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1119
1120 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
7c673cae 1121 """
f67539c2 1122 Interact with the metadata pool via rados CLI. Get the stdout.
7c673cae 1123 """
7c673cae 1124
f67539c2 1125 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
7c673cae
FG
1126
1127 def get_metadata_object(self, object_type, object_id):
1128 """
1129 Retrieve an object from the metadata pool, pass it through
1130 ceph-dencoder to dump it to JSON, and return the decoded object.
1131 """
7c673cae 1132
f67539c2
TL
1133 o = self.radosmo(['get', object_id, '-'])
1134 j = self.dencoder(object_type, o)
7c673cae 1135 try:
f67539c2 1136 return json.loads(j)
7c673cae 1137 except (TypeError, ValueError):
f67539c2 1138 log.error("Failed to decode JSON: '{0}'".format(j))
7c673cae
FG
1139 raise
1140
7c673cae
FG
1141 def get_journal_version(self):
1142 """
1143 Read the JournalPointer and Journal::Header objects to learn the version of
1144 encoding in use.
1145 """
1146 journal_pointer_object = '400.00000000'
1147 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1148 journal_ino = journal_pointer_dump['journal_pointer']['front']
1149
1150 journal_header_object = "{0:x}.00000000".format(journal_ino)
1151 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1152
1153 version = journal_header_dump['journal_header']['stream_format']
cd265ab1 1154 log.debug("Read journal version {0}".format(version))
7c673cae
FG
1155
1156 return version
1157
f64942e4 1158 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae 1159 if mds_id is None:
f67539c2 1160 return self.rank_asok(command, timeout=timeout)
7c673cae 1161
f64942e4 1162 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1163
f67539c2
TL
1164 def mds_tell(self, command, mds_id=None):
1165 if mds_id is None:
1166 return self.rank_tell(command)
1167
1168 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1169
f64942e4
AA
1170 def rank_asok(self, command, rank=0, status=None, timeout=None):
1171 info = self.get_rank(rank=rank, status=status)
1172 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1173
11fdf7f2 1174 def rank_tell(self, command, rank=0, status=None):
f67539c2 1175 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
11fdf7f2 1176
f6b5b4d7
TL
1177 def ranks_tell(self, command, status=None):
1178 if status is None:
1179 status = self.status()
1180 out = []
1181 for r in status.get_ranks(self.id):
1182 result = self.rank_tell(command, rank=r['rank'], status=status)
1183 out.append((r['rank'], result))
1184 return sorted(out)
1185
1186 def ranks_perf(self, f, status=None):
1187 perf = self.ranks_tell(["perf", "dump"], status=status)
1188 out = []
1189 for rank, perf in perf:
1190 out.append((rank, f(perf)))
1191 return out
1192
7c673cae
FG
1193 def read_cache(self, path, depth=None):
1194 cmd = ["dump", "tree", path]
1195 if depth is not None:
1196 cmd.append(depth.__str__())
1197 result = self.mds_asok(cmd)
1198 if len(result) == 0:
1199 raise RuntimeError("Path not found in cache: {0}".format(path))
1200
1201 return result
1202
1203 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1204 """
1205 Block until the MDS reaches a particular state, or a failure condition
1206 is met.
1207
1208 When there are multiple MDSs, succeed when exaclty one MDS is in the
1209 goal state, or fail when any MDS is in the reject state.
1210
1211 :param goal_state: Return once the MDS is in this state
1212 :param reject: Fail if the MDS enters this state before the goal state
1213 :param timeout: Fail if this many seconds pass before reaching goal
1214 :return: number of seconds waited, rounded down to integer
1215 """
1216
1217 started_at = time.time()
1218 while True:
1219 status = self.status()
1220 if rank is not None:
f64942e4
AA
1221 try:
1222 mds_info = status.get_rank(self.id, rank)
1223 current_state = mds_info['state'] if mds_info else None
cd265ab1 1224 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
f64942e4
AA
1225 except:
1226 mdsmap = self.get_mds_map(status=status)
1227 if rank in mdsmap['failed']:
cd265ab1 1228 log.debug("Waiting for rank {0} to come back.".format(rank))
f64942e4
AA
1229 current_state = None
1230 else:
1231 raise
7c673cae
FG
1232 elif mds_id is not None:
1233 # mds_info is None if no daemon with this ID exists in the map
1234 mds_info = status.get_mds(mds_id)
1235 current_state = mds_info['state'] if mds_info else None
cd265ab1 1236 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
7c673cae
FG
1237 else:
1238 # In general, look for a single MDS
1239 states = [m['state'] for m in status.get_ranks(self.id)]
1240 if [s for s in states if s == goal_state] == [goal_state]:
1241 current_state = goal_state
1242 elif reject in states:
1243 current_state = reject
1244 else:
1245 current_state = None
cd265ab1 1246 log.debug("mapped states {0} to {1}".format(states, current_state))
7c673cae
FG
1247
1248 elapsed = time.time() - started_at
1249 if current_state == goal_state:
cd265ab1 1250 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
7c673cae
FG
1251 return elapsed
1252 elif reject is not None and current_state == reject:
1253 raise RuntimeError("MDS in reject state {0}".format(current_state))
1254 elif timeout is not None and elapsed > timeout:
1255 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1256 raise RuntimeError(
1257 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1258 elapsed, goal_state, current_state
1259 ))
1260 else:
1261 time.sleep(1)
1262
f67539c2 1263 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
7c673cae
FG
1264 if pool is None:
1265 pool = self.get_data_pool_name()
1266
1267 obj_name = "{0:x}.00000000".format(ino_no)
1268
f67539c2 1269 args = ["getxattr", obj_name, xattr_name]
7c673cae 1270 try:
f67539c2 1271 proc = self.rados(args, pool=pool, stdout=BytesIO())
7c673cae
FG
1272 except CommandFailedError as e:
1273 log.error(e.__str__())
1274 raise ObjectNotFound(obj_name)
1275
f67539c2
TL
1276 obj_blob = proc.stdout.getvalue()
1277 return json.loads(self.dencoder(obj_type, obj_blob).strip())
7c673cae
FG
1278
1279 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1280 """
1281 Write to an xattr of the 0th data object of an inode. Will
1282 succeed whether the object and/or xattr already exist or not.
1283
1284 :param ino_no: integer inode number
1285 :param xattr_name: string name of the xattr
1286 :param data: byte array data to write to the xattr
1287 :param pool: name of data pool or None to use primary data pool
1288 :return: None
1289 """
7c673cae
FG
1290 if pool is None:
1291 pool = self.get_data_pool_name()
1292
1293 obj_name = "{0:x}.00000000".format(ino_no)
f67539c2
TL
1294 args = ["setxattr", obj_name, xattr_name, data]
1295 self.rados(args, pool=pool)
7c673cae
FG
1296
1297 def read_backtrace(self, ino_no, pool=None):
1298 """
1299 Read the backtrace from the data pool, return a dict in the format
1300 given by inode_backtrace_t::dump, which is something like:
1301
1302 ::
1303
1304 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1305 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1306
1307 { "ino": 1099511627778,
1308 "ancestors": [
1309 { "dirino": 1,
1310 "dname": "blah",
1311 "version": 11}],
1312 "pool": 1,
1313 "old_pools": []}
1314
1315 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1316 one data pool and that will be used.
1317 """
1318 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1319
1320 def read_layout(self, ino_no, pool=None):
1321 """
1322 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1323 ::
1324 {
1325 "stripe_unit": 4194304,
1326 "stripe_count": 1,
1327 "object_size": 4194304,
1328 "pool_id": 1,
1329 "pool_ns": "",
1330 }
1331
1332 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1333 one data pool and that will be used.
1334 """
1335 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1336
1337 def _enumerate_data_objects(self, ino, size):
1338 """
1339 Get the list of expected data objects for a range, and the list of objects
1340 that really exist.
1341
1342 :return a tuple of two lists of strings (expected, actual)
1343 """
1344 stripe_size = 1024 * 1024 * 4
1345
1346 size = max(stripe_size, size)
1347
1348 want_objects = [
1349 "{0:x}.{1:08x}".format(ino, n)
e306af50 1350 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1351 ]
1352
f67539c2 1353 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
7c673cae
FG
1354
1355 return want_objects, exist_objects
1356
1357 def data_objects_present(self, ino, size):
1358 """
1359 Check that *all* the expected data objects for an inode are present in the data pool
1360 """
1361
1362 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1363 missing = set(want_objects) - set(exist_objects)
1364
1365 if missing:
cd265ab1 1366 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
7c673cae
FG
1367 ino, size, missing
1368 ))
1369 return False
1370 else:
cd265ab1 1371 log.debug("All objects for ino {0} size {1} found".format(ino, size))
7c673cae
FG
1372 return True
1373
1374 def data_objects_absent(self, ino, size):
1375 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1376 present = set(want_objects) & set(exist_objects)
1377
1378 if present:
cd265ab1 1379 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
7c673cae
FG
1380 ino, size, present
1381 ))
1382 return False
1383 else:
cd265ab1 1384 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
7c673cae
FG
1385 return True
1386
1387 def dirfrag_exists(self, ino, frag):
1388 try:
f67539c2 1389 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1390 except CommandFailedError:
7c673cae
FG
1391 return False
1392 else:
1393 return True
1394
7c673cae
FG
1395 def list_dirfrag(self, dir_ino):
1396 """
1397 Read the named object and return the list of omap keys
1398
1399 :return a list of 0 or more strings
1400 """
1401
1402 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1403
1404 try:
f67539c2 1405 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
7c673cae
FG
1406 except CommandFailedError as e:
1407 log.error(e.__str__())
1408 raise ObjectNotFound(dirfrag_obj_name)
1409
f67539c2
TL
1410 return key_list_str.strip().split("\n") if key_list_str else []
1411
1412 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1413 """
1414 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1415 warning : The splitting of directory is not considered here.
1416 """
1417
1418 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1419 try:
1420 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1421 except CommandFailedError as e:
1422 log.error(e.__str__())
1423 raise ObjectNotFound(dir_ino)
7c673cae
FG
1424
1425 def erase_metadata_objects(self, prefix):
1426 """
1427 For all objects in the metadata pool matching the prefix,
1428 erase them.
1429
1430 This O(N) with the number of objects in the pool, so only suitable
1431 for use on toy test filesystems.
1432 """
f67539c2 1433 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
7c673cae
FG
1434 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1435 for o in matching_objects:
f67539c2 1436 self.radosm(["rm", o])
7c673cae
FG
1437
1438 def erase_mds_objects(self, rank):
1439 """
1440 Erase all the per-MDS objects for a particular rank. This includes
1441 inotable, sessiontable, journal
1442 """
1443
1444 def obj_prefix(multiplier):
1445 """
1446 MDS object naming conventions like rank 1's
1447 journal is at 201.***
1448 """
1449 return "%x." % (multiplier * 0x100 + rank)
1450
1451 # MDS_INO_LOG_OFFSET
1452 self.erase_metadata_objects(obj_prefix(2))
1453 # MDS_INO_LOG_BACKUP_OFFSET
1454 self.erase_metadata_objects(obj_prefix(3))
1455 # MDS_INO_LOG_POINTER_OFFSET
1456 self.erase_metadata_objects(obj_prefix(4))
1457 # MDSTables & SessionMap
1458 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1459
1460 @property
1461 def _prefix(self):
1462 """
1463 Override this to set a different
1464 """
1465 return ""
1466
f64942e4
AA
1467 def _make_rank(self, rank):
1468 return "{}:{}".format(self.name, rank)
1469
7c673cae
FG
1470 def _run_tool(self, tool, args, rank=None, quiet=False):
1471 # Tests frequently have [client] configuration that jacks up
1472 # the objecter log level (unlikely to be interesting here)
1473 # and does not set the mds log level (very interesting here)
1474 if quiet:
1475 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1476 else:
1477 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1478
1479 if rank is not None:
f64942e4 1480 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1481
1482 t1 = datetime.datetime.now()
e306af50 1483 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae 1484 duration = datetime.datetime.now() - t1
cd265ab1 1485 log.debug("Ran {0} in time {1}, result:\n{2}".format(
7c673cae
FG
1486 base_args + args, duration, r
1487 ))
1488 return r
1489
1490 @property
1491 def tool_remote(self):
1492 """
1493 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1494 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1495 so that tests can use this remote to go get locally written output files from the tools.
1496 """
f67539c2 1497 return self.mon_manager.controller
7c673cae 1498
f64942e4 1499 def journal_tool(self, args, rank, quiet=False):
7c673cae 1500 """
f64942e4 1501 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1502 """
f64942e4
AA
1503 fs_rank = self._make_rank(rank)
1504 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae 1505
f67539c2
TL
1506 def meta_tool(self, args, rank, quiet=False):
1507 """
1508 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1509 """
1510 fs_rank = self._make_rank(rank)
1511 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1512
7c673cae
FG
1513 def table_tool(self, args, quiet=False):
1514 """
1515 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1516 """
1517 return self._run_tool("cephfs-table-tool", args, None, quiet)
1518
1519 def data_scan(self, args, quiet=False, worker_count=1):
1520 """
1521 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1522
1523 :param worker_count: if greater than 1, multiple workers will be run
1524 in parallel and the return value will be None
1525 """
1526
1527 workers = []
1528
1529 for n in range(0, worker_count):
1530 if worker_count > 1:
1531 # data-scan args first token is a command, followed by args to it.
1532 # insert worker arguments after the command.
1533 cmd = args[0]
1534 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1535 else:
1536 worker_args = args
1537
1538 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1539 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1540
1541 for w in workers:
1542 w.get()
1543
1544 if worker_count == 1:
1545 return workers[0].value
1546 else:
1547 return None
b32b8144
FG
1548
1549 def is_full(self):
1550 return self.is_pool_full(self.get_data_pool_name())
f67539c2
TL
1551
1552 def authorize(self, client_id, caps=('/', 'rw')):
1553 """
1554 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1555 keyring.
1556
1557 client_id: client id that will be authorized
1558 caps: tuple containing the path and permission (can be r or rw)
1559 respectively.
1560 """
1561 client_name = 'client.' + client_id
1562 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1563 client_name, *caps)
1564
1565 def grow(self, new_max_mds, status=None):
1566 oldmax = self.get_var('max_mds', status=status)
1567 assert(new_max_mds > oldmax)
1568 self.set_max_mds(new_max_mds)
1569 return self.wait_for_daemons()
1570
1571 def shrink(self, new_max_mds, status=None):
1572 oldmax = self.get_var('max_mds', status=status)
1573 assert(new_max_mds < oldmax)
1574 self.set_max_mds(new_max_mds)
1575 return self.wait_for_daemons()
1576
1577 def run_scrub(self, cmd, rank=0):
1578 return self.rank_tell(["scrub"] + cmd, rank)
1579
1580 def get_scrub_status(self, rank=0):
1581 return self.run_scrub(["status"], rank)
1582
1583 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1584 timeout=300, reverse=False):
1585 # time out after "timeout" seconds and assume as done
1586 if result is None:
1587 result = "no active scrubs running"
1588 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1589 while proceed():
1590 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1591 assert out_json is not None
1592 if not reverse:
1593 if result in out_json['status']:
1594 log.info("all active scrubs completed")
1595 return True
1596 else:
1597 if result not in out_json['status']:
1598 log.info("all active scrubs completed")
1599 return True
1600
1601 if tag is not None:
1602 status = out_json['scrubs'][tag]
1603 if status is not None:
1604 log.info(f"scrub status for tag:{tag} - {status}")
1605 else:
1606 log.info(f"scrub has completed for tag:{tag}")
1607 return True
1608
1609 # timed out waiting for scrub to complete
1610 return False