]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | from StringIO import StringIO | |
3 | import json | |
4 | import logging | |
5 | from gevent import Greenlet | |
6 | import os | |
7 | import time | |
8 | import datetime | |
9 | import re | |
10 | import errno | |
181888fb | 11 | import random |
11fdf7f2 | 12 | import traceback |
7c673cae FG |
13 | |
14 | from teuthology.exceptions import CommandFailedError | |
15 | from teuthology import misc | |
16 | from teuthology.nuke import clear_firewall | |
17 | from teuthology.parallel import parallel | |
18 | from tasks.ceph_manager import write_conf | |
19 | from tasks import ceph_manager | |
20 | ||
21 | ||
22 | log = logging.getLogger(__name__) | |
23 | ||
24 | ||
25 | DAEMON_WAIT_TIMEOUT = 120 | |
26 | ROOT_INO = 1 | |
27 | ||
92f5a8d4 TL |
28 | class FileLayout(object): |
29 | def __init__(self, pool=None, pool_namespace=None, stripe_unit=None, stripe_count=None, object_size=None): | |
30 | self.pool = pool | |
31 | self.pool_namespace = pool_namespace | |
32 | self.stripe_unit = stripe_unit | |
33 | self.stripe_count = stripe_count | |
34 | self.object_size = object_size | |
35 | ||
36 | @classmethod | |
37 | def load_from_ceph(layout_str): | |
38 | # TODO | |
39 | pass | |
40 | ||
41 | def items(self): | |
42 | if self.pool is not None: | |
43 | yield ("pool", self.pool) | |
44 | if self.pool_namespace: | |
45 | yield ("pool_namespace", self.pool_namespace) | |
46 | if self.stripe_unit is not None: | |
47 | yield ("stripe_unit", self.stripe_unit) | |
48 | if self.stripe_count is not None: | |
49 | yield ("stripe_count", self.stripe_count) | |
50 | if self.object_size is not None: | |
51 | yield ("object_size", self.stripe_size) | |
7c673cae FG |
52 | |
53 | class ObjectNotFound(Exception): | |
54 | def __init__(self, object_name): | |
55 | self._object_name = object_name | |
56 | ||
57 | def __str__(self): | |
58 | return "Object not found: '{0}'".format(self._object_name) | |
59 | ||
60 | class FSStatus(object): | |
61 | """ | |
62 | Operations on a snapshot of the FSMap. | |
63 | """ | |
64 | def __init__(self, mon_manager): | |
65 | self.mon = mon_manager | |
66 | self.map = json.loads(self.mon.raw_cluster_cmd("fs", "dump", "--format=json")) | |
67 | ||
68 | def __str__(self): | |
69 | return json.dumps(self.map, indent = 2, sort_keys = True) | |
70 | ||
71 | # Expose the fsmap for manual inspection. | |
72 | def __getitem__(self, key): | |
73 | """ | |
74 | Get a field from the fsmap. | |
75 | """ | |
76 | return self.map[key] | |
77 | ||
78 | def get_filesystems(self): | |
79 | """ | |
80 | Iterator for all filesystems. | |
81 | """ | |
82 | for fs in self.map['filesystems']: | |
83 | yield fs | |
84 | ||
85 | def get_all(self): | |
86 | """ | |
87 | Iterator for all the mds_info components in the FSMap. | |
88 | """ | |
89 | for info in self.get_standbys(): | |
90 | yield info | |
91 | for fs in self.map['filesystems']: | |
92 | for info in fs['mdsmap']['info'].values(): | |
93 | yield info | |
94 | ||
95 | def get_standbys(self): | |
96 | """ | |
97 | Iterator for all standbys. | |
98 | """ | |
99 | for info in self.map['standbys']: | |
100 | yield info | |
101 | ||
102 | def get_fsmap(self, fscid): | |
103 | """ | |
104 | Get the fsmap for the given FSCID. | |
105 | """ | |
106 | for fs in self.map['filesystems']: | |
107 | if fscid is None or fs['id'] == fscid: | |
108 | return fs | |
109 | raise RuntimeError("FSCID {0} not in map".format(fscid)) | |
110 | ||
111 | def get_fsmap_byname(self, name): | |
112 | """ | |
113 | Get the fsmap for the given file system name. | |
114 | """ | |
115 | for fs in self.map['filesystems']: | |
116 | if name is None or fs['mdsmap']['fs_name'] == name: | |
117 | return fs | |
118 | raise RuntimeError("FS {0} not in map".format(name)) | |
119 | ||
120 | def get_replays(self, fscid): | |
121 | """ | |
122 | Get the standby:replay MDS for the given FSCID. | |
123 | """ | |
124 | fs = self.get_fsmap(fscid) | |
125 | for info in fs['mdsmap']['info'].values(): | |
126 | if info['state'] == 'up:standby-replay': | |
127 | yield info | |
128 | ||
129 | def get_ranks(self, fscid): | |
130 | """ | |
131 | Get the ranks for the given FSCID. | |
132 | """ | |
133 | fs = self.get_fsmap(fscid) | |
134 | for info in fs['mdsmap']['info'].values(): | |
11fdf7f2 | 135 | if info['rank'] >= 0 and info['state'] != 'up:standby-replay': |
7c673cae FG |
136 | yield info |
137 | ||
138 | def get_rank(self, fscid, rank): | |
139 | """ | |
140 | Get the rank for the given FSCID. | |
141 | """ | |
142 | for info in self.get_ranks(fscid): | |
143 | if info['rank'] == rank: | |
144 | return info | |
145 | raise RuntimeError("FSCID {0} has no rank {1}".format(fscid, rank)) | |
146 | ||
147 | def get_mds(self, name): | |
148 | """ | |
149 | Get the info for the given MDS name. | |
150 | """ | |
151 | for info in self.get_all(): | |
152 | if info['name'] == name: | |
153 | return info | |
154 | return None | |
155 | ||
156 | def get_mds_addr(self, name): | |
157 | """ | |
158 | Return the instance addr as a string, like "10.214.133.138:6807\/10825" | |
159 | """ | |
160 | info = self.get_mds(name) | |
161 | if info: | |
162 | return info['addr'] | |
163 | else: | |
164 | log.warn(json.dumps(list(self.get_all()), indent=2)) # dump for debugging | |
165 | raise RuntimeError("MDS id '{0}' not found in map".format(name)) | |
166 | ||
167 | class CephCluster(object): | |
168 | @property | |
169 | def admin_remote(self): | |
170 | first_mon = misc.get_first_mon(self._ctx, None) | |
171 | (result,) = self._ctx.cluster.only(first_mon).remotes.iterkeys() | |
172 | return result | |
173 | ||
174 | def __init__(self, ctx): | |
175 | self._ctx = ctx | |
176 | self.mon_manager = ceph_manager.CephManager(self.admin_remote, ctx=ctx, logger=log.getChild('ceph_manager')) | |
177 | ||
178 | def get_config(self, key, service_type=None): | |
179 | """ | |
180 | Get config from mon by default, or a specific service if caller asks for it | |
181 | """ | |
182 | if service_type is None: | |
183 | service_type = 'mon' | |
184 | ||
185 | service_id = sorted(misc.all_roles_of_type(self._ctx.cluster, service_type))[0] | |
186 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] | |
187 | ||
188 | def set_ceph_conf(self, subsys, key, value): | |
189 | if subsys not in self._ctx.ceph['ceph'].conf: | |
190 | self._ctx.ceph['ceph'].conf[subsys] = {} | |
191 | self._ctx.ceph['ceph'].conf[subsys][key] = value | |
192 | write_conf(self._ctx) # XXX because we don't have the ceph task's config object, if they | |
193 | # used a different config path this won't work. | |
194 | ||
195 | def clear_ceph_conf(self, subsys, key): | |
196 | del self._ctx.ceph['ceph'].conf[subsys][key] | |
197 | write_conf(self._ctx) | |
198 | ||
f64942e4 AA |
199 | def json_asok(self, command, service_type, service_id, timeout=None): |
200 | if timeout is None: | |
201 | timeout = 15*60 | |
202 | proc = self.mon_manager.admin_socket(service_type, service_id, command, timeout=timeout) | |
7c673cae FG |
203 | response_data = proc.stdout.getvalue() |
204 | log.info("_json_asok output: {0}".format(response_data)) | |
205 | if response_data.strip(): | |
206 | return json.loads(response_data) | |
207 | else: | |
208 | return None | |
209 | ||
210 | ||
211 | class MDSCluster(CephCluster): | |
212 | """ | |
213 | Collective operations on all the MDS daemons in the Ceph cluster. These | |
214 | daemons may be in use by various Filesystems. | |
215 | ||
216 | For the benefit of pre-multi-filesystem tests, this class is also | |
217 | a parent of Filesystem. The correct way to use MDSCluster going forward is | |
218 | as a separate instance outside of your (multiple) Filesystem instances. | |
219 | """ | |
220 | def __init__(self, ctx): | |
221 | super(MDSCluster, self).__init__(ctx) | |
222 | ||
223 | self.mds_ids = list(misc.all_roles_of_type(ctx.cluster, 'mds')) | |
224 | ||
225 | if len(self.mds_ids) == 0: | |
226 | raise RuntimeError("This task requires at least one MDS") | |
227 | ||
228 | if hasattr(self._ctx, "daemons"): | |
229 | # Presence of 'daemons' attribute implies ceph task rather than ceph_deploy task | |
230 | self.mds_daemons = dict([(mds_id, self._ctx.daemons.get_daemon('mds', mds_id)) for mds_id in self.mds_ids]) | |
231 | ||
232 | def _one_or_all(self, mds_id, cb, in_parallel=True): | |
233 | """ | |
234 | Call a callback for a single named MDS, or for all. | |
235 | ||
236 | Note that the parallelism here isn't for performance, it's to avoid being overly kind | |
237 | to the cluster by waiting a graceful ssh-latency of time between doing things, and to | |
238 | avoid being overly kind by executing them in a particular order. However, some actions | |
239 | don't cope with being done in parallel, so it's optional (`in_parallel`) | |
240 | ||
241 | :param mds_id: MDS daemon name, or None | |
242 | :param cb: Callback taking single argument of MDS daemon name | |
243 | :param in_parallel: whether to invoke callbacks concurrently (else one after the other) | |
244 | """ | |
245 | if mds_id is None: | |
246 | if in_parallel: | |
247 | with parallel() as p: | |
248 | for mds_id in self.mds_ids: | |
249 | p.spawn(cb, mds_id) | |
250 | else: | |
251 | for mds_id in self.mds_ids: | |
252 | cb(mds_id) | |
253 | else: | |
254 | cb(mds_id) | |
255 | ||
181888fb FG |
256 | def get_config(self, key, service_type=None): |
257 | """ | |
258 | get_config specialization of service_type="mds" | |
259 | """ | |
260 | if service_type != "mds": | |
261 | return super(MDSCluster, self).get_config(key, service_type) | |
262 | ||
263 | # Some tests stop MDS daemons, don't send commands to a dead one: | |
264 | service_id = random.sample(filter(lambda i: self.mds_daemons[i].running(), self.mds_daemons), 1)[0] | |
265 | return self.json_asok(['config', 'get', key], service_type, service_id)[key] | |
266 | ||
7c673cae FG |
267 | def mds_stop(self, mds_id=None): |
268 | """ | |
269 | Stop the MDS daemon process(se). If it held a rank, that rank | |
270 | will eventually go laggy. | |
271 | """ | |
272 | self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].stop()) | |
273 | ||
274 | def mds_fail(self, mds_id=None): | |
275 | """ | |
276 | Inform MDSMonitor of the death of the daemon process(es). If it held | |
277 | a rank, that rank will be relinquished. | |
278 | """ | |
279 | self._one_or_all(mds_id, lambda id_: self.mon_manager.raw_cluster_cmd("mds", "fail", id_)) | |
280 | ||
281 | def mds_restart(self, mds_id=None): | |
282 | self._one_or_all(mds_id, lambda id_: self.mds_daemons[id_].restart()) | |
283 | ||
284 | def mds_fail_restart(self, mds_id=None): | |
285 | """ | |
286 | Variation on restart that includes marking MDSs as failed, so that doing this | |
287 | operation followed by waiting for healthy daemon states guarantees that they | |
288 | have gone down and come up, rather than potentially seeing the healthy states | |
289 | that existed before the restart. | |
290 | """ | |
291 | def _fail_restart(id_): | |
292 | self.mds_daemons[id_].stop() | |
293 | self.mon_manager.raw_cluster_cmd("mds", "fail", id_) | |
294 | self.mds_daemons[id_].restart() | |
295 | ||
296 | self._one_or_all(mds_id, _fail_restart) | |
297 | ||
1adf2230 AA |
298 | def mds_signal(self, mds_id, sig, silent=False): |
299 | """ | |
300 | signal a MDS daemon | |
301 | """ | |
302 | self.mds_daemons[mds_id].signal(sig, silent); | |
303 | ||
181888fb FG |
304 | def newfs(self, name='cephfs', create=True): |
305 | return Filesystem(self._ctx, name=name, create=create) | |
7c673cae FG |
306 | |
307 | def status(self): | |
308 | return FSStatus(self.mon_manager) | |
309 | ||
310 | def delete_all_filesystems(self): | |
311 | """ | |
312 | Remove all filesystems that exist, and any pools in use by them. | |
313 | """ | |
314 | pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] | |
315 | pool_id_name = {} | |
316 | for pool in pools: | |
317 | pool_id_name[pool['pool']] = pool['pool_name'] | |
318 | ||
319 | # mark cluster down for each fs to prevent churn during deletion | |
320 | status = self.status() | |
321 | for fs in status.get_filesystems(): | |
11fdf7f2 | 322 | self.mon_manager.raw_cluster_cmd("fs", "fail", str(fs['mdsmap']['fs_name'])) |
7c673cae FG |
323 | |
324 | # get a new copy as actives may have since changed | |
325 | status = self.status() | |
326 | for fs in status.get_filesystems(): | |
327 | mdsmap = fs['mdsmap'] | |
328 | metadata_pool = pool_id_name[mdsmap['metadata_pool']] | |
329 | ||
7c673cae FG |
330 | self.mon_manager.raw_cluster_cmd('fs', 'rm', mdsmap['fs_name'], '--yes-i-really-mean-it') |
331 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete', | |
332 | metadata_pool, metadata_pool, | |
333 | '--yes-i-really-really-mean-it') | |
334 | for data_pool in mdsmap['data_pools']: | |
335 | data_pool = pool_id_name[data_pool] | |
336 | try: | |
337 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'delete', | |
338 | data_pool, data_pool, | |
339 | '--yes-i-really-really-mean-it') | |
340 | except CommandFailedError as e: | |
341 | if e.exitstatus == 16: # EBUSY, this data pool is used | |
342 | pass # by two metadata pools, let the 2nd | |
343 | else: # pass delete it | |
344 | raise | |
345 | ||
346 | def get_standby_daemons(self): | |
347 | return set([s['name'] for s in self.status().get_standbys()]) | |
348 | ||
349 | def get_mds_hostnames(self): | |
350 | result = set() | |
351 | for mds_id in self.mds_ids: | |
352 | mds_remote = self.mon_manager.find_remote('mds', mds_id) | |
353 | result.add(mds_remote.hostname) | |
354 | ||
355 | return list(result) | |
356 | ||
357 | def set_clients_block(self, blocked, mds_id=None): | |
358 | """ | |
359 | Block (using iptables) client communications to this MDS. Be careful: if | |
360 | other services are running on this MDS, or other MDSs try to talk to this | |
361 | MDS, their communications may also be blocked as collatoral damage. | |
362 | ||
363 | :param mds_id: Optional ID of MDS to block, default to all | |
364 | :return: | |
365 | """ | |
366 | da_flag = "-A" if blocked else "-D" | |
367 | ||
368 | def set_block(_mds_id): | |
369 | remote = self.mon_manager.find_remote('mds', _mds_id) | |
370 | status = self.status() | |
371 | ||
372 | addr = status.get_mds_addr(_mds_id) | |
373 | ip_str, port_str, inst_str = re.match("(.+):(.+)/(.+)", addr).groups() | |
374 | ||
375 | remote.run( | |
376 | args=["sudo", "iptables", da_flag, "OUTPUT", "-p", "tcp", "--sport", port_str, "-j", "REJECT", "-m", | |
377 | "comment", "--comment", "teuthology"]) | |
378 | remote.run( | |
379 | args=["sudo", "iptables", da_flag, "INPUT", "-p", "tcp", "--dport", port_str, "-j", "REJECT", "-m", | |
380 | "comment", "--comment", "teuthology"]) | |
381 | ||
382 | self._one_or_all(mds_id, set_block, in_parallel=False) | |
383 | ||
384 | def clear_firewall(self): | |
385 | clear_firewall(self._ctx) | |
386 | ||
387 | def get_mds_info(self, mds_id): | |
388 | return FSStatus(self.mon_manager).get_mds(mds_id) | |
389 | ||
7c673cae FG |
390 | def is_pool_full(self, pool_name): |
391 | pools = json.loads(self.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['pools'] | |
392 | for pool in pools: | |
393 | if pool['pool_name'] == pool_name: | |
394 | return 'full' in pool['flags_names'].split(",") | |
395 | ||
396 | raise RuntimeError("Pool not found '{0}'".format(pool_name)) | |
397 | ||
398 | class Filesystem(MDSCluster): | |
399 | """ | |
400 | This object is for driving a CephFS filesystem. The MDS daemons driven by | |
401 | MDSCluster may be shared with other Filesystems. | |
402 | """ | |
3efd9988 FG |
403 | def __init__(self, ctx, fscid=None, name=None, create=False, |
404 | ec_profile=None): | |
7c673cae FG |
405 | super(Filesystem, self).__init__(ctx) |
406 | ||
181888fb | 407 | self.name = name |
3efd9988 | 408 | self.ec_profile = ec_profile |
7c673cae | 409 | self.id = None |
7c673cae | 410 | self.metadata_pool_name = None |
181888fb FG |
411 | self.metadata_overlay = False |
412 | self.data_pool_name = None | |
7c673cae FG |
413 | self.data_pools = None |
414 | ||
415 | client_list = list(misc.all_roles_of_type(self._ctx.cluster, 'client')) | |
416 | self.client_id = client_list[0] | |
417 | self.client_remote = list(misc.get_clients(ctx=ctx, roles=["client.{0}".format(self.client_id)]))[0][1] | |
418 | ||
181888fb | 419 | if name is not None: |
7c673cae FG |
420 | if fscid is not None: |
421 | raise RuntimeError("cannot specify fscid when creating fs") | |
181888fb | 422 | if create and not self.legacy_configured(): |
7c673cae | 423 | self.create() |
181888fb FG |
424 | else: |
425 | if fscid is not None: | |
426 | self.id = fscid | |
427 | self.getinfo(refresh = True) | |
7c673cae FG |
428 | |
429 | # Stash a reference to the first created filesystem on ctx, so | |
430 | # that if someone drops to the interactive shell they can easily | |
431 | # poke our methods. | |
432 | if not hasattr(self._ctx, "filesystem"): | |
433 | self._ctx.filesystem = self | |
434 | ||
435 | def getinfo(self, refresh = False): | |
436 | status = self.status() | |
437 | if self.id is not None: | |
438 | fsmap = status.get_fsmap(self.id) | |
439 | elif self.name is not None: | |
440 | fsmap = status.get_fsmap_byname(self.name) | |
441 | else: | |
442 | fss = [fs for fs in status.get_filesystems()] | |
443 | if len(fss) == 1: | |
444 | fsmap = fss[0] | |
445 | elif len(fss) == 0: | |
446 | raise RuntimeError("no file system available") | |
447 | else: | |
448 | raise RuntimeError("more than one file system available") | |
449 | self.id = fsmap['id'] | |
450 | self.name = fsmap['mdsmap']['fs_name'] | |
451 | self.get_pool_names(status = status, refresh = refresh) | |
452 | return status | |
453 | ||
181888fb FG |
454 | def set_metadata_overlay(self, overlay): |
455 | if self.id is not None: | |
456 | raise RuntimeError("cannot specify fscid when configuring overlay") | |
457 | self.metadata_overlay = overlay | |
458 | ||
7c673cae FG |
459 | def deactivate(self, rank): |
460 | if rank < 0: | |
461 | raise RuntimeError("invalid rank") | |
462 | elif rank == 0: | |
463 | raise RuntimeError("cannot deactivate rank 0") | |
464 | self.mon_manager.raw_cluster_cmd("mds", "deactivate", "%d:%d" % (self.id, rank)) | |
465 | ||
11fdf7f2 TL |
466 | def reach_max_mds(self): |
467 | # Try to reach rank count == max_mds, up or down (UPGRADE SENSITIVE!) | |
468 | status = self.getinfo() | |
469 | mds_map = self.get_mds_map(status=status) | |
470 | max_mds = mds_map['max_mds'] | |
471 | ||
472 | count = len(list(self.get_ranks(status=status))) | |
473 | if count > max_mds: | |
474 | try: | |
475 | # deactivate mds in decending order | |
476 | status = self.wait_for_daemons(status=status, skip_max_mds_check=True) | |
477 | while count > max_mds: | |
478 | targets = sorted(self.get_ranks(status=status), key=lambda r: r['rank'], reverse=True) | |
479 | target = targets[0] | |
480 | log.info("deactivating rank %d" % target['rank']) | |
481 | self.deactivate(target['rank']) | |
482 | status = self.wait_for_daemons(skip_max_mds_check=True) | |
483 | count = len(list(self.get_ranks(status=status))) | |
484 | except: | |
485 | # In Mimic, deactivation is done automatically: | |
486 | log.info("Error:\n{}".format(traceback.format_exc())) | |
487 | status = self.wait_for_daemons() | |
488 | else: | |
489 | status = self.wait_for_daemons() | |
490 | ||
491 | mds_map = self.get_mds_map(status=status) | |
492 | assert(mds_map['max_mds'] == max_mds) | |
493 | assert(mds_map['in'] == list(range(0, max_mds))) | |
494 | ||
495 | def fail(self): | |
496 | self.mon_manager.raw_cluster_cmd("fs", "fail", str(self.name)) | |
497 | ||
91327a77 AA |
498 | def set_var(self, var, *args): |
499 | a = map(str, args) | |
500 | self.mon_manager.raw_cluster_cmd("fs", "set", self.name, var, *a) | |
501 | ||
11fdf7f2 TL |
502 | def set_down(self, down=True): |
503 | self.set_var("down", str(down).lower()) | |
504 | ||
505 | def set_joinable(self, joinable=True): | |
506 | self.set_var("joinable", str(joinable).lower()) | |
507 | ||
7c673cae | 508 | def set_max_mds(self, max_mds): |
f64942e4 | 509 | self.set_var("max_mds", "%d" % max_mds) |
7c673cae | 510 | |
11fdf7f2 TL |
511 | def set_allow_standby_replay(self, yes): |
512 | self.set_var("allow_standby_replay", str(yes).lower()) | |
513 | ||
514 | def set_allow_new_snaps(self, yes): | |
515 | self.set_var("allow_new_snaps", str(yes).lower(), '--yes-i-really-mean-it') | |
7c673cae | 516 | |
7c673cae FG |
517 | def get_pgs_per_fs_pool(self): |
518 | """ | |
519 | Calculate how many PGs to use when creating a pool, in order to avoid raising any | |
520 | health warnings about mon_pg_warn_min_per_osd | |
521 | ||
522 | :return: an integer number of PGs | |
523 | """ | |
524 | pg_warn_min_per_osd = int(self.get_config('mon_pg_warn_min_per_osd')) | |
525 | osd_count = len(list(misc.all_roles_of_type(self._ctx.cluster, 'osd'))) | |
526 | return pg_warn_min_per_osd * osd_count | |
527 | ||
528 | def create(self): | |
529 | if self.name is None: | |
530 | self.name = "cephfs" | |
531 | if self.metadata_pool_name is None: | |
532 | self.metadata_pool_name = "{0}_metadata".format(self.name) | |
181888fb FG |
533 | if self.data_pool_name is None: |
534 | data_pool_name = "{0}_data".format(self.name) | |
535 | else: | |
536 | data_pool_name = self.data_pool_name | |
7c673cae FG |
537 | |
538 | log.info("Creating filesystem '{0}'".format(self.name)) | |
539 | ||
540 | pgs_per_fs_pool = self.get_pgs_per_fs_pool() | |
541 | ||
542 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', | |
543 | self.metadata_pool_name, pgs_per_fs_pool.__str__()) | |
181888fb FG |
544 | if self.metadata_overlay: |
545 | self.mon_manager.raw_cluster_cmd('fs', 'new', | |
546 | self.name, self.metadata_pool_name, data_pool_name, | |
547 | '--allow-dangerous-metadata-overlay') | |
548 | else: | |
b32b8144 | 549 | if self.ec_profile and 'disabled' not in self.ec_profile: |
3efd9988 FG |
550 | log.info("EC profile is %s", self.ec_profile) |
551 | cmd = ['osd', 'erasure-code-profile', 'set', data_pool_name] | |
552 | cmd.extend(self.ec_profile) | |
553 | self.mon_manager.raw_cluster_cmd(*cmd) | |
554 | self.mon_manager.raw_cluster_cmd( | |
555 | 'osd', 'pool', 'create', | |
556 | data_pool_name, pgs_per_fs_pool.__str__(), 'erasure', | |
557 | data_pool_name) | |
558 | self.mon_manager.raw_cluster_cmd( | |
559 | 'osd', 'pool', 'set', | |
560 | data_pool_name, 'allow_ec_overwrites', 'true') | |
561 | else: | |
562 | self.mon_manager.raw_cluster_cmd( | |
563 | 'osd', 'pool', 'create', | |
564 | data_pool_name, pgs_per_fs_pool.__str__()) | |
181888fb | 565 | self.mon_manager.raw_cluster_cmd('fs', 'new', |
92f5a8d4 TL |
566 | self.name, |
567 | self.metadata_pool_name, | |
568 | data_pool_name, | |
569 | "--force") | |
35e4c445 FG |
570 | self.check_pool_application(self.metadata_pool_name) |
571 | self.check_pool_application(data_pool_name) | |
7c673cae | 572 | # Turn off spurious standby count warnings from modifying max_mds in tests. |
31f18b77 FG |
573 | try: |
574 | self.mon_manager.raw_cluster_cmd('fs', 'set', self.name, 'standby_count_wanted', '0') | |
575 | except CommandFailedError as e: | |
576 | if e.exitstatus == 22: | |
577 | # standby_count_wanted not available prior to luminous (upgrade tests would fail otherwise) | |
578 | pass | |
579 | else: | |
580 | raise | |
7c673cae FG |
581 | |
582 | self.getinfo(refresh = True) | |
583 | ||
35e4c445 FG |
584 | |
585 | def check_pool_application(self, pool_name): | |
586 | osd_map = self.mon_manager.get_osd_dump_json() | |
587 | for pool in osd_map['pools']: | |
588 | if pool['pool_name'] == pool_name: | |
589 | if "application_metadata" in pool: | |
590 | if not "cephfs" in pool['application_metadata']: | |
591 | raise RuntimeError("Pool %p does not name cephfs as application!".\ | |
592 | format(pool_name)) | |
593 | ||
594 | ||
7c673cae FG |
595 | def __del__(self): |
596 | if getattr(self._ctx, "filesystem", None) == self: | |
597 | delattr(self._ctx, "filesystem") | |
598 | ||
599 | def exists(self): | |
600 | """ | |
601 | Whether a filesystem exists in the mon's filesystem list | |
602 | """ | |
603 | fs_list = json.loads(self.mon_manager.raw_cluster_cmd('fs', 'ls', '--format=json-pretty')) | |
604 | return self.name in [fs['name'] for fs in fs_list] | |
605 | ||
606 | def legacy_configured(self): | |
607 | """ | |
608 | Check if a legacy (i.e. pre "fs new") filesystem configuration is present. If this is | |
609 | the case, the caller should avoid using Filesystem.create | |
610 | """ | |
611 | try: | |
612 | out_text = self.mon_manager.raw_cluster_cmd('--format=json-pretty', 'osd', 'lspools') | |
613 | pools = json.loads(out_text) | |
614 | metadata_pool_exists = 'metadata' in [p['poolname'] for p in pools] | |
615 | if metadata_pool_exists: | |
616 | self.metadata_pool_name = 'metadata' | |
617 | except CommandFailedError as e: | |
618 | # For use in upgrade tests, Ceph cuttlefish and earlier don't support | |
619 | # structured output (--format) from the CLI. | |
620 | if e.exitstatus == 22: | |
621 | metadata_pool_exists = True | |
622 | else: | |
623 | raise | |
624 | ||
625 | return metadata_pool_exists | |
626 | ||
627 | def _df(self): | |
628 | return json.loads(self.mon_manager.raw_cluster_cmd("df", "--format=json-pretty")) | |
629 | ||
f64942e4 AA |
630 | def get_mds_map(self, status=None): |
631 | if status is None: | |
632 | status = self.status() | |
633 | return status.get_fsmap(self.id)['mdsmap'] | |
7c673cae | 634 | |
11fdf7f2 TL |
635 | def get_var(self, var, status=None): |
636 | return self.get_mds_map(status=status)[var] | |
91327a77 | 637 | |
92f5a8d4 TL |
638 | def set_dir_layout(self, mount, path, layout): |
639 | for name, value in layout.items(): | |
640 | mount.run_shell(args=["setfattr", "-n", "ceph.dir.layout."+name, "-v", str(value), path]) | |
641 | ||
642 | def add_data_pool(self, name, create=True): | |
643 | if create: | |
644 | self.mon_manager.raw_cluster_cmd('osd', 'pool', 'create', name, self.get_pgs_per_fs_pool().__str__()) | |
7c673cae FG |
645 | self.mon_manager.raw_cluster_cmd('fs', 'add_data_pool', self.name, name) |
646 | self.get_pool_names(refresh = True) | |
647 | for poolid, fs_name in self.data_pools.items(): | |
648 | if name == fs_name: | |
649 | return poolid | |
650 | raise RuntimeError("could not get just created pool '{0}'".format(name)) | |
651 | ||
652 | def get_pool_names(self, refresh = False, status = None): | |
653 | if refresh or self.metadata_pool_name is None or self.data_pools is None: | |
654 | if status is None: | |
655 | status = self.status() | |
656 | fsmap = status.get_fsmap(self.id) | |
657 | ||
658 | osd_map = self.mon_manager.get_osd_dump_json() | |
659 | id_to_name = {} | |
660 | for p in osd_map['pools']: | |
661 | id_to_name[p['pool']] = p['pool_name'] | |
662 | ||
663 | self.metadata_pool_name = id_to_name[fsmap['mdsmap']['metadata_pool']] | |
664 | self.data_pools = {} | |
665 | for data_pool in fsmap['mdsmap']['data_pools']: | |
666 | self.data_pools[data_pool] = id_to_name[data_pool] | |
667 | ||
668 | def get_data_pool_name(self, refresh = False): | |
669 | if refresh or self.data_pools is None: | |
670 | self.get_pool_names(refresh = True) | |
671 | assert(len(self.data_pools) == 1) | |
672 | return self.data_pools.values()[0] | |
673 | ||
674 | def get_data_pool_id(self, refresh = False): | |
675 | """ | |
676 | Don't call this if you have multiple data pools | |
677 | :return: integer | |
678 | """ | |
679 | if refresh or self.data_pools is None: | |
680 | self.get_pool_names(refresh = True) | |
681 | assert(len(self.data_pools) == 1) | |
682 | return self.data_pools.keys()[0] | |
683 | ||
684 | def get_data_pool_names(self, refresh = False): | |
685 | if refresh or self.data_pools is None: | |
686 | self.get_pool_names(refresh = True) | |
687 | return self.data_pools.values() | |
688 | ||
689 | def get_metadata_pool_name(self): | |
690 | return self.metadata_pool_name | |
691 | ||
181888fb FG |
692 | def set_data_pool_name(self, name): |
693 | if self.id is not None: | |
694 | raise RuntimeError("can't set filesystem name if its fscid is set") | |
695 | self.data_pool_name = name | |
696 | ||
7c673cae FG |
697 | def get_namespace_id(self): |
698 | return self.id | |
699 | ||
700 | def get_pool_df(self, pool_name): | |
701 | """ | |
702 | Return a dict like: | |
703 | {u'bytes_used': 0, u'max_avail': 83848701, u'objects': 0, u'kb_used': 0} | |
704 | """ | |
705 | for pool_df in self._df()['pools']: | |
706 | if pool_df['name'] == pool_name: | |
707 | return pool_df['stats'] | |
708 | ||
709 | raise RuntimeError("Pool name '{0}' not found".format(pool_name)) | |
710 | ||
711 | def get_usage(self): | |
712 | return self._df()['stats']['total_used_bytes'] | |
713 | ||
11fdf7f2 | 714 | def are_daemons_healthy(self, status=None, skip_max_mds_check=False): |
7c673cae FG |
715 | """ |
716 | Return true if all daemons are in one of active, standby, standby-replay, and | |
717 | at least max_mds daemons are in 'active'. | |
718 | ||
719 | Unlike most of Filesystem, this function is tolerant of new-style `fs` | |
720 | commands being missing, because we are part of the ceph installation | |
721 | process during upgrade suites, so must fall back to old style commands | |
722 | when we get an EINVAL on a new style command. | |
723 | ||
724 | :return: | |
725 | """ | |
11fdf7f2 TL |
726 | # First, check to see that processes haven't exited with an error code |
727 | for mds in self._ctx.daemons.iter_daemons_of_role('mds'): | |
728 | mds.check_status() | |
7c673cae FG |
729 | |
730 | active_count = 0 | |
731 | try: | |
11fdf7f2 | 732 | mds_map = self.get_mds_map(status=status) |
7c673cae FG |
733 | except CommandFailedError as cfe: |
734 | # Old version, fall back to non-multi-fs commands | |
735 | if cfe.exitstatus == errno.EINVAL: | |
736 | mds_map = json.loads( | |
737 | self.mon_manager.raw_cluster_cmd('mds', 'dump', '--format=json')) | |
738 | else: | |
739 | raise | |
740 | ||
741 | log.info("are_daemons_healthy: mds map: {0}".format(mds_map)) | |
742 | ||
743 | for mds_id, mds_status in mds_map['info'].items(): | |
744 | if mds_status['state'] not in ["up:active", "up:standby", "up:standby-replay"]: | |
745 | log.warning("Unhealthy mds state {0}:{1}".format(mds_id, mds_status['state'])) | |
746 | return False | |
747 | elif mds_status['state'] == 'up:active': | |
748 | active_count += 1 | |
749 | ||
750 | log.info("are_daemons_healthy: {0}/{1}".format( | |
751 | active_count, mds_map['max_mds'] | |
752 | )) | |
753 | ||
11fdf7f2 TL |
754 | if not skip_max_mds_check: |
755 | if active_count > mds_map['max_mds']: | |
756 | log.info("are_daemons_healthy: number of actives is greater than max_mds: {0}".format(mds_map)) | |
757 | return False | |
758 | elif active_count == mds_map['max_mds']: | |
759 | # The MDSMap says these guys are active, but let's check they really are | |
760 | for mds_id, mds_status in mds_map['info'].items(): | |
761 | if mds_status['state'] == 'up:active': | |
762 | try: | |
763 | daemon_status = self.mds_asok(["status"], mds_id=mds_status['name']) | |
764 | except CommandFailedError as cfe: | |
765 | if cfe.exitstatus == errno.EINVAL: | |
766 | # Old version, can't do this check | |
767 | continue | |
768 | else: | |
769 | # MDS not even running | |
770 | return False | |
771 | ||
772 | if daemon_status['state'] != 'up:active': | |
773 | # MDS hasn't taken the latest map yet | |
7c673cae FG |
774 | return False |
775 | ||
11fdf7f2 TL |
776 | return True |
777 | else: | |
778 | return False | |
7c673cae | 779 | else: |
11fdf7f2 TL |
780 | log.info("are_daemons_healthy: skipping max_mds check") |
781 | return True | |
7c673cae | 782 | |
11fdf7f2 | 783 | def get_daemon_names(self, state=None, status=None): |
7c673cae FG |
784 | """ |
785 | Return MDS daemon names of those daemons in the given state | |
786 | :param state: | |
787 | :return: | |
788 | """ | |
11fdf7f2 | 789 | mdsmap = self.get_mds_map(status) |
7c673cae | 790 | result = [] |
11fdf7f2 | 791 | for mds_status in sorted(mdsmap['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])): |
7c673cae FG |
792 | if mds_status['state'] == state or state is None: |
793 | result.append(mds_status['name']) | |
794 | ||
795 | return result | |
796 | ||
797 | def get_active_names(self): | |
798 | """ | |
799 | Return MDS daemon names of those daemons holding ranks | |
800 | in state up:active | |
801 | ||
802 | :return: list of strings like ['a', 'b'], sorted by rank | |
803 | """ | |
804 | return self.get_daemon_names("up:active") | |
805 | ||
11fdf7f2 TL |
806 | def get_all_mds_rank(self, status=None): |
807 | mdsmap = self.get_mds_map(status) | |
7c673cae | 808 | result = [] |
11fdf7f2 | 809 | for mds_status in sorted(mdsmap['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])): |
7c673cae FG |
810 | if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay': |
811 | result.append(mds_status['rank']) | |
812 | ||
813 | return result | |
814 | ||
28e407b8 AA |
815 | def get_rank(self, rank=0, status=None): |
816 | if status is None: | |
817 | status = self.getinfo() | |
818 | return status.get_rank(self.id, rank) | |
819 | ||
11fdf7f2 TL |
820 | def rank_restart(self, rank=0, status=None): |
821 | name = self.get_rank(rank=rank, status=status)['name'] | |
822 | self.mds_restart(mds_id=name) | |
823 | ||
824 | def rank_signal(self, signal, rank=0, status=None): | |
825 | name = self.get_rank(rank=rank, status=status)['name'] | |
826 | self.mds_signal(name, signal) | |
827 | ||
828 | def rank_freeze(self, yes, rank=0): | |
829 | self.mon_manager.raw_cluster_cmd("mds", "freeze", "{}:{}".format(self.id, rank), str(yes).lower()) | |
830 | ||
831 | def rank_fail(self, rank=0): | |
832 | self.mon_manager.raw_cluster_cmd("mds", "fail", "{}:{}".format(self.id, rank)) | |
833 | ||
28e407b8 AA |
834 | def get_ranks(self, status=None): |
835 | if status is None: | |
836 | status = self.getinfo() | |
837 | return status.get_ranks(self.id) | |
838 | ||
11fdf7f2 TL |
839 | def get_replays(self, status=None): |
840 | if status is None: | |
841 | status = self.getinfo() | |
842 | return status.get_replays(self.id) | |
843 | ||
844 | def get_replay(self, rank=0, status=None): | |
845 | for replay in self.get_replays(status=status): | |
846 | if replay['rank'] == rank: | |
847 | return replay | |
848 | return None | |
849 | ||
28e407b8 | 850 | def get_rank_names(self, status=None): |
7c673cae FG |
851 | """ |
852 | Return MDS daemon names of those daemons holding a rank, | |
853 | sorted by rank. This includes e.g. up:replay/reconnect | |
854 | as well as active, but does not include standby or | |
855 | standby-replay. | |
856 | """ | |
11fdf7f2 | 857 | mdsmap = self.get_mds_map(status) |
7c673cae | 858 | result = [] |
11fdf7f2 | 859 | for mds_status in sorted(mdsmap['info'].values(), lambda a, b: cmp(a['rank'], b['rank'])): |
7c673cae FG |
860 | if mds_status['rank'] != -1 and mds_status['state'] != 'up:standby-replay': |
861 | result.append(mds_status['name']) | |
862 | ||
863 | return result | |
864 | ||
11fdf7f2 | 865 | def wait_for_daemons(self, timeout=None, skip_max_mds_check=False, status=None): |
7c673cae FG |
866 | """ |
867 | Wait until all daemons are healthy | |
868 | :return: | |
869 | """ | |
870 | ||
871 | if timeout is None: | |
872 | timeout = DAEMON_WAIT_TIMEOUT | |
873 | ||
11fdf7f2 TL |
874 | if status is None: |
875 | status = self.status() | |
876 | ||
7c673cae FG |
877 | elapsed = 0 |
878 | while True: | |
11fdf7f2 TL |
879 | if self.are_daemons_healthy(status=status, skip_max_mds_check=skip_max_mds_check): |
880 | return status | |
7c673cae FG |
881 | else: |
882 | time.sleep(1) | |
883 | elapsed += 1 | |
884 | ||
885 | if elapsed > timeout: | |
11fdf7f2 | 886 | log.info("status = {0}".format(status)) |
7c673cae FG |
887 | raise RuntimeError("Timed out waiting for MDS daemons to become healthy") |
888 | ||
11fdf7f2 TL |
889 | status = self.status() |
890 | ||
7c673cae FG |
891 | def get_lone_mds_id(self): |
892 | """ | |
893 | Get a single MDS ID: the only one if there is only one | |
894 | configured, else the only one currently holding a rank, | |
895 | else raise an error. | |
896 | """ | |
897 | if len(self.mds_ids) != 1: | |
898 | alive = self.get_rank_names() | |
899 | if len(alive) == 1: | |
900 | return alive[0] | |
901 | else: | |
902 | raise ValueError("Explicit MDS argument required when multiple MDSs in use") | |
903 | else: | |
904 | return self.mds_ids[0] | |
905 | ||
906 | def recreate(self): | |
907 | log.info("Creating new filesystem") | |
908 | self.delete_all_filesystems() | |
909 | self.id = None | |
910 | self.create() | |
911 | ||
912 | def put_metadata_object_raw(self, object_id, infile): | |
913 | """ | |
914 | Save an object to the metadata pool | |
915 | """ | |
916 | temp_bin_path = infile | |
917 | self.client_remote.run(args=[ | |
918 | 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'put', object_id, temp_bin_path | |
919 | ]) | |
920 | ||
921 | def get_metadata_object_raw(self, object_id): | |
922 | """ | |
923 | Retrieve an object from the metadata pool and store it in a file. | |
924 | """ | |
925 | temp_bin_path = '/tmp/' + object_id + '.bin' | |
926 | ||
927 | self.client_remote.run(args=[ | |
928 | 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path | |
929 | ]) | |
930 | ||
931 | return temp_bin_path | |
932 | ||
933 | def get_metadata_object(self, object_type, object_id): | |
934 | """ | |
935 | Retrieve an object from the metadata pool, pass it through | |
936 | ceph-dencoder to dump it to JSON, and return the decoded object. | |
937 | """ | |
938 | temp_bin_path = '/tmp/out.bin' | |
939 | ||
940 | self.client_remote.run(args=[ | |
941 | 'sudo', os.path.join(self._prefix, 'rados'), '-p', self.metadata_pool_name, 'get', object_id, temp_bin_path | |
942 | ]) | |
943 | ||
944 | stdout = StringIO() | |
945 | self.client_remote.run(args=[ | |
946 | 'sudo', os.path.join(self._prefix, 'ceph-dencoder'), 'type', object_type, 'import', temp_bin_path, 'decode', 'dump_json' | |
947 | ], stdout=stdout) | |
948 | dump_json = stdout.getvalue().strip() | |
949 | try: | |
950 | dump = json.loads(dump_json) | |
951 | except (TypeError, ValueError): | |
952 | log.error("Failed to decode JSON: '{0}'".format(dump_json)) | |
953 | raise | |
954 | ||
955 | return dump | |
956 | ||
957 | def get_journal_version(self): | |
958 | """ | |
959 | Read the JournalPointer and Journal::Header objects to learn the version of | |
960 | encoding in use. | |
961 | """ | |
962 | journal_pointer_object = '400.00000000' | |
963 | journal_pointer_dump = self.get_metadata_object("JournalPointer", journal_pointer_object) | |
964 | journal_ino = journal_pointer_dump['journal_pointer']['front'] | |
965 | ||
966 | journal_header_object = "{0:x}.00000000".format(journal_ino) | |
967 | journal_header_dump = self.get_metadata_object('Journaler::Header', journal_header_object) | |
968 | ||
969 | version = journal_header_dump['journal_header']['stream_format'] | |
970 | log.info("Read journal version {0}".format(version)) | |
971 | ||
972 | return version | |
973 | ||
f64942e4 | 974 | def mds_asok(self, command, mds_id=None, timeout=None): |
7c673cae FG |
975 | if mds_id is None: |
976 | mds_id = self.get_lone_mds_id() | |
977 | ||
f64942e4 | 978 | return self.json_asok(command, 'mds', mds_id, timeout=timeout) |
7c673cae | 979 | |
f64942e4 AA |
980 | def rank_asok(self, command, rank=0, status=None, timeout=None): |
981 | info = self.get_rank(rank=rank, status=status) | |
982 | return self.json_asok(command, 'mds', info['name'], timeout=timeout) | |
28e407b8 | 983 | |
11fdf7f2 TL |
984 | def rank_tell(self, command, rank=0, status=None): |
985 | info = self.get_rank(rank=rank, status=status) | |
986 | return json.loads(self.mon_manager.raw_cluster_cmd("tell", 'mds.{0}'.format(info['name']), *command)) | |
987 | ||
7c673cae FG |
988 | def read_cache(self, path, depth=None): |
989 | cmd = ["dump", "tree", path] | |
990 | if depth is not None: | |
991 | cmd.append(depth.__str__()) | |
992 | result = self.mds_asok(cmd) | |
993 | if len(result) == 0: | |
994 | raise RuntimeError("Path not found in cache: {0}".format(path)) | |
995 | ||
996 | return result | |
997 | ||
998 | def wait_for_state(self, goal_state, reject=None, timeout=None, mds_id=None, rank=None): | |
999 | """ | |
1000 | Block until the MDS reaches a particular state, or a failure condition | |
1001 | is met. | |
1002 | ||
1003 | When there are multiple MDSs, succeed when exaclty one MDS is in the | |
1004 | goal state, or fail when any MDS is in the reject state. | |
1005 | ||
1006 | :param goal_state: Return once the MDS is in this state | |
1007 | :param reject: Fail if the MDS enters this state before the goal state | |
1008 | :param timeout: Fail if this many seconds pass before reaching goal | |
1009 | :return: number of seconds waited, rounded down to integer | |
1010 | """ | |
1011 | ||
1012 | started_at = time.time() | |
1013 | while True: | |
1014 | status = self.status() | |
1015 | if rank is not None: | |
f64942e4 AA |
1016 | try: |
1017 | mds_info = status.get_rank(self.id, rank) | |
1018 | current_state = mds_info['state'] if mds_info else None | |
1019 | log.info("Looked up MDS state for mds.{0}: {1}".format(rank, current_state)) | |
1020 | except: | |
1021 | mdsmap = self.get_mds_map(status=status) | |
1022 | if rank in mdsmap['failed']: | |
1023 | log.info("Waiting for rank {0} to come back.".format(rank)) | |
1024 | current_state = None | |
1025 | else: | |
1026 | raise | |
7c673cae FG |
1027 | elif mds_id is not None: |
1028 | # mds_info is None if no daemon with this ID exists in the map | |
1029 | mds_info = status.get_mds(mds_id) | |
1030 | current_state = mds_info['state'] if mds_info else None | |
1031 | log.info("Looked up MDS state for {0}: {1}".format(mds_id, current_state)) | |
1032 | else: | |
1033 | # In general, look for a single MDS | |
1034 | states = [m['state'] for m in status.get_ranks(self.id)] | |
1035 | if [s for s in states if s == goal_state] == [goal_state]: | |
1036 | current_state = goal_state | |
1037 | elif reject in states: | |
1038 | current_state = reject | |
1039 | else: | |
1040 | current_state = None | |
1041 | log.info("mapped states {0} to {1}".format(states, current_state)) | |
1042 | ||
1043 | elapsed = time.time() - started_at | |
1044 | if current_state == goal_state: | |
1045 | log.info("reached state '{0}' in {1}s".format(current_state, elapsed)) | |
1046 | return elapsed | |
1047 | elif reject is not None and current_state == reject: | |
1048 | raise RuntimeError("MDS in reject state {0}".format(current_state)) | |
1049 | elif timeout is not None and elapsed > timeout: | |
1050 | log.error("MDS status at timeout: {0}".format(status.get_fsmap(self.id))) | |
1051 | raise RuntimeError( | |
1052 | "Reached timeout after {0} seconds waiting for state {1}, while in state {2}".format( | |
1053 | elapsed, goal_state, current_state | |
1054 | )) | |
1055 | else: | |
1056 | time.sleep(1) | |
1057 | ||
1058 | def _read_data_xattr(self, ino_no, xattr_name, type, pool): | |
1059 | mds_id = self.mds_ids[0] | |
1060 | remote = self.mds_daemons[mds_id].remote | |
1061 | if pool is None: | |
1062 | pool = self.get_data_pool_name() | |
1063 | ||
1064 | obj_name = "{0:x}.00000000".format(ino_no) | |
1065 | ||
1066 | args = [ | |
1067 | os.path.join(self._prefix, "rados"), "-p", pool, "getxattr", obj_name, xattr_name | |
1068 | ] | |
1069 | try: | |
1070 | proc = remote.run( | |
1071 | args=args, | |
1072 | stdout=StringIO()) | |
1073 | except CommandFailedError as e: | |
1074 | log.error(e.__str__()) | |
1075 | raise ObjectNotFound(obj_name) | |
1076 | ||
1077 | data = proc.stdout.getvalue() | |
1078 | ||
1079 | p = remote.run( | |
1080 | args=[os.path.join(self._prefix, "ceph-dencoder"), "type", type, "import", "-", "decode", "dump_json"], | |
1081 | stdout=StringIO(), | |
1082 | stdin=data | |
1083 | ) | |
1084 | ||
1085 | return json.loads(p.stdout.getvalue().strip()) | |
1086 | ||
1087 | def _write_data_xattr(self, ino_no, xattr_name, data, pool=None): | |
1088 | """ | |
1089 | Write to an xattr of the 0th data object of an inode. Will | |
1090 | succeed whether the object and/or xattr already exist or not. | |
1091 | ||
1092 | :param ino_no: integer inode number | |
1093 | :param xattr_name: string name of the xattr | |
1094 | :param data: byte array data to write to the xattr | |
1095 | :param pool: name of data pool or None to use primary data pool | |
1096 | :return: None | |
1097 | """ | |
1098 | remote = self.mds_daemons[self.mds_ids[0]].remote | |
1099 | if pool is None: | |
1100 | pool = self.get_data_pool_name() | |
1101 | ||
1102 | obj_name = "{0:x}.00000000".format(ino_no) | |
1103 | args = [ | |
1104 | os.path.join(self._prefix, "rados"), "-p", pool, "setxattr", | |
1105 | obj_name, xattr_name, data | |
1106 | ] | |
1107 | remote.run( | |
1108 | args=args, | |
1109 | stdout=StringIO()) | |
1110 | ||
1111 | def read_backtrace(self, ino_no, pool=None): | |
1112 | """ | |
1113 | Read the backtrace from the data pool, return a dict in the format | |
1114 | given by inode_backtrace_t::dump, which is something like: | |
1115 | ||
1116 | :: | |
1117 | ||
1118 | rados -p cephfs_data getxattr 10000000002.00000000 parent > out.bin | |
1119 | ceph-dencoder type inode_backtrace_t import out.bin decode dump_json | |
1120 | ||
1121 | { "ino": 1099511627778, | |
1122 | "ancestors": [ | |
1123 | { "dirino": 1, | |
1124 | "dname": "blah", | |
1125 | "version": 11}], | |
1126 | "pool": 1, | |
1127 | "old_pools": []} | |
1128 | ||
1129 | :param pool: name of pool to read backtrace from. If omitted, FS must have only | |
1130 | one data pool and that will be used. | |
1131 | """ | |
1132 | return self._read_data_xattr(ino_no, "parent", "inode_backtrace_t", pool) | |
1133 | ||
1134 | def read_layout(self, ino_no, pool=None): | |
1135 | """ | |
1136 | Read 'layout' xattr of an inode and parse the result, returning a dict like: | |
1137 | :: | |
1138 | { | |
1139 | "stripe_unit": 4194304, | |
1140 | "stripe_count": 1, | |
1141 | "object_size": 4194304, | |
1142 | "pool_id": 1, | |
1143 | "pool_ns": "", | |
1144 | } | |
1145 | ||
1146 | :param pool: name of pool to read backtrace from. If omitted, FS must have only | |
1147 | one data pool and that will be used. | |
1148 | """ | |
1149 | return self._read_data_xattr(ino_no, "layout", "file_layout_t", pool) | |
1150 | ||
1151 | def _enumerate_data_objects(self, ino, size): | |
1152 | """ | |
1153 | Get the list of expected data objects for a range, and the list of objects | |
1154 | that really exist. | |
1155 | ||
1156 | :return a tuple of two lists of strings (expected, actual) | |
1157 | """ | |
1158 | stripe_size = 1024 * 1024 * 4 | |
1159 | ||
1160 | size = max(stripe_size, size) | |
1161 | ||
1162 | want_objects = [ | |
1163 | "{0:x}.{1:08x}".format(ino, n) | |
1164 | for n in range(0, ((size - 1) / stripe_size) + 1) | |
1165 | ] | |
1166 | ||
1167 | exist_objects = self.rados(["ls"], pool=self.get_data_pool_name()).split("\n") | |
1168 | ||
1169 | return want_objects, exist_objects | |
1170 | ||
1171 | def data_objects_present(self, ino, size): | |
1172 | """ | |
1173 | Check that *all* the expected data objects for an inode are present in the data pool | |
1174 | """ | |
1175 | ||
1176 | want_objects, exist_objects = self._enumerate_data_objects(ino, size) | |
1177 | missing = set(want_objects) - set(exist_objects) | |
1178 | ||
1179 | if missing: | |
1180 | log.info("Objects missing (ino {0}, size {1}): {2}".format( | |
1181 | ino, size, missing | |
1182 | )) | |
1183 | return False | |
1184 | else: | |
1185 | log.info("All objects for ino {0} size {1} found".format(ino, size)) | |
1186 | return True | |
1187 | ||
1188 | def data_objects_absent(self, ino, size): | |
1189 | want_objects, exist_objects = self._enumerate_data_objects(ino, size) | |
1190 | present = set(want_objects) & set(exist_objects) | |
1191 | ||
1192 | if present: | |
1193 | log.info("Objects not absent (ino {0}, size {1}): {2}".format( | |
1194 | ino, size, present | |
1195 | )) | |
1196 | return False | |
1197 | else: | |
1198 | log.info("All objects for ino {0} size {1} are absent".format(ino, size)) | |
1199 | return True | |
1200 | ||
1201 | def dirfrag_exists(self, ino, frag): | |
1202 | try: | |
1203 | self.rados(["stat", "{0:x}.{1:08x}".format(ino, frag)]) | |
1204 | except CommandFailedError as e: | |
1205 | return False | |
1206 | else: | |
1207 | return True | |
1208 | ||
94b18763 FG |
1209 | def rados(self, args, pool=None, namespace=None, stdin_data=None, |
1210 | stdin_file=None): | |
7c673cae FG |
1211 | """ |
1212 | Call into the `rados` CLI from an MDS | |
1213 | """ | |
1214 | ||
1215 | if pool is None: | |
1216 | pool = self.get_metadata_pool_name() | |
1217 | ||
1218 | # Doesn't matter which MDS we use to run rados commands, they all | |
1219 | # have access to the pools | |
1220 | mds_id = self.mds_ids[0] | |
1221 | remote = self.mds_daemons[mds_id].remote | |
1222 | ||
1223 | # NB we could alternatively use librados pybindings for this, but it's a one-liner | |
1224 | # using the `rados` CLI | |
1225 | args = ([os.path.join(self._prefix, "rados"), "-p", pool] + | |
1226 | (["--namespace", namespace] if namespace else []) + | |
1227 | args) | |
94b18763 FG |
1228 | |
1229 | if stdin_file is not None: | |
1230 | args = ["bash", "-c", "cat " + stdin_file + " | " + " ".join(args)] | |
1231 | ||
7c673cae FG |
1232 | p = remote.run( |
1233 | args=args, | |
1234 | stdin=stdin_data, | |
1235 | stdout=StringIO()) | |
1236 | return p.stdout.getvalue().strip() | |
1237 | ||
1238 | def list_dirfrag(self, dir_ino): | |
1239 | """ | |
1240 | Read the named object and return the list of omap keys | |
1241 | ||
1242 | :return a list of 0 or more strings | |
1243 | """ | |
1244 | ||
1245 | dirfrag_obj_name = "{0:x}.00000000".format(dir_ino) | |
1246 | ||
1247 | try: | |
1248 | key_list_str = self.rados(["listomapkeys", dirfrag_obj_name]) | |
1249 | except CommandFailedError as e: | |
1250 | log.error(e.__str__()) | |
1251 | raise ObjectNotFound(dirfrag_obj_name) | |
1252 | ||
1253 | return key_list_str.split("\n") if key_list_str else [] | |
1254 | ||
1255 | def erase_metadata_objects(self, prefix): | |
1256 | """ | |
1257 | For all objects in the metadata pool matching the prefix, | |
1258 | erase them. | |
1259 | ||
1260 | This O(N) with the number of objects in the pool, so only suitable | |
1261 | for use on toy test filesystems. | |
1262 | """ | |
1263 | all_objects = self.rados(["ls"]).split("\n") | |
1264 | matching_objects = [o for o in all_objects if o.startswith(prefix)] | |
1265 | for o in matching_objects: | |
1266 | self.rados(["rm", o]) | |
1267 | ||
1268 | def erase_mds_objects(self, rank): | |
1269 | """ | |
1270 | Erase all the per-MDS objects for a particular rank. This includes | |
1271 | inotable, sessiontable, journal | |
1272 | """ | |
1273 | ||
1274 | def obj_prefix(multiplier): | |
1275 | """ | |
1276 | MDS object naming conventions like rank 1's | |
1277 | journal is at 201.*** | |
1278 | """ | |
1279 | return "%x." % (multiplier * 0x100 + rank) | |
1280 | ||
1281 | # MDS_INO_LOG_OFFSET | |
1282 | self.erase_metadata_objects(obj_prefix(2)) | |
1283 | # MDS_INO_LOG_BACKUP_OFFSET | |
1284 | self.erase_metadata_objects(obj_prefix(3)) | |
1285 | # MDS_INO_LOG_POINTER_OFFSET | |
1286 | self.erase_metadata_objects(obj_prefix(4)) | |
1287 | # MDSTables & SessionMap | |
1288 | self.erase_metadata_objects("mds{rank:d}_".format(rank=rank)) | |
1289 | ||
1290 | @property | |
1291 | def _prefix(self): | |
1292 | """ | |
1293 | Override this to set a different | |
1294 | """ | |
1295 | return "" | |
1296 | ||
f64942e4 AA |
1297 | def _make_rank(self, rank): |
1298 | return "{}:{}".format(self.name, rank) | |
1299 | ||
7c673cae FG |
1300 | def _run_tool(self, tool, args, rank=None, quiet=False): |
1301 | # Tests frequently have [client] configuration that jacks up | |
1302 | # the objecter log level (unlikely to be interesting here) | |
1303 | # and does not set the mds log level (very interesting here) | |
1304 | if quiet: | |
1305 | base_args = [os.path.join(self._prefix, tool), '--debug-mds=1', '--debug-objecter=1'] | |
1306 | else: | |
1307 | base_args = [os.path.join(self._prefix, tool), '--debug-mds=4', '--debug-objecter=1'] | |
1308 | ||
1309 | if rank is not None: | |
f64942e4 | 1310 | base_args.extend(["--rank", "%s" % str(rank)]) |
7c673cae FG |
1311 | |
1312 | t1 = datetime.datetime.now() | |
1313 | r = self.tool_remote.run( | |
1314 | args=base_args + args, | |
1315 | stdout=StringIO()).stdout.getvalue().strip() | |
1316 | duration = datetime.datetime.now() - t1 | |
1317 | log.info("Ran {0} in time {1}, result:\n{2}".format( | |
1318 | base_args + args, duration, r | |
1319 | )) | |
1320 | return r | |
1321 | ||
1322 | @property | |
1323 | def tool_remote(self): | |
1324 | """ | |
1325 | An arbitrary remote to use when invoking recovery tools. Use an MDS host because | |
1326 | it'll definitely have keys with perms to access cephfs metadata pool. This is public | |
1327 | so that tests can use this remote to go get locally written output files from the tools. | |
1328 | """ | |
1329 | mds_id = self.mds_ids[0] | |
1330 | return self.mds_daemons[mds_id].remote | |
1331 | ||
f64942e4 | 1332 | def journal_tool(self, args, rank, quiet=False): |
7c673cae | 1333 | """ |
f64942e4 | 1334 | Invoke cephfs-journal-tool with the passed arguments for a rank, and return its stdout |
7c673cae | 1335 | """ |
f64942e4 AA |
1336 | fs_rank = self._make_rank(rank) |
1337 | return self._run_tool("cephfs-journal-tool", args, fs_rank, quiet) | |
7c673cae FG |
1338 | |
1339 | def table_tool(self, args, quiet=False): | |
1340 | """ | |
1341 | Invoke cephfs-table-tool with the passed arguments, and return its stdout | |
1342 | """ | |
1343 | return self._run_tool("cephfs-table-tool", args, None, quiet) | |
1344 | ||
1345 | def data_scan(self, args, quiet=False, worker_count=1): | |
1346 | """ | |
1347 | Invoke cephfs-data-scan with the passed arguments, and return its stdout | |
1348 | ||
1349 | :param worker_count: if greater than 1, multiple workers will be run | |
1350 | in parallel and the return value will be None | |
1351 | """ | |
1352 | ||
1353 | workers = [] | |
1354 | ||
1355 | for n in range(0, worker_count): | |
1356 | if worker_count > 1: | |
1357 | # data-scan args first token is a command, followed by args to it. | |
1358 | # insert worker arguments after the command. | |
1359 | cmd = args[0] | |
1360 | worker_args = [cmd] + ["--worker_n", n.__str__(), "--worker_m", worker_count.__str__()] + args[1:] | |
1361 | else: | |
1362 | worker_args = args | |
1363 | ||
1364 | workers.append(Greenlet.spawn(lambda wargs=worker_args: | |
1365 | self._run_tool("cephfs-data-scan", wargs, None, quiet))) | |
1366 | ||
1367 | for w in workers: | |
1368 | w.get() | |
1369 | ||
1370 | if worker_count == 1: | |
1371 | return workers[0].value | |
1372 | else: | |
1373 | return None | |
b32b8144 FG |
1374 | |
1375 | def is_full(self): | |
1376 | return self.is_pool_full(self.get_data_pool_name()) |