]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
bump version to 18.2.2-pve1
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
7c673cae
FG
4from functools import wraps
5import contextlib
522d829b 6import errno
7c673cae
FG
7import random
8import signal
9import time
10import gevent
11import base64
12import json
13import logging
14import threading
15import traceback
16import os
f67539c2 17import shlex
9f95a23c 18
f91f0fd5 19from io import BytesIO, StringIO
f67539c2 20from subprocess import DEVNULL
7c673cae
FG
21from teuthology import misc as teuthology
22from tasks.scrub import Scrubber
9f95a23c
TL
23from tasks.util.rados import cmd_erasure_code_profile
24from tasks.util import get_remote
1e59de90 25
7c673cae
FG
26from teuthology.contextutil import safe_while
27from teuthology.orchestra.remote import Remote
28from teuthology.orchestra import run
20effc67 29from teuthology.parallel import parallel
7c673cae 30from teuthology.exceptions import CommandFailedError
9f95a23c 31from tasks.thrasher import Thrasher
7c673cae 32
7c673cae
FG
33
34DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
35
36log = logging.getLogger(__name__)
37
9f95a23c
TL
38# this is for cephadm clusters
39def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
9f95a23c
TL
40 extra_args = []
41 if name:
42 extra_args = ['-n', name]
43 return remote.run(
44 args=[
45 'sudo',
46 ctx.cephadm,
47 '--image', ctx.ceph[cluster_name].image,
48 'shell',
49 ] + extra_args + [
50 '--fsid', ctx.ceph[cluster_name].fsid,
51 '--',
52 ] + args,
53 **kwargs
54 )
7c673cae 55
b3b6e05e
TL
56# this is for rook clusters
57def toolbox(ctx, cluster_name, args, **kwargs):
58 return ctx.rook[cluster_name].remote.run(
59 args=[
60 'kubectl',
61 '-n', 'rook-ceph',
62 'exec',
63 ctx.rook[cluster_name].toolbox,
64 '--',
65 ] + args,
66 **kwargs
67 )
68
69
7c673cae 70def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
9f95a23c 71 conf_fp = BytesIO()
7c673cae
FG
72 ctx.ceph[cluster].conf.write(conf_fp)
73 conf_fp.seek(0)
74 writes = ctx.cluster.run(
75 args=[
76 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
77 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
9f95a23c 78 'sudo', 'tee', conf_path, run.Raw('&&'),
7c673cae 79 'sudo', 'chmod', '0644', conf_path,
9f95a23c
TL
80 run.Raw('>'), '/dev/null',
81
7c673cae
FG
82 ],
83 stdin=run.PIPE,
84 wait=False)
85 teuthology.feed_many_stdins_and_close(conf_fp, writes)
86 run.wait(writes)
87
f67539c2
TL
88def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
89 """
90 Build a command line for running valgrind.
91
92 testdir - test results directory
93 name - name of daemon (for naming hte log file)
94 preamble - stuff we should run before valgrind
95 v - valgrind arguments
96 """
97 if v is None:
98 return preamble
99 if not isinstance(v, list):
100 v = [v]
101
102 # https://tracker.ceph.com/issues/44362
103 preamble.extend([
104 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
105 ])
106
107 val_path = '/var/log/ceph/valgrind'
108 if '--tool=memcheck' in v or '--tool=helgrind' in v:
109 extra_args = [
110 'valgrind',
111 '--trace-children=no',
112 '--child-silent-after-fork=yes',
113 '--soname-synonyms=somalloc=*tcmalloc*',
114 '--num-callers=50',
115 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
116 '--xml=yes',
117 '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
118 '--time-stamp=yes',
119 '--vgdb=yes',
120 ]
121 else:
122 extra_args = [
123 'valgrind',
124 '--trace-children=no',
125 '--child-silent-after-fork=yes',
126 '--soname-synonyms=somalloc=*tcmalloc*',
127 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
128 '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
129 '--time-stamp=yes',
130 '--vgdb=yes',
131 ]
132 if exit_on_first_error:
133 extra_args.extend([
134 # at least Valgrind 3.14 is required
135 '--exit-on-first-error=yes',
136 '--error-exitcode=42',
137 ])
138 args = []
139 if cd:
140 args += ['cd', testdir, run.Raw('&&')]
141 args += preamble + extra_args + v
142 log.debug('running %s under valgrind with args %s', name, args)
143 return args
144
7c673cae
FG
145
146def mount_osd_data(ctx, remote, cluster, osd):
147 """
148 Mount a remote OSD
149
150 :param ctx: Context
151 :param remote: Remote site
152 :param cluster: name of ceph cluster
153 :param osd: Osd name
154 """
155 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
156 role = "{0}.osd.{1}".format(cluster, osd)
157 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
158 if remote in ctx.disk_config.remote_to_roles_to_dev:
159 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
160 role = alt_role
161 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
162 return
163 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
164 mount_options = ctx.disk_config.\
165 remote_to_roles_to_dev_mount_options[remote][role]
166 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
167 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
168
169 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
170 'mountpoint: {p}, type: {t}, options: {v}'.format(
171 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
172 c=cluster))
173
174 remote.run(
175 args=[
176 'sudo',
177 'mount',
178 '-t', fstype,
179 '-o', ','.join(mount_options),
180 dev,
181 mnt,
182 ]
183 )
184
185
9f95a23c
TL
186def log_exc(func):
187 @wraps(func)
188 def wrapper(self):
189 try:
190 return func(self)
191 except:
192 self.log(traceback.format_exc())
193 raise
194 return wrapper
195
196
197class PoolType:
198 REPLICATED = 1
199 ERASURE_CODED = 3
200
201
202class OSDThrasher(Thrasher):
7c673cae
FG
203 """
204 Object used to thrash Ceph
205 """
9f95a23c
TL
206 def __init__(self, manager, config, name, logger):
207 super(OSDThrasher, self).__init__()
208
7c673cae
FG
209 self.ceph_manager = manager
210 self.cluster = manager.cluster
211 self.ceph_manager.wait_for_clean()
212 osd_status = self.ceph_manager.get_osd_status()
213 self.in_osds = osd_status['in']
214 self.live_osds = osd_status['live']
215 self.out_osds = osd_status['out']
216 self.dead_osds = osd_status['dead']
217 self.stopping = False
218 self.logger = logger
219 self.config = config
9f95a23c 220 self.name = name
3efd9988 221 self.revive_timeout = self.config.get("revive_timeout", 360)
7c673cae
FG
222 self.pools_to_fix_pgp_num = set()
223 if self.config.get('powercycle'):
224 self.revive_timeout += 120
225 self.clean_wait = self.config.get('clean_wait', 0)
3efd9988 226 self.minin = self.config.get("min_in", 4)
7c673cae
FG
227 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
228 self.sighup_delay = self.config.get('sighup_delay')
229 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
230 self.dump_ops_enable = self.config.get('dump_ops_enable')
231 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
232 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
233 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
234 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
224ce89b 235 self.random_eio = self.config.get('random_eio')
c07f9fc5 236 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
7c673cae
FG
237
238 num_osds = self.in_osds + self.out_osds
11fdf7f2
TL
239 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
240 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
7c673cae
FG
241 if self.config is None:
242 self.config = dict()
243 # prevent monitor from auto-marking things out while thrasher runs
244 # try both old and new tell syntax, in case we are testing old code
245 self.saved_options = []
246 # assuming that the default settings do not vary from one daemon to
247 # another
248 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
249 opts = [('mon', 'mon_osd_down_out_interval', 0)]
9f95a23c 250 #why do we disable marking an OSD out automatically? :/
7c673cae
FG
251 for service, opt, new_value in opts:
252 old_value = manager.get_config(first_mon[0],
253 first_mon[1],
254 opt)
255 self.saved_options.append((service, opt, old_value))
11fdf7f2 256 manager.inject_args(service, '*', opt, new_value)
7c673cae
FG
257 # initialize ceph_objectstore_tool property - must be done before
258 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
259 if (self.config.get('powercycle') or
260 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
261 self.config.get('disable_objectstore_tool_tests', False)):
262 self.ceph_objectstore_tool = False
7c673cae
FG
263 if self.config.get('powercycle'):
264 self.log("Unable to test ceph-objectstore-tool, "
265 "powercycle testing")
266 else:
267 self.log("Unable to test ceph-objectstore-tool, "
268 "not available on all OSD nodes")
269 else:
270 self.ceph_objectstore_tool = \
271 self.config.get('ceph_objectstore_tool', True)
7c673cae
FG
272 # spawn do_thrash
273 self.thread = gevent.spawn(self.do_thrash)
274 if self.sighup_delay:
275 self.sighup_thread = gevent.spawn(self.do_sighup)
276 if self.optrack_toggle_delay:
277 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
278 if self.dump_ops_enable == "true":
279 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
280 if self.noscrub_toggle_delay:
281 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
282
9f95a23c
TL
283 def log(self, msg, *args, **kwargs):
284 self.logger.info(msg, *args, **kwargs)
285
7c673cae 286 def cmd_exists_on_osds(self, cmd):
b3b6e05e 287 if self.ceph_manager.cephadm or self.ceph_manager.rook:
9f95a23c 288 return True
7c673cae
FG
289 allremotes = self.ceph_manager.ctx.cluster.only(\
290 teuthology.is_type('osd', self.cluster)).remotes.keys()
291 allremotes = list(set(allremotes))
292 for remote in allremotes:
293 proc = remote.run(args=['type', cmd], wait=True,
9f95a23c
TL
294 check_status=False, stdout=BytesIO(),
295 stderr=BytesIO())
7c673cae
FG
296 if proc.exitstatus != 0:
297 return False;
298 return True;
299
9f95a23c
TL
300 def run_ceph_objectstore_tool(self, remote, osd, cmd):
301 if self.ceph_manager.cephadm:
302 return shell(
303 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
20effc67 304 args=['ceph-objectstore-tool', '--err-to-stderr'] + cmd,
9f95a23c
TL
305 name=osd,
306 wait=True, check_status=False,
307 stdout=StringIO(),
308 stderr=StringIO())
b3b6e05e
TL
309 elif self.ceph_manager.rook:
310 assert False, 'not implemented'
9f95a23c
TL
311 else:
312 return remote.run(
20effc67 313 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool', '--err-to-stderr'] + cmd,
9f95a23c
TL
314 wait=True, check_status=False,
315 stdout=StringIO(),
316 stderr=StringIO())
317
f67539c2
TL
318 def run_ceph_bluestore_tool(self, remote, osd, cmd):
319 if self.ceph_manager.cephadm:
320 return shell(
321 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
322 args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
323 name=osd,
324 wait=True, check_status=False,
325 stdout=StringIO(),
326 stderr=StringIO())
b3b6e05e
TL
327 elif self.ceph_manager.rook:
328 assert False, 'not implemented'
f67539c2
TL
329 else:
330 return remote.run(
331 args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
332 wait=True, check_status=False,
333 stdout=StringIO(),
334 stderr=StringIO())
335
7c673cae
FG
336 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
337 """
338 :param osd: Osd to be killed.
339 :mark_down: Mark down if true.
340 :mark_out: Mark out if true.
341 """
342 if osd is None:
343 osd = random.choice(self.live_osds)
344 self.log("Killing osd %s, live_osds are %s" % (str(osd),
345 str(self.live_osds)))
346 self.live_osds.remove(osd)
347 self.dead_osds.append(osd)
348 self.ceph_manager.kill_osd(osd)
349 if mark_down:
350 self.ceph_manager.mark_down_osd(osd)
351 if mark_out and osd in self.in_osds:
352 self.out_osd(osd)
353 if self.ceph_objectstore_tool:
9f95a23c 354 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
7c673cae
FG
355 remote = self.ceph_manager.find_remote('osd', osd)
356 FSPATH = self.ceph_manager.get_filepath()
357 JPATH = os.path.join(FSPATH, "journal")
358 exp_osd = imp_osd = osd
9f95a23c 359 self.log('remote for osd %s is %s' % (osd, remote))
7c673cae
FG
360 exp_remote = imp_remote = remote
361 # If an older osd is available we'll move a pg from there
362 if (len(self.dead_osds) > 1 and
363 random.random() < self.chance_move_pg):
364 exp_osd = random.choice(self.dead_osds[:-1])
365 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
9f95a23c
TL
366 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
367 prefix = [
368 '--no-mon-config',
369 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
370 ]
371
b3b6e05e
TL
372 if self.ceph_manager.rook:
373 assert False, 'not implemented'
374
9f95a23c
TL
375 if not self.ceph_manager.cephadm:
376 # ceph-objectstore-tool might be temporarily absent during an
377 # upgrade - see http://tracker.ceph.com/issues/18014
378 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
379 while proceed():
380 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
381 wait=True, check_status=False, stdout=BytesIO(),
382 stderr=BytesIO())
383 if proc.exitstatus == 0:
384 break
385 log.debug("ceph-objectstore-tool binary not present, trying again")
7c673cae
FG
386
387 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
388 # see http://tracker.ceph.com/issues/19556
389 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
390 while proceed():
9f95a23c
TL
391 proc = self.run_ceph_objectstore_tool(
392 exp_remote, 'osd.%s' % exp_osd,
393 prefix + [
394 '--data-path', FSPATH.format(id=exp_osd),
395 '--journal-path', JPATH.format(id=exp_osd),
396 '--op', 'list-pgs',
397 ])
7c673cae
FG
398 if proc.exitstatus == 0:
399 break
9f95a23c
TL
400 elif (proc.exitstatus == 1 and
401 proc.stderr.getvalue() == "OSD has the store locked"):
7c673cae
FG
402 continue
403 else:
404 raise Exception("ceph-objectstore-tool: "
405 "exp list-pgs failure with status {ret}".
406 format(ret=proc.exitstatus))
407
f91f0fd5 408 pgs = proc.stdout.getvalue().split('\n')[:-1]
7c673cae
FG
409 if len(pgs) == 0:
410 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
411 return
412 pg = random.choice(pgs)
9f95a23c
TL
413 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
414 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
415 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
7c673cae
FG
416 "exp.{pg}.{id}".format(
417 pg=pg,
418 id=exp_osd))
9f95a23c
TL
419 if self.ceph_manager.cephadm:
420 exp_host_path = os.path.join(
421 '/var/log/ceph',
422 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
423 "exp.{pg}.{id}".format(
424 pg=pg,
425 id=exp_osd))
426 else:
427 exp_host_path = exp_path
428
7c673cae 429 # export
3efd9988 430 # Can't use new export-remove op since this is part of upgrade testing
9f95a23c
TL
431 proc = self.run_ceph_objectstore_tool(
432 exp_remote, 'osd.%s' % exp_osd,
433 prefix + [
434 '--data-path', FSPATH.format(id=exp_osd),
435 '--journal-path', JPATH.format(id=exp_osd),
436 '--op', 'export',
437 '--pgid', pg,
438 '--file', exp_path,
439 ])
7c673cae
FG
440 if proc.exitstatus:
441 raise Exception("ceph-objectstore-tool: "
442 "export failure with status {ret}".
443 format(ret=proc.exitstatus))
444 # remove
9f95a23c
TL
445 proc = self.run_ceph_objectstore_tool(
446 exp_remote, 'osd.%s' % exp_osd,
447 prefix + [
448 '--data-path', FSPATH.format(id=exp_osd),
449 '--journal-path', JPATH.format(id=exp_osd),
450 '--force',
451 '--op', 'remove',
452 '--pgid', pg,
453 ])
7c673cae
FG
454 if proc.exitstatus:
455 raise Exception("ceph-objectstore-tool: "
456 "remove failure with status {ret}".
457 format(ret=proc.exitstatus))
458 # If there are at least 2 dead osds we might move the pg
459 if exp_osd != imp_osd:
460 # If pg isn't already on this osd, then we will move it there
9f95a23c
TL
461 proc = self.run_ceph_objectstore_tool(
462 imp_remote,
463 'osd.%s' % imp_osd,
464 prefix + [
465 '--data-path', FSPATH.format(id=imp_osd),
466 '--journal-path', JPATH.format(id=imp_osd),
467 '--op', 'list-pgs',
468 ])
7c673cae
FG
469 if proc.exitstatus:
470 raise Exception("ceph-objectstore-tool: "
471 "imp list-pgs failure with status {ret}".
472 format(ret=proc.exitstatus))
f91f0fd5 473 pgs = proc.stdout.getvalue().split('\n')[:-1]
7c673cae
FG
474 if pg not in pgs:
475 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
476 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
477 if imp_remote != exp_remote:
478 # Copy export file to the other machine
479 self.log("Transfer export file from {srem} to {trem}".
480 format(srem=exp_remote, trem=imp_remote))
9f95a23c
TL
481 # just in case an upgrade make /var/log/ceph unreadable by non-root,
482 exp_remote.run(args=['sudo', 'chmod', '777',
483 '/var/log/ceph'])
484 imp_remote.run(args=['sudo', 'chmod', '777',
485 '/var/log/ceph'])
486 tmpexport = Remote.get_file(exp_remote, exp_host_path,
487 sudo=True)
488 if exp_host_path != exp_path:
489 # push to /var/log/ceph, then rename (we can't
490 # chmod 777 the /var/log/ceph/$fsid mountpoint)
491 Remote.put_file(imp_remote, tmpexport, exp_path)
492 imp_remote.run(args=[
493 'sudo', 'mv', exp_path, exp_host_path])
494 else:
495 Remote.put_file(imp_remote, tmpexport, exp_host_path)
7c673cae
FG
496 os.remove(tmpexport)
497 else:
498 # Can't move the pg after all
499 imp_osd = exp_osd
500 imp_remote = exp_remote
501 # import
9f95a23c
TL
502 proc = self.run_ceph_objectstore_tool(
503 imp_remote, 'osd.%s' % imp_osd,
504 [
505 '--data-path', FSPATH.format(id=imp_osd),
506 '--journal-path', JPATH.format(id=imp_osd),
507 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
508 '--op', 'import',
509 '--file', exp_path,
510 ])
7c673cae
FG
511 if proc.exitstatus == 1:
512 bogosity = "The OSD you are using is older than the exported PG"
513 if bogosity in proc.stderr.getvalue():
514 self.log("OSD older than exported PG"
515 "...ignored")
516 elif proc.exitstatus == 10:
517 self.log("Pool went away before processing an import"
518 "...ignored")
519 elif proc.exitstatus == 11:
520 self.log("Attempt to import an incompatible export"
521 "...ignored")
11fdf7f2
TL
522 elif proc.exitstatus == 12:
523 # this should be safe to ignore because we only ever move 1
524 # copy of the pg at a time, and merge is only initiated when
525 # all replicas are peered and happy. /me crosses fingers
526 self.log("PG merged on target"
527 "...ignored")
7c673cae
FG
528 elif proc.exitstatus:
529 raise Exception("ceph-objectstore-tool: "
530 "import failure with status {ret}".
531 format(ret=proc.exitstatus))
9f95a23c 532 cmd = "sudo rm -f {file}".format(file=exp_host_path)
7c673cae
FG
533 exp_remote.run(args=cmd)
534 if imp_remote != exp_remote:
535 imp_remote.run(args=cmd)
536
7c673cae
FG
537 def blackhole_kill_osd(self, osd=None):
538 """
539 If all else fails, kill the osd.
540 :param osd: Osd to be killed.
541 """
542 if osd is None:
543 osd = random.choice(self.live_osds)
544 self.log("Blackholing and then killing osd %s, live_osds are %s" %
545 (str(osd), str(self.live_osds)))
546 self.live_osds.remove(osd)
547 self.dead_osds.append(osd)
548 self.ceph_manager.blackhole_kill_osd(osd)
549
550 def revive_osd(self, osd=None, skip_admin_check=False):
551 """
552 Revive the osd.
553 :param osd: Osd to be revived.
554 """
555 if osd is None:
556 osd = random.choice(self.dead_osds)
557 self.log("Reviving osd %s" % (str(osd),))
558 self.ceph_manager.revive_osd(
559 osd,
560 self.revive_timeout,
561 skip_admin_check=skip_admin_check)
562 self.dead_osds.remove(osd)
563 self.live_osds.append(osd)
11fdf7f2
TL
564 if self.random_eio > 0 and osd == self.rerrosd:
565 self.ceph_manager.set_config(self.rerrosd,
566 filestore_debug_random_read_err = self.random_eio)
567 self.ceph_manager.set_config(self.rerrosd,
568 bluestore_debug_random_read_err = self.random_eio)
224ce89b 569
7c673cae
FG
570
571 def out_osd(self, osd=None):
572 """
573 Mark the osd out
574 :param osd: Osd to be marked.
575 """
576 if osd is None:
577 osd = random.choice(self.in_osds)
578 self.log("Removing osd %s, in_osds are: %s" %
579 (str(osd), str(self.in_osds)))
580 self.ceph_manager.mark_out_osd(osd)
581 self.in_osds.remove(osd)
582 self.out_osds.append(osd)
583
584 def in_osd(self, osd=None):
585 """
586 Mark the osd out
587 :param osd: Osd to be marked.
588 """
589 if osd is None:
590 osd = random.choice(self.out_osds)
591 if osd in self.dead_osds:
592 return self.revive_osd(osd)
593 self.log("Adding osd %s" % (str(osd),))
594 self.out_osds.remove(osd)
595 self.in_osds.append(osd)
596 self.ceph_manager.mark_in_osd(osd)
597 self.log("Added osd %s" % (str(osd),))
598
599 def reweight_osd_or_by_util(self, osd=None):
600 """
601 Reweight an osd that is in
602 :param osd: Osd to be marked.
603 """
604 if osd is not None or random.choice([True, False]):
605 if osd is None:
606 osd = random.choice(self.in_osds)
607 val = random.uniform(.1, 1.0)
608 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
609 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
610 str(osd), str(val))
611 else:
612 # do it several times, the option space is large
613 for i in range(5):
614 options = {
615 'max_change': random.choice(['0.05', '1.0', '3.0']),
616 'overage': random.choice(['110', '1000']),
617 'type': random.choice([
618 'reweight-by-utilization',
619 'test-reweight-by-utilization']),
620 }
621 self.log("Reweighting by: %s"%(str(options),))
622 self.ceph_manager.raw_cluster_cmd(
623 'osd',
624 options['type'],
625 options['overage'],
626 options['max_change'])
627
628 def primary_affinity(self, osd=None):
2a845540 629 self.log("primary_affinity")
7c673cae
FG
630 if osd is None:
631 osd = random.choice(self.in_osds)
632 if random.random() >= .5:
633 pa = random.random()
634 elif random.random() >= .5:
635 pa = 1
636 else:
637 pa = 0
638 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
639 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
640 str(osd), str(pa))
641
642 def thrash_cluster_full(self):
643 """
644 Set and unset cluster full condition
645 """
646 self.log('Setting full ratio to .001')
647 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
648 time.sleep(1)
649 self.log('Setting full ratio back to .95')
650 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
651
652 def thrash_pg_upmap(self):
653 """
654 Install or remove random pg_upmap entries in OSDMap
655 """
2a845540 656 self.log("thrash_pg_upmap")
7c673cae
FG
657 from random import shuffle
658 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
659 j = json.loads(out)
660 self.log('j is %s' % j)
661 try:
662 if random.random() >= .3:
663 pgs = self.ceph_manager.get_pg_stats()
9f95a23c 664 if not pgs:
2a845540 665 self.log('No pgs; doing nothing')
9f95a23c 666 return
7c673cae
FG
667 pg = random.choice(pgs)
668 pgid = str(pg['pgid'])
669 poolid = int(pgid.split('.')[0])
670 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
671 if len(sizes) == 0:
2a845540 672 self.log('No pools; doing nothing')
7c673cae
FG
673 return
674 n = sizes[0]
675 osds = self.in_osds + self.out_osds
676 shuffle(osds)
677 osds = osds[0:n]
678 self.log('Setting %s to %s' % (pgid, osds))
679 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
680 self.log('cmd %s' % cmd)
681 self.ceph_manager.raw_cluster_cmd(*cmd)
682 else:
683 m = j['pg_upmap']
684 if len(m) > 0:
685 shuffle(m)
686 pg = m[0]['pgid']
687 self.log('Clearing pg_upmap on %s' % pg)
688 self.ceph_manager.raw_cluster_cmd(
689 'osd',
690 'rm-pg-upmap',
691 pg)
692 else:
693 self.log('No pg_upmap entries; doing nothing')
694 except CommandFailedError:
695 self.log('Failed to rm-pg-upmap, ignoring')
696
697 def thrash_pg_upmap_items(self):
698 """
699 Install or remove random pg_upmap_items entries in OSDMap
700 """
2a845540 701 self.log("thrash_pg_upmap_items")
7c673cae
FG
702 from random import shuffle
703 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
704 j = json.loads(out)
705 self.log('j is %s' % j)
706 try:
707 if random.random() >= .3:
708 pgs = self.ceph_manager.get_pg_stats()
9f95a23c 709 if not pgs:
2a845540 710 self.log('No pgs; doing nothing')
9f95a23c 711 return
7c673cae
FG
712 pg = random.choice(pgs)
713 pgid = str(pg['pgid'])
714 poolid = int(pgid.split('.')[0])
715 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
716 if len(sizes) == 0:
2a845540 717 self.log('No pools; doing nothing')
7c673cae
FG
718 return
719 n = sizes[0]
720 osds = self.in_osds + self.out_osds
721 shuffle(osds)
722 osds = osds[0:n*2]
723 self.log('Setting %s to %s' % (pgid, osds))
724 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
725 self.log('cmd %s' % cmd)
726 self.ceph_manager.raw_cluster_cmd(*cmd)
727 else:
728 m = j['pg_upmap_items']
729 if len(m) > 0:
730 shuffle(m)
731 pg = m[0]['pgid']
732 self.log('Clearing pg_upmap on %s' % pg)
733 self.ceph_manager.raw_cluster_cmd(
734 'osd',
735 'rm-pg-upmap-items',
736 pg)
737 else:
738 self.log('No pg_upmap entries; doing nothing')
739 except CommandFailedError:
740 self.log('Failed to rm-pg-upmap-items, ignoring')
741
c07f9fc5
FG
742 def force_recovery(self):
743 """
744 Force recovery on some of PGs
745 """
746 backfill = random.random() >= 0.5
747 j = self.ceph_manager.get_pgids_to_force(backfill)
748 if j:
b32b8144
FG
749 try:
750 if backfill:
751 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
752 else:
753 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
754 except CommandFailedError:
755 self.log('Failed to force backfill|recovery, ignoring')
756
c07f9fc5
FG
757
758 def cancel_force_recovery(self):
759 """
760 Force recovery on some of PGs
761 """
762 backfill = random.random() >= 0.5
763 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
764 if j:
b32b8144
FG
765 try:
766 if backfill:
767 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
768 else:
769 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
770 except CommandFailedError:
771 self.log('Failed to force backfill|recovery, ignoring')
c07f9fc5
FG
772
773 def force_cancel_recovery(self):
774 """
775 Force or cancel forcing recovery
776 """
777 if random.random() >= 0.4:
778 self.force_recovery()
779 else:
780 self.cancel_force_recovery()
781
7c673cae
FG
782 def all_up(self):
783 """
784 Make sure all osds are up and not out.
785 """
786 while len(self.dead_osds) > 0:
787 self.log("reviving osd")
788 self.revive_osd()
789 while len(self.out_osds) > 0:
790 self.log("inning osd")
791 self.in_osd()
792
31f18b77
FG
793 def all_up_in(self):
794 """
795 Make sure all osds are up and fully in.
796 """
797 self.all_up();
798 for osd in self.live_osds:
799 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
800 str(osd), str(1))
801 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
802 str(osd), str(1))
803
7c673cae
FG
804 def do_join(self):
805 """
806 Break out of this Ceph loop
807 """
808 self.stopping = True
809 self.thread.get()
810 if self.sighup_delay:
811 self.log("joining the do_sighup greenlet")
812 self.sighup_thread.get()
813 if self.optrack_toggle_delay:
814 self.log("joining the do_optrack_toggle greenlet")
815 self.optrack_toggle_thread.join()
816 if self.dump_ops_enable == "true":
817 self.log("joining the do_dump_ops greenlet")
818 self.dump_ops_thread.join()
819 if self.noscrub_toggle_delay:
820 self.log("joining the do_noscrub_toggle greenlet")
821 self.noscrub_toggle_thread.join()
822
823 def grow_pool(self):
824 """
825 Increase the size of the pool
826 """
827 pool = self.ceph_manager.get_pool()
9f95a23c
TL
828 if pool is None:
829 return
7c673cae
FG
830 self.log("Growing pool %s" % (pool,))
831 if self.ceph_manager.expand_pool(pool,
832 self.config.get('pool_grow_by', 10),
833 self.max_pgs):
834 self.pools_to_fix_pgp_num.add(pool)
835
11fdf7f2
TL
836 def shrink_pool(self):
837 """
838 Decrease the size of the pool
839 """
840 pool = self.ceph_manager.get_pool()
9f95a23c
TL
841 if pool is None:
842 return
843 _ = self.ceph_manager.get_pool_pg_num(pool)
11fdf7f2
TL
844 self.log("Shrinking pool %s" % (pool,))
845 if self.ceph_manager.contract_pool(
846 pool,
847 self.config.get('pool_shrink_by', 10),
848 self.min_pgs):
849 self.pools_to_fix_pgp_num.add(pool)
850
7c673cae
FG
851 def fix_pgp_num(self, pool=None):
852 """
853 Fix number of pgs in pool.
854 """
855 if pool is None:
856 pool = self.ceph_manager.get_pool()
9f95a23c
TL
857 if not pool:
858 return
7c673cae
FG
859 force = False
860 else:
861 force = True
862 self.log("fixing pg num pool %s" % (pool,))
863 if self.ceph_manager.set_pool_pgpnum(pool, force):
864 self.pools_to_fix_pgp_num.discard(pool)
865
866 def test_pool_min_size(self):
867 """
9f95a23c
TL
868 Loop to selectively push PGs below their min_size and test that recovery
869 still occurs.
7c673cae
FG
870 """
871 self.log("test_pool_min_size")
872 self.all_up()
2a845540 873 time.sleep(60) # buffer time for recovery to start.
7c673cae
FG
874 self.ceph_manager.wait_for_recovery(
875 timeout=self.config.get('timeout')
876 )
9f95a23c
TL
877 minout = int(self.config.get("min_out", 1))
878 minlive = int(self.config.get("min_live", 2))
879 mindead = int(self.config.get("min_dead", 1))
880 self.log("doing min_size thrashing")
2a845540 881 self.ceph_manager.wait_for_clean(timeout=180)
9f95a23c
TL
882 assert self.ceph_manager.is_clean(), \
883 'not clean before minsize thrashing starts'
884 while not self.stopping:
885 # look up k and m from all the pools on each loop, in case it
886 # changes as the cluster runs
887 k = 0
888 m = 99
889 has_pools = False
890 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
891
892 for pool_json in pools_json:
893 pool = pool_json['pool_name']
894 has_pools = True
895 pool_type = pool_json['type'] # 1 for rep, 3 for ec
896 min_size = pool_json['min_size']
897 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
898 try:
899 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
900 if pool_type != PoolType.ERASURE_CODED:
901 continue
902 ec_profile = pool_json['erasure_code_profile']
903 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
904 'osd',
905 'erasure-code-profile',
906 'get',
907 ec_profile,
908 '--format=json')
909 ec_json = json.loads(ec_profile_json)
910 local_k = int(ec_json['k'])
911 local_m = int(ec_json['m'])
912 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
913 k=local_k, m=local_m))
914 if local_k > k:
915 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
916 k = local_k
917 if local_m < m:
918 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
919 m = local_m
920 except CommandFailedError:
921 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
922 continue
923
924 if has_pools :
925 self.log("using k={k}, m={m}".format(k=k,m=m))
926 else:
927 self.log("No pools yet, waiting")
928 time.sleep(5)
929 continue
930
931 if minout > len(self.out_osds): # kill OSDs and mark out
932 self.log("forced to out an osd")
933 self.kill_osd(mark_out=True)
934 continue
935 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
936 self.log("forced to kill an osd")
937 self.kill_osd()
938 continue
939 else: # make mostly-random choice to kill or revive OSDs
940 minup = max(minlive, k)
941 rand_val = random.uniform(0, 1)
942 self.log("choosing based on number of live OSDs and rand val {rand}".\
943 format(rand=rand_val))
944 if len(self.live_osds) > minup+1 and rand_val < 0.5:
945 # chose to knock out as many OSDs as we can w/out downing PGs
946
947 most_killable = min(len(self.live_osds) - minup, m)
948 self.log("chose to kill {n} OSDs".format(n=most_killable))
949 for i in range(1, most_killable):
950 self.kill_osd(mark_out=True)
951 time.sleep(10)
952 # try a few times since there might be a concurrent pool
953 # creation or deletion
954 with safe_while(
2a845540 955 sleep=25, tries=5,
9f95a23c
TL
956 action='check for active or peered') as proceed:
957 while proceed():
958 if self.ceph_manager.all_active_or_peered():
959 break
960 self.log('not all PGs are active or peered')
961 else: # chose to revive OSDs, bring up a random fraction of the dead ones
962 self.log("chose to revive osds")
963 for i in range(1, int(rand_val * len(self.dead_osds))):
964 self.revive_osd(i)
965
966 # let PGs repair themselves or our next knockout might kill one
967 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
968
969 # / while not self.stopping
970 self.all_up_in()
971
7c673cae
FG
972 self.ceph_manager.wait_for_recovery(
973 timeout=self.config.get('timeout')
974 )
975
976 def inject_pause(self, conf_key, duration, check_after, should_be_down):
977 """
978 Pause injection testing. Check for osd being down when finished.
979 """
980 the_one = random.choice(self.live_osds)
2a845540 981 self.log("inject_pause on osd.{osd}".format(osd=the_one))
7c673cae
FG
982 self.log(
983 "Testing {key} pause injection for duration {duration}".format(
984 key=conf_key,
985 duration=duration
986 ))
987 self.log(
988 "Checking after {after}, should_be_down={shouldbedown}".format(
989 after=check_after,
990 shouldbedown=should_be_down
991 ))
992 self.ceph_manager.set_config(the_one, **{conf_key: duration})
993 if not should_be_down:
994 return
995 time.sleep(check_after)
996 status = self.ceph_manager.get_osd_status()
997 assert the_one in status['down']
998 time.sleep(duration - check_after + 20)
999 status = self.ceph_manager.get_osd_status()
1000 assert not the_one in status['down']
1001
1002 def test_backfill_full(self):
1003 """
1004 Test backfills stopping when the replica fills up.
1005
1006 First, use injectfull admin command to simulate a now full
1007 osd by setting it to 0 on all of the OSDs.
1008
1009 Second, on a random subset, set
1010 osd_debug_skip_full_check_in_backfill_reservation to force
1011 the more complicated check in do_scan to be exercised.
1012
3efd9988 1013 Then, verify that all backfillings stop.
7c673cae
FG
1014 """
1015 self.log("injecting backfill full")
1016 for i in self.live_osds:
1017 self.ceph_manager.set_config(
1018 i,
1019 osd_debug_skip_full_check_in_backfill_reservation=
1020 random.choice(['false', 'true']))
1021 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
1022 check_status=True, timeout=30, stdout=DEVNULL)
1023 for i in range(30):
1024 status = self.ceph_manager.compile_pg_status()
3efd9988 1025 if 'backfilling' not in status.keys():
7c673cae
FG
1026 break
1027 self.log(
3efd9988
FG
1028 "waiting for {still_going} backfillings".format(
1029 still_going=status.get('backfilling')))
7c673cae 1030 time.sleep(1)
3efd9988 1031 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
7c673cae
FG
1032 for i in self.live_osds:
1033 self.ceph_manager.set_config(
1034 i,
1035 osd_debug_skip_full_check_in_backfill_reservation='false')
1036 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
1037 check_status=True, timeout=30, stdout=DEVNULL)
1038
f67539c2
TL
1039
1040 def generate_random_sharding(self):
1041 prefixes = [
1042 'm','O','P','L'
1043 ]
1044 new_sharding = ''
1045 for prefix in prefixes:
1046 choose = random.choice([False, True])
1047 if not choose:
1048 continue
1049 if new_sharding != '':
1050 new_sharding = new_sharding + ' '
1051 columns = random.randint(1, 5)
1052 do_hash = random.choice([False, True])
1053 if do_hash:
1054 low_hash = random.choice([0, 5, 8])
1055 do_high_hash = random.choice([False, True])
1056 if do_high_hash:
1057 high_hash = random.choice([8, 16, 30]) + low_hash
1058 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
1059 else:
1060 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
1061 else:
1062 if columns == 1:
1063 new_sharding = new_sharding + prefix
1064 else:
1065 new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
1066 return new_sharding
1067
1068 def test_bluestore_reshard_action(self):
1069 """
1070 Test if resharding of bluestore works properly.
1071 If bluestore is not used, or bluestore is in version that
1072 does not support sharding, skip.
1073 """
1074
1075 osd = random.choice(self.dead_osds)
1076 remote = self.ceph_manager.find_remote('osd', osd)
1077 FSPATH = self.ceph_manager.get_filepath()
1078
1079 prefix = [
1080 '--no-mon-config',
1081 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1082 '--log-level=10',
1083 '--path', FSPATH.format(id=osd)
1084 ]
1085
1086 # sanity check if bluestore-tool accessible
1087 self.log('checking if target objectstore is bluestore on osd.%s' % osd)
1088 cmd = prefix + [
1089 'show-label'
1090 ]
1091 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1092 if proc.exitstatus != 0:
1093 raise Exception("ceph-bluestore-tool access failed.")
1094
1095 # check if sharding is possible
1096 self.log('checking if target bluestore supports sharding on osd.%s' % osd)
1097 cmd = prefix + [
1098 'show-sharding'
1099 ]
1100 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1101 if proc.exitstatus != 0:
1102 self.log("Unable to test resharding, "
1103 "ceph-bluestore-tool does not support it.")
1104 return
1105
1106 # now go for reshard to something else
1107 self.log('applying new sharding to bluestore on osd.%s' % osd)
1108 new_sharding = self.config.get('bluestore_new_sharding','random')
1109
1110 if new_sharding == 'random':
1111 self.log('generate random sharding')
1112 new_sharding = self.generate_random_sharding()
1113
1114 self.log("applying new sharding: " + new_sharding)
1115 cmd = prefix + [
1116 '--sharding', new_sharding,
1117 'reshard'
1118 ]
1119 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1120 if proc.exitstatus != 0:
1121 raise Exception("ceph-bluestore-tool resharding failed.")
1122
1123 # now do fsck to
1124 self.log('running fsck to verify new sharding on osd.%s' % osd)
1125 cmd = prefix + [
1126 'fsck'
1127 ]
1128 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1129 if proc.exitstatus != 0:
1130 raise Exception("ceph-bluestore-tool fsck failed.")
1131 self.log('resharding successfully completed')
1132
1133 def test_bluestore_reshard(self):
1134 """
1135 1) kills an osd
1136 2) reshards bluestore on killed osd
1137 3) revives the osd
1138 """
1139 self.log('test_bluestore_reshard started')
1140 self.kill_osd(mark_down=True, mark_out=True)
1141 self.test_bluestore_reshard_action()
1142 self.revive_osd()
1143 self.log('test_bluestore_reshard completed')
1144
1145
7c673cae
FG
1146 def test_map_discontinuity(self):
1147 """
1148 1) Allows the osds to recover
1149 2) kills an osd
1150 3) allows the remaining osds to recover
1151 4) waits for some time
1152 5) revives the osd
1153 This sequence should cause the revived osd to have to handle
1154 a map gap since the mons would have trimmed
1155 """
2a845540 1156 self.log("test_map_discontinuity")
7c673cae
FG
1157 while len(self.in_osds) < (self.minin + 1):
1158 self.in_osd()
1159 self.log("Waiting for recovery")
c07f9fc5 1160 self.ceph_manager.wait_for_all_osds_up(
7c673cae
FG
1161 timeout=self.config.get('timeout')
1162 )
1163 # now we wait 20s for the pg status to change, if it takes longer,
1164 # the test *should* fail!
1165 time.sleep(20)
1166 self.ceph_manager.wait_for_clean(
1167 timeout=self.config.get('timeout')
1168 )
1169
1170 # now we wait 20s for the backfill replicas to hear about the clean
1171 time.sleep(20)
1172 self.log("Recovered, killing an osd")
1173 self.kill_osd(mark_down=True, mark_out=True)
1174 self.log("Waiting for clean again")
1175 self.ceph_manager.wait_for_clean(
1176 timeout=self.config.get('timeout')
1177 )
1178 self.log("Waiting for trim")
1179 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
1180 self.revive_osd()
1181
1182 def choose_action(self):
1183 """
1184 Random action selector.
1185 """
1186 chance_down = self.config.get('chance_down', 0.4)
9f95a23c 1187 _ = self.config.get('chance_test_min_size', 0)
7c673cae
FG
1188 chance_test_backfill_full = \
1189 self.config.get('chance_test_backfill_full', 0)
1190 if isinstance(chance_down, int):
1191 chance_down = float(chance_down) / 100
1192 minin = self.minin
9f95a23c
TL
1193 minout = int(self.config.get("min_out", 0))
1194 minlive = int(self.config.get("min_live", 2))
1195 mindead = int(self.config.get("min_dead", 0))
7c673cae
FG
1196
1197 self.log('choose_action: min_in %d min_out '
2a845540
TL
1198 '%d min_live %d min_dead %d '
1199 'chance_down %.2f' %
1200 (minin, minout, minlive, mindead, chance_down))
7c673cae
FG
1201 actions = []
1202 if len(self.in_osds) > minin:
1203 actions.append((self.out_osd, 1.0,))
1204 if len(self.live_osds) > minlive and chance_down > 0:
1205 actions.append((self.kill_osd, chance_down,))
7c673cae
FG
1206 if len(self.out_osds) > minout:
1207 actions.append((self.in_osd, 1.7,))
1208 if len(self.dead_osds) > mindead:
1209 actions.append((self.revive_osd, 1.0,))
1210 if self.config.get('thrash_primary_affinity', True):
1211 actions.append((self.primary_affinity, 1.0,))
1212 actions.append((self.reweight_osd_or_by_util,
1213 self.config.get('reweight_osd', .5),))
1214 actions.append((self.grow_pool,
1215 self.config.get('chance_pgnum_grow', 0),))
11fdf7f2
TL
1216 actions.append((self.shrink_pool,
1217 self.config.get('chance_pgnum_shrink', 0),))
7c673cae
FG
1218 actions.append((self.fix_pgp_num,
1219 self.config.get('chance_pgpnum_fix', 0),))
1220 actions.append((self.test_pool_min_size,
9f95a23c 1221 self.config.get('chance_test_min_size', 0),))
7c673cae
FG
1222 actions.append((self.test_backfill_full,
1223 chance_test_backfill_full,))
1224 if self.chance_thrash_cluster_full > 0:
1225 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1226 if self.chance_thrash_pg_upmap > 0:
1227 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1228 if self.chance_thrash_pg_upmap_items > 0:
1229 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
c07f9fc5
FG
1230 if self.chance_force_recovery > 0:
1231 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
7c673cae
FG
1232
1233 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1234 for scenario in [
1235 (lambda:
1236 self.inject_pause(key,
1237 self.config.get('pause_short', 3),
1238 0,
1239 False),
1240 self.config.get('chance_inject_pause_short', 1),),
1241 (lambda:
1242 self.inject_pause(key,
1243 self.config.get('pause_long', 80),
1244 self.config.get('pause_check_after', 70),
1245 True),
1246 self.config.get('chance_inject_pause_long', 0),)]:
1247 actions.append(scenario)
1248
f67539c2
TL
1249 # only consider resharding if objectstore is bluestore
1250 cluster_name = self.ceph_manager.cluster
1251 cluster = self.ceph_manager.ctx.ceph[cluster_name]
1252 if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1253 actions.append((self.test_bluestore_reshard,
1254 self.config.get('chance_bluestore_reshard', 0),))
1255
7c673cae
FG
1256 total = sum([y for (x, y) in actions])
1257 val = random.uniform(0, total)
1258 for (action, prob) in actions:
1259 if val < prob:
1260 return action
1261 val -= prob
1262 return None
1263
9f95a23c
TL
1264 def do_thrash(self):
1265 """
1266 _do_thrash() wrapper.
1267 """
1268 try:
1269 self._do_thrash()
1270 except Exception as e:
1271 # See _run exception comment for MDSThrasher
1272 self.set_thrasher_exception(e)
1273 self.logger.exception("exception:")
1274 # Allow successful completion so gevent doesn't see an exception.
1275 # The DaemonWatchdog will observe the error and tear down the test.
7c673cae
FG
1276
1277 @log_exc
1278 def do_sighup(self):
1279 """
1280 Loops and sends signal.SIGHUP to a random live osd.
1281
1282 Loop delay is controlled by the config value sighup_delay.
1283 """
1284 delay = float(self.sighup_delay)
1285 self.log("starting do_sighup with a delay of {0}".format(delay))
1286 while not self.stopping:
1287 osd = random.choice(self.live_osds)
1288 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1289 time.sleep(delay)
1290
1291 @log_exc
1292 def do_optrack_toggle(self):
1293 """
1294 Loops and toggle op tracking to all osds.
1295
1296 Loop delay is controlled by the config value optrack_toggle_delay.
1297 """
1298 delay = float(self.optrack_toggle_delay)
1299 osd_state = "true"
1300 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1301 while not self.stopping:
1302 if osd_state == "true":
1303 osd_state = "false"
1304 else:
1305 osd_state = "true"
11fdf7f2
TL
1306 try:
1307 self.ceph_manager.inject_args('osd', '*',
1308 'osd_enable_op_tracker',
1309 osd_state)
1310 except CommandFailedError:
1311 self.log('Failed to tell all osds, ignoring')
7c673cae
FG
1312 gevent.sleep(delay)
1313
1314 @log_exc
1315 def do_dump_ops(self):
1316 """
1317 Loops and does op dumps on all osds
1318 """
1319 self.log("starting do_dump_ops")
1320 while not self.stopping:
1321 for osd in self.live_osds:
1322 # Ignore errors because live_osds is in flux
1323 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1324 check_status=False, timeout=30, stdout=DEVNULL)
1325 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1326 check_status=False, timeout=30, stdout=DEVNULL)
1327 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1328 check_status=False, timeout=30, stdout=DEVNULL)
1329 gevent.sleep(0)
1330
1331 @log_exc
1332 def do_noscrub_toggle(self):
1333 """
1334 Loops and toggle noscrub flags
1335
1336 Loop delay is controlled by the config value noscrub_toggle_delay.
1337 """
1338 delay = float(self.noscrub_toggle_delay)
1339 scrub_state = "none"
1340 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1341 while not self.stopping:
1342 if scrub_state == "none":
1343 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1344 scrub_state = "noscrub"
1345 elif scrub_state == "noscrub":
1346 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1347 scrub_state = "both"
1348 elif scrub_state == "both":
1349 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1350 scrub_state = "nodeep-scrub"
1351 else:
1352 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1353 scrub_state = "none"
1354 gevent.sleep(delay)
1355 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1356 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1357
1358 @log_exc
9f95a23c 1359 def _do_thrash(self):
7c673cae
FG
1360 """
1361 Loop to select random actions to thrash ceph manager with.
1362 """
1363 cleanint = self.config.get("clean_interval", 60)
1364 scrubint = self.config.get("scrub_interval", -1)
1365 maxdead = self.config.get("max_dead", 0)
1366 delay = self.config.get("op_delay", 5)
224ce89b
WB
1367 self.rerrosd = self.live_osds[0]
1368 if self.random_eio > 0:
11fdf7f2
TL
1369 self.ceph_manager.inject_args('osd', self.rerrosd,
1370 'filestore_debug_random_read_err',
1371 self.random_eio)
1372 self.ceph_manager.inject_args('osd', self.rerrosd,
1373 'bluestore_debug_random_read_err',
1374 self.random_eio)
7c673cae
FG
1375 self.log("starting do_thrash")
1376 while not self.stopping:
1377 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1378 "out_osds: ", self.out_osds,
1379 "dead_osds: ", self.dead_osds,
1380 "live_osds: ", self.live_osds]]
1381 self.log(" ".join(to_log))
1382 if random.uniform(0, 1) < (float(delay) / cleanint):
1383 while len(self.dead_osds) > maxdead:
1384 self.revive_osd()
1385 for osd in self.in_osds:
1386 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1387 str(osd), str(1))
1388 if random.uniform(0, 1) < float(
11fdf7f2
TL
1389 self.config.get('chance_test_map_discontinuity', 0)) \
1390 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
7c673cae
FG
1391 self.test_map_discontinuity()
1392 else:
1393 self.ceph_manager.wait_for_recovery(
1394 timeout=self.config.get('timeout')
1395 )
1396 time.sleep(self.clean_wait)
1397 if scrubint > 0:
1398 if random.uniform(0, 1) < (float(delay) / scrubint):
1399 self.log('Scrubbing while thrashing being performed')
1400 Scrubber(self.ceph_manager, self.config)
1401 self.choose_action()()
1402 time.sleep(delay)
181888fb 1403 self.all_up()
224ce89b 1404 if self.random_eio > 0:
11fdf7f2
TL
1405 self.ceph_manager.inject_args('osd', self.rerrosd,
1406 'filestore_debug_random_read_err', '0.0')
1407 self.ceph_manager.inject_args('osd', self.rerrosd,
1408 'bluestore_debug_random_read_err', '0.0')
7c673cae
FG
1409 for pool in list(self.pools_to_fix_pgp_num):
1410 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1411 self.fix_pgp_num(pool)
1412 self.pools_to_fix_pgp_num.clear()
1413 for service, opt, saved_value in self.saved_options:
11fdf7f2 1414 self.ceph_manager.inject_args(service, '*', opt, saved_value)
7c673cae 1415 self.saved_options = []
31f18b77 1416 self.all_up_in()
7c673cae
FG
1417
1418
1419class ObjectStoreTool:
1420
1421 def __init__(self, manager, pool, **kwargs):
1422 self.manager = manager
1423 self.pool = pool
1424 self.osd = kwargs.get('osd', None)
1425 self.object_name = kwargs.get('object_name', None)
1426 self.do_revive = kwargs.get('do_revive', True)
1427 if self.osd and self.pool and self.object_name:
1428 if self.osd == "primary":
1429 self.osd = self.manager.get_object_primary(self.pool,
1430 self.object_name)
a4b75251 1431 assert self.osd is not None
7c673cae
FG
1432 if self.object_name:
1433 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1434 self.object_name,
1435 self.osd)
e306af50
TL
1436 self.remote = next(iter(self.manager.ctx.\
1437 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
7c673cae
FG
1438 path = self.manager.get_filepath().format(id=self.osd)
1439 self.paths = ("--data-path {path} --journal-path {path}/journal".
1440 format(path=path))
1441
1442 def build_cmd(self, options, args, stdin):
1443 lines = []
1444 if self.object_name:
1445 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1446 "{paths} --pgid {pgid} --op list |"
1447 "grep '\"oid\":\"{name}\"')".
1448 format(paths=self.paths,
1449 pgid=self.pgid,
1450 name=self.object_name))
1451 args = '"$object" ' + args
1452 options += " --pgid {pgid}".format(pgid=self.pgid)
1453 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1454 format(paths=self.paths,
1455 args=args,
1456 options=options))
1457 if stdin:
1458 cmd = ("echo {payload} | base64 --decode | {cmd}".
1459 format(payload=base64.encode(stdin),
1460 cmd=cmd))
1461 lines.append(cmd)
1462 return "\n".join(lines)
1463
f91f0fd5 1464 def run(self, options, args):
7c673cae 1465 self.manager.kill_osd(self.osd)
f91f0fd5 1466 cmd = self.build_cmd(options, args, None)
7c673cae
FG
1467 self.manager.log(cmd)
1468 try:
1469 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1470 check_status=False,
f91f0fd5 1471 stdout=BytesIO(),
9f95a23c 1472 stderr=BytesIO())
7c673cae
FG
1473 proc.wait()
1474 if proc.exitstatus != 0:
1475 self.manager.log("failed with " + str(proc.exitstatus))
f91f0fd5
TL
1476 error = proc.stdout.getvalue().decode() + " " + \
1477 proc.stderr.getvalue().decode()
7c673cae
FG
1478 raise Exception(error)
1479 finally:
1480 if self.do_revive:
1481 self.manager.revive_osd(self.osd)
c07f9fc5 1482 self.manager.wait_till_osd_is_up(self.osd, 300)
7c673cae
FG
1483
1484
9f95a23c
TL
1485# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1486# the same name.
7c673cae
FG
1487class CephManager:
1488 """
1489 Ceph manager object.
1490 Contains several local functions that form a bulk of this module.
1491
9f95a23c
TL
1492 :param controller: the remote machine where the Ceph commands should be
1493 executed
1494 :param ctx: the cluster context
1495 :param config: path to Ceph config file
1496 :param logger: for logging messages
1497 :param cluster: name of the Ceph cluster
7c673cae
FG
1498 """
1499
7c673cae 1500 def __init__(self, controller, ctx=None, config=None, logger=None,
b3b6e05e 1501 cluster='ceph', cephadm=False, rook=False) -> None:
7c673cae
FG
1502 self.lock = threading.RLock()
1503 self.ctx = ctx
1504 self.config = config
1505 self.controller = controller
1506 self.next_pool_id = 0
1507 self.cluster = cluster
20effc67 1508
7c673cae
FG
1509 if (logger):
1510 self.log = lambda x: logger.info(x)
1511 else:
1512 def tmp(x):
1513 """
1514 implement log behavior.
1515 """
9f95a23c 1516 print(x)
7c673cae 1517 self.log = tmp
20effc67 1518
7c673cae
FG
1519 if self.config is None:
1520 self.config = dict()
20effc67
TL
1521
1522 # NOTE: These variables are meant to be overriden by vstart_runner.py.
1523 self.rook = rook
1524 self.cephadm = cephadm
1525 self.testdir = teuthology.get_testdir(self.ctx)
1e59de90
TL
1526 # prefix args for ceph cmds to be executed
1527 pre = ['adjust-ulimits', 'ceph-coverage',
1528 f'{self.testdir}/archive/coverage']
1529 self.CEPH_CMD = ['sudo'] + pre + ['timeout', '120', 'ceph',
1530 '--cluster', self.cluster]
1531 self.RADOS_CMD = pre + ['rados', '--cluster', self.cluster]
20effc67
TL
1532 self.run_ceph_w_prefix = ['sudo', 'daemon-helper', 'kill', 'ceph',
1533 '--cluster', self.cluster]
1534
7c673cae
FG
1535 pools = self.list_pools()
1536 self.pools = {}
1537 for pool in pools:
1538 # we may race with a pool deletion; ignore failures here
1539 try:
9f95a23c 1540 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
7c673cae
FG
1541 except CommandFailedError:
1542 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1543
f67539c2 1544 def ceph(self, cmd, **kwargs):
7c673cae 1545 """
f67539c2
TL
1546 Simple Ceph admin command wrapper around run_cluster_cmd.
1547 """
1548
1549 kwargs.pop('args', None)
1550 args = shlex.split(cmd)
1551 stdout = kwargs.pop('stdout', StringIO())
1552 stderr = kwargs.pop('stderr', StringIO())
1553 return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
1554
1555 def run_cluster_cmd(self, **kwargs):
1556 """
1557 Run a Ceph command and return the object representing the process
1558 for the command.
1559
1560 Accepts arguments same as that of teuthology.orchestra.run.run()
7c673cae 1561 """
20effc67
TL
1562 if isinstance(kwargs['args'], str):
1563 kwargs['args'] = shlex.split(kwargs['args'])
1564 elif isinstance(kwargs['args'], tuple):
1565 kwargs['args'] = list(kwargs['args'])
1566
1e59de90
TL
1567 prefixcmd = []
1568 timeoutcmd = kwargs.pop('timeoutcmd', None)
1569 if timeoutcmd is not None:
1570 prefixcmd += ['timeout', str(timeoutcmd)]
1571
9f95a23c 1572 if self.cephadm:
1e59de90
TL
1573 prefixcmd += ['ceph']
1574 cmd = prefixcmd + list(kwargs['args'])
f67539c2 1575 return shell(self.ctx, self.cluster, self.controller,
1e59de90 1576 args=cmd,
f67539c2
TL
1577 stdout=StringIO(),
1578 check_status=kwargs.get('check_status', True))
1e59de90
TL
1579 elif self.rook:
1580 prefixcmd += ['ceph']
1581 cmd = prefixcmd + list(kwargs['args'])
b3b6e05e 1582 return toolbox(self.ctx, self.cluster,
1e59de90 1583 args=cmd,
b3b6e05e
TL
1584 stdout=StringIO(),
1585 check_status=kwargs.get('check_status', True))
1e59de90
TL
1586 else:
1587 kwargs['args'] = prefixcmd + self.CEPH_CMD + kwargs['args']
1588 return self.controller.run(**kwargs)
f67539c2
TL
1589
1590 def raw_cluster_cmd(self, *args, **kwargs) -> str:
1591 """
1592 Start ceph on a raw cluster. Return count
1593 """
20effc67
TL
1594 if kwargs.get('args') is None and args:
1595 kwargs['args'] = args
1596 kwargs['stdout'] = kwargs.pop('stdout', StringIO())
1597 return self.run_cluster_cmd(**kwargs).stdout.getvalue()
7c673cae 1598
11fdf7f2 1599 def raw_cluster_cmd_result(self, *args, **kwargs):
7c673cae
FG
1600 """
1601 Start ceph on a cluster. Return success or failure information.
1602 """
20effc67
TL
1603 if kwargs.get('args') is None and args:
1604 kwargs['args'] = args
1605 kwargs['check_status'] = False
f67539c2 1606 return self.run_cluster_cmd(**kwargs).exitstatus
7c673cae 1607
1e59de90
TL
1608 def get_keyring(self, client_id):
1609 """
1610 Return keyring for the given client.
1611
1612 :param client_id: str
1613 :return keyring: str
1614 """
1615 if client_id.find('client.') != -1:
1616 client_id = client_id.replace('client.', '')
1617
1618 keyring = self.run_cluster_cmd(args=f'auth get client.{client_id}',
1619 stdout=StringIO()).\
1620 stdout.getvalue().strip()
1621
1622 assert isinstance(keyring, str) and keyring != ''
1623 return keyring
1624
11fdf7f2 1625 def run_ceph_w(self, watch_channel=None):
7c673cae 1626 """
9f95a23c 1627 Execute "ceph -w" in the background with stdout connected to a BytesIO,
7c673cae 1628 and return the RemoteProcess.
11fdf7f2
TL
1629
1630 :param watch_channel: Specifies the channel to be watched. This can be
1631 'cluster', 'audit', ...
1632 :type watch_channel: str
1633 """
20effc67 1634 args = self.run_ceph_w_prefix + ['-w']
11fdf7f2
TL
1635 if watch_channel is not None:
1636 args.append("--watch-channel")
1637 args.append(watch_channel)
e306af50 1638 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
9f95a23c
TL
1639
1640 def get_mon_socks(self):
1641 """
1642 Get monitor sockets.
1643
1644 :return socks: tuple of strings; strings are individual sockets.
1645 """
1646 from json import loads
1647
1648 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1649 socks = []
1650 for mon in output['mons']:
1651 for addrvec_mem in mon['public_addrs']['addrvec']:
1652 socks.append(addrvec_mem['addr'])
1653 return tuple(socks)
1654
1655 def get_msgrv1_mon_socks(self):
1656 """
1657 Get monitor sockets that use msgrv1 to operate.
1658
1659 :return socks: tuple of strings; strings are individual sockets.
1660 """
1661 from json import loads
1662
1663 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1664 socks = []
1665 for mon in output['mons']:
1666 for addrvec_mem in mon['public_addrs']['addrvec']:
1667 if addrvec_mem['type'] == 'v1':
1668 socks.append(addrvec_mem['addr'])
1669 return tuple(socks)
1670
1671 def get_msgrv2_mon_socks(self):
1672 """
1673 Get monitor sockets that use msgrv2 to operate.
1674
1675 :return socks: tuple of strings; strings are individual sockets.
1676 """
1677 from json import loads
1678
1679 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1680 socks = []
1681 for mon in output['mons']:
1682 for addrvec_mem in mon['public_addrs']['addrvec']:
1683 if addrvec_mem['type'] == 'v2':
1684 socks.append(addrvec_mem['addr'])
1685 return tuple(socks)
7c673cae 1686
224ce89b 1687 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
31f18b77
FG
1688 """
1689 Flush pg stats from a list of OSD ids, ensuring they are reflected
1690 all the way to the monitor. Luminous and later only.
1691
1692 :param osds: list of OSDs to flush
1693 :param no_wait: list of OSDs not to wait for seq id. by default, we
1694 wait for all specified osds, but some of them could be
1695 moved out of osdmap, so we cannot get their updated
1696 stat seq from monitor anymore. in that case, you need
f67539c2 1697 to pass a blocklist.
31f18b77 1698 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
224ce89b 1699 it. (5 min by default)
31f18b77 1700 """
31f18b77
FG
1701 if no_wait is None:
1702 no_wait = []
20effc67
TL
1703
1704 def flush_one_osd(osd: int, wait_for_mon: int):
1705 need = int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1706 if not wait_for_mon:
1707 return
31f18b77 1708 if osd in no_wait:
20effc67 1709 return
31f18b77
FG
1710 got = 0
1711 while wait_for_mon > 0:
11fdf7f2 1712 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
31f18b77
FG
1713 self.log('need seq {need} got {got} for osd.{osd}'.format(
1714 need=need, got=got, osd=osd))
1715 if got >= need:
1716 break
1717 A_WHILE = 1
1718 time.sleep(A_WHILE)
1719 wait_for_mon -= A_WHILE
1720 else:
1721 raise Exception('timed out waiting for mon to be updated with '
1722 'osd.{osd}: {got} < {need}'.
1723 format(osd=osd, got=got, need=need))
1724
20effc67
TL
1725 with parallel() as p:
1726 for osd in osds:
1727 p.spawn(flush_one_osd, osd, wait_for_mon)
1728
31f18b77
FG
1729 def flush_all_pg_stats(self):
1730 self.flush_pg_stats(range(len(self.get_osd_dump())))
1731
f67539c2 1732 def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
7c673cae
FG
1733 """
1734 Execute a remote rados command.
1735 """
f67539c2
TL
1736 if remote is None:
1737 remote = self.controller
1738
1e59de90 1739 pre = self.RADOS_CMD + [] # deep-copying!
f67539c2
TL
1740 if pool is not None:
1741 pre += ['--pool', pool]
1742 if namespace is not None:
1743 pre += ['--namespace', namespace]
7c673cae
FG
1744 pre.extend(cmd)
1745 proc = remote.run(
1746 args=pre,
1747 wait=True,
f67539c2 1748 **kwargs
7c673cae
FG
1749 )
1750 return proc
1751
1752 def rados_write_objects(self, pool, num_objects, size,
1753 timelimit, threads, cleanup=False):
1754 """
1755 Write rados objects
1756 Threads not used yet.
1757 """
1758 args = [
7c673cae
FG
1759 '--num-objects', num_objects,
1760 '-b', size,
1761 'bench', timelimit,
1762 'write'
1763 ]
1764 if not cleanup:
1765 args.append('--no-cleanup')
f67539c2 1766 return self.do_rados(map(str, args), pool=pool)
7c673cae
FG
1767
1768 def do_put(self, pool, obj, fname, namespace=None):
1769 """
1770 Implement rados put operation
1771 """
f67539c2 1772 args = ['put', obj, fname]
7c673cae 1773 return self.do_rados(
7c673cae 1774 args,
f67539c2
TL
1775 check_status=False,
1776 pool=pool,
1777 namespace=namespace
7c673cae
FG
1778 ).exitstatus
1779
1780 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1781 """
1782 Implement rados get operation
1783 """
f67539c2 1784 args = ['get', obj, fname]
7c673cae 1785 return self.do_rados(
7c673cae 1786 args,
f67539c2
TL
1787 check_status=False,
1788 pool=pool,
1789 namespace=namespace,
7c673cae
FG
1790 ).exitstatus
1791
1792 def do_rm(self, pool, obj, namespace=None):
1793 """
1794 Implement rados rm operation
1795 """
f67539c2 1796 args = ['rm', obj]
7c673cae 1797 return self.do_rados(
7c673cae 1798 args,
f67539c2
TL
1799 check_status=False,
1800 pool=pool,
1801 namespace=namespace
7c673cae
FG
1802 ).exitstatus
1803
1804 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1805 if stdout is None:
e306af50 1806 stdout = StringIO()
7c673cae
FG
1807 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1808
1809 def find_remote(self, service_type, service_id):
1810 """
1811 Get the Remote for the host where a particular service runs.
1812
1813 :param service_type: 'mds', 'osd', 'client'
1814 :param service_id: The second part of a role, e.g. '0' for
1815 the role 'client.0'
1816 :return: a Remote instance for the host where the
1817 requested role is placed
1818 """
1819 return get_remote(self.ctx, self.cluster,
1820 service_type, service_id)
1821
1822 def admin_socket(self, service_type, service_id,
1823 command, check_status=True, timeout=0, stdout=None):
1824 """
1825 Remotely start up ceph specifying the admin socket
1826 :param command: a list of words to use as the command
1827 to the admin socket
1828 """
1829 if stdout is None:
e306af50 1830 stdout = StringIO()
9f95a23c 1831
7c673cae 1832 remote = self.find_remote(service_type, service_id)
9f95a23c
TL
1833
1834 if self.cephadm:
1835 return shell(
1836 self.ctx, self.cluster, remote,
1837 args=[
1838 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1839 ] + command,
1840 stdout=stdout,
1841 wait=True,
1842 check_status=check_status,
1843 )
b3b6e05e
TL
1844 if self.rook:
1845 assert False, 'not implemented'
9f95a23c 1846
7c673cae
FG
1847 args = [
1848 'sudo',
1849 'adjust-ulimits',
1850 'ceph-coverage',
20effc67 1851 f'{self.testdir}/archive/coverage',
7c673cae
FG
1852 'timeout',
1853 str(timeout),
1854 'ceph',
1855 '--cluster',
1856 self.cluster,
1857 '--admin-daemon',
1858 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1859 cluster=self.cluster,
1860 type=service_type,
1861 id=service_id),
1862 ]
1863 args.extend(command)
1864 return remote.run(
1865 args=args,
1866 stdout=stdout,
1867 wait=True,
1868 check_status=check_status
1869 )
1870
1871 def objectstore_tool(self, pool, options, args, **kwargs):
1872 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1873
1874 def get_pgid(self, pool, pgnum):
1875 """
1876 :param pool: pool name
1877 :param pgnum: pg number
1878 :returns: a string representing this pg.
1879 """
1880 poolnum = self.get_pool_num(pool)
1881 pg_str = "{poolnum}.{pgnum}".format(
1882 poolnum=poolnum,
1883 pgnum=pgnum)
1884 return pg_str
1885
1886 def get_pg_replica(self, pool, pgnum):
1887 """
1888 get replica for pool, pgnum (e.g. (data, 0)->0
1889 """
1890 pg_str = self.get_pgid(pool, pgnum)
1891 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1892 j = json.loads('\n'.join(output.split('\n')[1:]))
1893 return int(j['acting'][-1])
1894 assert False
1895
1896 def wait_for_pg_stats(func):
11fdf7f2 1897 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
7c673cae
FG
1898 # by default, and take the faulty injection in ms into consideration,
1899 # 12 seconds are more than enough
28e407b8 1900 delays = [1, 1, 2, 3, 5, 8, 13, 0]
7c673cae
FG
1901 @wraps(func)
1902 def wrapper(self, *args, **kwargs):
1903 exc = None
1904 for delay in delays:
1905 try:
1906 return func(self, *args, **kwargs)
1907 except AssertionError as e:
1908 time.sleep(delay)
1909 exc = e
1910 raise exc
1911 return wrapper
1912
1913 def get_pg_primary(self, pool, pgnum):
1914 """
1915 get primary for pool, pgnum (e.g. (data, 0)->0
1916 """
1917 pg_str = self.get_pgid(pool, pgnum)
1918 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1919 j = json.loads('\n'.join(output.split('\n')[1:]))
1920 return int(j['acting'][0])
1921 assert False
1922
1923 def get_pool_num(self, pool):
1924 """
1925 get number for pool (e.g., data -> 2)
1926 """
1927 return int(self.get_pool_dump(pool)['pool'])
1928
1929 def list_pools(self):
1930 """
1931 list all pool names
1932 """
1933 osd_dump = self.get_osd_dump_json()
1934 self.log(osd_dump['pools'])
1935 return [str(i['pool_name']) for i in osd_dump['pools']]
1936
1937 def clear_pools(self):
1938 """
1939 remove all pools
1940 """
1941 [self.remove_pool(i) for i in self.list_pools()]
1942
1943 def kick_recovery_wq(self, osdnum):
1944 """
1945 Run kick_recovery_wq on cluster.
1946 """
1947 return self.raw_cluster_cmd(
1948 'tell', "osd.%d" % (int(osdnum),),
1949 'debug',
1950 'kick_recovery_wq',
1951 '0')
1952
1953 def wait_run_admin_socket(self, service_type,
1954 service_id, args=['version'], timeout=75, stdout=None):
1955 """
11fdf7f2 1956 If osd_admin_socket call succeeds, return. Otherwise wait
7c673cae
FG
1957 five seconds and try again.
1958 """
1959 if stdout is None:
e306af50 1960 stdout = StringIO()
7c673cae
FG
1961 tries = 0
1962 while True:
1963 proc = self.admin_socket(service_type, service_id,
1964 args, check_status=False, stdout=stdout)
9f95a23c 1965 if proc.exitstatus == 0:
7c673cae
FG
1966 return proc
1967 else:
1968 tries += 1
1969 if (tries * 5) > timeout:
1970 raise Exception('timed out waiting for admin_socket '
1971 'to appear after {type}.{id} restart'.
1972 format(type=service_type,
1973 id=service_id))
1974 self.log("waiting on admin_socket for {type}-{id}, "
1975 "{command}".format(type=service_type,
1976 id=service_id,
1977 command=args))
1978 time.sleep(5)
1979
1980 def get_pool_dump(self, pool):
1981 """
1982 get the osd dump part of a pool
1983 """
1984 osd_dump = self.get_osd_dump_json()
1985 for i in osd_dump['pools']:
1986 if i['pool_name'] == pool:
1987 return i
1988 assert False
1989
1990 def get_config(self, service_type, service_id, name):
1991 """
1992 :param node: like 'mon.a'
1993 :param name: the option name
1994 """
1995 proc = self.wait_run_admin_socket(service_type, service_id,
1996 ['config', 'show'])
1997 j = json.loads(proc.stdout.getvalue())
1998 return j[name]
1999
11fdf7f2
TL
2000 def inject_args(self, service_type, service_id, name, value):
2001 whom = '{0}.{1}'.format(service_type, service_id)
2002 if isinstance(value, bool):
2003 value = 'true' if value else 'false'
2004 opt_arg = '--{name}={value}'.format(name=name, value=value)
2005 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
2006
7c673cae
FG
2007 def set_config(self, osdnum, **argdict):
2008 """
2009 :param osdnum: osd number
2010 :param argdict: dictionary containing values to set.
2011 """
9f95a23c 2012 for k, v in argdict.items():
7c673cae
FG
2013 self.wait_run_admin_socket(
2014 'osd', osdnum,
2015 ['config', 'set', str(k), str(v)])
2016
2017 def raw_cluster_status(self):
2018 """
2019 Get status from cluster
2020 """
9f95a23c 2021 status = self.raw_cluster_cmd('status', '--format=json')
7c673cae
FG
2022 return json.loads(status)
2023
2024 def raw_osd_status(self):
2025 """
2026 Get osd status from cluster
2027 """
2028 return self.raw_cluster_cmd('osd', 'dump')
2029
2030 def get_osd_status(self):
2031 """
2032 Get osd statuses sorted by states that the osds are in.
2033 """
e306af50 2034 osd_lines = list(filter(
7c673cae 2035 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
e306af50 2036 self.raw_osd_status().split('\n')))
7c673cae
FG
2037 self.log(osd_lines)
2038 in_osds = [int(i[4:].split()[0])
2039 for i in filter(lambda x: " in " in x, osd_lines)]
2040 out_osds = [int(i[4:].split()[0])
2041 for i in filter(lambda x: " out " in x, osd_lines)]
2042 up_osds = [int(i[4:].split()[0])
2043 for i in filter(lambda x: " up " in x, osd_lines)]
2044 down_osds = [int(i[4:].split()[0])
2045 for i in filter(lambda x: " down " in x, osd_lines)]
2046 dead_osds = [int(x.id_)
2047 for x in filter(lambda x:
2048 not x.running(),
2049 self.ctx.daemons.
2050 iter_daemons_of_role('osd', self.cluster))]
2051 live_osds = [int(x.id_) for x in
2052 filter(lambda x:
2053 x.running(),
2054 self.ctx.daemons.iter_daemons_of_role('osd',
2055 self.cluster))]
2056 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
2057 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
2058 'raw': osd_lines}
2059
2060 def get_num_pgs(self):
2061 """
2062 Check cluster status for the number of pgs
2063 """
2064 status = self.raw_cluster_status()
2065 self.log(status)
2066 return status['pgmap']['num_pgs']
2067
2068 def create_erasure_code_profile(self, profile_name, profile):
2069 """
2070 Create an erasure code profile name that can be used as a parameter
2071 when creating an erasure coded pool.
2072 """
2073 with self.lock:
2074 args = cmd_erasure_code_profile(profile_name, profile)
2075 self.raw_cluster_cmd(*args)
2076
2077 def create_pool_with_unique_name(self, pg_num=16,
2078 erasure_code_profile_name=None,
2079 min_size=None,
2080 erasure_code_use_overwrites=False):
2081 """
2082 Create a pool named unique_pool_X where X is unique.
2083 """
2084 name = ""
2085 with self.lock:
2086 name = "unique_pool_%s" % (str(self.next_pool_id),)
2087 self.next_pool_id += 1
2088 self.create_pool(
2089 name,
2090 pg_num,
2091 erasure_code_profile_name=erasure_code_profile_name,
2092 min_size=min_size,
2093 erasure_code_use_overwrites=erasure_code_use_overwrites)
2094 return name
2095
2096 @contextlib.contextmanager
2097 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
2098 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
2099 yield
2100 self.remove_pool(pool_name)
2101
2102 def create_pool(self, pool_name, pg_num=16,
2103 erasure_code_profile_name=None,
2104 min_size=None,
2105 erasure_code_use_overwrites=False):
2106 """
2107 Create a pool named from the pool_name parameter.
2108 :param pool_name: name of the pool being created.
2109 :param pg_num: initial number of pgs.
2110 :param erasure_code_profile_name: if set and !None create an
2111 erasure coded pool using the profile
2112 :param erasure_code_use_overwrites: if true, allow overwrites
2113 """
2114 with self.lock:
f91f0fd5 2115 assert isinstance(pool_name, str)
7c673cae
FG
2116 assert isinstance(pg_num, int)
2117 assert pool_name not in self.pools
2118 self.log("creating pool_name %s" % (pool_name,))
2119 if erasure_code_profile_name:
2120 self.raw_cluster_cmd('osd', 'pool', 'create',
2121 pool_name, str(pg_num), str(pg_num),
2122 'erasure', erasure_code_profile_name)
2123 else:
2124 self.raw_cluster_cmd('osd', 'pool', 'create',
2125 pool_name, str(pg_num))
2126 if min_size is not None:
2127 self.raw_cluster_cmd(
2128 'osd', 'pool', 'set', pool_name,
2129 'min_size',
2130 str(min_size))
2131 if erasure_code_use_overwrites:
2132 self.raw_cluster_cmd(
2133 'osd', 'pool', 'set', pool_name,
2134 'allow_ec_overwrites',
2135 'true')
c07f9fc5
FG
2136 self.raw_cluster_cmd(
2137 'osd', 'pool', 'application', 'enable',
2138 pool_name, 'rados', '--yes-i-really-mean-it',
2139 run.Raw('||'), 'true')
7c673cae
FG
2140 self.pools[pool_name] = pg_num
2141 time.sleep(1)
2142
2143 def add_pool_snap(self, pool_name, snap_name):
2144 """
2145 Add pool snapshot
2146 :param pool_name: name of pool to snapshot
2147 :param snap_name: name of snapshot to take
2148 """
2149 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
2150 str(pool_name), str(snap_name))
2151
2152 def remove_pool_snap(self, pool_name, snap_name):
2153 """
2154 Remove pool snapshot
2155 :param pool_name: name of pool to snapshot
2156 :param snap_name: name of snapshot to remove
2157 """
2158 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2159 str(pool_name), str(snap_name))
2160
2161 def remove_pool(self, pool_name):
2162 """
2163 Remove the indicated pool
2164 :param pool_name: Pool to be removed
2165 """
2166 with self.lock:
f91f0fd5 2167 assert isinstance(pool_name, str)
7c673cae
FG
2168 assert pool_name in self.pools
2169 self.log("removing pool_name %s" % (pool_name,))
2170 del self.pools[pool_name]
11fdf7f2
TL
2171 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
2172 "--yes-i-really-really-mean-it")
7c673cae
FG
2173
2174 def get_pool(self):
2175 """
2176 Pick a random pool
2177 """
2178 with self.lock:
9f95a23c 2179 if self.pools:
e306af50 2180 return random.sample(self.pools.keys(), 1)[0]
7c673cae
FG
2181
2182 def get_pool_pg_num(self, pool_name):
2183 """
2184 Return the number of pgs in the pool specified.
2185 """
2186 with self.lock:
f91f0fd5 2187 assert isinstance(pool_name, str)
7c673cae
FG
2188 if pool_name in self.pools:
2189 return self.pools[pool_name]
2190 return 0
2191
2192 def get_pool_property(self, pool_name, prop):
2193 """
2194 :param pool_name: pool
2195 :param prop: property to be checked.
9f95a23c 2196 :returns: property as string
7c673cae
FG
2197 """
2198 with self.lock:
f91f0fd5
TL
2199 assert isinstance(pool_name, str)
2200 assert isinstance(prop, str)
7c673cae
FG
2201 output = self.raw_cluster_cmd(
2202 'osd',
2203 'pool',
2204 'get',
2205 pool_name,
2206 prop)
9f95a23c
TL
2207 return output.split()[1]
2208
2209 def get_pool_int_property(self, pool_name, prop):
2210 return int(self.get_pool_property(pool_name, prop))
7c673cae
FG
2211
2212 def set_pool_property(self, pool_name, prop, val):
2213 """
2214 :param pool_name: pool
2215 :param prop: property to be set.
2216 :param val: value to set.
2217
2218 This routine retries if set operation fails.
2219 """
2220 with self.lock:
f91f0fd5
TL
2221 assert isinstance(pool_name, str)
2222 assert isinstance(prop, str)
7c673cae
FG
2223 assert isinstance(val, int)
2224 tries = 0
2225 while True:
2226 r = self.raw_cluster_cmd_result(
2227 'osd',
2228 'pool',
2229 'set',
2230 pool_name,
2231 prop,
2232 str(val))
2233 if r != 11: # EAGAIN
2234 break
2235 tries += 1
2236 if tries > 50:
2237 raise Exception('timed out getting EAGAIN '
2238 'when setting pool property %s %s = %s' %
2239 (pool_name, prop, val))
2240 self.log('got EAGAIN setting pool property, '
2241 'waiting a few seconds...')
2242 time.sleep(2)
2243
2244 def expand_pool(self, pool_name, by, max_pgs):
2245 """
2246 Increase the number of pgs in a pool
2247 """
2248 with self.lock:
f91f0fd5 2249 assert isinstance(pool_name, str)
7c673cae
FG
2250 assert isinstance(by, int)
2251 assert pool_name in self.pools
2252 if self.get_num_creating() > 0:
2253 return False
2254 if (self.pools[pool_name] + by) > max_pgs:
2255 return False
2256 self.log("increase pool size by %d" % (by,))
2257 new_pg_num = self.pools[pool_name] + by
2258 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2259 self.pools[pool_name] = new_pg_num
2260 return True
2261
11fdf7f2
TL
2262 def contract_pool(self, pool_name, by, min_pgs):
2263 """
2264 Decrease the number of pgs in a pool
2265 """
2266 with self.lock:
2267 self.log('contract_pool %s by %s min %s' % (
2268 pool_name, str(by), str(min_pgs)))
f91f0fd5 2269 assert isinstance(pool_name, str)
11fdf7f2
TL
2270 assert isinstance(by, int)
2271 assert pool_name in self.pools
2272 if self.get_num_creating() > 0:
2273 self.log('too many creating')
2274 return False
2275 proj = self.pools[pool_name] - by
2276 if proj < min_pgs:
2277 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2278 return False
2279 self.log("decrease pool size by %d" % (by,))
2280 new_pg_num = self.pools[pool_name] - by
2281 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2282 self.pools[pool_name] = new_pg_num
2283 return True
2284
2285 def stop_pg_num_changes(self):
2286 """
2287 Reset all pg_num_targets back to pg_num, canceling splits and merges
2288 """
2289 self.log('Canceling any pending splits or merges...')
2290 osd_dump = self.get_osd_dump_json()
9f95a23c
TL
2291 try:
2292 for pool in osd_dump['pools']:
2293 if pool['pg_num'] != pool['pg_num_target']:
2294 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2295 (pool['pool_name'], pool['pool'],
2296 pool['pg_num_target'],
2297 pool['pg_num']))
2298 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2299 'pg_num', str(pool['pg_num']))
2300 except KeyError:
2301 # we don't support pg_num_target before nautilus
2302 pass
11fdf7f2 2303
7c673cae
FG
2304 def set_pool_pgpnum(self, pool_name, force):
2305 """
2306 Set pgpnum property of pool_name pool.
2307 """
2308 with self.lock:
f91f0fd5 2309 assert isinstance(pool_name, str)
7c673cae
FG
2310 assert pool_name in self.pools
2311 if not force and self.get_num_creating() > 0:
2312 return False
2313 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2314 return True
2315
11fdf7f2 2316 def list_pg_unfound(self, pgid):
7c673cae 2317 """
11fdf7f2 2318 return list of unfound pgs with the id specified
7c673cae
FG
2319 """
2320 r = None
2321 offset = {}
2322 while True:
11fdf7f2 2323 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
7c673cae
FG
2324 json.dumps(offset))
2325 j = json.loads(out)
2326 if r is None:
2327 r = j
2328 else:
2329 r['objects'].extend(j['objects'])
2330 if not 'more' in j:
2331 break
2332 if j['more'] == 0:
2333 break
2334 offset = j['objects'][-1]['oid']
2335 if 'more' in r:
2336 del r['more']
2337 return r
2338
2339 def get_pg_stats(self):
2340 """
2341 Dump the cluster and get pg stats
2342 """
2343 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2344 j = json.loads('\n'.join(out.split('\n')[1:]))
11fdf7f2
TL
2345 try:
2346 return j['pg_map']['pg_stats']
2347 except KeyError:
2348 return j['pg_stats']
7c673cae 2349
a4b75251
TL
2350 def get_osd_df(self, osdid):
2351 """
2352 Get the osd df stats
2353 """
2354 out = self.raw_cluster_cmd('osd', 'df', 'name', 'osd.{}'.format(osdid),
2355 '--format=json')
2356 j = json.loads('\n'.join(out.split('\n')[1:]))
2357 return j['nodes'][0]
2358
2359 def get_pool_df(self, name):
2360 """
2361 Get the pool df stats
2362 """
2363 out = self.raw_cluster_cmd('df', 'detail', '--format=json')
2364 j = json.loads('\n'.join(out.split('\n')[1:]))
2365 return next((p['stats'] for p in j['pools'] if p['name'] == name),
2366 None)
2367
c07f9fc5
FG
2368 def get_pgids_to_force(self, backfill):
2369 """
2370 Return the randomized list of PGs that can have their recovery/backfill forced
2371 """
2372 j = self.get_pg_stats();
2373 pgids = []
2374 if backfill:
2375 wanted = ['degraded', 'backfilling', 'backfill_wait']
2376 else:
2377 wanted = ['recovering', 'degraded', 'recovery_wait']
2378 for pg in j:
2379 status = pg['state'].split('+')
2380 for t in wanted:
2381 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2382 pgids.append(pg['pgid'])
2383 break
2384 return pgids
2385
2386 def get_pgids_to_cancel_force(self, backfill):
2387 """
2388 Return the randomized list of PGs whose recovery/backfill priority is forced
2389 """
2390 j = self.get_pg_stats();
2391 pgids = []
2392 if backfill:
2393 wanted = 'forced_backfill'
2394 else:
2395 wanted = 'forced_recovery'
2396 for pg in j:
2397 status = pg['state'].split('+')
2398 if wanted in status and random.random() > 0.5:
2399 pgids.append(pg['pgid'])
2400 return pgids
2401
7c673cae
FG
2402 def compile_pg_status(self):
2403 """
2404 Return a histogram of pg state values
2405 """
2406 ret = {}
2407 j = self.get_pg_stats()
2408 for pg in j:
2409 for status in pg['state'].split('+'):
2410 if status not in ret:
2411 ret[status] = 0
2412 ret[status] += 1
2413 return ret
2414
9f95a23c 2415 @wait_for_pg_stats # type: ignore
7c673cae
FG
2416 def with_pg_state(self, pool, pgnum, check):
2417 pgstr = self.get_pgid(pool, pgnum)
2418 stats = self.get_single_pg_stats(pgstr)
2419 assert(check(stats['state']))
2420
9f95a23c 2421 @wait_for_pg_stats # type: ignore
7c673cae
FG
2422 def with_pg(self, pool, pgnum, check):
2423 pgstr = self.get_pgid(pool, pgnum)
2424 stats = self.get_single_pg_stats(pgstr)
2425 return check(stats)
2426
2427 def get_last_scrub_stamp(self, pool, pgnum):
2428 """
2429 Get the timestamp of the last scrub.
2430 """
2431 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2432 return stats["last_scrub_stamp"]
2433
2434 def do_pg_scrub(self, pool, pgnum, stype):
2435 """
2436 Scrub pg and wait for scrubbing to finish
2437 """
2438 init = self.get_last_scrub_stamp(pool, pgnum)
2439 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2440 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2441 SLEEP_TIME = 10
2442 timer = 0
2443 while init == self.get_last_scrub_stamp(pool, pgnum):
2444 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2445 self.log("waiting for scrub type %s" % (stype,))
2446 if (timer % RESEND_TIMEOUT) == 0:
2447 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2448 # The first time in this loop is the actual request
2449 if timer != 0 and stype == "repair":
2450 self.log("WARNING: Resubmitted a non-idempotent repair")
2451 time.sleep(SLEEP_TIME)
2452 timer += SLEEP_TIME
2453
2454 def wait_snap_trimming_complete(self, pool):
2455 """
2456 Wait for snap trimming on pool to end
2457 """
2458 POLL_PERIOD = 10
2459 FATAL_TIMEOUT = 600
2460 start = time.time()
2461 poolnum = self.get_pool_num(pool)
2462 poolnumstr = "%s." % (poolnum,)
2463 while (True):
2464 now = time.time()
2465 if (now - start) > FATAL_TIMEOUT:
2466 assert (now - start) < FATAL_TIMEOUT, \
2467 'failed to complete snap trimming before timeout'
2468 all_stats = self.get_pg_stats()
2469 trimming = False
2470 for pg in all_stats:
2471 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2472 self.log("pg {pg} in trimming, state: {state}".format(
2473 pg=pg['pgid'],
2474 state=pg['state']))
2475 trimming = True
2476 if not trimming:
2477 break
2478 self.log("{pool} still trimming, waiting".format(pool=pool))
2479 time.sleep(POLL_PERIOD)
2480
2481 def get_single_pg_stats(self, pgid):
2482 """
2483 Return pg for the pgid specified.
2484 """
2485 all_stats = self.get_pg_stats()
2486
2487 for pg in all_stats:
2488 if pg['pgid'] == pgid:
2489 return pg
2490
2491 return None
2492
2493 def get_object_pg_with_shard(self, pool, name, osdid):
2494 """
2495 """
2496 pool_dump = self.get_pool_dump(pool)
2497 object_map = self.get_object_map(pool, name)
9f95a23c 2498 if pool_dump["type"] == PoolType.ERASURE_CODED:
7c673cae
FG
2499 shard = object_map['acting'].index(osdid)
2500 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2501 shard=shard)
2502 else:
2503 return object_map['pgid']
2504
2505 def get_object_primary(self, pool, name):
2506 """
2507 """
2508 object_map = self.get_object_map(pool, name)
2509 return object_map['acting_primary']
2510
2511 def get_object_map(self, pool, name):
2512 """
2513 osd map --format=json converted to a python object
2514 :returns: the python object
2515 """
2516 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2517 return json.loads('\n'.join(out.split('\n')[1:]))
2518
2519 def get_osd_dump_json(self):
2520 """
2521 osd dump --format=json converted to a python object
2522 :returns: the python object
2523 """
2524 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2525 return json.loads('\n'.join(out.split('\n')[1:]))
2526
2527 def get_osd_dump(self):
2528 """
2529 Dump osds
2530 :returns: all osds
2531 """
2532 return self.get_osd_dump_json()['osds']
2533
11fdf7f2
TL
2534 def get_osd_metadata(self):
2535 """
2536 osd metadata --format=json converted to a python object
2537 :returns: the python object containing osd metadata information
2538 """
2539 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2540 return json.loads('\n'.join(out.split('\n')[1:]))
2541
c07f9fc5
FG
2542 def get_mgr_dump(self):
2543 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2544 return json.loads(out)
2545
7c673cae
FG
2546 def get_stuck_pgs(self, type_, threshold):
2547 """
2548 :returns: stuck pg information from the cluster
2549 """
2550 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2551 '--format=json')
11fdf7f2 2552 return json.loads(out).get('stuck_pg_stats',[])
7c673cae
FG
2553
2554 def get_num_unfound_objects(self):
2555 """
2556 Check cluster status to get the number of unfound objects
2557 """
2558 status = self.raw_cluster_status()
2559 self.log(status)
2560 return status['pgmap'].get('unfound_objects', 0)
2561
2562 def get_num_creating(self):
2563 """
2564 Find the number of pgs in creating mode.
2565 """
2566 pgs = self.get_pg_stats()
2567 num = 0
2568 for pg in pgs:
2569 if 'creating' in pg['state']:
2570 num += 1
2571 return num
2572
2573 def get_num_active_clean(self):
2574 """
2575 Find the number of active and clean pgs.
2576 """
2577 pgs = self.get_pg_stats()
9f95a23c
TL
2578 return self._get_num_active_clean(pgs)
2579
2580 def _get_num_active_clean(self, pgs):
7c673cae
FG
2581 num = 0
2582 for pg in pgs:
2583 if (pg['state'].count('active') and
2584 pg['state'].count('clean') and
2585 not pg['state'].count('stale')):
2586 num += 1
2587 return num
2588
2589 def get_num_active_recovered(self):
2590 """
2591 Find the number of active and recovered pgs.
2592 """
2593 pgs = self.get_pg_stats()
9f95a23c
TL
2594 return self._get_num_active_recovered(pgs)
2595
2596 def _get_num_active_recovered(self, pgs):
7c673cae
FG
2597 num = 0
2598 for pg in pgs:
2599 if (pg['state'].count('active') and
2600 not pg['state'].count('recover') and
3efd9988 2601 not pg['state'].count('backfilling') and
7c673cae
FG
2602 not pg['state'].count('stale')):
2603 num += 1
2604 return num
2605
2606 def get_is_making_recovery_progress(self):
2607 """
2608 Return whether there is recovery progress discernable in the
2609 raw cluster status
2610 """
2611 status = self.raw_cluster_status()
2612 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2613 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2614 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2615 return kps > 0 or bps > 0 or ops > 0
2616
2617 def get_num_active(self):
2618 """
2619 Find the number of active pgs.
2620 """
2621 pgs = self.get_pg_stats()
9f95a23c
TL
2622 return self._get_num_active(pgs)
2623
2624 def _get_num_active(self, pgs):
7c673cae
FG
2625 num = 0
2626 for pg in pgs:
2627 if pg['state'].count('active') and not pg['state'].count('stale'):
2628 num += 1
2629 return num
2630
2631 def get_num_down(self):
2632 """
2633 Find the number of pgs that are down.
2634 """
2635 pgs = self.get_pg_stats()
2636 num = 0
2637 for pg in pgs:
2638 if ((pg['state'].count('down') and not
2639 pg['state'].count('stale')) or
2640 (pg['state'].count('incomplete') and not
2641 pg['state'].count('stale'))):
2642 num += 1
2643 return num
2644
2645 def get_num_active_down(self):
2646 """
2647 Find the number of pgs that are either active or down.
2648 """
2649 pgs = self.get_pg_stats()
9f95a23c
TL
2650 return self._get_num_active_down(pgs)
2651
2652 def _get_num_active_down(self, pgs):
7c673cae
FG
2653 num = 0
2654 for pg in pgs:
2655 if ((pg['state'].count('active') and not
2656 pg['state'].count('stale')) or
2657 (pg['state'].count('down') and not
2658 pg['state'].count('stale')) or
2659 (pg['state'].count('incomplete') and not
2660 pg['state'].count('stale'))):
2661 num += 1
2662 return num
2663
9f95a23c
TL
2664 def get_num_peered(self):
2665 """
2666 Find the number of PGs that are peered
2667 """
2668 pgs = self.get_pg_stats()
2669 return self._get_num_peered(pgs)
2670
2671 def _get_num_peered(self, pgs):
2672 num = 0
2673 for pg in pgs:
2674 if pg['state'].count('peered') and not pg['state'].count('stale'):
2675 num += 1
2676 return num
2677
7c673cae
FG
2678 def is_clean(self):
2679 """
2680 True if all pgs are clean
2681 """
9f95a23c 2682 pgs = self.get_pg_stats()
2a845540
TL
2683 if self._get_num_active_clean(pgs) == len(pgs):
2684 return True
2685 else:
2686 self.dump_pgs_not_active_clean()
2687 return False
7c673cae
FG
2688
2689 def is_recovered(self):
2690 """
2691 True if all pgs have recovered
2692 """
9f95a23c
TL
2693 pgs = self.get_pg_stats()
2694 return self._get_num_active_recovered(pgs) == len(pgs)
7c673cae
FG
2695
2696 def is_active_or_down(self):
2697 """
2698 True if all pgs are active or down
2699 """
9f95a23c
TL
2700 pgs = self.get_pg_stats()
2701 return self._get_num_active_down(pgs) == len(pgs)
7c673cae 2702
f6b5b4d7
TL
2703 def dump_pgs_not_active_clean(self):
2704 """
2705 Dumps all pgs that are not active+clean
2706 """
2707 pgs = self.get_pg_stats()
2708 for pg in pgs:
2709 if pg['state'] != 'active+clean':
2710 self.log('PG %s is not active+clean' % pg['pgid'])
2711 self.log(pg)
2712
2713 def dump_pgs_not_active_down(self):
2714 """
2715 Dumps all pgs that are not active or down
2716 """
2717 pgs = self.get_pg_stats()
2718 for pg in pgs:
2719 if 'active' not in pg['state'] and 'down' not in pg['state']:
2720 self.log('PG %s is not active or down' % pg['pgid'])
2721 self.log(pg)
2722
2723 def dump_pgs_not_active(self):
2724 """
2725 Dumps all pgs that are not active
2726 """
2727 pgs = self.get_pg_stats()
2728 for pg in pgs:
2729 if 'active' not in pg['state']:
2730 self.log('PG %s is not active' % pg['pgid'])
2731 self.log(pg)
2732
2a845540
TL
2733 def dump_pgs_not_active_peered(self, pgs):
2734 for pg in pgs:
2735 if (not pg['state'].count('active')) and (not pg['state'].count('peered')):
2736 self.log('PG %s is not active or peered' % pg['pgid'])
2737 self.log(pg)
2738
11fdf7f2 2739 def wait_for_clean(self, timeout=1200):
7c673cae
FG
2740 """
2741 Returns true when all pgs are clean.
2742 """
2743 self.log("waiting for clean")
2744 start = time.time()
2745 num_active_clean = self.get_num_active_clean()
2746 while not self.is_clean():
2747 if timeout is not None:
2748 if self.get_is_making_recovery_progress():
2749 self.log("making progress, resetting timeout")
2750 start = time.time()
2751 else:
2752 self.log("no progress seen, keeping timeout for now")
2753 if time.time() - start >= timeout:
f6b5b4d7
TL
2754 self.log('dumping pgs not clean')
2755 self.dump_pgs_not_active_clean()
7c673cae 2756 assert time.time() - start < timeout, \
f6b5b4d7 2757 'wait_for_clean: failed before timeout expired'
7c673cae
FG
2758 cur_active_clean = self.get_num_active_clean()
2759 if cur_active_clean != num_active_clean:
2760 start = time.time()
2761 num_active_clean = cur_active_clean
2762 time.sleep(3)
2763 self.log("clean!")
2764
2765 def are_all_osds_up(self):
2766 """
2767 Returns true if all osds are up.
2768 """
2769 x = self.get_osd_dump()
2770 return (len(x) == sum([(y['up'] > 0) for y in x]))
2771
c07f9fc5 2772 def wait_for_all_osds_up(self, timeout=None):
7c673cae
FG
2773 """
2774 When this exits, either the timeout has expired, or all
2775 osds are up.
2776 """
2777 self.log("waiting for all up")
2778 start = time.time()
2779 while not self.are_all_osds_up():
2780 if timeout is not None:
2781 assert time.time() - start < timeout, \
c07f9fc5 2782 'timeout expired in wait_for_all_osds_up'
7c673cae
FG
2783 time.sleep(3)
2784 self.log("all up!")
2785
c07f9fc5
FG
2786 def pool_exists(self, pool):
2787 if pool in self.list_pools():
2788 return True
2789 return False
2790
2791 def wait_for_pool(self, pool, timeout=300):
2792 """
2793 Wait for a pool to exist
2794 """
2795 self.log('waiting for pool %s to exist' % pool)
2796 start = time.time()
2797 while not self.pool_exists(pool):
2798 if timeout is not None:
2799 assert time.time() - start < timeout, \
2800 'timeout expired in wait_for_pool'
2801 time.sleep(3)
2802
2803 def wait_for_pools(self, pools):
2804 for pool in pools:
2805 self.wait_for_pool(pool)
2806
2807 def is_mgr_available(self):
2808 x = self.get_mgr_dump()
2809 return x.get('available', False)
2810
2811 def wait_for_mgr_available(self, timeout=None):
2812 self.log("waiting for mgr available")
2813 start = time.time()
2814 while not self.is_mgr_available():
2815 if timeout is not None:
2816 assert time.time() - start < timeout, \
2817 'timeout expired in wait_for_mgr_available'
2818 time.sleep(3)
2819 self.log("mgr available!")
2820
7c673cae
FG
2821 def wait_for_recovery(self, timeout=None):
2822 """
2823 Check peering. When this exists, we have recovered.
2824 """
2825 self.log("waiting for recovery to complete")
2826 start = time.time()
2827 num_active_recovered = self.get_num_active_recovered()
2828 while not self.is_recovered():
2829 now = time.time()
2830 if timeout is not None:
2831 if self.get_is_making_recovery_progress():
2832 self.log("making progress, resetting timeout")
2833 start = time.time()
2834 else:
2835 self.log("no progress seen, keeping timeout for now")
2836 if now - start >= timeout:
9f95a23c
TL
2837 if self.is_recovered():
2838 break
f6b5b4d7
TL
2839 self.log('dumping pgs not recovered yet')
2840 self.dump_pgs_not_active_clean()
7c673cae 2841 assert now - start < timeout, \
f6b5b4d7 2842 'wait_for_recovery: failed before timeout expired'
7c673cae
FG
2843 cur_active_recovered = self.get_num_active_recovered()
2844 if cur_active_recovered != num_active_recovered:
2845 start = time.time()
2846 num_active_recovered = cur_active_recovered
2847 time.sleep(3)
2848 self.log("recovered!")
2849
2850 def wait_for_active(self, timeout=None):
2851 """
2852 Check peering. When this exists, we are definitely active
2853 """
2854 self.log("waiting for peering to complete")
2855 start = time.time()
2856 num_active = self.get_num_active()
2857 while not self.is_active():
2858 if timeout is not None:
2859 if time.time() - start >= timeout:
f6b5b4d7
TL
2860 self.log('dumping pgs not active')
2861 self.dump_pgs_not_active()
7c673cae 2862 assert time.time() - start < timeout, \
f6b5b4d7 2863 'wait_for_active: failed before timeout expired'
7c673cae
FG
2864 cur_active = self.get_num_active()
2865 if cur_active != num_active:
2866 start = time.time()
2867 num_active = cur_active
2868 time.sleep(3)
2869 self.log("active!")
2870
2871 def wait_for_active_or_down(self, timeout=None):
2872 """
2873 Check peering. When this exists, we are definitely either
2874 active or down
2875 """
2876 self.log("waiting for peering to complete or become blocked")
2877 start = time.time()
2878 num_active_down = self.get_num_active_down()
2879 while not self.is_active_or_down():
2880 if timeout is not None:
2881 if time.time() - start >= timeout:
f6b5b4d7
TL
2882 self.log('dumping pgs not active or down')
2883 self.dump_pgs_not_active_down()
7c673cae 2884 assert time.time() - start < timeout, \
f6b5b4d7 2885 'wait_for_active_or_down: failed before timeout expired'
7c673cae
FG
2886 cur_active_down = self.get_num_active_down()
2887 if cur_active_down != num_active_down:
2888 start = time.time()
2889 num_active_down = cur_active_down
2890 time.sleep(3)
2891 self.log("active or down!")
2892
2893 def osd_is_up(self, osd):
2894 """
2895 Wrapper for osd check
2896 """
2897 osds = self.get_osd_dump()
2898 return osds[osd]['up'] > 0
2899
2900 def wait_till_osd_is_up(self, osd, timeout=None):
2901 """
2902 Loop waiting for osd.
2903 """
2904 self.log('waiting for osd.%d to be up' % osd)
2905 start = time.time()
2906 while not self.osd_is_up(osd):
2907 if timeout is not None:
2908 assert time.time() - start < timeout, \
2909 'osd.%d failed to come up before timeout expired' % osd
2910 time.sleep(3)
2911 self.log('osd.%d is up' % osd)
2912
2913 def is_active(self):
2914 """
2915 Wrapper to check if all pgs are active
2916 """
2917 return self.get_num_active() == self.get_num_pgs()
2918
9f95a23c
TL
2919 def all_active_or_peered(self):
2920 """
2921 Wrapper to check if all PGs are active or peered
2922 """
2923 pgs = self.get_pg_stats()
2a845540
TL
2924 if self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs):
2925 return True
2926 else:
2927 self.dump_pgs_not_active_peered(pgs)
2928 return False
9f95a23c 2929
7c673cae
FG
2930 def wait_till_active(self, timeout=None):
2931 """
2932 Wait until all pgs are active.
2933 """
2934 self.log("waiting till active")
2935 start = time.time()
2936 while not self.is_active():
2937 if timeout is not None:
2938 if time.time() - start >= timeout:
f6b5b4d7
TL
2939 self.log('dumping pgs not active')
2940 self.dump_pgs_not_active()
7c673cae 2941 assert time.time() - start < timeout, \
f6b5b4d7 2942 'wait_till_active: failed before timeout expired'
7c673cae
FG
2943 time.sleep(3)
2944 self.log("active!")
2945
3efd9988
FG
2946 def wait_till_pg_convergence(self, timeout=None):
2947 start = time.time()
2948 old_stats = None
2949 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2950 if osd['in'] and osd['up']]
2951 while True:
2952 # strictly speaking, no need to wait for mon. but due to the
2953 # "ms inject socket failures" setting, the osdmap could be delayed,
2954 # so mgr is likely to ignore the pg-stat messages with pgs serving
2955 # newly created pools which is not yet known by mgr. so, to make sure
2956 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2957 # necessary.
2958 self.flush_pg_stats(active_osds)
2959 new_stats = dict((stat['pgid'], stat['state'])
2960 for stat in self.get_pg_stats())
2961 if old_stats == new_stats:
2962 return old_stats
2963 if timeout is not None:
2964 assert time.time() - start < timeout, \
2965 'failed to reach convergence before %d secs' % timeout
2966 old_stats = new_stats
2967 # longer than mgr_stats_period
2968 time.sleep(5 + 1)
2969
7c673cae
FG
2970 def mark_out_osd(self, osd):
2971 """
2972 Wrapper to mark osd out.
2973 """
2974 self.raw_cluster_cmd('osd', 'out', str(osd))
2975
2976 def kill_osd(self, osd):
2977 """
2978 Kill osds by either power cycling (if indicated by the config)
2979 or by stopping.
2980 """
2981 if self.config.get('powercycle'):
2982 remote = self.find_remote('osd', osd)
2983 self.log('kill_osd on osd.{o} '
2984 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2985 self._assert_ipmi(remote)
2986 remote.console.power_off()
2987 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2988 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
11fdf7f2
TL
2989 self.inject_args(
2990 'osd', osd,
2991 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
7c673cae
FG
2992 try:
2993 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2994 except:
2995 pass
2996 else:
2997 raise RuntimeError('osd.%s did not fail' % osd)
2998 else:
2999 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
3000 else:
3001 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
3002
3003 @staticmethod
3004 def _assert_ipmi(remote):
3005 assert remote.console.has_ipmi_credentials, (
3006 "powercycling requested but RemoteConsole is not "
3007 "initialized. Check ipmi config.")
3008
3009 def blackhole_kill_osd(self, osd):
3010 """
3011 Stop osd if nothing else works.
3012 """
11fdf7f2
TL
3013 self.inject_args('osd', osd,
3014 'objectstore-blackhole', True)
7c673cae
FG
3015 time.sleep(2)
3016 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
3017
3efd9988 3018 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
7c673cae
FG
3019 """
3020 Revive osds by either power cycling (if indicated by the config)
3021 or by restarting.
3022 """
3023 if self.config.get('powercycle'):
3024 remote = self.find_remote('osd', osd)
3025 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
3026 format(o=osd, s=remote.name))
3027 self._assert_ipmi(remote)
3028 remote.console.power_on()
3029 if not remote.console.check_status(300):
3030 raise Exception('Failed to revive osd.{o} via ipmi'.
3031 format(o=osd))
3032 teuthology.reconnect(self.ctx, 60, [remote])
3033 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
3034 self.make_admin_daemon_dir(remote)
3035 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
3036 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
3037
3038 if not skip_admin_check:
3039 # wait for dump_ops_in_flight; this command doesn't appear
3040 # until after the signal handler is installed and it is safe
3041 # to stop the osd again without making valgrind leak checks
3042 # unhappy. see #5924.
3043 self.wait_run_admin_socket('osd', osd,
3044 args=['dump_ops_in_flight'],
3045 timeout=timeout, stdout=DEVNULL)
3046
3047 def mark_down_osd(self, osd):
3048 """
3049 Cluster command wrapper
3050 """
3051 self.raw_cluster_cmd('osd', 'down', str(osd))
3052
3053 def mark_in_osd(self, osd):
3054 """
3055 Cluster command wrapper
3056 """
3057 self.raw_cluster_cmd('osd', 'in', str(osd))
3058
3059 def signal_osd(self, osd, sig, silent=False):
3060 """
3061 Wrapper to local get_daemon call which sends the given
3062 signal to the given osd.
3063 """
3064 self.ctx.daemons.get_daemon('osd', osd,
3065 self.cluster).signal(sig, silent=silent)
3066
3067 ## monitors
3068 def signal_mon(self, mon, sig, silent=False):
3069 """
11fdf7f2 3070 Wrapper to local get_daemon call
7c673cae
FG
3071 """
3072 self.ctx.daemons.get_daemon('mon', mon,
3073 self.cluster).signal(sig, silent=silent)
3074
3075 def kill_mon(self, mon):
3076 """
3077 Kill the monitor by either power cycling (if the config says so),
3078 or by doing a stop.
3079 """
3080 if self.config.get('powercycle'):
3081 remote = self.find_remote('mon', mon)
3082 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
3083 format(m=mon, s=remote.name))
3084 self._assert_ipmi(remote)
3085 remote.console.power_off()
3086 else:
3087 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
3088
3089 def revive_mon(self, mon):
3090 """
3091 Restart by either power cycling (if the config says so),
3092 or by doing a normal restart.
3093 """
3094 if self.config.get('powercycle'):
3095 remote = self.find_remote('mon', mon)
3096 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
3097 format(m=mon, s=remote.name))
3098 self._assert_ipmi(remote)
3099 remote.console.power_on()
3100 self.make_admin_daemon_dir(remote)
3101 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
3102
31f18b77
FG
3103 def revive_mgr(self, mgr):
3104 """
3105 Restart by either power cycling (if the config says so),
3106 or by doing a normal restart.
3107 """
3108 if self.config.get('powercycle'):
3109 remote = self.find_remote('mgr', mgr)
3110 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3111 format(m=mgr, s=remote.name))
3112 self._assert_ipmi(remote)
3113 remote.console.power_on()
3114 self.make_admin_daemon_dir(remote)
3115 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
3116
7c673cae
FG
3117 def get_mon_status(self, mon):
3118 """
3119 Extract all the monitor status information from the cluster
3120 """
9f95a23c 3121 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
7c673cae
FG
3122 return json.loads(out)
3123
3124 def get_mon_quorum(self):
3125 """
3126 Extract monitor quorum information from the cluster
3127 """
3128 out = self.raw_cluster_cmd('quorum_status')
3129 j = json.loads(out)
7c673cae
FG
3130 return j['quorum']
3131
3132 def wait_for_mon_quorum_size(self, size, timeout=300):
3133 """
3134 Loop until quorum size is reached.
3135 """
3136 self.log('waiting for quorum size %d' % size)
522d829b
TL
3137 sleep = 3
3138 with safe_while(sleep=sleep,
3139 tries=timeout // sleep,
3140 action=f'wait for quorum size {size}') as proceed:
3141 while proceed():
3142 try:
3143 if len(self.get_mon_quorum()) == size:
3144 break
3145 except CommandFailedError as e:
3146 # could fail instea4d of blocked if the rotating key of the
3147 # connected monitor is not updated yet after they form the
3148 # quorum
3149 if e.exitstatus == errno.EACCES:
3150 pass
3151 else:
3152 raise
7c673cae
FG
3153 self.log("quorum is size %d" % size)
3154
3155 def get_mon_health(self, debug=False):
3156 """
3157 Extract all the monitor health information.
3158 """
3159 out = self.raw_cluster_cmd('health', '--format=json')
3160 if debug:
3161 self.log('health:\n{h}'.format(h=out))
3162 return json.loads(out)
3163
9f95a23c
TL
3164 def wait_until_healthy(self, timeout=None):
3165 self.log("wait_until_healthy")
3166 start = time.time()
3167 while self.get_mon_health()['status'] != 'HEALTH_OK':
3168 if timeout is not None:
3169 assert time.time() - start < timeout, \
3170 'timeout expired in wait_until_healthy'
3171 time.sleep(3)
3172 self.log("wait_until_healthy done")
3173
7c673cae
FG
3174 def get_filepath(self):
3175 """
3176 Return path to osd data with {id} needing to be replaced
3177 """
3178 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
3179
3180 def make_admin_daemon_dir(self, remote):
3181 """
3182 Create /var/run/ceph directory on remote site.
3183
3184 :param ctx: Context
3185 :param remote: Remote site
3186 """
3187 remote.run(args=['sudo',
3188 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3189
9f95a23c
TL
3190 def get_service_task_status(self, service, status_key):
3191 """
3192 Return daemon task status for a given ceph service.
3193
3194 :param service: ceph service (mds, osd, etc...)
3195 :param status_key: matching task status key
3196 """
3197 task_status = {}
3198 status = self.raw_cluster_status()
3199 try:
3200 for k,v in status['servicemap']['services'][service]['daemons'].items():
3201 ts = dict(v).get('task_status', None)
3202 if ts:
3203 task_status[k] = ts[status_key]
3204 except KeyError: # catches missing service and status key
3205 return {}
3206 self.log(task_status)
3207 return task_status
7c673cae
FG
3208
3209def utility_task(name):
3210 """
3211 Generate ceph_manager subtask corresponding to ceph_manager
3212 method name
3213 """
3214 def task(ctx, config):
3215 if config is None:
3216 config = {}
3217 args = config.get('args', [])
3218 kwargs = config.get('kwargs', {})
3219 cluster = config.get('cluster', 'ceph')
3220 fn = getattr(ctx.managers[cluster], name)
3221 fn(*args, **kwargs)
3222 return task
3223
3224revive_osd = utility_task("revive_osd")
3225revive_mon = utility_task("revive_mon")
3226kill_osd = utility_task("kill_osd")
3227kill_mon = utility_task("kill_mon")
3228create_pool = utility_task("create_pool")
3229remove_pool = utility_task("remove_pool")
3230wait_for_clean = utility_task("wait_for_clean")
c07f9fc5 3231flush_all_pg_stats = utility_task("flush_all_pg_stats")
7c673cae
FG
3232set_pool_property = utility_task("set_pool_property")
3233do_pg_scrub = utility_task("do_pg_scrub")
c07f9fc5
FG
3234wait_for_pool = utility_task("wait_for_pool")
3235wait_for_pools = utility_task("wait_for_pools")