]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/cephfs/filesystem.py
use the buster suite for getting the source package for now
[ceph.git] / ceph / qa / tasks / cephfs / filesystem.py
CommitLineData
7c673cae 1
7c673cae
FG
2import json
3import logging
4from gevent import Greenlet
5import os
6import time
7import datetime
8import re
9import errno
181888fb 10import random
11fdf7f2 11import traceback
7c673cae 12
f67539c2
TL
13from io import BytesIO, StringIO
14from errno import EBUSY
e306af50 15
7c673cae
FG
16from teuthology.exceptions import CommandFailedError
17from teuthology import misc
18from teuthology.nuke import clear_firewall
19from teuthology.parallel import parallel
f67539c2 20from teuthology import contextutil
7c673cae
FG
21from tasks.ceph_manager import write_conf
22from tasks import ceph_manager
23
24
25log = logging.getLogger(__name__)
26
27
28DAEMON_WAIT_TIMEOUT = 120
29ROOT_INO = 1
30
92f5a8d4
TL
31class FileLayout(object):
32 def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None):
33 self.pool = pool
34 self.pool_namespace = pool_namespace
35 self.stripe_unit = stripe_unit
36 self.stripe_count = stripe_count
37 self.object_size = object_size
38
39 @classmethod
40 def load_from_ceph(layout_str):
41 # TODO
42 pass
43
44 def items(self):
45 if self.pool is not None:
46 yield ("pool", self.pool)
47 if self.pool_namespace:
48 yield ("pool_namespace", self.pool_namespace)
49 if self.stripe_unit is not None:
50 yield ("stripe_unit", self.stripe_unit)
51 if self.stripe_count is not None:
52 yield ("stripe_count", self.stripe_count)
53 if self.object_size is not None:
54 yield ("object_size", self.stripe_size)
7c673cae
FG
55
56class ObjectNotFound(Exception):
57 def __init__(self, object_name):
58 self._object_name = object_name
59
60 def __str__(self):
61 return "Object not found: '{0}'".format(self._object_name)
62
f67539c2
TL
63class FSMissing(Exception):
64 def __init__(self, ident):
65 self.ident = ident
66
67 def __str__(self):
68 return f"File system {self.ident} does not exist in the map"
69
7c673cae
FG
70class FSStatus(object):
71 """
72 Operations on a snapshot of the FSMap.
73 """
74 def __init__(self, mon_manager):
75 self.mon = mon_manager
76 self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json"))
77
78 def __str__(self):
79 return json.dumps(self.map, indent = 2, sort_keys = True)
80
81 # Expose the fsmap for manual inspection.
82 def __getitem__(self, key):
83 """
84 Get a field from the fsmap.
85 """
86 return self.map[key]
87
88 def get_filesystems(self):
89 """
90 Iterator for all filesystems.
91 """
92 for fs in self.map['filesystems']:
93 yield fs
94
95 def get_all(self):
96 """
97 Iterator for all the mds_info components in the FSMap.
98 """
9f95a23c 99 for info in self.map['standbys']:
7c673cae
FG
100 yield info
101 for fs in self.map['filesystems']:
102 for info in fs['mdsmap']['info'].values():
103 yield info
104
105 def get_standbys(self):
106 """
107 Iterator for all standbys.
108 """
109 for info in self.map['standbys']:
110 yield info
111
112 def get_fsmap(self, fscid):
113 """
114 Get the fsmap for the given FSCID.
115 """
116 for fs in self.map['filesystems']:
117 if fscid is None or fs['id'] == fscid:
118 return fs
f67539c2 119 raise FSMissing(fscid)
7c673cae
FG
120
121 def get_fsmap_byname(self, name):
122 """
123 Get the fsmap for the given file system name.
124 """
125 for fs in self.map['filesystems']:
126 if name is None or fs['mdsmap']['fs_name'] == name:
127 return fs
f67539c2 128 raise FSMissing(name)
7c673cae
FG
129
130 def get_replays(self, fscid):
131 """
132 Get the standby:replay MDS for the given FSCID.
133 """
134 fs = self.get_fsmap(fscid)
135 for info in fs['mdsmap']['info'].values():
136 if info['state'] == 'up:standby-replay':
137 yield info
138
139 def get_ranks(self, fscid):
140 """
141 Get the ranks for the given FSCID.
142 """
143 fs = self.get_fsmap(fscid)
144 for info in fs['mdsmap']['info'].values():
11fdf7f2 145 if info['rank'] >= 0 and info['state'] != 'up:standby-replay':
7c673cae
FG
146 yield info
147
148 def get_rank(self, fscid, rank):
149 """
150 Get the rank for the given FSCID.
151 """
152 for info in self.get_ranks(fscid):
153 if info['rank'] == rank:
154 return info
155 raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank))
156
157 def get_mds(self, name):
158 """
159 Get the info for the given MDS name.
160 """
161 for info in self.get_all():
162 if info['name'] == name:
163 return info
164 return None
165
166 def get_mds_addr(self, name):
167 """
168 Return the instance addr as a string, like "10.214.133.138:6807\/10825"
169 """
170 info = self.get_mds(name)
171 if info:
172 return info['addr']
173 else:
e306af50 174 log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
7c673cae
FG
175 raise RuntimeError("MDS id '{0}' not found in map".format(name))
176
f67539c2
TL
177 def get_mds_addrs(self, name):
178 """
179 Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]"
180 """
181 info = self.get_mds(name)
182 if info:
183 return [e['addr'] for e in info['addrs']['addrvec']]
184 else:
185 log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging
186 raise RuntimeError("MDS id '{0}' not found in map".format(name))
187
9f95a23c
TL
188 def get_mds_gid(self, gid):
189 """
190 Get the info for the given MDS gid.
191 """
192 for info in self.get_all():
193 if info['gid'] == gid:
194 return info
195 return None
196
197 def hadfailover(self, status):
198 """
199 Compares two statuses for mds failovers.
200 Returns True if there is a failover.
201 """
202 for fs in status.map['filesystems']:
203 for info in fs['mdsmap']['info'].values():
204 oldinfo = self.get_mds_gid(info['gid'])
205 if oldinfo is None or oldinfo['incarnation'] != info['incarnation']:
206 return True
207 #all matching
208 return False
209
7c673cae
FG
210class CephCluster(object):
211 @property
212 def admin_remote(self):
213 first_mon = misc.get_first_mon(self._ctx, None)
9f95a23c 214 (result,) = self._ctx.cluster.only(first_mon).remotes.keys()
7c673cae
FG
215 return result
216
f67539c2 217 def __init__(self, ctx) -> None:
7c673cae
FG
218 self._ctx = ctx
219 self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager'))
220
221 def get_config(self, key, service_type=None):
222 """
223 Get config from mon by default, or a specific service if caller asks for it
224 """
225 if service_type is None:
226 service_type = 'mon'
227
228 service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0]
229 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
230
231 def set_ceph_conf(self, subsys, key, value):
232 if subsys not in self._ctx.ceph['ceph'].conf:
233 self._ctx.ceph['ceph'].conf[subsys] = {}
234 self._ctx.ceph['ceph'].conf[subsys][key] = value
235 write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they
236 # used a different config path this won't work.
237
238 def clear_ceph_conf(self, subsys, key):
239 del self._ctx.ceph['ceph'].conf[subsys][key]
240 write_conf(self._ctx)
241
f64942e4
AA
242 def json_asok(self, command, service_type, service_id, timeout=None):
243 if timeout is None:
244 timeout = 15*60
f6b5b4d7 245 command.insert(0, '--format=json')
f64942e4 246 proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout)
f6b5b4d7
TL
247 response_data = proc.stdout.getvalue().strip()
248 if len(response_data) > 0:
249 j = json.loads(response_data)
250 pretty = json.dumps(j, sort_keys=True, indent=2)
251 log.debug(f"_json_asok output\n{pretty}")
252 return j
7c673cae 253 else:
f6b5b4d7 254 log.debug("_json_asok output empty")
7c673cae
FG
255 return None
256
f67539c2
TL
257 def is_addr_blocklisted(self, addr=None):
258 if addr is None:
259 log.warn("Couldn't get the client address, so the blocklisted "
260 "status undetermined")
261 return False
262
263 blocklist = json.loads(self.mon_manager.run_cluster_cmd(
264 args=["osd", "blocklist", "ls", "--format=json"],
265 stdout=StringIO()).stdout.getvalue())
266 for b in blocklist:
267 if addr == b["addr"]:
268 return True
269 return False
270
7c673cae
FG
271
272class MDSCluster(CephCluster):
273 """
274 Collective operations on all the MDS daemons in the Ceph cluster. These
275 daemons may be in use by various Filesystems.
276
277 For the benefit of pre-multi-filesystem tests, this class is also
278 a parent of Filesystem. The correct way to use MDSCluster going forward is
279 as a separate instance outside of your (multiple) Filesystem instances.
280 """
f67539c2 281
7c673cae
FG
282 def __init__(self, ctx):
283 super(MDSCluster, self).__init__(ctx)
284
f67539c2
TL
285 @property
286 def mds_ids(self):
287 # do this dynamically because the list of ids may change periodically with cephadm
288 return list(misc.all_roles_of_type(self._ctx.cluster, 'mds'))
7c673cae 289
f67539c2
TL
290 @property
291 def mds_daemons(self):
292 return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids])
7c673cae
FG
293
294 def _one_or_all(self, mds_id, cb, in_parallel=True):
295 """
296 Call a callback for a single named MDS, or for all.
297
298 Note that the parallelism here isn't for performance, it's to avoid being overly kind
299 to the cluster by waiting a graceful ssh-latency of time between doing things, and to
300 avoid being overly kind by executing them in a particular order. However, some actions
301 don't cope with being done in parallel, so it's optional (`in_parallel`)
302
303 :param mds_id: MDS daemon name, or None
304 :param cb: Callback taking single argument of MDS daemon name
305 :param in_parallel: whether to invoke callbacks concurrently (else one after the other)
306 """
f67539c2 307
7c673cae
FG
308 if mds_id is None:
309 if in_parallel:
310 with parallel() as p:
311 for mds_id in self.mds_ids:
312 p.spawn(cb, mds_id)
313 else:
314 for mds_id in self.mds_ids:
315 cb(mds_id)
316 else:
317 cb(mds_id)
318
181888fb
FG
319 def get_config(self, key, service_type=None):
320 """
321 get_config specialization of service_type="mds"
322 """
323 if service_type != "mds":
324 return super(MDSCluster, self).get_config(key, service_type)
325
326 # Some tests stop MDS daemons, don't send commands to a dead one:
e306af50
TL
327 running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()]
328 service_id = random.sample(running_daemons, 1)[0]
181888fb
FG
329 return self.json_asok(['config', 'get', key], service_type, service_id)[key]
330
7c673cae
FG
331 def mds_stop(self, mds_id=None):
332 """
333 Stop the MDS daemon process(se). If it held a rank, that rank
334 will eventually go laggy.
335 """
336 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop())
337
338 def mds_fail(self, mds_id=None):
339 """
340 Inform MDSMonitor of the death of the daemon process(es). If it held
341 a rank, that rank will be relinquished.
342 """
343 self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_))
344
345 def mds_restart(self, mds_id=None):
346 self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart())
347
348 def mds_fail_restart(self, mds_id=None):
349 """
350 Variation on restart that includes marking MDSs as failed, so that doing this
351 operation followed by waiting for healthy daemon states guarantees that they
352 have gone down and come up, rather than potentially seeing the healthy states
353 that existed before the restart.
354 """
355 def _fail_restart(id_):
356 self.mds_daemons[id_].stop()
357 self.mon_manager.raw_cluster_cmd("mds", "fail", id_)
358 self.mds_daemons[id_].restart()
359
360 self._one_or_all(mds_id, _fail_restart)
361
1adf2230
AA
362 def mds_signal(self, mds_id, sig, silent=False):
363 """
364 signal a MDS daemon
365 """
366 self.mds_daemons[mds_id].signal(sig, silent);
367
181888fb
FG
368 def newfs(self, name='cephfs', create=True):
369 return Filesystem(self._ctx, name=name, create=create)
7c673cae
FG
370
371 def status(self):
372 return FSStatus(self.mon_manager)
373
7c673cae
FG
374 def get_standby_daemons(self):
375 return set([s['name'] for s in self.status().get_standbys()])
376
377 def get_mds_hostnames(self):
378 result = set()
379 for mds_id in self.mds_ids:
380 mds_remote = self.mon_manager.find_remote('mds', mds_id)
381 result.add(mds_remote.hostname)
382
383 return list(result)
384
385 def set_clients_block(self, blocked, mds_id=None):
386 """
387 Block (using iptables) client communications to this MDS. Be careful: if
388 other services are running on this MDS, or other MDSs try to talk to this
389 MDS, their communications may also be blocked as collatoral damage.
390
391 :param mds_id: Optional ID of MDS to block, default to all
392 :return:
393 """
394 da_flag = "-A" if blocked else "-D"
395
396 def set_block(_mds_id):
397 remote = self.mon_manager.find_remote('mds', _mds_id)
398 status = self.status()
399
400 addr = status.get_mds_addr(_mds_id)
401 ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups()
402
403 remote.run(
404 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
405 "comment", "--comment", "teuthology"])
406 remote.run(
407 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
408 "comment", "--comment", "teuthology"])
409
410 self._one_or_all(mds_id, set_block, in_parallel=False)
411
f67539c2
TL
412 def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2):
413 """
414 Block (using iptables) communications from a provided MDS to other MDSs.
415 Block all ports that an MDS uses for communication.
416
417 :param blocked: True to block the MDS, False otherwise
418 :param mds_rank_1: MDS rank
419 :param mds_rank_2: MDS rank
420 :return:
421 """
422 da_flag = "-A" if blocked else "-D"
423
424 def set_block(mds_ids):
425 status = self.status()
426
427 mds = mds_ids[0]
428 remote = self.mon_manager.find_remote('mds', mds)
429 addrs = status.get_mds_addrs(mds)
430 for addr in addrs:
431 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
432 remote.run(
433 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
434 "comment", "--comment", "teuthology"])
435
436
437 mds = mds_ids[1]
438 remote = self.mon_manager.find_remote('mds', mds)
439 addrs = status.get_mds_addrs(mds)
440 for addr in addrs:
441 ip_str, port_str = re.match("(.+):(.+)", addr).groups()
442 remote.run(
443 args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m",
444 "comment", "--comment", "teuthology"])
445 remote.run(
446 args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m",
447 "comment", "--comment", "teuthology"])
448
449 self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False)
450
7c673cae
FG
451 def clear_firewall(self):
452 clear_firewall(self._ctx)
453
454 def get_mds_info(self, mds_id):
455 return FSStatus(self.mon_manager).get_mds(mds_id)
456
7c673cae
FG
457 def is_pool_full(self, pool_name):
458 pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools']
459 for pool in pools:
460 if pool['pool_name'] == pool_name:
461 return 'full' in pool['flags_names'].split(",")
462
463 raise RuntimeError("Pool not found '{0}'".format(pool_name))
464
f67539c2
TL
465 def delete_all_filesystems(self):
466 """
467 Remove all filesystems that exist, and any pools in use by them.
468 """
469 for fs in self.status().get_filesystems():
470 Filesystem(ctx=self._ctx, fscid=fs['id']).destroy()
471
472
7c673cae
FG
473class Filesystem(MDSCluster):
474 """
475 This object is for driving a CephFS filesystem. The MDS daemons driven by
476 MDSCluster may be shared with other Filesystems.
477 """
f67539c2 478 def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False):
7c673cae
FG
479 super(Filesystem, self).__init__(ctx)
480
181888fb 481 self.name = name
7c673cae 482 self.id = None
7c673cae 483 self.metadata_pool_name = None
181888fb
FG
484 self.metadata_overlay = False
485 self.data_pool_name = None
7c673cae 486 self.data_pools = None
f91f0fd5 487 self.fs_config = fs_config
f67539c2 488 self.ec_profile = fs_config.get('ec_profile')
7c673cae
FG
489
490 client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client'))
491 self.client_id = client_list[0]
492 self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1]
493
181888fb 494 if name is not None:
7c673cae
FG
495 if fscid is not None:
496 raise RuntimeError("cannot specify fscid when creating fs")
181888fb 497 if create and not self.legacy_configured():
7c673cae 498 self.create()
181888fb
FG
499 else:
500 if fscid is not None:
501 self.id = fscid
502 self.getinfo(refresh = True)
7c673cae
FG
503
504 # Stash a reference to the first created filesystem on ctx, so
505 # that if someone drops to the interactive shell they can easily
506 # poke our methods.
507 if not hasattr(self._ctx, "filesystem"):
508 self._ctx.filesystem = self
509
f67539c2
TL
510 def dead(self):
511 try:
512 return not bool(self.get_mds_map())
513 except FSMissing:
514 return True
515
9f95a23c
TL
516 def get_task_status(self, status_key):
517 return self.mon_manager.get_service_task_status("mds", status_key)
518
7c673cae
FG
519 def getinfo(self, refresh = False):
520 status = self.status()
521 if self.id is not None:
522 fsmap = status.get_fsmap(self.id)
523 elif self.name is not None:
524 fsmap = status.get_fsmap_byname(self.name)
525 else:
526 fss = [fs for fs in status.get_filesystems()]
527 if len(fss) == 1:
528 fsmap = fss[0]
529 elif len(fss) == 0:
530 raise RuntimeError("no file system available")
531 else:
532 raise RuntimeError("more than one file system available")
533 self.id = fsmap['id']
534 self.name = fsmap['mdsmap']['fs_name']
535 self.get_pool_names(status = status, refresh = refresh)
536 return status
537
181888fb
FG
538 def set_metadata_overlay(self, overlay):
539 if self.id is not None:
540 raise RuntimeError("cannot specify fscid when configuring overlay")
541 self.metadata_overlay = overlay
542
7c673cae
FG
543 def deactivate(self, rank):
544 if rank < 0:
545 raise RuntimeError("invalid rank")
546 elif rank == 0:
547 raise RuntimeError("cannot deactivate rank 0")
548 self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank))
549
11fdf7f2
TL
550 def reach_max_mds(self):
551 # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!)
552 status = self.getinfo()
553 mds_map = self.get_mds_map(status=status)
554 max_mds = mds_map['max_mds']
555
556 count = len(list(self.get_ranks(status=status)))
557 if count > max_mds:
558 try:
559 # deactivate mds in decending order
560 status = self.wait_for_daemons(status=status, skip_max_mds_check=True)
561 while count > max_mds:
562 targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True)
563 target = targets[0]
cd265ab1 564 log.debug("deactivating rank %d" % target['rank'])
11fdf7f2
TL
565 self.deactivate(target['rank'])
566 status = self.wait_for_daemons(skip_max_mds_check=True)
567 count = len(list(self.get_ranks(status=status)))
568 except:
569 # In Mimic, deactivation is done automatically:
570 log.info("Error:\n{}".format(traceback.format_exc()))
571 status = self.wait_for_daemons()
572 else:
573 status = self.wait_for_daemons()
574
575 mds_map = self.get_mds_map(status=status)
576 assert(mds_map['max_mds'] == max_mds)
577 assert(mds_map['in'] == list(range(0, max_mds)))
578
f67539c2
TL
579 def reset(self):
580 self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it')
581
11fdf7f2
TL
582 def fail(self):
583 self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name))
584
9f95a23c
TL
585 def set_flag(self, var, *args):
586 a = map(lambda x: str(x).lower(), args)
587 self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a)
588
589 def set_allow_multifs(self, yes=True):
590 self.set_flag("enable_multiple", yes)
591
91327a77 592 def set_var(self, var, *args):
9f95a23c 593 a = map(lambda x: str(x).lower(), args)
91327a77
AA
594 self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a)
595
11fdf7f2
TL
596 def set_down(self, down=True):
597 self.set_var("down", str(down).lower())
598
599 def set_joinable(self, joinable=True):
9f95a23c 600 self.set_var("joinable", joinable)
11fdf7f2 601
7c673cae 602 def set_max_mds(self, max_mds):
f64942e4 603 self.set_var("max_mds", "%d" % max_mds)
7c673cae 604
f91f0fd5
TL
605 def set_session_timeout(self, timeout):
606 self.set_var("session_timeout", "%d" % timeout)
607
11fdf7f2 608 def set_allow_standby_replay(self, yes):
9f95a23c 609 self.set_var("allow_standby_replay", yes)
11fdf7f2
TL
610
611 def set_allow_new_snaps(self, yes):
9f95a23c 612 self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it')
7c673cae 613
f67539c2
TL
614 def required_client_features(self, *args, **kwargs):
615 c = ["fs", "required_client_features", self.name, *args]
616 return self.mon_manager.run_cluster_cmd(args=c, **kwargs)
617
9f95a23c
TL
618 # In Octopus+, the PG count can be omitted to use the default. We keep the
619 # hard-coded value for deployments of Mimic/Nautilus.
620 pgs_per_fs_pool = 8
7c673cae
FG
621
622 def create(self):
623 if self.name is None:
624 self.name = "cephfs"
625 if self.metadata_pool_name is None:
626 self.metadata_pool_name = "{0}_metadata".format(self.name)
181888fb
FG
627 if self.data_pool_name is None:
628 data_pool_name = "{0}_data".format(self.name)
629 else:
630 data_pool_name = self.data_pool_name
7c673cae 631
cd265ab1 632 log.debug("Creating filesystem '{0}'".format(self.name))
7c673cae 633
7c673cae 634 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
9f95a23c 635 self.metadata_pool_name, self.pgs_per_fs_pool.__str__())
f67539c2
TL
636
637 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create',
638 data_pool_name, self.pgs_per_fs_pool.__str__())
639
181888fb
FG
640 if self.metadata_overlay:
641 self.mon_manager.raw_cluster_cmd('fs', 'new',
642 self.name, self.metadata_pool_name, data_pool_name,
643 '--allow-dangerous-metadata-overlay')
644 else:
f67539c2
TL
645 self.mon_manager.raw_cluster_cmd('fs', 'new',
646 self.name,
647 self.metadata_pool_name,
648 data_pool_name)
649
b32b8144 650 if self.ec_profile and 'disabled' not in self.ec_profile:
f67539c2 651 ec_data_pool_name = data_pool_name + "_ec"
cd265ab1 652 log.debug("EC profile is %s", self.ec_profile)
f67539c2 653 cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name]
3efd9988
FG
654 cmd.extend(self.ec_profile)
655 self.mon_manager.raw_cluster_cmd(*cmd)
656 self.mon_manager.raw_cluster_cmd(
657 'osd', 'pool', 'create',
f67539c2
TL
658 ec_data_pool_name, self.pgs_per_fs_pool.__str__(), 'erasure',
659 ec_data_pool_name)
3efd9988
FG
660 self.mon_manager.raw_cluster_cmd(
661 'osd', 'pool', 'set',
f67539c2
TL
662 ec_data_pool_name, 'allow_ec_overwrites', 'true')
663 self.add_data_pool(ec_data_pool_name, create=False)
664 self.check_pool_application(ec_data_pool_name)
665
666 self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .")
667
35e4c445
FG
668 self.check_pool_application(self.metadata_pool_name)
669 self.check_pool_application(data_pool_name)
f67539c2 670
7c673cae 671 # Turn off spurious standby count warnings from modifying max_mds in tests.
31f18b77
FG
672 try:
673 self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0')
674 except CommandFailedError as e:
675 if e.exitstatus == 22:
676 # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise)
677 pass
678 else:
679 raise
7c673cae 680
f91f0fd5
TL
681 if self.fs_config is not None:
682 max_mds = self.fs_config.get('max_mds', 1)
683 if max_mds > 1:
684 self.set_max_mds(max_mds)
685
f67539c2
TL
686 standby_replay = self.fs_config.get('standby_replay', False)
687 self.set_allow_standby_replay(standby_replay)
688
f91f0fd5
TL
689 # If absent will use the default value (60 seconds)
690 session_timeout = self.fs_config.get('session_timeout', 60)
691 if session_timeout != 60:
692 self.set_session_timeout(session_timeout)
693
7c673cae
FG
694 self.getinfo(refresh = True)
695
f67539c2
TL
696 def run_client_payload(self, cmd):
697 # avoid circular dep by importing here:
698 from tasks.cephfs.fuse_mount import FuseMount
699 d = misc.get_testdir(self._ctx)
700 m = FuseMount(self._ctx, {}, d, "admin", self.client_remote, cephfs_name=self.name)
701 m.mount()
702 m.run_shell_payload(cmd)
703 m.umount_wait(require_clean=True)
704
705 def _remove_pool(self, name, **kwargs):
706 c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it'
707 return self.mon_manager.ceph(c, **kwargs)
708
709 def rm(self, **kwargs):
710 c = f'fs rm {self.name} --yes-i-really-mean-it'
711 return self.mon_manager.ceph(c, **kwargs)
712
713 def remove_pools(self, data_pools):
714 self._remove_pool(self.get_metadata_pool_name())
715 for poolname in data_pools:
716 try:
717 self._remove_pool(poolname)
718 except CommandFailedError as e:
719 # EBUSY, this data pool is used by two metadata pools, let the
720 # 2nd pass delete it
721 if e.exitstatus == EBUSY:
722 pass
723 else:
724 raise
725
726 def destroy(self, reset_obj_attrs=True):
727 log.info(f'Destroying file system {self.name} and related pools')
728
729 if self.dead():
730 log.debug('already dead...')
731 return
732
733 data_pools = self.get_data_pool_names(refresh=True)
734
735 # make sure no MDSs are attached to given FS.
736 self.fail()
737 self.rm()
738
739 self.remove_pools(data_pools)
740
741 if reset_obj_attrs:
742 self.id = None
743 self.name = None
744 self.metadata_pool_name = None
745 self.data_pool_name = None
746 self.data_pools = None
747
748 def recreate(self):
749 self.destroy()
750
751 self.create()
752 self.getinfo(refresh=True)
753
35e4c445
FG
754 def check_pool_application(self, pool_name):
755 osd_map = self.mon_manager.get_osd_dump_json()
756 for pool in osd_map['pools']:
757 if pool['pool_name'] == pool_name:
758 if "application_metadata" in pool:
759 if not "cephfs" in pool['application_metadata']:
9f95a23c
TL
760 raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\
761 format(pool_name=pool_name))
35e4c445 762
7c673cae
FG
763 def __del__(self):
764 if getattr(self._ctx, "filesystem", None) == self:
765 delattr(self._ctx, "filesystem")
766
767 def exists(self):
768 """
769 Whether a filesystem exists in the mon's filesystem list
770 """
771 fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty'))
772 return self.name in [fs['name'] for fs in fs_list]
773
774 def legacy_configured(self):
775 """
776 Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is
777 the case, the caller should avoid using Filesystem.create
778 """
779 try:
780 out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools')
781 pools = json.loads(out_text)
782 metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools]
783 if metadata_pool_exists:
784 self.metadata_pool_name = 'metadata'
785 except CommandFailedError as e:
786 # For use in upgrade tests, Ceph cuttlefish and earlier don't support
787 # structured output (--format) from the CLI.
788 if e.exitstatus == 22:
789 metadata_pool_exists = True
790 else:
791 raise
792
793 return metadata_pool_exists
794
795 def _df(self):
796 return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty"))
797
f67539c2 798 # may raise FSMissing
f64942e4
AA
799 def get_mds_map(self, status=None):
800 if status is None:
801 status = self.status()
802 return status.get_fsmap(self.id)['mdsmap']
7c673cae 803
11fdf7f2
TL
804 def get_var(self, var, status=None):
805 return self.get_mds_map(status=status)[var]
91327a77 806
92f5a8d4
TL
807 def set_dir_layout(self, mount, path, layout):
808 for name, value in layout.items():
809 mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path])
810
811 def add_data_pool(self, name, create=True):
812 if create:
9f95a23c 813 self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.pgs_per_fs_pool.__str__())
7c673cae
FG
814 self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name)
815 self.get_pool_names(refresh = True)
816 for poolid, fs_name in self.data_pools.items():
817 if name == fs_name:
818 return poolid
819 raise RuntimeError("could not get just created pool '{0}'".format(name))
820
821 def get_pool_names(self, refresh = False, status = None):
822 if refresh or self.metadata_pool_name is None or self.data_pools is None:
823 if status is None:
824 status = self.status()
825 fsmap = status.get_fsmap(self.id)
826
827 osd_map = self.mon_manager.get_osd_dump_json()
828 id_to_name = {}
829 for p in osd_map['pools']:
830 id_to_name[p['pool']] = p['pool_name']
831
832 self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']]
833 self.data_pools = {}
834 for data_pool in fsmap['mdsmap']['data_pools']:
835 self.data_pools[data_pool] = id_to_name[data_pool]
836
837 def get_data_pool_name(self, refresh = False):
838 if refresh or self.data_pools is None:
839 self.get_pool_names(refresh = True)
840 assert(len(self.data_pools) == 1)
e306af50 841 return next(iter(self.data_pools.values()))
7c673cae
FG
842
843 def get_data_pool_id(self, refresh = False):
844 """
845 Don't call this if you have multiple data pools
846 :return: integer
847 """
848 if refresh or self.data_pools is None:
849 self.get_pool_names(refresh = True)
850 assert(len(self.data_pools) == 1)
e306af50 851 return next(iter(self.data_pools.keys()))
7c673cae
FG
852
853 def get_data_pool_names(self, refresh = False):
854 if refresh or self.data_pools is None:
855 self.get_pool_names(refresh = True)
e306af50 856 return list(self.data_pools.values())
7c673cae
FG
857
858 def get_metadata_pool_name(self):
859 return self.metadata_pool_name
860
181888fb
FG
861 def set_data_pool_name(self, name):
862 if self.id is not None:
863 raise RuntimeError("can't set filesystem name if its fscid is set")
864 self.data_pool_name = name
865
7c673cae
FG
866 def get_namespace_id(self):
867 return self.id
868
869 def get_pool_df(self, pool_name):
870 """
871 Return a dict like:
872 {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0}
873 """
874 for pool_df in self._df()['pools']:
875 if pool_df['name'] == pool_name:
876 return pool_df['stats']
877
878 raise RuntimeError("Pool name '{0}' not found".format(pool_name))
879
880 def get_usage(self):
881 return self._df()['stats']['total_used_bytes']
882
11fdf7f2 883 def are_daemons_healthy(self, status=None, skip_max_mds_check=False):
7c673cae
FG
884 """
885 Return true if all daemons are in one of active, standby, standby-replay, and
886 at least max_mds daemons are in 'active'.
887
888 Unlike most of Filesystem, this function is tolerant of new-style `fs`
889 commands being missing, because we are part of the ceph installation
890 process during upgrade suites, so must fall back to old style commands
891 when we get an EINVAL on a new style command.
892
893 :return:
894 """
11fdf7f2
TL
895 # First, check to see that processes haven't exited with an error code
896 for mds in self._ctx.daemons.iter_daemons_of_role('mds'):
897 mds.check_status()
7c673cae
FG
898
899 active_count = 0
f67539c2 900 mds_map = self.get_mds_map(status=status)
7c673cae 901
cd265ab1 902 log.debug("are_daemons_healthy: mds map: {0}".format(mds_map))
7c673cae
FG
903
904 for mds_id, mds_status in mds_map['info'].items():
905 if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]:
906 log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state']))
907 return False
908 elif mds_status['state'] == 'up:active':
909 active_count += 1
910
cd265ab1 911 log.debug("are_daemons_healthy: {0}/{1}".format(
7c673cae
FG
912 active_count, mds_map['max_mds']
913 ))
914
11fdf7f2
TL
915 if not skip_max_mds_check:
916 if active_count > mds_map['max_mds']:
cd265ab1 917 log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map))
11fdf7f2
TL
918 return False
919 elif active_count == mds_map['max_mds']:
920 # The MDSMap says these guys are active, but let's check they really are
921 for mds_id, mds_status in mds_map['info'].items():
922 if mds_status['state'] == 'up:active':
923 try:
f67539c2 924 daemon_status = self.mds_tell(["status"], mds_id=mds_status['name'])
11fdf7f2
TL
925 except CommandFailedError as cfe:
926 if cfe.exitstatus == errno.EINVAL:
927 # Old version, can't do this check
928 continue
929 else:
930 # MDS not even running
931 return False
932
933 if daemon_status['state'] != 'up:active':
934 # MDS hasn't taken the latest map yet
7c673cae
FG
935 return False
936
11fdf7f2
TL
937 return True
938 else:
939 return False
7c673cae 940 else:
cd265ab1 941 log.debug("are_daemons_healthy: skipping max_mds check")
11fdf7f2 942 return True
7c673cae 943
11fdf7f2 944 def get_daemon_names(self, state=None, status=None):
7c673cae
FG
945 """
946 Return MDS daemon names of those daemons in the given state
947 :param state:
948 :return:
949 """
11fdf7f2 950 mdsmap = self.get_mds_map(status)
7c673cae 951 result = []
9f95a23c
TL
952 for mds_status in sorted(mdsmap['info'].values(),
953 key=lambda _: _['rank']):
7c673cae
FG
954 if mds_status['state'] == state or state is None:
955 result.append(mds_status['name'])
956
957 return result
958
9f95a23c 959 def get_active_names(self, status=None):
7c673cae
FG
960 """
961 Return MDS daemon names of those daemons holding ranks
962 in state up:active
963
964 :return: list of strings like ['a', 'b'], sorted by rank
965 """
9f95a23c 966 return self.get_daemon_names("up:active", status=status)
7c673cae 967
11fdf7f2
TL
968 def get_all_mds_rank(self, status=None):
969 mdsmap = self.get_mds_map(status)
7c673cae 970 result = []
9f95a23c
TL
971 for mds_status in sorted(mdsmap['info'].values(),
972 key=lambda _: _['rank']):
7c673cae
FG
973 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
974 result.append(mds_status['rank'])
975
976 return result
977
f6b5b4d7 978 def get_rank(self, rank=None, status=None):
28e407b8
AA
979 if status is None:
980 status = self.getinfo()
f6b5b4d7
TL
981 if rank is None:
982 rank = 0
28e407b8
AA
983 return status.get_rank(self.id, rank)
984
11fdf7f2
TL
985 def rank_restart(self, rank=0, status=None):
986 name = self.get_rank(rank=rank, status=status)['name']
987 self.mds_restart(mds_id=name)
988
989 def rank_signal(self, signal, rank=0, status=None):
990 name = self.get_rank(rank=rank, status=status)['name']
991 self.mds_signal(name, signal)
992
993 def rank_freeze(self, yes, rank=0):
994 self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower())
995
996 def rank_fail(self, rank=0):
997 self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank))
998
28e407b8
AA
999 def get_ranks(self, status=None):
1000 if status is None:
1001 status = self.getinfo()
1002 return status.get_ranks(self.id)
1003
11fdf7f2
TL
1004 def get_replays(self, status=None):
1005 if status is None:
1006 status = self.getinfo()
1007 return status.get_replays(self.id)
1008
1009 def get_replay(self, rank=0, status=None):
1010 for replay in self.get_replays(status=status):
1011 if replay['rank'] == rank:
1012 return replay
1013 return None
1014
28e407b8 1015 def get_rank_names(self, status=None):
7c673cae
FG
1016 """
1017 Return MDS daemon names of those daemons holding a rank,
1018 sorted by rank. This includes e.g. up:replay/reconnect
1019 as well as active, but does not include standby or
1020 standby-replay.
1021 """
11fdf7f2 1022 mdsmap = self.get_mds_map(status)
7c673cae 1023 result = []
9f95a23c
TL
1024 for mds_status in sorted(mdsmap['info'].values(),
1025 key=lambda _: _['rank']):
7c673cae
FG
1026 if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay':
1027 result.append(mds_status['name'])
1028
1029 return result
1030
11fdf7f2 1031 def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None):
7c673cae
FG
1032 """
1033 Wait until all daemons are healthy
1034 :return:
1035 """
1036
1037 if timeout is None:
1038 timeout = DAEMON_WAIT_TIMEOUT
1039
11fdf7f2
TL
1040 if status is None:
1041 status = self.status()
1042
7c673cae
FG
1043 elapsed = 0
1044 while True:
11fdf7f2
TL
1045 if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check):
1046 return status
7c673cae
FG
1047 else:
1048 time.sleep(1)
1049 elapsed += 1
1050
1051 if elapsed > timeout:
cd265ab1 1052 log.debug("status = {0}".format(status))
7c673cae
FG
1053 raise RuntimeError("Timed out waiting for MDS daemons to become healthy")
1054
11fdf7f2
TL
1055 status = self.status()
1056
f67539c2
TL
1057 def dencoder(self, obj_type, obj_blob):
1058 args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json']
1059 p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO())
1060 return p.stdout.getvalue()
1061
1062 def rados(self, *args, **kwargs):
7c673cae 1063 """
f67539c2 1064 Callout to rados CLI.
7c673cae 1065 """
7c673cae 1066
f67539c2 1067 return self.mon_manager.do_rados(*args, **kwargs)
7c673cae 1068
f67539c2 1069 def radosm(self, *args, **kwargs):
7c673cae 1070 """
f67539c2 1071 Interact with the metadata pool via rados CLI.
7c673cae 1072 """
7c673cae 1073
f67539c2
TL
1074 return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name())
1075
1076 def radosmo(self, *args, stdout=BytesIO(), **kwargs):
7c673cae 1077 """
f67539c2 1078 Interact with the metadata pool via rados CLI. Get the stdout.
7c673cae 1079 """
7c673cae 1080
f67539c2 1081 return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue()
7c673cae
FG
1082
1083 def get_metadata_object(self, object_type, object_id):
1084 """
1085 Retrieve an object from the metadata pool, pass it through
1086 ceph-dencoder to dump it to JSON, and return the decoded object.
1087 """
7c673cae 1088
f67539c2
TL
1089 o = self.radosmo(['get', object_id, '-'])
1090 j = self.dencoder(object_type, o)
7c673cae 1091 try:
f67539c2 1092 return json.loads(j)
7c673cae 1093 except (TypeError, ValueError):
f67539c2 1094 log.error("Failed to decode JSON: '{0}'".format(j))
7c673cae
FG
1095 raise
1096
7c673cae
FG
1097 def get_journal_version(self):
1098 """
1099 Read the JournalPointer and Journal::Header objects to learn the version of
1100 encoding in use.
1101 """
1102 journal_pointer_object = '400.00000000'
1103 journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object)
1104 journal_ino = journal_pointer_dump['journal_pointer']['front']
1105
1106 journal_header_object = "{0:x}.00000000".format(journal_ino)
1107 journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object)
1108
1109 version = journal_header_dump['journal_header']['stream_format']
cd265ab1 1110 log.debug("Read journal version {0}".format(version))
7c673cae
FG
1111
1112 return version
1113
f64942e4 1114 def mds_asok(self, command, mds_id=None, timeout=None):
7c673cae 1115 if mds_id is None:
f67539c2 1116 return self.rank_asok(command, timeout=timeout)
7c673cae 1117
f64942e4 1118 return self.json_asok(command, 'mds', mds_id, timeout=timeout)
7c673cae 1119
f67539c2
TL
1120 def mds_tell(self, command, mds_id=None):
1121 if mds_id is None:
1122 return self.rank_tell(command)
1123
1124 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command))
1125
f64942e4
AA
1126 def rank_asok(self, command, rank=0, status=None, timeout=None):
1127 info = self.get_rank(rank=rank, status=status)
1128 return self.json_asok(command, 'mds', info['name'], timeout=timeout)
28e407b8 1129
11fdf7f2 1130 def rank_tell(self, command, rank=0, status=None):
f67539c2 1131 return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command))
11fdf7f2 1132
f6b5b4d7
TL
1133 def ranks_tell(self, command, status=None):
1134 if status is None:
1135 status = self.status()
1136 out = []
1137 for r in status.get_ranks(self.id):
1138 result = self.rank_tell(command, rank=r['rank'], status=status)
1139 out.append((r['rank'], result))
1140 return sorted(out)
1141
1142 def ranks_perf(self, f, status=None):
1143 perf = self.ranks_tell(["perf", "dump"], status=status)
1144 out = []
1145 for rank, perf in perf:
1146 out.append((rank, f(perf)))
1147 return out
1148
7c673cae
FG
1149 def read_cache(self, path, depth=None):
1150 cmd = ["dump", "tree", path]
1151 if depth is not None:
1152 cmd.append(depth.__str__())
1153 result = self.mds_asok(cmd)
1154 if len(result) == 0:
1155 raise RuntimeError("Path not found in cache: {0}".format(path))
1156
1157 return result
1158
1159 def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None):
1160 """
1161 Block until the MDS reaches a particular state, or a failure condition
1162 is met.
1163
1164 When there are multiple MDSs, succeed when exaclty one MDS is in the
1165 goal state, or fail when any MDS is in the reject state.
1166
1167 :param goal_state: Return once the MDS is in this state
1168 :param reject: Fail if the MDS enters this state before the goal state
1169 :param timeout: Fail if this many seconds pass before reaching goal
1170 :return: number of seconds waited, rounded down to integer
1171 """
1172
1173 started_at = time.time()
1174 while True:
1175 status = self.status()
1176 if rank is not None:
f64942e4
AA
1177 try:
1178 mds_info = status.get_rank(self.id, rank)
1179 current_state = mds_info['state'] if mds_info else None
cd265ab1 1180 log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state))
f64942e4
AA
1181 except:
1182 mdsmap = self.get_mds_map(status=status)
1183 if rank in mdsmap['failed']:
cd265ab1 1184 log.debug("Waiting for rank {0} to come back.".format(rank))
f64942e4
AA
1185 current_state = None
1186 else:
1187 raise
7c673cae
FG
1188 elif mds_id is not None:
1189 # mds_info is None if no daemon with this ID exists in the map
1190 mds_info = status.get_mds(mds_id)
1191 current_state = mds_info['state'] if mds_info else None
cd265ab1 1192 log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state))
7c673cae
FG
1193 else:
1194 # In general, look for a single MDS
1195 states = [m['state'] for m in status.get_ranks(self.id)]
1196 if [s for s in states if s == goal_state] == [goal_state]:
1197 current_state = goal_state
1198 elif reject in states:
1199 current_state = reject
1200 else:
1201 current_state = None
cd265ab1 1202 log.debug("mapped states {0} to {1}".format(states, current_state))
7c673cae
FG
1203
1204 elapsed = time.time() - started_at
1205 if current_state == goal_state:
cd265ab1 1206 log.debug("reached state '{0}' in {1}s".format(current_state, elapsed))
7c673cae
FG
1207 return elapsed
1208 elif reject is not None and current_state == reject:
1209 raise RuntimeError("MDS in reject state {0}".format(current_state))
1210 elif timeout is not None and elapsed > timeout:
1211 log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id)))
1212 raise RuntimeError(
1213 "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format(
1214 elapsed, goal_state, current_state
1215 ))
1216 else:
1217 time.sleep(1)
1218
f67539c2 1219 def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool):
7c673cae
FG
1220 if pool is None:
1221 pool = self.get_data_pool_name()
1222
1223 obj_name = "{0:x}.00000000".format(ino_no)
1224
f67539c2 1225 args = ["getxattr", obj_name, xattr_name]
7c673cae 1226 try:
f67539c2 1227 proc = self.rados(args, pool=pool, stdout=BytesIO())
7c673cae
FG
1228 except CommandFailedError as e:
1229 log.error(e.__str__())
1230 raise ObjectNotFound(obj_name)
1231
f67539c2
TL
1232 obj_blob = proc.stdout.getvalue()
1233 return json.loads(self.dencoder(obj_type, obj_blob).strip())
7c673cae
FG
1234
1235 def _write_data_xattr(self, ino_no, xattr_name, data, pool=None):
1236 """
1237 Write to an xattr of the 0th data object of an inode. Will
1238 succeed whether the object and/or xattr already exist or not.
1239
1240 :param ino_no: integer inode number
1241 :param xattr_name: string name of the xattr
1242 :param data: byte array data to write to the xattr
1243 :param pool: name of data pool or None to use primary data pool
1244 :return: None
1245 """
7c673cae
FG
1246 if pool is None:
1247 pool = self.get_data_pool_name()
1248
1249 obj_name = "{0:x}.00000000".format(ino_no)
f67539c2
TL
1250 args = ["setxattr", obj_name, xattr_name, data]
1251 self.rados(args, pool=pool)
7c673cae
FG
1252
1253 def read_backtrace(self, ino_no, pool=None):
1254 """
1255 Read the backtrace from the data pool, return a dict in the format
1256 given by inode_backtrace_t::dump, which is something like:
1257
1258 ::
1259
1260 rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin
1261 ceph-dencoder type inode_backtrace_t import out.bin decode dump_json
1262
1263 { "ino": 1099511627778,
1264 "ancestors": [
1265 { "dirino": 1,
1266 "dname": "blah",
1267 "version": 11}],
1268 "pool": 1,
1269 "old_pools": []}
1270
1271 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1272 one data pool and that will be used.
1273 """
1274 return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool)
1275
1276 def read_layout(self, ino_no, pool=None):
1277 """
1278 Read 'layout' xattr of an inode and parse the result, returning a dict like:
1279 ::
1280 {
1281 "stripe_unit": 4194304,
1282 "stripe_count": 1,
1283 "object_size": 4194304,
1284 "pool_id": 1,
1285 "pool_ns": "",
1286 }
1287
1288 :param pool: name of pool to read backtrace from. If omitted, FS must have only
1289 one data pool and that will be used.
1290 """
1291 return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool)
1292
1293 def _enumerate_data_objects(self, ino, size):
1294 """
1295 Get the list of expected data objects for a range, and the list of objects
1296 that really exist.
1297
1298 :return a tuple of two lists of strings (expected, actual)
1299 """
1300 stripe_size = 1024 * 1024 * 4
1301
1302 size = max(stripe_size, size)
1303
1304 want_objects = [
1305 "{0:x}.{1:08x}".format(ino, n)
e306af50 1306 for n in range(0, ((size - 1) // stripe_size) + 1)
7c673cae
FG
1307 ]
1308
f67539c2 1309 exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n")
7c673cae
FG
1310
1311 return want_objects, exist_objects
1312
1313 def data_objects_present(self, ino, size):
1314 """
1315 Check that *all* the expected data objects for an inode are present in the data pool
1316 """
1317
1318 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1319 missing = set(want_objects) - set(exist_objects)
1320
1321 if missing:
cd265ab1 1322 log.debug("Objects missing (ino {0}, size {1}): {2}".format(
7c673cae
FG
1323 ino, size, missing
1324 ))
1325 return False
1326 else:
cd265ab1 1327 log.debug("All objects for ino {0} size {1} found".format(ino, size))
7c673cae
FG
1328 return True
1329
1330 def data_objects_absent(self, ino, size):
1331 want_objects, exist_objects = self._enumerate_data_objects(ino, size)
1332 present = set(want_objects) & set(exist_objects)
1333
1334 if present:
cd265ab1 1335 log.debug("Objects not absent (ino {0}, size {1}): {2}".format(
7c673cae
FG
1336 ino, size, present
1337 ))
1338 return False
1339 else:
cd265ab1 1340 log.debug("All objects for ino {0} size {1} are absent".format(ino, size))
7c673cae
FG
1341 return True
1342
1343 def dirfrag_exists(self, ino, frag):
1344 try:
f67539c2 1345 self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)])
9f95a23c 1346 except CommandFailedError:
7c673cae
FG
1347 return False
1348 else:
1349 return True
1350
7c673cae
FG
1351 def list_dirfrag(self, dir_ino):
1352 """
1353 Read the named object and return the list of omap keys
1354
1355 :return a list of 0 or more strings
1356 """
1357
1358 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1359
1360 try:
f67539c2 1361 key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO())
7c673cae
FG
1362 except CommandFailedError as e:
1363 log.error(e.__str__())
1364 raise ObjectNotFound(dirfrag_obj_name)
1365
f67539c2
TL
1366 return key_list_str.strip().split("\n") if key_list_str else []
1367
1368 def get_meta_of_fs_file(self, dir_ino, obj_name, out):
1369 """
1370 get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection.
1371 warning : The splitting of directory is not considered here.
1372 """
1373
1374 dirfrag_obj_name = "{0:x}.00000000".format(dir_ino)
1375 try:
1376 self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out])
1377 except CommandFailedError as e:
1378 log.error(e.__str__())
1379 raise ObjectNotFound(dir_ino)
7c673cae
FG
1380
1381 def erase_metadata_objects(self, prefix):
1382 """
1383 For all objects in the metadata pool matching the prefix,
1384 erase them.
1385
1386 This O(N) with the number of objects in the pool, so only suitable
1387 for use on toy test filesystems.
1388 """
f67539c2 1389 all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n")
7c673cae
FG
1390 matching_objects = [o for o in all_objects if o.startswith(prefix)]
1391 for o in matching_objects:
f67539c2 1392 self.radosm(["rm", o])
7c673cae
FG
1393
1394 def erase_mds_objects(self, rank):
1395 """
1396 Erase all the per-MDS objects for a particular rank. This includes
1397 inotable, sessiontable, journal
1398 """
1399
1400 def obj_prefix(multiplier):
1401 """
1402 MDS object naming conventions like rank 1's
1403 journal is at 201.***
1404 """
1405 return "%x." % (multiplier * 0x100 + rank)
1406
1407 # MDS_INO_LOG_OFFSET
1408 self.erase_metadata_objects(obj_prefix(2))
1409 # MDS_INO_LOG_BACKUP_OFFSET
1410 self.erase_metadata_objects(obj_prefix(3))
1411 # MDS_INO_LOG_POINTER_OFFSET
1412 self.erase_metadata_objects(obj_prefix(4))
1413 # MDSTables & SessionMap
1414 self.erase_metadata_objects("mds{rank:d}_".format(rank=rank))
1415
1416 @property
1417 def _prefix(self):
1418 """
1419 Override this to set a different
1420 """
1421 return ""
1422
f64942e4
AA
1423 def _make_rank(self, rank):
1424 return "{}:{}".format(self.name, rank)
1425
7c673cae
FG
1426 def _run_tool(self, tool, args, rank=None, quiet=False):
1427 # Tests frequently have [client] configuration that jacks up
1428 # the objecter log level (unlikely to be interesting here)
1429 # and does not set the mds log level (very interesting here)
1430 if quiet:
1431 base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1']
1432 else:
1433 base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1']
1434
1435 if rank is not None:
f64942e4 1436 base_args.extend(["--rank", "%s" % str(rank)])
7c673cae
FG
1437
1438 t1 = datetime.datetime.now()
e306af50 1439 r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip()
7c673cae 1440 duration = datetime.datetime.now() - t1
cd265ab1 1441 log.debug("Ran {0} in time {1}, result:\n{2}".format(
7c673cae
FG
1442 base_args + args, duration, r
1443 ))
1444 return r
1445
1446 @property
1447 def tool_remote(self):
1448 """
1449 An arbitrary remote to use when invoking recovery tools. Use an MDS host because
1450 it'll definitely have keys with perms to access cephfs metadata pool. This is public
1451 so that tests can use this remote to go get locally written output files from the tools.
1452 """
f67539c2 1453 return self.mon_manager.controller
7c673cae 1454
f64942e4 1455 def journal_tool(self, args, rank, quiet=False):
7c673cae 1456 """
f64942e4 1457 Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout
7c673cae 1458 """
f64942e4
AA
1459 fs_rank = self._make_rank(rank)
1460 return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet)
7c673cae 1461
f67539c2
TL
1462 def meta_tool(self, args, rank, quiet=False):
1463 """
1464 Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout
1465 """
1466 fs_rank = self._make_rank(rank)
1467 return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet)
1468
7c673cae
FG
1469 def table_tool(self, args, quiet=False):
1470 """
1471 Invoke cephfs-table-tool with the passed arguments, and return its stdout
1472 """
1473 return self._run_tool("cephfs-table-tool", args, None, quiet)
1474
1475 def data_scan(self, args, quiet=False, worker_count=1):
1476 """
1477 Invoke cephfs-data-scan with the passed arguments, and return its stdout
1478
1479 :param worker_count: if greater than 1, multiple workers will be run
1480 in parallel and the return value will be None
1481 """
1482
1483 workers = []
1484
1485 for n in range(0, worker_count):
1486 if worker_count > 1:
1487 # data-scan args first token is a command, followed by args to it.
1488 # insert worker arguments after the command.
1489 cmd = args[0]
1490 worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:]
1491 else:
1492 worker_args = args
1493
1494 workers.append(Greenlet.spawn(lambda wargs=worker_args:
1495 self._run_tool("cephfs-data-scan", wargs, None, quiet)))
1496
1497 for w in workers:
1498 w.get()
1499
1500 if worker_count == 1:
1501 return workers[0].value
1502 else:
1503 return None
b32b8144
FG
1504
1505 def is_full(self):
1506 return self.is_pool_full(self.get_data_pool_name())
f67539c2
TL
1507
1508 def authorize(self, client_id, caps=('/', 'rw')):
1509 """
1510 Run "ceph fs authorize" and run "ceph auth get" to get and returnt the
1511 keyring.
1512
1513 client_id: client id that will be authorized
1514 caps: tuple containing the path and permission (can be r or rw)
1515 respectively.
1516 """
1517 client_name = 'client.' + client_id
1518 return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name,
1519 client_name, *caps)
1520
1521 def grow(self, new_max_mds, status=None):
1522 oldmax = self.get_var('max_mds', status=status)
1523 assert(new_max_mds > oldmax)
1524 self.set_max_mds(new_max_mds)
1525 return self.wait_for_daemons()
1526
1527 def shrink(self, new_max_mds, status=None):
1528 oldmax = self.get_var('max_mds', status=status)
1529 assert(new_max_mds < oldmax)
1530 self.set_max_mds(new_max_mds)
1531 return self.wait_for_daemons()
1532
1533 def run_scrub(self, cmd, rank=0):
1534 return self.rank_tell(["scrub"] + cmd, rank)
1535
1536 def get_scrub_status(self, rank=0):
1537 return self.run_scrub(["status"], rank)
1538
1539 def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30,
1540 timeout=300, reverse=False):
1541 # time out after "timeout" seconds and assume as done
1542 if result is None:
1543 result = "no active scrubs running"
1544 with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed:
1545 while proceed():
1546 out_json = self.rank_tell(["scrub", "status"], rank=rank)
1547 assert out_json is not None
1548 if not reverse:
1549 if result in out_json['status']:
1550 log.info("all active scrubs completed")
1551 return True
1552 else:
1553 if result not in out_json['status']:
1554 log.info("all active scrubs completed")
1555 return True
1556
1557 if tag is not None:
1558 status = out_json['scrubs'][tag]
1559 if status is not None:
1560 log.info(f"scrub status for tag:{tag} - {status}")
1561 else:
1562 log.info(f"scrub has completed for tag:{tag}")
1563 return True
1564
1565 # timed out waiting for scrub to complete
1566 return False