]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | |
7c673cae FG |
2 | import json |
3 | import logging | |
4 | from gevent import Greenlet | |
5 | import os | |
6 | import time | |
7 | import datetime | |
8 | import re | |
9 | import errno | |
181888fb | 10 | import random |
7c673cae | 11 | |
f67539c2 TL |
12 | from io import BytesIO, StringIO |
13 | from errno import EBUSY | |
e306af50 | 14 | |
7c673cae FG |
15 | from teuthology.exceptions import CommandFailedError |
16 | from teuthology import misc | |
17 | from teuthology.nuke import clear_firewall | |
18 | from teuthology.parallel import parallel | |
f67539c2 | 19 | from teuthology import contextutil |
7c673cae FG |
20 | from tasks.ceph_manager import write_conf |
21 | from tasks import ceph_manager | |
22 | ||
23 | ||
24 | log = logging.getLogger(__name__) | |
25 | ||
26 | ||
27 | DAEMON_WAIT_TIMEOUT = 120 | |
28 | ROOT_INO = 1 | |
29 | ||
92f5a8d4 TL |
30 | class FileLayout(object): |
31 | def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None): | |
32 | self.pool = pool | |
33 | self.pool_namespace = pool_namespace | |
34 | self.stripe_unit = stripe_unit | |
35 | self.stripe_count = stripe_count | |
36 | self.object_size = object_size | |
37 | ||
38 | @classmethod | |
39 | def load_from_ceph(layout_str): | |
40 | # TODO | |
41 | pass | |
42 | ||
43 | def items(self): | |
44 | if self.pool is not None: | |
45 | yield ("pool", self.pool) | |
46 | if self.pool_namespace: | |
47 | yield ("pool_namespace", self.pool_namespace) | |
48 | if self.stripe_unit is not None: | |
49 | yield ("stripe_unit", self.stripe_unit) | |
50 | if self.stripe_count is not None: | |
51 | yield ("stripe_count", self.stripe_count) | |
52 | if self.object_size is not None: | |
53 | yield ("object_size", self.stripe_size) | |
7c673cae FG |
54 | |
55 | class ObjectNotFound(Exception): | |
56 | def __init__(self, object_name): | |
57 | self._object_name = object_name | |
58 | ||
59 | def __str__(self): | |
60 | return "Object not found: '{0}'".format(self._object_name) | |
61 | ||
f67539c2 TL |
62 | class FSMissing(Exception): |
63 | def __init__(self, ident): | |
64 | self.ident = ident | |
65 | ||
66 | def __str__(self): | |
67 | return f"File system {self.ident} does not exist in the map" | |
68 | ||
7c673cae FG |
69 | class FSStatus(object): |
70 | """ | |
71 | Operations on a snapshot of the FSMap. | |
72 | """ | |
522d829b | 73 | def __init__(self, mon_manager, epoch=None): |
7c673cae | 74 | self.mon = mon_manager |
522d829b TL |
75 | cmd = ["fs", "dump", "--format=json"] |
76 | if epoch is not None: | |
77 | cmd.append(str(epoch)) | |
78 | self.map = json.loads(self.mon.raw_cluster_cmd(*cmd)) | |
7c673cae FG |
79 | |
80 | def __str__(self): | |
81 | return json.dumps(self.map, indent = 2, sort_keys = True) | |
82 | ||
83 | # Expose the fsmap for manual inspection. | |
84 | def __getitem__(self, key): | |
85 | """ | |
86 | Get a field from the fsmap. | |
87 | """ | |
88 | return self.map[key] | |
89 | ||
90 | def get_filesystems(self): | |
91 | """ | |
92 | Iterator for all filesystems. | |
93 | """ | |
94 | for fs in self.map['filesystems']: | |
95 | yield fs | |
96 | ||
97 | def get_all(self): | |
98 | """ | |
99 | Iterator for all the mds_info components in the FSMap. | |
100 | """ | |
9f95a23c | 101 | for info in self.map['standbys']: |
7c673cae FG |
102 | yield info |
103 | for fs in self.map['filesystems']: | |
104 | for info in fs['mdsmap']['info'].values(): | |
105 | yield info | |
106 | ||
107 | def get_standbys(self): | |
108 | """ | |
109 | Iterator for all standbys. | |
110 | """ | |
111 | for info in self.map['standbys']: | |
112 | yield info | |
113 | ||
114 | def get_fsmap(self, fscid): | |
115 | """ | |
116 | Get the fsmap for the given FSCID. | |
117 | """ | |
118 | for fs in self.map['filesystems']: | |
119 | if fscid is None or fs['id'] == fscid: | |
120 | return fs | |
f67539c2 | 121 | raise FSMissing(fscid) |
7c673cae FG |
122 | |
123 | def get_fsmap_byname(self, name): | |
124 | """ | |
125 | Get the fsmap for the given file system name. | |
126 | """ | |
127 | for fs in self.map['filesystems']: | |
128 | if name is None or fs['mdsmap']['fs_name'] == name: | |
129 | return fs | |
f67539c2 | 130 | raise FSMissing(name) |
7c673cae FG |
131 | |
132 | def get_replays(self, fscid): | |
133 | """ | |
134 | Get the standby:replay MDS for the given FSCID. | |
135 | """ | |
136 | fs = self.get_fsmap(fscid) | |
137 | for info in fs['mdsmap']['info'].values(): | |
138 | if info['state'] == 'up:standby-replay': | |
139 | yield info | |
140 | ||
141 | def get_ranks(self, fscid): | |
142 | """ | |
143 | Get the ranks for the given FSCID. | |
144 | """ | |
145 | fs = self.get_fsmap(fscid) | |
146 | for info in fs['mdsmap']['info'].values(): | |
11fdf7f2 | 147 | if info['rank'] >= 0 and info['state'] != 'up:standby-replay': |
7c673cae FG |
148 | yield info |
149 | ||
a4b75251 TL |
150 | def get_damaged(self, fscid): |
151 | """ | |
152 | Get the damaged ranks for the given FSCID. | |
153 | """ | |
154 | fs = self.get_fsmap(fscid) | |
155 | return fs['mdsmap']['damaged'] | |
156 | ||
7c673cae FG |
157 | def get_rank(self, fscid, rank): |
158 | """ | |
159 | Get the rank for the given FSCID. | |
160 | """ | |
161 | for info in self.get_ranks(fscid): | |
162 | if info['rank'] == rank: | |
163 | return info | |
164 | raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank)) | |
165 | ||
166 | def get_mds(self, name): | |
167 | """ | |
168 | Get the info for the given MDS name. | |
169 | """ | |
170 | for info in self.get_all(): | |
171 | if info['name'] == name: | |
172 | return info | |
173 | return None | |
174 | ||
175 | def get_mds_addr(self, name): | |
176 | """ | |
177 | Return the instance addr as a string, like "10.214.133.138:6807\/10825" | |
178 | """ | |
179 | info = self.get_mds(name) | |
180 | if info: | |
181 | return info['addr'] | |
182 | else: | |
e306af50 | 183 | log.warning(json.dumps(list(self.get_all()), indent=2)) # dump for debugging |
7c673cae FG |
184 | raise RuntimeError("MDS id '{0}' not found in map".format(name)) |
185 | ||
f67539c2 TL |
186 | def get_mds_addrs(self, name): |
187 | """ | |
188 | Return the instance addr as a string, like "[10.214.133.138:6807 10.214.133.138:6808]" | |
189 | """ | |
190 | info = self.get_mds(name) | |
191 | if info: | |
192 | return [e['addr'] for e in info['addrs']['addrvec']] | |
193 | else: | |
194 | log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging | |
195 | raise RuntimeError("MDS id '{0}' not found in map".format(name)) | |
196 | ||
9f95a23c TL |
197 | def get_mds_gid(self, gid): |
198 | """ | |
199 | Get the info for the given MDS gid. | |
200 | """ | |
201 | for info in self.get_all(): | |
202 | if info['gid'] == gid: | |
203 | return info | |
204 | return None | |
205 | ||
206 | def hadfailover(self, status): | |
207 | """ | |
208 | Compares two statuses for mds failovers. | |
209 | Returns True if there is a failover. | |
210 | """ | |
211 | for fs in status.map['filesystems']: | |
212 | for info in fs['mdsmap']['info'].values(): | |
213 | oldinfo = self.get_mds_gid(info['gid']) | |
214 | if oldinfo is None or oldinfo['incarnation'] != info['incarnation']: | |
215 | return True | |
216 | #all matching | |
217 | return False | |
218 | ||
7c673cae FG |
219 | class CephCluster(object): |
220 | @property | |
221 | def admin_remote(self): | |
222 | first_mon = misc.get_first_mon(self._ctx, None) | |
9f95a23c | 223 | (result,) = self._ctx.cluster.only(first_mon).remotes.keys() |
7c673cae FG |
224 | return result |
225 | ||
f67539c2 | 226 | def __init__(self, ctx) -> None: |
7c673cae FG |
227 | self._ctx = ctx |
228 | self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager')) | |
229 | ||
230 | def get_config(self, key, service_type=None): | |
231 | """ | |
232 | Get config from mon by default, or a specific service if caller asks for it | |
233 | """ | |
234 | if service_type is None: | |
235 | service_type = 'mon' | |
236 | ||
237 | service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0] | |
238 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] | |
239 | ||
240 | def set_ceph_conf(self, subsys, key, value): | |
241 | if subsys not in self._ctx.ceph['ceph'].conf: | |
242 | self._ctx.ceph['ceph'].conf[subsys] = {} | |
243 | self._ctx.ceph['ceph'].conf[subsys][key] = value | |
244 | write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they | |
245 | # used a different config path this won't work. | |
246 | ||
247 | def clear_ceph_conf(self, subsys, key): | |
248 | del self._ctx.ceph['ceph'].conf[subsys][key] | |
249 | write_conf(self._ctx) | |
250 | ||
f64942e4 AA |
251 | def json_asok(self, command, service_type, service_id, timeout=None): |
252 | if timeout is None: | |
1e59de90 | 253 | timeout = 300 |
f6b5b4d7 | 254 | command.insert(0, '--format=json') |
f64942e4 | 255 | proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout) |
f6b5b4d7 TL |
256 | response_data = proc.stdout.getvalue().strip() |
257 | if len(response_data) > 0: | |
1e59de90 TL |
258 | |
259 | def get_nonnumeric_values(value): | |
260 | c = {"NaN": float("nan"), "Infinity": float("inf"), | |
261 | "-Infinity": -float("inf")} | |
262 | return c[value] | |
263 | ||
264 | j = json.loads(response_data.replace('inf', 'Infinity'), | |
265 | parse_constant=get_nonnumeric_values) | |
f6b5b4d7 TL |
266 | pretty = json.dumps(j, sort_keys=True, indent=2) |
267 | log.debug(f"_json_asok output\n{pretty}") | |
268 | return j | |
7c673cae | 269 | else: |
f6b5b4d7 | 270 | log.debug("_json_asok output empty") |
7c673cae FG |
271 | return None |
272 | ||
33c7a0ef TL |
273 | def is_addr_blocklisted(self, addr): |
274 | blocklist = json.loads(self.mon_manager.raw_cluster_cmd( | |
275 | "osd", "dump", "--format=json"))['blocklist'] | |
276 | if addr in blocklist: | |
277 | return True | |
278 | log.warn(f'The address {addr} is not blocklisted') | |
f67539c2 TL |
279 | return False |
280 | ||
7c673cae FG |
281 | |
282 | class MDSCluster(CephCluster): | |
283 | """ | |
284 | Collective operations on all the MDS daemons in the Ceph cluster. These | |
285 | daemons may be in use by various Filesystems. | |
286 | ||
287 | For the benefit of pre-multi-filesystem tests, this class is also | |
288 | a parent of Filesystem. The correct way to use MDSCluster going forward is | |
289 | as a separate instance outside of your (multiple) Filesystem instances. | |
290 | """ | |
f67539c2 | 291 | |
7c673cae FG |
292 | def __init__(self, ctx): |
293 | super(MDSCluster, self).__init__(ctx) | |
294 | ||
f67539c2 TL |
295 | @property |
296 | def mds_ids(self): | |
297 | # do this dynamically because the list of ids may change periodically with cephadm | |
298 | return list(misc.all_roles_of_type(self._ctx.cluster, 'mds')) | |
7c673cae | 299 | |
f67539c2 TL |
300 | @property |
301 | def mds_daemons(self): | |
302 | return dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids]) | |
7c673cae FG |
303 | |
304 | def _one_or_all(self, mds_id, cb, in_parallel=True): | |
305 | """ | |
306 | Call a callback for a single named MDS, or for all. | |
307 | ||
308 | Note that the parallelism here isn't for performance, it's to avoid being overly kind | |
309 | to the cluster by waiting a graceful ssh-latency of time between doing things, and to | |
310 | avoid being overly kind by executing them in a particular order. However, some actions | |
311 | don't cope with being done in parallel, so it's optional (`in_parallel`) | |
312 | ||
313 | :param mds_id: MDS daemon name, or None | |
314 | :param cb: Callback taking single argument of MDS daemon name | |
315 | :param in_parallel: whether to invoke callbacks concurrently (else one after the other) | |
316 | """ | |
f67539c2 | 317 | |
7c673cae FG |
318 | if mds_id is None: |
319 | if in_parallel: | |
320 | with parallel() as p: | |
321 | for mds_id in self.mds_ids: | |
322 | p.spawn(cb, mds_id) | |
323 | else: | |
324 | for mds_id in self.mds_ids: | |
325 | cb(mds_id) | |
326 | else: | |
327 | cb(mds_id) | |
328 | ||
181888fb FG |
329 | def get_config(self, key, service_type=None): |
330 | """ | |
331 | get_config specialization of service_type="mds" | |
332 | """ | |
333 | if service_type != "mds": | |
334 | return super(MDSCluster, self).get_config(key, service_type) | |
335 | ||
336 | # Some tests stop MDS daemons, don't send commands to a dead one: | |
e306af50 TL |
337 | running_daemons = [i for i, mds in self.mds_daemons.items() if mds.running()] |
338 | service_id = random.sample(running_daemons, 1)[0] | |
181888fb FG |
339 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] |
340 | ||
7c673cae FG |
341 | def mds_stop(self, mds_id=None): |
342 | """ | |
343 | Stop the MDS daemon process(se). If it held a rank, that rank | |
344 | will eventually go laggy. | |
345 | """ | |
346 | self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop()) | |
347 | ||
348 | def mds_fail(self, mds_id=None): | |
349 | """ | |
350 | Inform MDSMonitor of the death of the daemon process(es). If it held | |
351 | a rank, that rank will be relinquished. | |
352 | """ | |
353 | self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_)) | |
354 | ||
355 | def mds_restart(self, mds_id=None): | |
356 | self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart()) | |
357 | ||
358 | def mds_fail_restart(self, mds_id=None): | |
359 | """ | |
360 | Variation on restart that includes marking MDSs as failed, so that doing this | |
361 | operation followed by waiting for healthy daemon states guarantees that they | |
362 | have gone down and come up, rather than potentially seeing the healthy states | |
363 | that existed before the restart. | |
364 | """ | |
365 | def _fail_restart(id_): | |
366 | self.mds_daemons[id_].stop() | |
367 | self.mon_manager.raw_cluster_cmd("mds", "fail", id_) | |
368 | self.mds_daemons[id_].restart() | |
369 | ||
370 | self._one_or_all(mds_id, _fail_restart) | |
371 | ||
1adf2230 AA |
372 | def mds_signal(self, mds_id, sig, silent=False): |
373 | """ | |
374 | signal a MDS daemon | |
375 | """ | |
376 | self.mds_daemons[mds_id].signal(sig, silent); | |
377 | ||
1e59de90 TL |
378 | def mds_is_running(self, mds_id): |
379 | return self.mds_daemons[mds_id].running() | |
380 | ||
181888fb FG |
381 | def newfs(self, name='cephfs', create=True): |
382 | return Filesystem(self._ctx, name=name, create=create) | |
7c673cae | 383 | |
522d829b TL |
384 | def status(self, epoch=None): |
385 | return FSStatus(self.mon_manager, epoch) | |
7c673cae | 386 | |
7c673cae FG |
387 | def get_standby_daemons(self): |
388 | return set([s['name'] for s in self.status().get_standbys()]) | |
389 | ||
390 | def get_mds_hostnames(self): | |
391 | result = set() | |
392 | for mds_id in self.mds_ids: | |
393 | mds_remote = self.mon_manager.find_remote('mds', mds_id) | |
394 | result.add(mds_remote.hostname) | |
395 | ||
396 | return list(result) | |
397 | ||
398 | def set_clients_block(self, blocked, mds_id=None): | |
399 | """ | |
400 | Block (using iptables) client communications to this MDS. Be careful: if | |
401 | other services are running on this MDS, or other MDSs try to talk to this | |
402 | MDS, their communications may also be blocked as collatoral damage. | |
403 | ||
404 | :param mds_id: Optional ID of MDS to block, default to all | |
405 | :return: | |
406 | """ | |
407 | da_flag = "-A" if blocked else "-D" | |
408 | ||
409 | def set_block(_mds_id): | |
410 | remote = self.mon_manager.find_remote('mds', _mds_id) | |
411 | status = self.status() | |
412 | ||
413 | addr = status.get_mds_addr(_mds_id) | |
414 | ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups() | |
415 | ||
416 | remote.run( | |
417 | args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m", | |
418 | "comment", "--comment", "teuthology"]) | |
419 | remote.run( | |
420 | args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", | |
421 | "comment", "--comment", "teuthology"]) | |
422 | ||
423 | self._one_or_all(mds_id, set_block, in_parallel=False) | |
424 | ||
f67539c2 TL |
425 | def set_inter_mds_block(self, blocked, mds_rank_1, mds_rank_2): |
426 | """ | |
427 | Block (using iptables) communications from a provided MDS to other MDSs. | |
428 | Block all ports that an MDS uses for communication. | |
429 | ||
430 | :param blocked: True to block the MDS, False otherwise | |
431 | :param mds_rank_1: MDS rank | |
432 | :param mds_rank_2: MDS rank | |
433 | :return: | |
434 | """ | |
435 | da_flag = "-A" if blocked else "-D" | |
436 | ||
437 | def set_block(mds_ids): | |
438 | status = self.status() | |
439 | ||
440 | mds = mds_ids[0] | |
441 | remote = self.mon_manager.find_remote('mds', mds) | |
442 | addrs = status.get_mds_addrs(mds) | |
443 | for addr in addrs: | |
444 | ip_str, port_str = re.match("(.+):(.+)", addr).groups() | |
445 | remote.run( | |
446 | args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", | |
1e59de90 | 447 | "comment", "--comment", "teuthology"], omit_sudo=False) |
f67539c2 TL |
448 | |
449 | ||
450 | mds = mds_ids[1] | |
451 | remote = self.mon_manager.find_remote('mds', mds) | |
452 | addrs = status.get_mds_addrs(mds) | |
453 | for addr in addrs: | |
454 | ip_str, port_str = re.match("(.+):(.+)", addr).groups() | |
455 | remote.run( | |
456 | args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m", | |
1e59de90 | 457 | "comment", "--comment", "teuthology"], omit_sudo=False) |
f67539c2 TL |
458 | remote.run( |
459 | args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", | |
1e59de90 | 460 | "comment", "--comment", "teuthology"], omit_sudo=False) |
f67539c2 TL |
461 | |
462 | self._one_or_all((mds_rank_1, mds_rank_2), set_block, in_parallel=False) | |
463 | ||
7c673cae FG |
464 | def clear_firewall(self): |
465 | clear_firewall(self._ctx) | |
466 | ||
467 | def get_mds_info(self, mds_id): | |
468 | return FSStatus(self.mon_manager).get_mds(mds_id) | |
469 | ||
7c673cae FG |
470 | def is_pool_full(self, pool_name): |
471 | pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] | |
472 | for pool in pools: | |
473 | if pool['pool_name'] == pool_name: | |
474 | return 'full' in pool['flags_names'].split(",") | |
475 | ||
476 | raise RuntimeError("Pool not found '{0}'".format(pool_name)) | |
477 | ||
f67539c2 TL |
478 | def delete_all_filesystems(self): |
479 | """ | |
480 | Remove all filesystems that exist, and any pools in use by them. | |
481 | """ | |
482 | for fs in self.status().get_filesystems(): | |
483 | Filesystem(ctx=self._ctx, fscid=fs['id']).destroy() | |
484 | ||
20effc67 TL |
485 | @property |
486 | def beacon_timeout(self): | |
487 | """ | |
488 | Generate an acceptable timeout for the mons to drive some MDSMap change | |
489 | because of missed beacons from some MDS. This involves looking up the | |
490 | grace period in use by the mons and adding an acceptable buffer. | |
491 | """ | |
492 | ||
493 | grace = float(self.get_config("mds_beacon_grace", service_type="mon")) | |
494 | return grace*2+15 | |
495 | ||
f67539c2 | 496 | |
7c673cae | 497 | class Filesystem(MDSCluster): |
1e59de90 TL |
498 | |
499 | """ | |
500 | Generator for all Filesystems in the cluster. | |
501 | """ | |
502 | @classmethod | |
503 | def get_all_fs(cls, ctx): | |
504 | mdsc = MDSCluster(ctx) | |
505 | status = mdsc.status() | |
506 | for fs in status.get_filesystems(): | |
507 | yield cls(ctx, fscid=fs['id']) | |
508 | ||
7c673cae FG |
509 | """ |
510 | This object is for driving a CephFS filesystem. The MDS daemons driven by | |
511 | MDSCluster may be shared with other Filesystems. | |
512 | """ | |
f67539c2 | 513 | def __init__(self, ctx, fs_config={}, fscid=None, name=None, create=False): |
7c673cae FG |
514 | super(Filesystem, self).__init__(ctx) |
515 | ||
181888fb | 516 | self.name = name |
7c673cae | 517 | self.id = None |
7c673cae | 518 | self.metadata_pool_name = None |
181888fb | 519 | self.data_pool_name = None |
7c673cae | 520 | self.data_pools = None |
f91f0fd5 | 521 | self.fs_config = fs_config |
f67539c2 | 522 | self.ec_profile = fs_config.get('ec_profile') |
7c673cae FG |
523 | |
524 | client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client')) | |
525 | self.client_id = client_list[0] | |
526 | self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1] | |
527 | ||
181888fb | 528 | if name is not None: |
7c673cae FG |
529 | if fscid is not None: |
530 | raise RuntimeError("cannot specify fscid when creating fs") | |
181888fb | 531 | if create and not self.legacy_configured(): |
7c673cae | 532 | self.create() |
181888fb FG |
533 | else: |
534 | if fscid is not None: | |
535 | self.id = fscid | |
536 | self.getinfo(refresh = True) | |
7c673cae FG |
537 | |
538 | # Stash a reference to the first created filesystem on ctx, so | |
539 | # that if someone drops to the interactive shell they can easily | |
540 | # poke our methods. | |
541 | if not hasattr(self._ctx, "filesystem"): | |
542 | self._ctx.filesystem = self | |
543 | ||
f67539c2 TL |
544 | def dead(self): |
545 | try: | |
546 | return not bool(self.get_mds_map()) | |
547 | except FSMissing: | |
548 | return True | |
549 | ||
9f95a23c TL |
550 | def get_task_status(self, status_key): |
551 | return self.mon_manager.get_service_task_status("mds", status_key) | |
552 | ||
7c673cae FG |
553 | def getinfo(self, refresh = False): |
554 | status = self.status() | |
555 | if self.id is not None: | |
556 | fsmap = status.get_fsmap(self.id) | |
557 | elif self.name is not None: | |
558 | fsmap = status.get_fsmap_byname(self.name) | |
559 | else: | |
560 | fss = [fs for fs in status.get_filesystems()] | |
561 | if len(fss) == 1: | |
562 | fsmap = fss[0] | |
563 | elif len(fss) == 0: | |
564 | raise RuntimeError("no file system available") | |
565 | else: | |
566 | raise RuntimeError("more than one file system available") | |
567 | self.id = fsmap['id'] | |
568 | self.name = fsmap['mdsmap']['fs_name'] | |
569 | self.get_pool_names(status = status, refresh = refresh) | |
570 | return status | |
571 | ||
11fdf7f2 | 572 | def reach_max_mds(self): |
20effc67 | 573 | status = self.wait_for_daemons() |
11fdf7f2 | 574 | mds_map = self.get_mds_map(status=status) |
20effc67 | 575 | assert(mds_map['in'] == list(range(0, mds_map['max_mds']))) |
11fdf7f2 | 576 | |
f67539c2 TL |
577 | def reset(self): |
578 | self.mon_manager.raw_cluster_cmd("fs", "reset", str(self.name), '--yes-i-really-mean-it') | |
579 | ||
11fdf7f2 TL |
580 | def fail(self): |
581 | self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name)) | |
582 | ||
9f95a23c TL |
583 | def set_flag(self, var, *args): |
584 | a = map(lambda x: str(x).lower(), args) | |
585 | self.mon_manager.raw_cluster_cmd("fs", "flag", "set", var, *a) | |
586 | ||
587 | def set_allow_multifs(self, yes=True): | |
588 | self.set_flag("enable_multiple", yes) | |
589 | ||
91327a77 | 590 | def set_var(self, var, *args): |
9f95a23c | 591 | a = map(lambda x: str(x).lower(), args) |
91327a77 AA |
592 | self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a) |
593 | ||
11fdf7f2 TL |
594 | def set_down(self, down=True): |
595 | self.set_var("down", str(down).lower()) | |
596 | ||
597 | def set_joinable(self, joinable=True): | |
9f95a23c | 598 | self.set_var("joinable", joinable) |
11fdf7f2 | 599 | |
7c673cae | 600 | def set_max_mds(self, max_mds): |
f64942e4 | 601 | self.set_var("max_mds", "%d" % max_mds) |
7c673cae | 602 | |
f91f0fd5 TL |
603 | def set_session_timeout(self, timeout): |
604 | self.set_var("session_timeout", "%d" % timeout) | |
605 | ||
11fdf7f2 | 606 | def set_allow_standby_replay(self, yes): |
9f95a23c | 607 | self.set_var("allow_standby_replay", yes) |
11fdf7f2 TL |
608 | |
609 | def set_allow_new_snaps(self, yes): | |
9f95a23c | 610 | self.set_var("allow_new_snaps", yes, '--yes-i-really-mean-it') |
7c673cae | 611 | |
1e59de90 TL |
612 | def set_bal_rank_mask(self, bal_rank_mask): |
613 | self.set_var("bal_rank_mask", bal_rank_mask) | |
614 | ||
615 | def set_refuse_client_session(self, yes): | |
616 | self.set_var("refuse_client_session", yes) | |
617 | ||
522d829b TL |
618 | def compat(self, *args): |
619 | a = map(lambda x: str(x).lower(), args) | |
620 | self.mon_manager.raw_cluster_cmd("fs", "compat", self.name, *a) | |
621 | ||
622 | def add_compat(self, *args): | |
623 | self.compat("add_compat", *args) | |
624 | ||
625 | def add_incompat(self, *args): | |
626 | self.compat("add_incompat", *args) | |
627 | ||
628 | def rm_compat(self, *args): | |
629 | self.compat("rm_compat", *args) | |
630 | ||
631 | def rm_incompat(self, *args): | |
632 | self.compat("rm_incompat", *args) | |
633 | ||
f67539c2 TL |
634 | def required_client_features(self, *args, **kwargs): |
635 | c = ["fs", "required_client_features", self.name, *args] | |
636 | return self.mon_manager.run_cluster_cmd(args=c, **kwargs) | |
637 | ||
522d829b TL |
638 | # Since v15.1.0 the pg autoscale mode has been enabled as default, |
639 | # will let the pg autoscale mode to calculate the pg_num as needed. | |
640 | # We set the pg_num_min to 64 to make sure that pg autoscale mode | |
641 | # won't set the pg_num to low to fix Tracker#45434. | |
642 | pg_num = 64 | |
643 | pg_num_min = 64 | |
644 | target_size_ratio = 0.9 | |
645 | target_size_ratio_ec = 0.9 | |
7c673cae | 646 | |
1e59de90 | 647 | def create(self, recover=False, metadata_overlay=False): |
7c673cae FG |
648 | if self.name is None: |
649 | self.name = "cephfs" | |
650 | if self.metadata_pool_name is None: | |
651 | self.metadata_pool_name = "{0}_metadata".format(self.name) | |
181888fb FG |
652 | if self.data_pool_name is None: |
653 | data_pool_name = "{0}_data".format(self.name) | |
654 | else: | |
655 | data_pool_name = self.data_pool_name | |
7c673cae | 656 | |
522d829b TL |
657 | # will use the ec pool to store the data and a small amount of |
658 | # metadata still goes to the primary data pool for all files. | |
1e59de90 | 659 | if not metadata_overlay and self.ec_profile and 'disabled' not in self.ec_profile: |
522d829b TL |
660 | self.target_size_ratio = 0.05 |
661 | ||
cd265ab1 | 662 | log.debug("Creating filesystem '{0}'".format(self.name)) |
7c673cae | 663 | |
39ae355f TL |
664 | try: |
665 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', | |
666 | self.metadata_pool_name, | |
667 | '--pg_num_min', str(self.pg_num_min)) | |
f67539c2 | 668 | |
39ae355f TL |
669 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', |
670 | data_pool_name, str(self.pg_num), | |
671 | '--pg_num_min', str(self.pg_num_min), | |
672 | '--target_size_ratio', | |
673 | str(self.target_size_ratio)) | |
674 | except CommandFailedError as e: | |
675 | if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option | |
676 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', | |
677 | self.metadata_pool_name, | |
678 | str(self.pg_num_min)) | |
679 | ||
680 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', | |
681 | data_pool_name, str(self.pg_num), | |
682 | str(self.pg_num_min)) | |
683 | else: | |
684 | raise | |
f67539c2 | 685 | |
1e59de90 TL |
686 | args = ["fs", "new", self.name, self.metadata_pool_name, data_pool_name] |
687 | if recover: | |
688 | args.append('--recover') | |
689 | if metadata_overlay: | |
690 | args.append('--allow-dangerous-metadata-overlay') | |
691 | self.mon_manager.raw_cluster_cmd(*args) | |
f67539c2 | 692 | |
1e59de90 | 693 | if not recover: |
b32b8144 | 694 | if self.ec_profile and 'disabled' not in self.ec_profile: |
f67539c2 | 695 | ec_data_pool_name = data_pool_name + "_ec" |
cd265ab1 | 696 | log.debug("EC profile is %s", self.ec_profile) |
f67539c2 | 697 | cmd = ['osd', 'erasure-code-profile', 'set', ec_data_pool_name] |
3efd9988 FG |
698 | cmd.extend(self.ec_profile) |
699 | self.mon_manager.raw_cluster_cmd(*cmd) | |
39ae355f TL |
700 | try: |
701 | self.mon_manager.raw_cluster_cmd( | |
702 | 'osd', 'pool', 'create', ec_data_pool_name, | |
703 | 'erasure', ec_data_pool_name, | |
704 | '--pg_num_min', str(self.pg_num_min), | |
705 | '--target_size_ratio', str(self.target_size_ratio_ec)) | |
706 | except CommandFailedError as e: | |
707 | if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option | |
708 | self.mon_manager.raw_cluster_cmd( | |
709 | 'osd', 'pool', 'create', ec_data_pool_name, | |
710 | str(self.pg_num_min), 'erasure', ec_data_pool_name) | |
711 | else: | |
712 | raise | |
3efd9988 FG |
713 | self.mon_manager.raw_cluster_cmd( |
714 | 'osd', 'pool', 'set', | |
f67539c2 TL |
715 | ec_data_pool_name, 'allow_ec_overwrites', 'true') |
716 | self.add_data_pool(ec_data_pool_name, create=False) | |
717 | self.check_pool_application(ec_data_pool_name) | |
718 | ||
719 | self.run_client_payload(f"setfattr -n ceph.dir.layout.pool -v {ec_data_pool_name} . && getfattr -n ceph.dir.layout .") | |
720 | ||
35e4c445 FG |
721 | self.check_pool_application(self.metadata_pool_name) |
722 | self.check_pool_application(data_pool_name) | |
f67539c2 | 723 | |
7c673cae | 724 | # Turn off spurious standby count warnings from modifying max_mds in tests. |
31f18b77 FG |
725 | try: |
726 | self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0') | |
727 | except CommandFailedError as e: | |
728 | if e.exitstatus == 22: | |
729 | # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise) | |
730 | pass | |
731 | else: | |
732 | raise | |
7c673cae | 733 | |
f91f0fd5 | 734 | if self.fs_config is not None: |
1e59de90 | 735 | log.debug(f"fs_config: {self.fs_config}") |
f91f0fd5 TL |
736 | max_mds = self.fs_config.get('max_mds', 1) |
737 | if max_mds > 1: | |
738 | self.set_max_mds(max_mds) | |
739 | ||
f67539c2 TL |
740 | standby_replay = self.fs_config.get('standby_replay', False) |
741 | self.set_allow_standby_replay(standby_replay) | |
742 | ||
f91f0fd5 TL |
743 | # If absent will use the default value (60 seconds) |
744 | session_timeout = self.fs_config.get('session_timeout', 60) | |
745 | if session_timeout != 60: | |
746 | self.set_session_timeout(session_timeout) | |
747 | ||
1e59de90 TL |
748 | if self.fs_config.get('subvols', None) is not None: |
749 | log.debug(f"Creating {self.fs_config.get('subvols')} subvols " | |
750 | f"for filesystem '{self.name}'") | |
751 | if not hasattr(self._ctx, "created_subvols"): | |
752 | self._ctx.created_subvols = dict() | |
753 | ||
754 | subvols = self.fs_config.get('subvols') | |
755 | assert(isinstance(subvols, dict)) | |
756 | assert(isinstance(subvols['create'], int)) | |
757 | assert(subvols['create'] > 0) | |
758 | ||
759 | for sv in range(0, subvols['create']): | |
760 | sv_name = f'sv_{sv}' | |
761 | self.mon_manager.raw_cluster_cmd( | |
762 | 'fs', 'subvolume', 'create', self.name, sv_name, | |
763 | self.fs_config.get('subvol_options', '')) | |
764 | ||
765 | if self.name not in self._ctx.created_subvols: | |
766 | self._ctx.created_subvols[self.name] = [] | |
767 | ||
768 | subvol_path = self.mon_manager.raw_cluster_cmd( | |
769 | 'fs', 'subvolume', 'getpath', self.name, sv_name) | |
770 | subvol_path = subvol_path.strip() | |
771 | self._ctx.created_subvols[self.name].append(subvol_path) | |
772 | else: | |
773 | log.debug(f"Not Creating any subvols for filesystem '{self.name}'") | |
774 | ||
775 | ||
7c673cae FG |
776 | self.getinfo(refresh = True) |
777 | ||
522d829b TL |
778 | # wait pgs to be clean |
779 | self.mon_manager.wait_for_clean() | |
780 | ||
f67539c2 TL |
781 | def run_client_payload(self, cmd): |
782 | # avoid circular dep by importing here: | |
783 | from tasks.cephfs.fuse_mount import FuseMount | |
2a845540 TL |
784 | |
785 | # Wait for at MDS daemons to be ready before mounting the | |
786 | # ceph-fuse client in run_client_payload() | |
787 | self.wait_for_daemons() | |
788 | ||
f67539c2 | 789 | d = misc.get_testdir(self._ctx) |
20effc67 | 790 | m = FuseMount(self._ctx, d, "admin", self.client_remote, cephfs_name=self.name) |
522d829b | 791 | m.mount_wait() |
f67539c2 TL |
792 | m.run_shell_payload(cmd) |
793 | m.umount_wait(require_clean=True) | |
794 | ||
795 | def _remove_pool(self, name, **kwargs): | |
796 | c = f'osd pool rm {name} {name} --yes-i-really-really-mean-it' | |
797 | return self.mon_manager.ceph(c, **kwargs) | |
798 | ||
799 | def rm(self, **kwargs): | |
800 | c = f'fs rm {self.name} --yes-i-really-mean-it' | |
801 | return self.mon_manager.ceph(c, **kwargs) | |
802 | ||
803 | def remove_pools(self, data_pools): | |
804 | self._remove_pool(self.get_metadata_pool_name()) | |
805 | for poolname in data_pools: | |
806 | try: | |
807 | self._remove_pool(poolname) | |
808 | except CommandFailedError as e: | |
809 | # EBUSY, this data pool is used by two metadata pools, let the | |
810 | # 2nd pass delete it | |
811 | if e.exitstatus == EBUSY: | |
812 | pass | |
813 | else: | |
814 | raise | |
815 | ||
816 | def destroy(self, reset_obj_attrs=True): | |
817 | log.info(f'Destroying file system {self.name} and related pools') | |
818 | ||
819 | if self.dead(): | |
820 | log.debug('already dead...') | |
821 | return | |
822 | ||
823 | data_pools = self.get_data_pool_names(refresh=True) | |
824 | ||
825 | # make sure no MDSs are attached to given FS. | |
826 | self.fail() | |
827 | self.rm() | |
828 | ||
829 | self.remove_pools(data_pools) | |
830 | ||
831 | if reset_obj_attrs: | |
832 | self.id = None | |
833 | self.name = None | |
834 | self.metadata_pool_name = None | |
835 | self.data_pool_name = None | |
836 | self.data_pools = None | |
837 | ||
838 | def recreate(self): | |
839 | self.destroy() | |
840 | ||
841 | self.create() | |
842 | self.getinfo(refresh=True) | |
843 | ||
35e4c445 FG |
844 | def check_pool_application(self, pool_name): |
845 | osd_map = self.mon_manager.get_osd_dump_json() | |
846 | for pool in osd_map['pools']: | |
847 | if pool['pool_name'] == pool_name: | |
848 | if "application_metadata" in pool: | |
849 | if not "cephfs" in pool['application_metadata']: | |
9f95a23c TL |
850 | raise RuntimeError("Pool {pool_name} does not name cephfs as application!".\ |
851 | format(pool_name=pool_name)) | |
35e4c445 | 852 | |
7c673cae FG |
853 | def __del__(self): |
854 | if getattr(self._ctx, "filesystem", None) == self: | |
855 | delattr(self._ctx, "filesystem") | |
856 | ||
857 | def exists(self): | |
858 | """ | |
859 | Whether a filesystem exists in the mon's filesystem list | |
860 | """ | |
861 | fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty')) | |
862 | return self.name in [fs['name'] for fs in fs_list] | |
863 | ||
864 | def legacy_configured(self): | |
865 | """ | |
866 | Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is | |
867 | the case, the caller should avoid using Filesystem.create | |
868 | """ | |
869 | try: | |
870 | out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools') | |
871 | pools = json.loads(out_text) | |
872 | metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools] | |
873 | if metadata_pool_exists: | |
874 | self.metadata_pool_name = 'metadata' | |
875 | except CommandFailedError as e: | |
876 | # For use in upgrade tests, Ceph cuttlefish and earlier don't support | |
877 | # structured output (--format) from the CLI. | |
878 | if e.exitstatus == 22: | |
879 | metadata_pool_exists = True | |
880 | else: | |
881 | raise | |
882 | ||
883 | return metadata_pool_exists | |
884 | ||
885 | def _df(self): | |
886 | return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")) | |
887 | ||
f67539c2 | 888 | # may raise FSMissing |
f64942e4 AA |
889 | def get_mds_map(self, status=None): |
890 | if status is None: | |
891 | status = self.status() | |
892 | return status.get_fsmap(self.id)['mdsmap'] | |
7c673cae | 893 | |
11fdf7f2 TL |
894 | def get_var(self, var, status=None): |
895 | return self.get_mds_map(status=status)[var] | |
91327a77 | 896 | |
92f5a8d4 TL |
897 | def set_dir_layout(self, mount, path, layout): |
898 | for name, value in layout.items(): | |
899 | mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path]) | |
900 | ||
901 | def add_data_pool(self, name, create=True): | |
902 | if create: | |
39ae355f TL |
903 | try: |
904 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, | |
905 | '--pg_num_min', str(self.pg_num_min)) | |
906 | except CommandFailedError as e: | |
907 | if e.exitstatus == 22: # nautilus couldn't specify --pg_num_min option | |
908 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, | |
909 | str(self.pg_num_min)) | |
910 | else: | |
911 | raise | |
7c673cae FG |
912 | self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name) |
913 | self.get_pool_names(refresh = True) | |
914 | for poolid, fs_name in self.data_pools.items(): | |
915 | if name == fs_name: | |
916 | return poolid | |
917 | raise RuntimeError("could not get just created pool '{0}'".format(name)) | |
918 | ||
919 | def get_pool_names(self, refresh = False, status = None): | |
920 | if refresh or self.metadata_pool_name is None or self.data_pools is None: | |
921 | if status is None: | |
922 | status = self.status() | |
923 | fsmap = status.get_fsmap(self.id) | |
924 | ||
925 | osd_map = self.mon_manager.get_osd_dump_json() | |
926 | id_to_name = {} | |
927 | for p in osd_map['pools']: | |
928 | id_to_name[p['pool']] = p['pool_name'] | |
929 | ||
930 | self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']] | |
931 | self.data_pools = {} | |
932 | for data_pool in fsmap['mdsmap']['data_pools']: | |
933 | self.data_pools[data_pool] = id_to_name[data_pool] | |
934 | ||
935 | def get_data_pool_name(self, refresh = False): | |
936 | if refresh or self.data_pools is None: | |
937 | self.get_pool_names(refresh = True) | |
938 | assert(len(self.data_pools) == 1) | |
e306af50 | 939 | return next(iter(self.data_pools.values())) |
7c673cae FG |
940 | |
941 | def get_data_pool_id(self, refresh = False): | |
942 | """ | |
943 | Don't call this if you have multiple data pools | |
944 | :return: integer | |
945 | """ | |
946 | if refresh or self.data_pools is None: | |
947 | self.get_pool_names(refresh = True) | |
948 | assert(len(self.data_pools) == 1) | |
e306af50 | 949 | return next(iter(self.data_pools.keys())) |
7c673cae FG |
950 | |
951 | def get_data_pool_names(self, refresh = False): | |
952 | if refresh or self.data_pools is None: | |
953 | self.get_pool_names(refresh = True) | |
e306af50 | 954 | return list(self.data_pools.values()) |
7c673cae FG |
955 | |
956 | def get_metadata_pool_name(self): | |
957 | return self.metadata_pool_name | |
958 | ||
181888fb FG |
959 | def set_data_pool_name(self, name): |
960 | if self.id is not None: | |
961 | raise RuntimeError("can't set filesystem name if its fscid is set") | |
962 | self.data_pool_name = name | |
963 | ||
522d829b TL |
964 | def get_pool_pg_num(self, pool_name): |
965 | pgs = json.loads(self.mon_manager.raw_cluster_cmd('osd', 'pool', 'get', | |
966 | pool_name, 'pg_num', | |
967 | '--format=json-pretty')) | |
968 | return int(pgs['pg_num']) | |
969 | ||
7c673cae FG |
970 | def get_namespace_id(self): |
971 | return self.id | |
972 | ||
973 | def get_pool_df(self, pool_name): | |
974 | """ | |
975 | Return a dict like: | |
976 | {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0} | |
977 | """ | |
978 | for pool_df in self._df()['pools']: | |
979 | if pool_df['name'] == pool_name: | |
980 | return pool_df['stats'] | |
981 | ||
982 | raise RuntimeError("Pool name '{0}' not found".format(pool_name)) | |
983 | ||
984 | def get_usage(self): | |
985 | return self._df()['stats']['total_used_bytes'] | |
986 | ||
11fdf7f2 | 987 | def are_daemons_healthy(self, status=None, skip_max_mds_check=False): |
7c673cae FG |
988 | """ |
989 | Return true if all daemons are in one of active, standby, standby-replay, and | |
990 | at least max_mds daemons are in 'active'. | |
991 | ||
992 | Unlike most of Filesystem, this function is tolerant of new-style `fs` | |
993 | commands being missing, because we are part of the ceph installation | |
994 | process during upgrade suites, so must fall back to old style commands | |
995 | when we get an EINVAL on a new style command. | |
996 | ||
997 | :return: | |
998 | """ | |
11fdf7f2 TL |
999 | # First, check to see that processes haven't exited with an error code |
1000 | for mds in self._ctx.daemons.iter_daemons_of_role('mds'): | |
1001 | mds.check_status() | |
7c673cae FG |
1002 | |
1003 | active_count = 0 | |
f67539c2 | 1004 | mds_map = self.get_mds_map(status=status) |
7c673cae | 1005 | |
cd265ab1 | 1006 | log.debug("are_daemons_healthy: mds map: {0}".format(mds_map)) |
7c673cae FG |
1007 | |
1008 | for mds_id, mds_status in mds_map['info'].items(): | |
1009 | if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]: | |
1010 | log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state'])) | |
1011 | return False | |
1012 | elif mds_status['state'] == 'up:active': | |
1013 | active_count += 1 | |
1014 | ||
cd265ab1 | 1015 | log.debug("are_daemons_healthy: {0}/{1}".format( |
7c673cae FG |
1016 | active_count, mds_map['max_mds'] |
1017 | )) | |
1018 | ||
11fdf7f2 TL |
1019 | if not skip_max_mds_check: |
1020 | if active_count > mds_map['max_mds']: | |
cd265ab1 | 1021 | log.debug("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map)) |
11fdf7f2 TL |
1022 | return False |
1023 | elif active_count == mds_map['max_mds']: | |
1024 | # The MDSMap says these guys are active, but let's check they really are | |
1025 | for mds_id, mds_status in mds_map['info'].items(): | |
1026 | if mds_status['state'] == 'up:active': | |
1027 | try: | |
f67539c2 | 1028 | daemon_status = self.mds_tell(["status"], mds_id=mds_status['name']) |
11fdf7f2 TL |
1029 | except CommandFailedError as cfe: |
1030 | if cfe.exitstatus == errno.EINVAL: | |
1031 | # Old version, can't do this check | |
1032 | continue | |
1033 | else: | |
1034 | # MDS not even running | |
1035 | return False | |
1036 | ||
1037 | if daemon_status['state'] != 'up:active': | |
1038 | # MDS hasn't taken the latest map yet | |
7c673cae FG |
1039 | return False |
1040 | ||
11fdf7f2 TL |
1041 | return True |
1042 | else: | |
1043 | return False | |
7c673cae | 1044 | else: |
cd265ab1 | 1045 | log.debug("are_daemons_healthy: skipping max_mds check") |
11fdf7f2 | 1046 | return True |
7c673cae | 1047 | |
11fdf7f2 | 1048 | def get_daemon_names(self, state=None, status=None): |
7c673cae FG |
1049 | """ |
1050 | Return MDS daemon names of those daemons in the given state | |
1051 | :param state: | |
1052 | :return: | |
1053 | """ | |
11fdf7f2 | 1054 | mdsmap = self.get_mds_map(status) |
7c673cae | 1055 | result = [] |
9f95a23c TL |
1056 | for mds_status in sorted(mdsmap['info'].values(), |
1057 | key=lambda _: _['rank']): | |
7c673cae FG |
1058 | if mds_status['state'] == state or state is None: |
1059 | result.append(mds_status['name']) | |
1060 | ||
1061 | return result | |
1062 | ||
9f95a23c | 1063 | def get_active_names(self, status=None): |
7c673cae FG |
1064 | """ |
1065 | Return MDS daemon names of those daemons holding ranks | |
1066 | in state up:active | |
1067 | ||
1068 | :return: list of strings like ['a', 'b'], sorted by rank | |
1069 | """ | |
9f95a23c | 1070 | return self.get_daemon_names("up:active", status=status) |
7c673cae | 1071 | |
11fdf7f2 TL |
1072 | def get_all_mds_rank(self, status=None): |
1073 | mdsmap = self.get_mds_map(status) | |
7c673cae | 1074 | result = [] |
9f95a23c TL |
1075 | for mds_status in sorted(mdsmap['info'].values(), |
1076 | key=lambda _: _['rank']): | |
7c673cae FG |
1077 | if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay': |
1078 | result.append(mds_status['rank']) | |
1079 | ||
1080 | return result | |
1081 | ||
f6b5b4d7 | 1082 | def get_rank(self, rank=None, status=None): |
28e407b8 AA |
1083 | if status is None: |
1084 | status = self.getinfo() | |
f6b5b4d7 TL |
1085 | if rank is None: |
1086 | rank = 0 | |
28e407b8 AA |
1087 | return status.get_rank(self.id, rank) |
1088 | ||
11fdf7f2 TL |
1089 | def rank_restart(self, rank=0, status=None): |
1090 | name = self.get_rank(rank=rank, status=status)['name'] | |
1091 | self.mds_restart(mds_id=name) | |
1092 | ||
1093 | def rank_signal(self, signal, rank=0, status=None): | |
1094 | name = self.get_rank(rank=rank, status=status)['name'] | |
1095 | self.mds_signal(name, signal) | |
1096 | ||
1097 | def rank_freeze(self, yes, rank=0): | |
1098 | self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower()) | |
1099 | ||
1e59de90 TL |
1100 | def rank_repaired(self, rank): |
1101 | self.mon_manager.raw_cluster_cmd("mds", "repaired", "{}:{}".format(self.id, rank)) | |
1102 | ||
11fdf7f2 TL |
1103 | def rank_fail(self, rank=0): |
1104 | self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank)) | |
1105 | ||
1e59de90 TL |
1106 | def rank_is_running(self, rank=0, status=None): |
1107 | name = self.get_rank(rank=rank, status=status)['name'] | |
1108 | return self.mds_is_running(name) | |
1109 | ||
28e407b8 AA |
1110 | def get_ranks(self, status=None): |
1111 | if status is None: | |
1112 | status = self.getinfo() | |
1113 | return status.get_ranks(self.id) | |
1114 | ||
a4b75251 TL |
1115 | def get_damaged(self, status=None): |
1116 | if status is None: | |
1117 | status = self.getinfo() | |
1118 | return status.get_damaged(self.id) | |
1119 | ||
11fdf7f2 TL |
1120 | def get_replays(self, status=None): |
1121 | if status is None: | |
1122 | status = self.getinfo() | |
1123 | return status.get_replays(self.id) | |
1124 | ||
1125 | def get_replay(self, rank=0, status=None): | |
1126 | for replay in self.get_replays(status=status): | |
1127 | if replay['rank'] == rank: | |
1128 | return replay | |
1129 | return None | |
1130 | ||
28e407b8 | 1131 | def get_rank_names(self, status=None): |
7c673cae FG |
1132 | """ |
1133 | Return MDS daemon names of those daemons holding a rank, | |
1134 | sorted by rank. This includes e.g. up:replay/reconnect | |
1135 | as well as active, but does not include standby or | |
1136 | standby-replay. | |
1137 | """ | |
11fdf7f2 | 1138 | mdsmap = self.get_mds_map(status) |
7c673cae | 1139 | result = [] |
9f95a23c TL |
1140 | for mds_status in sorted(mdsmap['info'].values(), |
1141 | key=lambda _: _['rank']): | |
7c673cae FG |
1142 | if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay': |
1143 | result.append(mds_status['name']) | |
1144 | ||
1145 | return result | |
1146 | ||
11fdf7f2 | 1147 | def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None): |
7c673cae FG |
1148 | """ |
1149 | Wait until all daemons are healthy | |
1150 | :return: | |
1151 | """ | |
1152 | ||
1153 | if timeout is None: | |
1154 | timeout = DAEMON_WAIT_TIMEOUT | |
1155 | ||
05a536ef TL |
1156 | if self.id is None: |
1157 | status = self.getinfo(refresh=True) | |
1158 | ||
11fdf7f2 TL |
1159 | if status is None: |
1160 | status = self.status() | |
1161 | ||
7c673cae FG |
1162 | elapsed = 0 |
1163 | while True: | |
11fdf7f2 TL |
1164 | if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check): |
1165 | return status | |
7c673cae FG |
1166 | else: |
1167 | time.sleep(1) | |
1168 | elapsed += 1 | |
1169 | ||
1170 | if elapsed > timeout: | |
cd265ab1 | 1171 | log.debug("status = {0}".format(status)) |
7c673cae FG |
1172 | raise RuntimeError("Timed out waiting for MDS daemons to become healthy") |
1173 | ||
11fdf7f2 TL |
1174 | status = self.status() |
1175 | ||
f67539c2 TL |
1176 | def dencoder(self, obj_type, obj_blob): |
1177 | args = [os.path.join(self._prefix, "ceph-dencoder"), 'type', obj_type, 'import', '-', 'decode', 'dump_json'] | |
1178 | p = self.mon_manager.controller.run(args=args, stdin=BytesIO(obj_blob), stdout=BytesIO()) | |
1179 | return p.stdout.getvalue() | |
1180 | ||
1181 | def rados(self, *args, **kwargs): | |
7c673cae | 1182 | """ |
f67539c2 | 1183 | Callout to rados CLI. |
7c673cae | 1184 | """ |
7c673cae | 1185 | |
f67539c2 | 1186 | return self.mon_manager.do_rados(*args, **kwargs) |
7c673cae | 1187 | |
f67539c2 | 1188 | def radosm(self, *args, **kwargs): |
7c673cae | 1189 | """ |
f67539c2 | 1190 | Interact with the metadata pool via rados CLI. |
7c673cae | 1191 | """ |
7c673cae | 1192 | |
f67539c2 TL |
1193 | return self.rados(*args, **kwargs, pool=self.get_metadata_pool_name()) |
1194 | ||
1195 | def radosmo(self, *args, stdout=BytesIO(), **kwargs): | |
7c673cae | 1196 | """ |
f67539c2 | 1197 | Interact with the metadata pool via rados CLI. Get the stdout. |
7c673cae | 1198 | """ |
7c673cae | 1199 | |
f67539c2 | 1200 | return self.radosm(*args, **kwargs, stdout=stdout).stdout.getvalue() |
7c673cae FG |
1201 | |
1202 | def get_metadata_object(self, object_type, object_id): | |
1203 | """ | |
1204 | Retrieve an object from the metadata pool, pass it through | |
1205 | ceph-dencoder to dump it to JSON, and return the decoded object. | |
1206 | """ | |
7c673cae | 1207 | |
f67539c2 TL |
1208 | o = self.radosmo(['get', object_id, '-']) |
1209 | j = self.dencoder(object_type, o) | |
7c673cae | 1210 | try: |
f67539c2 | 1211 | return json.loads(j) |
7c673cae | 1212 | except (TypeError, ValueError): |
f67539c2 | 1213 | log.error("Failed to decode JSON: '{0}'".format(j)) |
7c673cae FG |
1214 | raise |
1215 | ||
7c673cae FG |
1216 | def get_journal_version(self): |
1217 | """ | |
1218 | Read the JournalPointer and Journal::Header objects to learn the version of | |
1219 | encoding in use. | |
1220 | """ | |
1221 | journal_pointer_object = '400.00000000' | |
1222 | journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object) | |
1223 | journal_ino = journal_pointer_dump['journal_pointer']['front'] | |
1224 | ||
1225 | journal_header_object = "{0:x}.00000000".format(journal_ino) | |
1226 | journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object) | |
1227 | ||
1228 | version = journal_header_dump['journal_header']['stream_format'] | |
cd265ab1 | 1229 | log.debug("Read journal version {0}".format(version)) |
7c673cae FG |
1230 | |
1231 | return version | |
1232 | ||
f64942e4 | 1233 | def mds_asok(self, command, mds_id=None, timeout=None): |
7c673cae | 1234 | if mds_id is None: |
f67539c2 | 1235 | return self.rank_asok(command, timeout=timeout) |
7c673cae | 1236 | |
f64942e4 | 1237 | return self.json_asok(command, 'mds', mds_id, timeout=timeout) |
7c673cae | 1238 | |
f67539c2 TL |
1239 | def mds_tell(self, command, mds_id=None): |
1240 | if mds_id is None: | |
1241 | return self.rank_tell(command) | |
1242 | ||
1243 | return json.loads(self.mon_manager.raw_cluster_cmd("tell", f"mds.{mds_id}", *command)) | |
1244 | ||
f64942e4 AA |
1245 | def rank_asok(self, command, rank=0, status=None, timeout=None): |
1246 | info = self.get_rank(rank=rank, status=status) | |
1247 | return self.json_asok(command, 'mds', info['name'], timeout=timeout) | |
28e407b8 | 1248 | |
11fdf7f2 | 1249 | def rank_tell(self, command, rank=0, status=None): |
20effc67 TL |
1250 | try: |
1251 | out = self.mon_manager.raw_cluster_cmd("tell", f"mds.{self.id}:{rank}", *command) | |
1252 | return json.loads(out) | |
1253 | except json.decoder.JSONDecodeError: | |
1254 | log.error("could not decode: {}".format(out)) | |
1255 | raise | |
11fdf7f2 | 1256 | |
f6b5b4d7 TL |
1257 | def ranks_tell(self, command, status=None): |
1258 | if status is None: | |
1259 | status = self.status() | |
1260 | out = [] | |
1261 | for r in status.get_ranks(self.id): | |
1262 | result = self.rank_tell(command, rank=r['rank'], status=status) | |
1263 | out.append((r['rank'], result)) | |
1264 | return sorted(out) | |
1265 | ||
1266 | def ranks_perf(self, f, status=None): | |
1267 | perf = self.ranks_tell(["perf", "dump"], status=status) | |
1268 | out = [] | |
1269 | for rank, perf in perf: | |
1270 | out.append((rank, f(perf))) | |
1271 | return out | |
1272 | ||
1e59de90 | 1273 | def read_cache(self, path, depth=None, rank=None): |
7c673cae FG |
1274 | cmd = ["dump", "tree", path] |
1275 | if depth is not None: | |
1276 | cmd.append(depth.__str__()) | |
1e59de90 TL |
1277 | result = self.rank_asok(cmd, rank=rank) |
1278 | if result is None or len(result) == 0: | |
7c673cae FG |
1279 | raise RuntimeError("Path not found in cache: {0}".format(path)) |
1280 | ||
1281 | return result | |
1282 | ||
1283 | def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None): | |
1284 | """ | |
1285 | Block until the MDS reaches a particular state, or a failure condition | |
1286 | is met. | |
1287 | ||
1288 | When there are multiple MDSs, succeed when exaclty one MDS is in the | |
1289 | goal state, or fail when any MDS is in the reject state. | |
1290 | ||
1291 | :param goal_state: Return once the MDS is in this state | |
1292 | :param reject: Fail if the MDS enters this state before the goal state | |
1293 | :param timeout: Fail if this many seconds pass before reaching goal | |
1294 | :return: number of seconds waited, rounded down to integer | |
1295 | """ | |
1296 | ||
1297 | started_at = time.time() | |
1298 | while True: | |
1299 | status = self.status() | |
1300 | if rank is not None: | |
f64942e4 AA |
1301 | try: |
1302 | mds_info = status.get_rank(self.id, rank) | |
1303 | current_state = mds_info['state'] if mds_info else None | |
cd265ab1 | 1304 | log.debug("Looked up MDS state for mds.{0}: {1}".format(rank, current_state)) |
f64942e4 AA |
1305 | except: |
1306 | mdsmap = self.get_mds_map(status=status) | |
1307 | if rank in mdsmap['failed']: | |
cd265ab1 | 1308 | log.debug("Waiting for rank {0} to come back.".format(rank)) |
f64942e4 AA |
1309 | current_state = None |
1310 | else: | |
1311 | raise | |
7c673cae FG |
1312 | elif mds_id is not None: |
1313 | # mds_info is None if no daemon with this ID exists in the map | |
1314 | mds_info = status.get_mds(mds_id) | |
1315 | current_state = mds_info['state'] if mds_info else None | |
cd265ab1 | 1316 | log.debug("Looked up MDS state for {0}: {1}".format(mds_id, current_state)) |
7c673cae FG |
1317 | else: |
1318 | # In general, look for a single MDS | |
1319 | states = [m['state'] for m in status.get_ranks(self.id)] | |
1320 | if [s for s in states if s == goal_state] == [goal_state]: | |
1321 | current_state = goal_state | |
1322 | elif reject in states: | |
1323 | current_state = reject | |
1324 | else: | |
1325 | current_state = None | |
cd265ab1 | 1326 | log.debug("mapped states {0} to {1}".format(states, current_state)) |
7c673cae FG |
1327 | |
1328 | elapsed = time.time() - started_at | |
1329 | if current_state == goal_state: | |
cd265ab1 | 1330 | log.debug("reached state '{0}' in {1}s".format(current_state, elapsed)) |
7c673cae FG |
1331 | return elapsed |
1332 | elif reject is not None and current_state == reject: | |
1333 | raise RuntimeError("MDS in reject state {0}".format(current_state)) | |
1334 | elif timeout is not None and elapsed > timeout: | |
1335 | log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id))) | |
1336 | raise RuntimeError( | |
1337 | "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format( | |
1338 | elapsed, goal_state, current_state | |
1339 | )) | |
1340 | else: | |
1341 | time.sleep(1) | |
1342 | ||
f67539c2 | 1343 | def _read_data_xattr(self, ino_no, xattr_name, obj_type, pool): |
7c673cae FG |
1344 | if pool is None: |
1345 | pool = self.get_data_pool_name() | |
1346 | ||
1347 | obj_name = "{0:x}.00000000".format(ino_no) | |
1348 | ||
f67539c2 | 1349 | args = ["getxattr", obj_name, xattr_name] |
7c673cae | 1350 | try: |
f67539c2 | 1351 | proc = self.rados(args, pool=pool, stdout=BytesIO()) |
7c673cae FG |
1352 | except CommandFailedError as e: |
1353 | log.error(e.__str__()) | |
1354 | raise ObjectNotFound(obj_name) | |
1355 | ||
f67539c2 TL |
1356 | obj_blob = proc.stdout.getvalue() |
1357 | return json.loads(self.dencoder(obj_type, obj_blob).strip()) | |
7c673cae FG |
1358 | |
1359 | def _write_data_xattr(self, ino_no, xattr_name, data, pool=None): | |
1360 | """ | |
1361 | Write to an xattr of the 0th data object of an inode. Will | |
1362 | succeed whether the object and/or xattr already exist or not. | |
1363 | ||
1364 | :param ino_no: integer inode number | |
1365 | :param xattr_name: string name of the xattr | |
1366 | :param data: byte array data to write to the xattr | |
1367 | :param pool: name of data pool or None to use primary data pool | |
1368 | :return: None | |
1369 | """ | |
7c673cae FG |
1370 | if pool is None: |
1371 | pool = self.get_data_pool_name() | |
1372 | ||
1373 | obj_name = "{0:x}.00000000".format(ino_no) | |
f67539c2 TL |
1374 | args = ["setxattr", obj_name, xattr_name, data] |
1375 | self.rados(args, pool=pool) | |
7c673cae | 1376 | |
20effc67 TL |
1377 | def read_symlink(self, ino_no, pool=None): |
1378 | return self._read_data_xattr(ino_no, "symlink", "string_wrapper", pool) | |
1379 | ||
7c673cae FG |
1380 | def read_backtrace(self, ino_no, pool=None): |
1381 | """ | |
1382 | Read the backtrace from the data pool, return a dict in the format | |
1383 | given by inode_backtrace_t::dump, which is something like: | |
1384 | ||
1385 | :: | |
1386 | ||
1387 | rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin | |
1388 | ceph-dencoder type inode_backtrace_t import out.bin decode dump_json | |
1389 | ||
1390 | { "ino": 1099511627778, | |
1391 | "ancestors": [ | |
1392 | { "dirino": 1, | |
1393 | "dname": "blah", | |
1394 | "version": 11}], | |
1395 | "pool": 1, | |
1396 | "old_pools": []} | |
1397 | ||
1398 | :param pool: name of pool to read backtrace from. If omitted, FS must have only | |
1399 | one data pool and that will be used. | |
1400 | """ | |
1401 | return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool) | |
1402 | ||
1403 | def read_layout(self, ino_no, pool=None): | |
1404 | """ | |
1405 | Read 'layout' xattr of an inode and parse the result, returning a dict like: | |
1406 | :: | |
1407 | { | |
1408 | "stripe_unit": 4194304, | |
1409 | "stripe_count": 1, | |
1410 | "object_size": 4194304, | |
1411 | "pool_id": 1, | |
1412 | "pool_ns": "", | |
1413 | } | |
1414 | ||
1415 | :param pool: name of pool to read backtrace from. If omitted, FS must have only | |
1416 | one data pool and that will be used. | |
1417 | """ | |
1418 | return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool) | |
1419 | ||
1420 | def _enumerate_data_objects(self, ino, size): | |
1421 | """ | |
1422 | Get the list of expected data objects for a range, and the list of objects | |
1423 | that really exist. | |
1424 | ||
1425 | :return a tuple of two lists of strings (expected, actual) | |
1426 | """ | |
1427 | stripe_size = 1024 * 1024 * 4 | |
1428 | ||
1429 | size = max(stripe_size, size) | |
1430 | ||
1431 | want_objects = [ | |
1432 | "{0:x}.{1:08x}".format(ino, n) | |
e306af50 | 1433 | for n in range(0, ((size - 1) // stripe_size) + 1) |
7c673cae FG |
1434 | ] |
1435 | ||
f67539c2 | 1436 | exist_objects = self.rados(["ls"], pool=self.get_data_pool_name(), stdout=StringIO()).stdout.getvalue().split("\n") |
7c673cae FG |
1437 | |
1438 | return want_objects, exist_objects | |
1439 | ||
1440 | def data_objects_present(self, ino, size): | |
1441 | """ | |
1442 | Check that *all* the expected data objects for an inode are present in the data pool | |
1443 | """ | |
1444 | ||
1445 | want_objects, exist_objects = self._enumerate_data_objects(ino, size) | |
1446 | missing = set(want_objects) - set(exist_objects) | |
1447 | ||
1448 | if missing: | |
cd265ab1 | 1449 | log.debug("Objects missing (ino {0}, size {1}): {2}".format( |
7c673cae FG |
1450 | ino, size, missing |
1451 | )) | |
1452 | return False | |
1453 | else: | |
cd265ab1 | 1454 | log.debug("All objects for ino {0} size {1} found".format(ino, size)) |
7c673cae FG |
1455 | return True |
1456 | ||
1457 | def data_objects_absent(self, ino, size): | |
1458 | want_objects, exist_objects = self._enumerate_data_objects(ino, size) | |
1459 | present = set(want_objects) & set(exist_objects) | |
1460 | ||
1461 | if present: | |
cd265ab1 | 1462 | log.debug("Objects not absent (ino {0}, size {1}): {2}".format( |
7c673cae FG |
1463 | ino, size, present |
1464 | )) | |
1465 | return False | |
1466 | else: | |
cd265ab1 | 1467 | log.debug("All objects for ino {0} size {1} are absent".format(ino, size)) |
7c673cae FG |
1468 | return True |
1469 | ||
1470 | def dirfrag_exists(self, ino, frag): | |
1471 | try: | |
f67539c2 | 1472 | self.radosm(["stat", "{0:x}.{1:08x}".format(ino, frag)]) |
9f95a23c | 1473 | except CommandFailedError: |
7c673cae FG |
1474 | return False |
1475 | else: | |
1476 | return True | |
1477 | ||
7c673cae FG |
1478 | def list_dirfrag(self, dir_ino): |
1479 | """ | |
1480 | Read the named object and return the list of omap keys | |
1481 | ||
1482 | :return a list of 0 or more strings | |
1483 | """ | |
1484 | ||
1485 | dirfrag_obj_name = "{0:x}.00000000".format(dir_ino) | |
1486 | ||
1487 | try: | |
f67539c2 | 1488 | key_list_str = self.radosmo(["listomapkeys", dirfrag_obj_name], stdout=StringIO()) |
7c673cae FG |
1489 | except CommandFailedError as e: |
1490 | log.error(e.__str__()) | |
1491 | raise ObjectNotFound(dirfrag_obj_name) | |
1492 | ||
f67539c2 TL |
1493 | return key_list_str.strip().split("\n") if key_list_str else [] |
1494 | ||
1495 | def get_meta_of_fs_file(self, dir_ino, obj_name, out): | |
1496 | """ | |
1497 | get metadata from parent to verify the correctness of the data format encoded by the tool, cephfs-meta-injection. | |
1498 | warning : The splitting of directory is not considered here. | |
1499 | """ | |
1500 | ||
1501 | dirfrag_obj_name = "{0:x}.00000000".format(dir_ino) | |
1502 | try: | |
1503 | self.radosm(["getomapval", dirfrag_obj_name, obj_name+"_head", out]) | |
1504 | except CommandFailedError as e: | |
1505 | log.error(e.__str__()) | |
1506 | raise ObjectNotFound(dir_ino) | |
7c673cae FG |
1507 | |
1508 | def erase_metadata_objects(self, prefix): | |
1509 | """ | |
1510 | For all objects in the metadata pool matching the prefix, | |
1511 | erase them. | |
1512 | ||
1513 | This O(N) with the number of objects in the pool, so only suitable | |
1514 | for use on toy test filesystems. | |
1515 | """ | |
f67539c2 | 1516 | all_objects = self.radosmo(["ls"], stdout=StringIO()).strip().split("\n") |
7c673cae FG |
1517 | matching_objects = [o for o in all_objects if o.startswith(prefix)] |
1518 | for o in matching_objects: | |
f67539c2 | 1519 | self.radosm(["rm", o]) |
7c673cae FG |
1520 | |
1521 | def erase_mds_objects(self, rank): | |
1522 | """ | |
1523 | Erase all the per-MDS objects for a particular rank. This includes | |
1524 | inotable, sessiontable, journal | |
1525 | """ | |
1526 | ||
1527 | def obj_prefix(multiplier): | |
1528 | """ | |
1529 | MDS object naming conventions like rank 1's | |
1530 | journal is at 201.*** | |
1531 | """ | |
1532 | return "%x." % (multiplier * 0x100 + rank) | |
1533 | ||
1534 | # MDS_INO_LOG_OFFSET | |
1535 | self.erase_metadata_objects(obj_prefix(2)) | |
1536 | # MDS_INO_LOG_BACKUP_OFFSET | |
1537 | self.erase_metadata_objects(obj_prefix(3)) | |
1538 | # MDS_INO_LOG_POINTER_OFFSET | |
1539 | self.erase_metadata_objects(obj_prefix(4)) | |
1540 | # MDSTables & SessionMap | |
1541 | self.erase_metadata_objects("mds{rank:d}_".format(rank=rank)) | |
1542 | ||
1543 | @property | |
1544 | def _prefix(self): | |
1545 | """ | |
1546 | Override this to set a different | |
1547 | """ | |
1548 | return "" | |
1549 | ||
f64942e4 AA |
1550 | def _make_rank(self, rank): |
1551 | return "{}:{}".format(self.name, rank) | |
1552 | ||
7c673cae FG |
1553 | def _run_tool(self, tool, args, rank=None, quiet=False): |
1554 | # Tests frequently have [client] configuration that jacks up | |
1555 | # the objecter log level (unlikely to be interesting here) | |
1556 | # and does not set the mds log level (very interesting here) | |
1557 | if quiet: | |
1558 | base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1'] | |
1559 | else: | |
1e59de90 | 1560 | base_args = [os.path.join(self._prefix, tool), '--debug-mds=20', '--debug-ms=1', '--debug-objecter=1'] |
7c673cae FG |
1561 | |
1562 | if rank is not None: | |
f64942e4 | 1563 | base_args.extend(["--rank", "%s" % str(rank)]) |
7c673cae FG |
1564 | |
1565 | t1 = datetime.datetime.now() | |
e306af50 | 1566 | r = self.tool_remote.sh(script=base_args + args, stdout=StringIO()).strip() |
7c673cae | 1567 | duration = datetime.datetime.now() - t1 |
cd265ab1 | 1568 | log.debug("Ran {0} in time {1}, result:\n{2}".format( |
7c673cae FG |
1569 | base_args + args, duration, r |
1570 | )) | |
1571 | return r | |
1572 | ||
1573 | @property | |
1574 | def tool_remote(self): | |
1575 | """ | |
1576 | An arbitrary remote to use when invoking recovery tools. Use an MDS host because | |
1577 | it'll definitely have keys with perms to access cephfs metadata pool. This is public | |
1578 | so that tests can use this remote to go get locally written output files from the tools. | |
1579 | """ | |
f67539c2 | 1580 | return self.mon_manager.controller |
7c673cae | 1581 | |
f64942e4 | 1582 | def journal_tool(self, args, rank, quiet=False): |
7c673cae | 1583 | """ |
f64942e4 | 1584 | Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout |
7c673cae | 1585 | """ |
f64942e4 AA |
1586 | fs_rank = self._make_rank(rank) |
1587 | return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet) | |
7c673cae | 1588 | |
f67539c2 TL |
1589 | def meta_tool(self, args, rank, quiet=False): |
1590 | """ | |
1591 | Invoke cephfs-meta-injection with the passed arguments for a rank, and return its stdout | |
1592 | """ | |
1593 | fs_rank = self._make_rank(rank) | |
1594 | return self._run_tool("cephfs-meta-injection", args, fs_rank, quiet) | |
1595 | ||
7c673cae FG |
1596 | def table_tool(self, args, quiet=False): |
1597 | """ | |
1598 | Invoke cephfs-table-tool with the passed arguments, and return its stdout | |
1599 | """ | |
1600 | return self._run_tool("cephfs-table-tool", args, None, quiet) | |
1601 | ||
1602 | def data_scan(self, args, quiet=False, worker_count=1): | |
1603 | """ | |
1604 | Invoke cephfs-data-scan with the passed arguments, and return its stdout | |
1605 | ||
1606 | :param worker_count: if greater than 1, multiple workers will be run | |
1607 | in parallel and the return value will be None | |
1608 | """ | |
1609 | ||
1610 | workers = [] | |
1611 | ||
1612 | for n in range(0, worker_count): | |
1613 | if worker_count > 1: | |
1614 | # data-scan args first token is a command, followed by args to it. | |
1615 | # insert worker arguments after the command. | |
1616 | cmd = args[0] | |
1617 | worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:] | |
1618 | else: | |
1619 | worker_args = args | |
1620 | ||
1621 | workers.append(Greenlet.spawn(lambda wargs=worker_args: | |
1622 | self._run_tool("cephfs-data-scan", wargs, None, quiet))) | |
1623 | ||
1624 | for w in workers: | |
1625 | w.get() | |
1626 | ||
1627 | if worker_count == 1: | |
1628 | return workers[0].value | |
1629 | else: | |
1630 | return None | |
b32b8144 FG |
1631 | |
1632 | def is_full(self): | |
1633 | return self.is_pool_full(self.get_data_pool_name()) | |
f67539c2 TL |
1634 | |
1635 | def authorize(self, client_id, caps=('/', 'rw')): | |
1636 | """ | |
1637 | Run "ceph fs authorize" and run "ceph auth get" to get and returnt the | |
1638 | keyring. | |
1639 | ||
1640 | client_id: client id that will be authorized | |
1641 | caps: tuple containing the path and permission (can be r or rw) | |
1642 | respectively. | |
1643 | """ | |
1e59de90 TL |
1644 | if isinstance(caps[0], (tuple, list)): |
1645 | x = [] | |
1646 | for c in caps: | |
1647 | x.extend(c) | |
1648 | caps = tuple(x) | |
1649 | ||
f67539c2 TL |
1650 | client_name = 'client.' + client_id |
1651 | return self.mon_manager.raw_cluster_cmd('fs', 'authorize', self.name, | |
1652 | client_name, *caps) | |
1653 | ||
1654 | def grow(self, new_max_mds, status=None): | |
1655 | oldmax = self.get_var('max_mds', status=status) | |
1656 | assert(new_max_mds > oldmax) | |
1657 | self.set_max_mds(new_max_mds) | |
1658 | return self.wait_for_daemons() | |
1659 | ||
1660 | def shrink(self, new_max_mds, status=None): | |
1661 | oldmax = self.get_var('max_mds', status=status) | |
1662 | assert(new_max_mds < oldmax) | |
1663 | self.set_max_mds(new_max_mds) | |
1664 | return self.wait_for_daemons() | |
1665 | ||
1666 | def run_scrub(self, cmd, rank=0): | |
1667 | return self.rank_tell(["scrub"] + cmd, rank) | |
1668 | ||
1669 | def get_scrub_status(self, rank=0): | |
1670 | return self.run_scrub(["status"], rank) | |
1671 | ||
1e59de90 TL |
1672 | def flush(self, rank=0): |
1673 | return self.rank_tell(["flush", "journal"], rank=rank) | |
1674 | ||
f67539c2 TL |
1675 | def wait_until_scrub_complete(self, result=None, tag=None, rank=0, sleep=30, |
1676 | timeout=300, reverse=False): | |
1677 | # time out after "timeout" seconds and assume as done | |
1678 | if result is None: | |
1679 | result = "no active scrubs running" | |
1680 | with contextutil.safe_while(sleep=sleep, tries=timeout//sleep) as proceed: | |
1681 | while proceed(): | |
1682 | out_json = self.rank_tell(["scrub", "status"], rank=rank) | |
1683 | assert out_json is not None | |
1684 | if not reverse: | |
1685 | if result in out_json['status']: | |
1686 | log.info("all active scrubs completed") | |
1687 | return True | |
1688 | else: | |
1689 | if result not in out_json['status']: | |
1690 | log.info("all active scrubs completed") | |
1691 | return True | |
1692 | ||
1693 | if tag is not None: | |
1694 | status = out_json['scrubs'][tag] | |
1695 | if status is not None: | |
1696 | log.info(f"scrub status for tag:{tag} - {status}") | |
1697 | else: | |
1698 | log.info(f"scrub has completed for tag:{tag}") | |
1699 | return True | |
1700 | ||
1701 | # timed out waiting for scrub to complete | |
1702 | return False | |
1e59de90 TL |
1703 | |
1704 | def get_damage(self, rank=None): | |
1705 | if rank is None: | |
1706 | result = {} | |
1707 | for info in self.get_ranks(): | |
1708 | rank = info['rank'] | |
1709 | result[rank] = self.get_damage(rank=rank) | |
1710 | return result | |
1711 | else: | |
1712 | return self.rank_tell(['damage', 'ls'], rank=rank) |