]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/ceph_manager.py
import ceph quincy 17.2.4
[ceph.git] / ceph / qa / tasks / ceph_manager.py
CommitLineData
7c673cae
FG
1"""
2ceph manager -- Thrasher and CephManager objects
3"""
7c673cae
FG
4from functools import wraps
5import contextlib
522d829b 6import errno
7c673cae
FG
7import random
8import signal
9import time
10import gevent
11import base64
12import json
13import logging
14import threading
15import traceback
16import os
2a845540 17import re
f67539c2 18import shlex
9f95a23c 19
f91f0fd5 20from io import BytesIO, StringIO
f67539c2 21from subprocess import DEVNULL
7c673cae
FG
22from teuthology import misc as teuthology
23from tasks.scrub import Scrubber
9f95a23c
TL
24from tasks.util.rados import cmd_erasure_code_profile
25from tasks.util import get_remote
7c673cae
FG
26from teuthology.contextutil import safe_while
27from teuthology.orchestra.remote import Remote
28from teuthology.orchestra import run
20effc67 29from teuthology.parallel import parallel
7c673cae 30from teuthology.exceptions import CommandFailedError
9f95a23c 31from tasks.thrasher import Thrasher
7c673cae 32
7c673cae
FG
33
34DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
35
36log = logging.getLogger(__name__)
37
9f95a23c
TL
38# this is for cephadm clusters
39def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
9f95a23c
TL
40 extra_args = []
41 if name:
42 extra_args = ['-n', name]
43 return remote.run(
44 args=[
45 'sudo',
46 ctx.cephadm,
47 '--image', ctx.ceph[cluster_name].image,
48 'shell',
49 ] + extra_args + [
50 '--fsid', ctx.ceph[cluster_name].fsid,
51 '--',
52 ] + args,
53 **kwargs
54 )
7c673cae 55
b3b6e05e
TL
56# this is for rook clusters
57def toolbox(ctx, cluster_name, args, **kwargs):
58 return ctx.rook[cluster_name].remote.run(
59 args=[
60 'kubectl',
61 '-n', 'rook-ceph',
62 'exec',
63 ctx.rook[cluster_name].toolbox,
64 '--',
65 ] + args,
66 **kwargs
67 )
68
69
7c673cae 70def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
9f95a23c 71 conf_fp = BytesIO()
7c673cae
FG
72 ctx.ceph[cluster].conf.write(conf_fp)
73 conf_fp.seek(0)
2a845540
TL
74 lines = conf_fp.readlines()
75 m = None
76 for l in lines:
77 m = re.search("rgw.crypt.sse.s3.backend *= *(.*)", l.decode())
78 if m:
79 break
80 ctx.ceph[cluster].rgw_crypt_sse_s3_backend = m.expand("\\1") if m else None
81 conf_fp.seek(0)
7c673cae
FG
82 writes = ctx.cluster.run(
83 args=[
84 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
85 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
9f95a23c 86 'sudo', 'tee', conf_path, run.Raw('&&'),
7c673cae 87 'sudo', 'chmod', '0644', conf_path,
9f95a23c
TL
88 run.Raw('>'), '/dev/null',
89
7c673cae
FG
90 ],
91 stdin=run.PIPE,
92 wait=False)
93 teuthology.feed_many_stdins_and_close(conf_fp, writes)
94 run.wait(writes)
95
f67539c2
TL
96def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
97 """
98 Build a command line for running valgrind.
99
100 testdir - test results directory
101 name - name of daemon (for naming hte log file)
102 preamble - stuff we should run before valgrind
103 v - valgrind arguments
104 """
105 if v is None:
106 return preamble
107 if not isinstance(v, list):
108 v = [v]
109
110 # https://tracker.ceph.com/issues/44362
111 preamble.extend([
112 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
113 ])
114
115 val_path = '/var/log/ceph/valgrind'
116 if '--tool=memcheck' in v or '--tool=helgrind' in v:
117 extra_args = [
118 'valgrind',
119 '--trace-children=no',
120 '--child-silent-after-fork=yes',
121 '--soname-synonyms=somalloc=*tcmalloc*',
122 '--num-callers=50',
123 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
124 '--xml=yes',
125 '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
126 '--time-stamp=yes',
127 '--vgdb=yes',
128 ]
129 else:
130 extra_args = [
131 'valgrind',
132 '--trace-children=no',
133 '--child-silent-after-fork=yes',
134 '--soname-synonyms=somalloc=*tcmalloc*',
135 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
136 '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
137 '--time-stamp=yes',
138 '--vgdb=yes',
139 ]
140 if exit_on_first_error:
141 extra_args.extend([
142 # at least Valgrind 3.14 is required
143 '--exit-on-first-error=yes',
144 '--error-exitcode=42',
145 ])
146 args = []
147 if cd:
148 args += ['cd', testdir, run.Raw('&&')]
149 args += preamble + extra_args + v
150 log.debug('running %s under valgrind with args %s', name, args)
151 return args
152
7c673cae
FG
153
154def mount_osd_data(ctx, remote, cluster, osd):
155 """
156 Mount a remote OSD
157
158 :param ctx: Context
159 :param remote: Remote site
160 :param cluster: name of ceph cluster
161 :param osd: Osd name
162 """
163 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
164 role = "{0}.osd.{1}".format(cluster, osd)
165 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
166 if remote in ctx.disk_config.remote_to_roles_to_dev:
167 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
168 role = alt_role
169 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
170 return
171 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
172 mount_options = ctx.disk_config.\
173 remote_to_roles_to_dev_mount_options[remote][role]
174 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
175 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
176
177 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
178 'mountpoint: {p}, type: {t}, options: {v}'.format(
179 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
180 c=cluster))
181
182 remote.run(
183 args=[
184 'sudo',
185 'mount',
186 '-t', fstype,
187 '-o', ','.join(mount_options),
188 dev,
189 mnt,
190 ]
191 )
192
193
9f95a23c
TL
194def log_exc(func):
195 @wraps(func)
196 def wrapper(self):
197 try:
198 return func(self)
199 except:
200 self.log(traceback.format_exc())
201 raise
202 return wrapper
203
204
205class PoolType:
206 REPLICATED = 1
207 ERASURE_CODED = 3
208
209
210class OSDThrasher(Thrasher):
7c673cae
FG
211 """
212 Object used to thrash Ceph
213 """
9f95a23c
TL
214 def __init__(self, manager, config, name, logger):
215 super(OSDThrasher, self).__init__()
216
7c673cae
FG
217 self.ceph_manager = manager
218 self.cluster = manager.cluster
219 self.ceph_manager.wait_for_clean()
220 osd_status = self.ceph_manager.get_osd_status()
221 self.in_osds = osd_status['in']
222 self.live_osds = osd_status['live']
223 self.out_osds = osd_status['out']
224 self.dead_osds = osd_status['dead']
225 self.stopping = False
226 self.logger = logger
227 self.config = config
9f95a23c 228 self.name = name
3efd9988 229 self.revive_timeout = self.config.get("revive_timeout", 360)
7c673cae
FG
230 self.pools_to_fix_pgp_num = set()
231 if self.config.get('powercycle'):
232 self.revive_timeout += 120
233 self.clean_wait = self.config.get('clean_wait', 0)
3efd9988 234 self.minin = self.config.get("min_in", 4)
7c673cae
FG
235 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
236 self.sighup_delay = self.config.get('sighup_delay')
237 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
238 self.dump_ops_enable = self.config.get('dump_ops_enable')
239 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
240 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
241 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
242 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
224ce89b 243 self.random_eio = self.config.get('random_eio')
c07f9fc5 244 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
7c673cae
FG
245
246 num_osds = self.in_osds + self.out_osds
11fdf7f2
TL
247 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
248 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
7c673cae
FG
249 if self.config is None:
250 self.config = dict()
251 # prevent monitor from auto-marking things out while thrasher runs
252 # try both old and new tell syntax, in case we are testing old code
253 self.saved_options = []
254 # assuming that the default settings do not vary from one daemon to
255 # another
256 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
257 opts = [('mon', 'mon_osd_down_out_interval', 0)]
9f95a23c 258 #why do we disable marking an OSD out automatically? :/
7c673cae
FG
259 for service, opt, new_value in opts:
260 old_value = manager.get_config(first_mon[0],
261 first_mon[1],
262 opt)
263 self.saved_options.append((service, opt, old_value))
11fdf7f2 264 manager.inject_args(service, '*', opt, new_value)
7c673cae
FG
265 # initialize ceph_objectstore_tool property - must be done before
266 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
267 if (self.config.get('powercycle') or
268 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
269 self.config.get('disable_objectstore_tool_tests', False)):
270 self.ceph_objectstore_tool = False
7c673cae
FG
271 if self.config.get('powercycle'):
272 self.log("Unable to test ceph-objectstore-tool, "
273 "powercycle testing")
274 else:
275 self.log("Unable to test ceph-objectstore-tool, "
276 "not available on all OSD nodes")
277 else:
278 self.ceph_objectstore_tool = \
279 self.config.get('ceph_objectstore_tool', True)
7c673cae
FG
280 # spawn do_thrash
281 self.thread = gevent.spawn(self.do_thrash)
282 if self.sighup_delay:
283 self.sighup_thread = gevent.spawn(self.do_sighup)
284 if self.optrack_toggle_delay:
285 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
286 if self.dump_ops_enable == "true":
287 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
288 if self.noscrub_toggle_delay:
289 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
290
9f95a23c
TL
291 def log(self, msg, *args, **kwargs):
292 self.logger.info(msg, *args, **kwargs)
293
7c673cae 294 def cmd_exists_on_osds(self, cmd):
b3b6e05e 295 if self.ceph_manager.cephadm or self.ceph_manager.rook:
9f95a23c 296 return True
7c673cae
FG
297 allremotes = self.ceph_manager.ctx.cluster.only(\
298 teuthology.is_type('osd', self.cluster)).remotes.keys()
299 allremotes = list(set(allremotes))
300 for remote in allremotes:
301 proc = remote.run(args=['type', cmd], wait=True,
9f95a23c
TL
302 check_status=False, stdout=BytesIO(),
303 stderr=BytesIO())
7c673cae
FG
304 if proc.exitstatus != 0:
305 return False;
306 return True;
307
9f95a23c
TL
308 def run_ceph_objectstore_tool(self, remote, osd, cmd):
309 if self.ceph_manager.cephadm:
310 return shell(
311 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
20effc67 312 args=['ceph-objectstore-tool', '--err-to-stderr'] + cmd,
9f95a23c
TL
313 name=osd,
314 wait=True, check_status=False,
315 stdout=StringIO(),
316 stderr=StringIO())
b3b6e05e
TL
317 elif self.ceph_manager.rook:
318 assert False, 'not implemented'
9f95a23c
TL
319 else:
320 return remote.run(
20effc67 321 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool', '--err-to-stderr'] + cmd,
9f95a23c
TL
322 wait=True, check_status=False,
323 stdout=StringIO(),
324 stderr=StringIO())
325
f67539c2
TL
326 def run_ceph_bluestore_tool(self, remote, osd, cmd):
327 if self.ceph_manager.cephadm:
328 return shell(
329 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
330 args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
331 name=osd,
332 wait=True, check_status=False,
333 stdout=StringIO(),
334 stderr=StringIO())
b3b6e05e
TL
335 elif self.ceph_manager.rook:
336 assert False, 'not implemented'
f67539c2
TL
337 else:
338 return remote.run(
339 args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
340 wait=True, check_status=False,
341 stdout=StringIO(),
342 stderr=StringIO())
343
7c673cae
FG
344 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
345 """
346 :param osd: Osd to be killed.
347 :mark_down: Mark down if true.
348 :mark_out: Mark out if true.
349 """
350 if osd is None:
351 osd = random.choice(self.live_osds)
352 self.log("Killing osd %s, live_osds are %s" % (str(osd),
353 str(self.live_osds)))
354 self.live_osds.remove(osd)
355 self.dead_osds.append(osd)
356 self.ceph_manager.kill_osd(osd)
357 if mark_down:
358 self.ceph_manager.mark_down_osd(osd)
359 if mark_out and osd in self.in_osds:
360 self.out_osd(osd)
361 if self.ceph_objectstore_tool:
9f95a23c 362 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
7c673cae
FG
363 remote = self.ceph_manager.find_remote('osd', osd)
364 FSPATH = self.ceph_manager.get_filepath()
365 JPATH = os.path.join(FSPATH, "journal")
366 exp_osd = imp_osd = osd
9f95a23c 367 self.log('remote for osd %s is %s' % (osd, remote))
7c673cae
FG
368 exp_remote = imp_remote = remote
369 # If an older osd is available we'll move a pg from there
370 if (len(self.dead_osds) > 1 and
371 random.random() < self.chance_move_pg):
372 exp_osd = random.choice(self.dead_osds[:-1])
373 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
9f95a23c
TL
374 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
375 prefix = [
376 '--no-mon-config',
377 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
378 ]
379
b3b6e05e
TL
380 if self.ceph_manager.rook:
381 assert False, 'not implemented'
382
9f95a23c
TL
383 if not self.ceph_manager.cephadm:
384 # ceph-objectstore-tool might be temporarily absent during an
385 # upgrade - see http://tracker.ceph.com/issues/18014
386 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
387 while proceed():
388 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
389 wait=True, check_status=False, stdout=BytesIO(),
390 stderr=BytesIO())
391 if proc.exitstatus == 0:
392 break
393 log.debug("ceph-objectstore-tool binary not present, trying again")
7c673cae
FG
394
395 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
396 # see http://tracker.ceph.com/issues/19556
397 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
398 while proceed():
9f95a23c
TL
399 proc = self.run_ceph_objectstore_tool(
400 exp_remote, 'osd.%s' % exp_osd,
401 prefix + [
402 '--data-path', FSPATH.format(id=exp_osd),
403 '--journal-path', JPATH.format(id=exp_osd),
404 '--op', 'list-pgs',
405 ])
7c673cae
FG
406 if proc.exitstatus == 0:
407 break
9f95a23c
TL
408 elif (proc.exitstatus == 1 and
409 proc.stderr.getvalue() == "OSD has the store locked"):
7c673cae
FG
410 continue
411 else:
412 raise Exception("ceph-objectstore-tool: "
413 "exp list-pgs failure with status {ret}".
414 format(ret=proc.exitstatus))
415
f91f0fd5 416 pgs = proc.stdout.getvalue().split('\n')[:-1]
7c673cae
FG
417 if len(pgs) == 0:
418 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
419 return
420 pg = random.choice(pgs)
9f95a23c
TL
421 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
422 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
423 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
7c673cae
FG
424 "exp.{pg}.{id}".format(
425 pg=pg,
426 id=exp_osd))
9f95a23c
TL
427 if self.ceph_manager.cephadm:
428 exp_host_path = os.path.join(
429 '/var/log/ceph',
430 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
431 "exp.{pg}.{id}".format(
432 pg=pg,
433 id=exp_osd))
434 else:
435 exp_host_path = exp_path
436
7c673cae 437 # export
3efd9988 438 # Can't use new export-remove op since this is part of upgrade testing
9f95a23c
TL
439 proc = self.run_ceph_objectstore_tool(
440 exp_remote, 'osd.%s' % exp_osd,
441 prefix + [
442 '--data-path', FSPATH.format(id=exp_osd),
443 '--journal-path', JPATH.format(id=exp_osd),
444 '--op', 'export',
445 '--pgid', pg,
446 '--file', exp_path,
447 ])
7c673cae
FG
448 if proc.exitstatus:
449 raise Exception("ceph-objectstore-tool: "
450 "export failure with status {ret}".
451 format(ret=proc.exitstatus))
452 # remove
9f95a23c
TL
453 proc = self.run_ceph_objectstore_tool(
454 exp_remote, 'osd.%s' % exp_osd,
455 prefix + [
456 '--data-path', FSPATH.format(id=exp_osd),
457 '--journal-path', JPATH.format(id=exp_osd),
458 '--force',
459 '--op', 'remove',
460 '--pgid', pg,
461 ])
7c673cae
FG
462 if proc.exitstatus:
463 raise Exception("ceph-objectstore-tool: "
464 "remove failure with status {ret}".
465 format(ret=proc.exitstatus))
466 # If there are at least 2 dead osds we might move the pg
467 if exp_osd != imp_osd:
468 # If pg isn't already on this osd, then we will move it there
9f95a23c
TL
469 proc = self.run_ceph_objectstore_tool(
470 imp_remote,
471 'osd.%s' % imp_osd,
472 prefix + [
473 '--data-path', FSPATH.format(id=imp_osd),
474 '--journal-path', JPATH.format(id=imp_osd),
475 '--op', 'list-pgs',
476 ])
7c673cae
FG
477 if proc.exitstatus:
478 raise Exception("ceph-objectstore-tool: "
479 "imp list-pgs failure with status {ret}".
480 format(ret=proc.exitstatus))
f91f0fd5 481 pgs = proc.stdout.getvalue().split('\n')[:-1]
7c673cae
FG
482 if pg not in pgs:
483 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
484 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
485 if imp_remote != exp_remote:
486 # Copy export file to the other machine
487 self.log("Transfer export file from {srem} to {trem}".
488 format(srem=exp_remote, trem=imp_remote))
9f95a23c
TL
489 # just in case an upgrade make /var/log/ceph unreadable by non-root,
490 exp_remote.run(args=['sudo', 'chmod', '777',
491 '/var/log/ceph'])
492 imp_remote.run(args=['sudo', 'chmod', '777',
493 '/var/log/ceph'])
494 tmpexport = Remote.get_file(exp_remote, exp_host_path,
495 sudo=True)
496 if exp_host_path != exp_path:
497 # push to /var/log/ceph, then rename (we can't
498 # chmod 777 the /var/log/ceph/$fsid mountpoint)
499 Remote.put_file(imp_remote, tmpexport, exp_path)
500 imp_remote.run(args=[
501 'sudo', 'mv', exp_path, exp_host_path])
502 else:
503 Remote.put_file(imp_remote, tmpexport, exp_host_path)
7c673cae
FG
504 os.remove(tmpexport)
505 else:
506 # Can't move the pg after all
507 imp_osd = exp_osd
508 imp_remote = exp_remote
509 # import
9f95a23c
TL
510 proc = self.run_ceph_objectstore_tool(
511 imp_remote, 'osd.%s' % imp_osd,
512 [
513 '--data-path', FSPATH.format(id=imp_osd),
514 '--journal-path', JPATH.format(id=imp_osd),
515 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
516 '--op', 'import',
517 '--file', exp_path,
518 ])
7c673cae
FG
519 if proc.exitstatus == 1:
520 bogosity = "The OSD you are using is older than the exported PG"
521 if bogosity in proc.stderr.getvalue():
522 self.log("OSD older than exported PG"
523 "...ignored")
524 elif proc.exitstatus == 10:
525 self.log("Pool went away before processing an import"
526 "...ignored")
527 elif proc.exitstatus == 11:
528 self.log("Attempt to import an incompatible export"
529 "...ignored")
11fdf7f2
TL
530 elif proc.exitstatus == 12:
531 # this should be safe to ignore because we only ever move 1
532 # copy of the pg at a time, and merge is only initiated when
533 # all replicas are peered and happy. /me crosses fingers
534 self.log("PG merged on target"
535 "...ignored")
7c673cae
FG
536 elif proc.exitstatus:
537 raise Exception("ceph-objectstore-tool: "
538 "import failure with status {ret}".
539 format(ret=proc.exitstatus))
9f95a23c 540 cmd = "sudo rm -f {file}".format(file=exp_host_path)
7c673cae
FG
541 exp_remote.run(args=cmd)
542 if imp_remote != exp_remote:
543 imp_remote.run(args=cmd)
544
545 # apply low split settings to each pool
9f95a23c
TL
546 if not self.ceph_manager.cephadm:
547 for pool in self.ceph_manager.list_pools():
548 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
549 "--filestore-split-multiple 1' sudo -E "
550 + 'ceph-objectstore-tool '
551 + ' '.join(prefix + [
552 '--data-path', FSPATH.format(id=imp_osd),
553 '--journal-path', JPATH.format(id=imp_osd),
554 ])
555 + " --op apply-layout-settings --pool " + pool).format(id=osd)
556 proc = imp_remote.run(args=cmd,
557 wait=True, check_status=False,
558 stderr=StringIO())
559 if 'Couldn\'t find pool' in proc.stderr.getvalue():
560 continue
561 if proc.exitstatus:
562 raise Exception("ceph-objectstore-tool apply-layout-settings"
563 " failed with {status}".format(status=proc.exitstatus))
7c673cae 564
7c673cae
FG
565
566 def blackhole_kill_osd(self, osd=None):
567 """
568 If all else fails, kill the osd.
569 :param osd: Osd to be killed.
570 """
571 if osd is None:
572 osd = random.choice(self.live_osds)
573 self.log("Blackholing and then killing osd %s, live_osds are %s" %
574 (str(osd), str(self.live_osds)))
575 self.live_osds.remove(osd)
576 self.dead_osds.append(osd)
577 self.ceph_manager.blackhole_kill_osd(osd)
578
579 def revive_osd(self, osd=None, skip_admin_check=False):
580 """
581 Revive the osd.
582 :param osd: Osd to be revived.
583 """
584 if osd is None:
585 osd = random.choice(self.dead_osds)
586 self.log("Reviving osd %s" % (str(osd),))
587 self.ceph_manager.revive_osd(
588 osd,
589 self.revive_timeout,
590 skip_admin_check=skip_admin_check)
591 self.dead_osds.remove(osd)
592 self.live_osds.append(osd)
11fdf7f2
TL
593 if self.random_eio > 0 and osd == self.rerrosd:
594 self.ceph_manager.set_config(self.rerrosd,
595 filestore_debug_random_read_err = self.random_eio)
596 self.ceph_manager.set_config(self.rerrosd,
597 bluestore_debug_random_read_err = self.random_eio)
224ce89b 598
7c673cae
FG
599
600 def out_osd(self, osd=None):
601 """
602 Mark the osd out
603 :param osd: Osd to be marked.
604 """
605 if osd is None:
606 osd = random.choice(self.in_osds)
607 self.log("Removing osd %s, in_osds are: %s" %
608 (str(osd), str(self.in_osds)))
609 self.ceph_manager.mark_out_osd(osd)
610 self.in_osds.remove(osd)
611 self.out_osds.append(osd)
612
613 def in_osd(self, osd=None):
614 """
615 Mark the osd out
616 :param osd: Osd to be marked.
617 """
618 if osd is None:
619 osd = random.choice(self.out_osds)
620 if osd in self.dead_osds:
621 return self.revive_osd(osd)
622 self.log("Adding osd %s" % (str(osd),))
623 self.out_osds.remove(osd)
624 self.in_osds.append(osd)
625 self.ceph_manager.mark_in_osd(osd)
626 self.log("Added osd %s" % (str(osd),))
627
628 def reweight_osd_or_by_util(self, osd=None):
629 """
630 Reweight an osd that is in
631 :param osd: Osd to be marked.
632 """
633 if osd is not None or random.choice([True, False]):
634 if osd is None:
635 osd = random.choice(self.in_osds)
636 val = random.uniform(.1, 1.0)
637 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
638 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
639 str(osd), str(val))
640 else:
641 # do it several times, the option space is large
642 for i in range(5):
643 options = {
644 'max_change': random.choice(['0.05', '1.0', '3.0']),
645 'overage': random.choice(['110', '1000']),
646 'type': random.choice([
647 'reweight-by-utilization',
648 'test-reweight-by-utilization']),
649 }
650 self.log("Reweighting by: %s"%(str(options),))
651 self.ceph_manager.raw_cluster_cmd(
652 'osd',
653 options['type'],
654 options['overage'],
655 options['max_change'])
656
657 def primary_affinity(self, osd=None):
2a845540 658 self.log("primary_affinity")
7c673cae
FG
659 if osd is None:
660 osd = random.choice(self.in_osds)
661 if random.random() >= .5:
662 pa = random.random()
663 elif random.random() >= .5:
664 pa = 1
665 else:
666 pa = 0
667 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
668 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
669 str(osd), str(pa))
670
671 def thrash_cluster_full(self):
672 """
673 Set and unset cluster full condition
674 """
675 self.log('Setting full ratio to .001')
676 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
677 time.sleep(1)
678 self.log('Setting full ratio back to .95')
679 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
680
681 def thrash_pg_upmap(self):
682 """
683 Install or remove random pg_upmap entries in OSDMap
684 """
2a845540 685 self.log("thrash_pg_upmap")
7c673cae
FG
686 from random import shuffle
687 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
688 j = json.loads(out)
689 self.log('j is %s' % j)
690 try:
691 if random.random() >= .3:
692 pgs = self.ceph_manager.get_pg_stats()
9f95a23c 693 if not pgs:
2a845540 694 self.log('No pgs; doing nothing')
9f95a23c 695 return
7c673cae
FG
696 pg = random.choice(pgs)
697 pgid = str(pg['pgid'])
698 poolid = int(pgid.split('.')[0])
699 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
700 if len(sizes) == 0:
2a845540 701 self.log('No pools; doing nothing')
7c673cae
FG
702 return
703 n = sizes[0]
704 osds = self.in_osds + self.out_osds
705 shuffle(osds)
706 osds = osds[0:n]
707 self.log('Setting %s to %s' % (pgid, osds))
708 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
709 self.log('cmd %s' % cmd)
710 self.ceph_manager.raw_cluster_cmd(*cmd)
711 else:
712 m = j['pg_upmap']
713 if len(m) > 0:
714 shuffle(m)
715 pg = m[0]['pgid']
716 self.log('Clearing pg_upmap on %s' % pg)
717 self.ceph_manager.raw_cluster_cmd(
718 'osd',
719 'rm-pg-upmap',
720 pg)
721 else:
722 self.log('No pg_upmap entries; doing nothing')
723 except CommandFailedError:
724 self.log('Failed to rm-pg-upmap, ignoring')
725
726 def thrash_pg_upmap_items(self):
727 """
728 Install or remove random pg_upmap_items entries in OSDMap
729 """
2a845540 730 self.log("thrash_pg_upmap_items")
7c673cae
FG
731 from random import shuffle
732 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
733 j = json.loads(out)
734 self.log('j is %s' % j)
735 try:
736 if random.random() >= .3:
737 pgs = self.ceph_manager.get_pg_stats()
9f95a23c 738 if not pgs:
2a845540 739 self.log('No pgs; doing nothing')
9f95a23c 740 return
7c673cae
FG
741 pg = random.choice(pgs)
742 pgid = str(pg['pgid'])
743 poolid = int(pgid.split('.')[0])
744 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
745 if len(sizes) == 0:
2a845540 746 self.log('No pools; doing nothing')
7c673cae
FG
747 return
748 n = sizes[0]
749 osds = self.in_osds + self.out_osds
750 shuffle(osds)
751 osds = osds[0:n*2]
752 self.log('Setting %s to %s' % (pgid, osds))
753 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
754 self.log('cmd %s' % cmd)
755 self.ceph_manager.raw_cluster_cmd(*cmd)
756 else:
757 m = j['pg_upmap_items']
758 if len(m) > 0:
759 shuffle(m)
760 pg = m[0]['pgid']
761 self.log('Clearing pg_upmap on %s' % pg)
762 self.ceph_manager.raw_cluster_cmd(
763 'osd',
764 'rm-pg-upmap-items',
765 pg)
766 else:
767 self.log('No pg_upmap entries; doing nothing')
768 except CommandFailedError:
769 self.log('Failed to rm-pg-upmap-items, ignoring')
770
c07f9fc5
FG
771 def force_recovery(self):
772 """
773 Force recovery on some of PGs
774 """
775 backfill = random.random() >= 0.5
776 j = self.ceph_manager.get_pgids_to_force(backfill)
777 if j:
b32b8144
FG
778 try:
779 if backfill:
780 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
781 else:
782 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
783 except CommandFailedError:
784 self.log('Failed to force backfill|recovery, ignoring')
785
c07f9fc5
FG
786
787 def cancel_force_recovery(self):
788 """
789 Force recovery on some of PGs
790 """
791 backfill = random.random() >= 0.5
792 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
793 if j:
b32b8144
FG
794 try:
795 if backfill:
796 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
797 else:
798 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
799 except CommandFailedError:
800 self.log('Failed to force backfill|recovery, ignoring')
c07f9fc5
FG
801
802 def force_cancel_recovery(self):
803 """
804 Force or cancel forcing recovery
805 """
806 if random.random() >= 0.4:
807 self.force_recovery()
808 else:
809 self.cancel_force_recovery()
810
7c673cae
FG
811 def all_up(self):
812 """
813 Make sure all osds are up and not out.
814 """
815 while len(self.dead_osds) > 0:
816 self.log("reviving osd")
817 self.revive_osd()
818 while len(self.out_osds) > 0:
819 self.log("inning osd")
820 self.in_osd()
821
31f18b77
FG
822 def all_up_in(self):
823 """
824 Make sure all osds are up and fully in.
825 """
826 self.all_up();
827 for osd in self.live_osds:
828 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
829 str(osd), str(1))
830 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
831 str(osd), str(1))
832
7c673cae
FG
833 def do_join(self):
834 """
835 Break out of this Ceph loop
836 """
837 self.stopping = True
838 self.thread.get()
839 if self.sighup_delay:
840 self.log("joining the do_sighup greenlet")
841 self.sighup_thread.get()
842 if self.optrack_toggle_delay:
843 self.log("joining the do_optrack_toggle greenlet")
844 self.optrack_toggle_thread.join()
845 if self.dump_ops_enable == "true":
846 self.log("joining the do_dump_ops greenlet")
847 self.dump_ops_thread.join()
848 if self.noscrub_toggle_delay:
849 self.log("joining the do_noscrub_toggle greenlet")
850 self.noscrub_toggle_thread.join()
851
852 def grow_pool(self):
853 """
854 Increase the size of the pool
855 """
856 pool = self.ceph_manager.get_pool()
9f95a23c
TL
857 if pool is None:
858 return
7c673cae
FG
859 self.log("Growing pool %s" % (pool,))
860 if self.ceph_manager.expand_pool(pool,
861 self.config.get('pool_grow_by', 10),
862 self.max_pgs):
863 self.pools_to_fix_pgp_num.add(pool)
864
11fdf7f2
TL
865 def shrink_pool(self):
866 """
867 Decrease the size of the pool
868 """
869 pool = self.ceph_manager.get_pool()
9f95a23c
TL
870 if pool is None:
871 return
872 _ = self.ceph_manager.get_pool_pg_num(pool)
11fdf7f2
TL
873 self.log("Shrinking pool %s" % (pool,))
874 if self.ceph_manager.contract_pool(
875 pool,
876 self.config.get('pool_shrink_by', 10),
877 self.min_pgs):
878 self.pools_to_fix_pgp_num.add(pool)
879
7c673cae
FG
880 def fix_pgp_num(self, pool=None):
881 """
882 Fix number of pgs in pool.
883 """
884 if pool is None:
885 pool = self.ceph_manager.get_pool()
9f95a23c
TL
886 if not pool:
887 return
7c673cae
FG
888 force = False
889 else:
890 force = True
891 self.log("fixing pg num pool %s" % (pool,))
892 if self.ceph_manager.set_pool_pgpnum(pool, force):
893 self.pools_to_fix_pgp_num.discard(pool)
894
895 def test_pool_min_size(self):
896 """
9f95a23c
TL
897 Loop to selectively push PGs below their min_size and test that recovery
898 still occurs.
7c673cae
FG
899 """
900 self.log("test_pool_min_size")
901 self.all_up()
2a845540 902 time.sleep(60) # buffer time for recovery to start.
7c673cae
FG
903 self.ceph_manager.wait_for_recovery(
904 timeout=self.config.get('timeout')
905 )
9f95a23c
TL
906 minout = int(self.config.get("min_out", 1))
907 minlive = int(self.config.get("min_live", 2))
908 mindead = int(self.config.get("min_dead", 1))
909 self.log("doing min_size thrashing")
2a845540 910 self.ceph_manager.wait_for_clean(timeout=180)
9f95a23c
TL
911 assert self.ceph_manager.is_clean(), \
912 'not clean before minsize thrashing starts'
913 while not self.stopping:
914 # look up k and m from all the pools on each loop, in case it
915 # changes as the cluster runs
916 k = 0
917 m = 99
918 has_pools = False
919 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
920
921 for pool_json in pools_json:
922 pool = pool_json['pool_name']
923 has_pools = True
924 pool_type = pool_json['type'] # 1 for rep, 3 for ec
925 min_size = pool_json['min_size']
926 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
927 try:
928 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
929 if pool_type != PoolType.ERASURE_CODED:
930 continue
931 ec_profile = pool_json['erasure_code_profile']
932 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
933 'osd',
934 'erasure-code-profile',
935 'get',
936 ec_profile,
937 '--format=json')
938 ec_json = json.loads(ec_profile_json)
939 local_k = int(ec_json['k'])
940 local_m = int(ec_json['m'])
941 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
942 k=local_k, m=local_m))
943 if local_k > k:
944 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
945 k = local_k
946 if local_m < m:
947 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
948 m = local_m
949 except CommandFailedError:
950 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
951 continue
952
953 if has_pools :
954 self.log("using k={k}, m={m}".format(k=k,m=m))
955 else:
956 self.log("No pools yet, waiting")
957 time.sleep(5)
958 continue
959
960 if minout > len(self.out_osds): # kill OSDs and mark out
961 self.log("forced to out an osd")
962 self.kill_osd(mark_out=True)
963 continue
964 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
965 self.log("forced to kill an osd")
966 self.kill_osd()
967 continue
968 else: # make mostly-random choice to kill or revive OSDs
969 minup = max(minlive, k)
970 rand_val = random.uniform(0, 1)
971 self.log("choosing based on number of live OSDs and rand val {rand}".\
972 format(rand=rand_val))
973 if len(self.live_osds) > minup+1 and rand_val < 0.5:
974 # chose to knock out as many OSDs as we can w/out downing PGs
975
976 most_killable = min(len(self.live_osds) - minup, m)
977 self.log("chose to kill {n} OSDs".format(n=most_killable))
978 for i in range(1, most_killable):
979 self.kill_osd(mark_out=True)
980 time.sleep(10)
981 # try a few times since there might be a concurrent pool
982 # creation or deletion
983 with safe_while(
2a845540 984 sleep=25, tries=5,
9f95a23c
TL
985 action='check for active or peered') as proceed:
986 while proceed():
987 if self.ceph_manager.all_active_or_peered():
988 break
989 self.log('not all PGs are active or peered')
990 else: # chose to revive OSDs, bring up a random fraction of the dead ones
991 self.log("chose to revive osds")
992 for i in range(1, int(rand_val * len(self.dead_osds))):
993 self.revive_osd(i)
994
995 # let PGs repair themselves or our next knockout might kill one
996 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
997
998 # / while not self.stopping
999 self.all_up_in()
1000
7c673cae
FG
1001 self.ceph_manager.wait_for_recovery(
1002 timeout=self.config.get('timeout')
1003 )
1004
1005 def inject_pause(self, conf_key, duration, check_after, should_be_down):
1006 """
1007 Pause injection testing. Check for osd being down when finished.
1008 """
1009 the_one = random.choice(self.live_osds)
2a845540 1010 self.log("inject_pause on osd.{osd}".format(osd=the_one))
7c673cae
FG
1011 self.log(
1012 "Testing {key} pause injection for duration {duration}".format(
1013 key=conf_key,
1014 duration=duration
1015 ))
1016 self.log(
1017 "Checking after {after}, should_be_down={shouldbedown}".format(
1018 after=check_after,
1019 shouldbedown=should_be_down
1020 ))
1021 self.ceph_manager.set_config(the_one, **{conf_key: duration})
1022 if not should_be_down:
1023 return
1024 time.sleep(check_after)
1025 status = self.ceph_manager.get_osd_status()
1026 assert the_one in status['down']
1027 time.sleep(duration - check_after + 20)
1028 status = self.ceph_manager.get_osd_status()
1029 assert not the_one in status['down']
1030
1031 def test_backfill_full(self):
1032 """
1033 Test backfills stopping when the replica fills up.
1034
1035 First, use injectfull admin command to simulate a now full
1036 osd by setting it to 0 on all of the OSDs.
1037
1038 Second, on a random subset, set
1039 osd_debug_skip_full_check_in_backfill_reservation to force
1040 the more complicated check in do_scan to be exercised.
1041
3efd9988 1042 Then, verify that all backfillings stop.
7c673cae
FG
1043 """
1044 self.log("injecting backfill full")
1045 for i in self.live_osds:
1046 self.ceph_manager.set_config(
1047 i,
1048 osd_debug_skip_full_check_in_backfill_reservation=
1049 random.choice(['false', 'true']))
1050 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
1051 check_status=True, timeout=30, stdout=DEVNULL)
1052 for i in range(30):
1053 status = self.ceph_manager.compile_pg_status()
3efd9988 1054 if 'backfilling' not in status.keys():
7c673cae
FG
1055 break
1056 self.log(
3efd9988
FG
1057 "waiting for {still_going} backfillings".format(
1058 still_going=status.get('backfilling')))
7c673cae 1059 time.sleep(1)
3efd9988 1060 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
7c673cae
FG
1061 for i in self.live_osds:
1062 self.ceph_manager.set_config(
1063 i,
1064 osd_debug_skip_full_check_in_backfill_reservation='false')
1065 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
1066 check_status=True, timeout=30, stdout=DEVNULL)
1067
f67539c2
TL
1068
1069 def generate_random_sharding(self):
1070 prefixes = [
1071 'm','O','P','L'
1072 ]
1073 new_sharding = ''
1074 for prefix in prefixes:
1075 choose = random.choice([False, True])
1076 if not choose:
1077 continue
1078 if new_sharding != '':
1079 new_sharding = new_sharding + ' '
1080 columns = random.randint(1, 5)
1081 do_hash = random.choice([False, True])
1082 if do_hash:
1083 low_hash = random.choice([0, 5, 8])
1084 do_high_hash = random.choice([False, True])
1085 if do_high_hash:
1086 high_hash = random.choice([8, 16, 30]) + low_hash
1087 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
1088 else:
1089 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
1090 else:
1091 if columns == 1:
1092 new_sharding = new_sharding + prefix
1093 else:
1094 new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
1095 return new_sharding
1096
1097 def test_bluestore_reshard_action(self):
1098 """
1099 Test if resharding of bluestore works properly.
1100 If bluestore is not used, or bluestore is in version that
1101 does not support sharding, skip.
1102 """
1103
1104 osd = random.choice(self.dead_osds)
1105 remote = self.ceph_manager.find_remote('osd', osd)
1106 FSPATH = self.ceph_manager.get_filepath()
1107
1108 prefix = [
1109 '--no-mon-config',
1110 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1111 '--log-level=10',
1112 '--path', FSPATH.format(id=osd)
1113 ]
1114
1115 # sanity check if bluestore-tool accessible
1116 self.log('checking if target objectstore is bluestore on osd.%s' % osd)
1117 cmd = prefix + [
1118 'show-label'
1119 ]
1120 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1121 if proc.exitstatus != 0:
1122 raise Exception("ceph-bluestore-tool access failed.")
1123
1124 # check if sharding is possible
1125 self.log('checking if target bluestore supports sharding on osd.%s' % osd)
1126 cmd = prefix + [
1127 'show-sharding'
1128 ]
1129 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1130 if proc.exitstatus != 0:
1131 self.log("Unable to test resharding, "
1132 "ceph-bluestore-tool does not support it.")
1133 return
1134
1135 # now go for reshard to something else
1136 self.log('applying new sharding to bluestore on osd.%s' % osd)
1137 new_sharding = self.config.get('bluestore_new_sharding','random')
1138
1139 if new_sharding == 'random':
1140 self.log('generate random sharding')
1141 new_sharding = self.generate_random_sharding()
1142
1143 self.log("applying new sharding: " + new_sharding)
1144 cmd = prefix + [
1145 '--sharding', new_sharding,
1146 'reshard'
1147 ]
1148 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1149 if proc.exitstatus != 0:
1150 raise Exception("ceph-bluestore-tool resharding failed.")
1151
1152 # now do fsck to
1153 self.log('running fsck to verify new sharding on osd.%s' % osd)
1154 cmd = prefix + [
1155 'fsck'
1156 ]
1157 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1158 if proc.exitstatus != 0:
1159 raise Exception("ceph-bluestore-tool fsck failed.")
1160 self.log('resharding successfully completed')
1161
1162 def test_bluestore_reshard(self):
1163 """
1164 1) kills an osd
1165 2) reshards bluestore on killed osd
1166 3) revives the osd
1167 """
1168 self.log('test_bluestore_reshard started')
1169 self.kill_osd(mark_down=True, mark_out=True)
1170 self.test_bluestore_reshard_action()
1171 self.revive_osd()
1172 self.log('test_bluestore_reshard completed')
1173
1174
7c673cae
FG
1175 def test_map_discontinuity(self):
1176 """
1177 1) Allows the osds to recover
1178 2) kills an osd
1179 3) allows the remaining osds to recover
1180 4) waits for some time
1181 5) revives the osd
1182 This sequence should cause the revived osd to have to handle
1183 a map gap since the mons would have trimmed
1184 """
2a845540 1185 self.log("test_map_discontinuity")
7c673cae
FG
1186 while len(self.in_osds) < (self.minin + 1):
1187 self.in_osd()
1188 self.log("Waiting for recovery")
c07f9fc5 1189 self.ceph_manager.wait_for_all_osds_up(
7c673cae
FG
1190 timeout=self.config.get('timeout')
1191 )
1192 # now we wait 20s for the pg status to change, if it takes longer,
1193 # the test *should* fail!
1194 time.sleep(20)
1195 self.ceph_manager.wait_for_clean(
1196 timeout=self.config.get('timeout')
1197 )
1198
1199 # now we wait 20s for the backfill replicas to hear about the clean
1200 time.sleep(20)
1201 self.log("Recovered, killing an osd")
1202 self.kill_osd(mark_down=True, mark_out=True)
1203 self.log("Waiting for clean again")
1204 self.ceph_manager.wait_for_clean(
1205 timeout=self.config.get('timeout')
1206 )
1207 self.log("Waiting for trim")
1208 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
1209 self.revive_osd()
1210
1211 def choose_action(self):
1212 """
1213 Random action selector.
1214 """
1215 chance_down = self.config.get('chance_down', 0.4)
9f95a23c 1216 _ = self.config.get('chance_test_min_size', 0)
7c673cae
FG
1217 chance_test_backfill_full = \
1218 self.config.get('chance_test_backfill_full', 0)
1219 if isinstance(chance_down, int):
1220 chance_down = float(chance_down) / 100
1221 minin = self.minin
9f95a23c
TL
1222 minout = int(self.config.get("min_out", 0))
1223 minlive = int(self.config.get("min_live", 2))
1224 mindead = int(self.config.get("min_dead", 0))
7c673cae
FG
1225
1226 self.log('choose_action: min_in %d min_out '
2a845540
TL
1227 '%d min_live %d min_dead %d '
1228 'chance_down %.2f' %
1229 (minin, minout, minlive, mindead, chance_down))
7c673cae
FG
1230 actions = []
1231 if len(self.in_osds) > minin:
1232 actions.append((self.out_osd, 1.0,))
1233 if len(self.live_osds) > minlive and chance_down > 0:
1234 actions.append((self.kill_osd, chance_down,))
7c673cae
FG
1235 if len(self.out_osds) > minout:
1236 actions.append((self.in_osd, 1.7,))
1237 if len(self.dead_osds) > mindead:
1238 actions.append((self.revive_osd, 1.0,))
1239 if self.config.get('thrash_primary_affinity', True):
1240 actions.append((self.primary_affinity, 1.0,))
1241 actions.append((self.reweight_osd_or_by_util,
1242 self.config.get('reweight_osd', .5),))
1243 actions.append((self.grow_pool,
1244 self.config.get('chance_pgnum_grow', 0),))
11fdf7f2
TL
1245 actions.append((self.shrink_pool,
1246 self.config.get('chance_pgnum_shrink', 0),))
7c673cae
FG
1247 actions.append((self.fix_pgp_num,
1248 self.config.get('chance_pgpnum_fix', 0),))
1249 actions.append((self.test_pool_min_size,
9f95a23c 1250 self.config.get('chance_test_min_size', 0),))
7c673cae
FG
1251 actions.append((self.test_backfill_full,
1252 chance_test_backfill_full,))
1253 if self.chance_thrash_cluster_full > 0:
1254 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1255 if self.chance_thrash_pg_upmap > 0:
1256 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1257 if self.chance_thrash_pg_upmap_items > 0:
1258 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
c07f9fc5
FG
1259 if self.chance_force_recovery > 0:
1260 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
7c673cae
FG
1261
1262 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1263 for scenario in [
1264 (lambda:
1265 self.inject_pause(key,
1266 self.config.get('pause_short', 3),
1267 0,
1268 False),
1269 self.config.get('chance_inject_pause_short', 1),),
1270 (lambda:
1271 self.inject_pause(key,
1272 self.config.get('pause_long', 80),
1273 self.config.get('pause_check_after', 70),
1274 True),
1275 self.config.get('chance_inject_pause_long', 0),)]:
1276 actions.append(scenario)
1277
f67539c2
TL
1278 # only consider resharding if objectstore is bluestore
1279 cluster_name = self.ceph_manager.cluster
1280 cluster = self.ceph_manager.ctx.ceph[cluster_name]
1281 if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1282 actions.append((self.test_bluestore_reshard,
1283 self.config.get('chance_bluestore_reshard', 0),))
1284
7c673cae
FG
1285 total = sum([y for (x, y) in actions])
1286 val = random.uniform(0, total)
1287 for (action, prob) in actions:
1288 if val < prob:
1289 return action
1290 val -= prob
1291 return None
1292
9f95a23c
TL
1293 def do_thrash(self):
1294 """
1295 _do_thrash() wrapper.
1296 """
1297 try:
1298 self._do_thrash()
1299 except Exception as e:
1300 # See _run exception comment for MDSThrasher
1301 self.set_thrasher_exception(e)
1302 self.logger.exception("exception:")
1303 # Allow successful completion so gevent doesn't see an exception.
1304 # The DaemonWatchdog will observe the error and tear down the test.
7c673cae
FG
1305
1306 @log_exc
1307 def do_sighup(self):
1308 """
1309 Loops and sends signal.SIGHUP to a random live osd.
1310
1311 Loop delay is controlled by the config value sighup_delay.
1312 """
1313 delay = float(self.sighup_delay)
1314 self.log("starting do_sighup with a delay of {0}".format(delay))
1315 while not self.stopping:
1316 osd = random.choice(self.live_osds)
1317 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1318 time.sleep(delay)
1319
1320 @log_exc
1321 def do_optrack_toggle(self):
1322 """
1323 Loops and toggle op tracking to all osds.
1324
1325 Loop delay is controlled by the config value optrack_toggle_delay.
1326 """
1327 delay = float(self.optrack_toggle_delay)
1328 osd_state = "true"
1329 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1330 while not self.stopping:
1331 if osd_state == "true":
1332 osd_state = "false"
1333 else:
1334 osd_state = "true"
11fdf7f2
TL
1335 try:
1336 self.ceph_manager.inject_args('osd', '*',
1337 'osd_enable_op_tracker',
1338 osd_state)
1339 except CommandFailedError:
1340 self.log('Failed to tell all osds, ignoring')
7c673cae
FG
1341 gevent.sleep(delay)
1342
1343 @log_exc
1344 def do_dump_ops(self):
1345 """
1346 Loops and does op dumps on all osds
1347 """
1348 self.log("starting do_dump_ops")
1349 while not self.stopping:
1350 for osd in self.live_osds:
1351 # Ignore errors because live_osds is in flux
1352 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1353 check_status=False, timeout=30, stdout=DEVNULL)
1354 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1355 check_status=False, timeout=30, stdout=DEVNULL)
1356 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1357 check_status=False, timeout=30, stdout=DEVNULL)
1358 gevent.sleep(0)
1359
1360 @log_exc
1361 def do_noscrub_toggle(self):
1362 """
1363 Loops and toggle noscrub flags
1364
1365 Loop delay is controlled by the config value noscrub_toggle_delay.
1366 """
1367 delay = float(self.noscrub_toggle_delay)
1368 scrub_state = "none"
1369 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1370 while not self.stopping:
1371 if scrub_state == "none":
1372 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1373 scrub_state = "noscrub"
1374 elif scrub_state == "noscrub":
1375 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1376 scrub_state = "both"
1377 elif scrub_state == "both":
1378 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1379 scrub_state = "nodeep-scrub"
1380 else:
1381 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1382 scrub_state = "none"
1383 gevent.sleep(delay)
1384 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1385 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1386
1387 @log_exc
9f95a23c 1388 def _do_thrash(self):
7c673cae
FG
1389 """
1390 Loop to select random actions to thrash ceph manager with.
1391 """
1392 cleanint = self.config.get("clean_interval", 60)
1393 scrubint = self.config.get("scrub_interval", -1)
1394 maxdead = self.config.get("max_dead", 0)
1395 delay = self.config.get("op_delay", 5)
224ce89b
WB
1396 self.rerrosd = self.live_osds[0]
1397 if self.random_eio > 0:
11fdf7f2
TL
1398 self.ceph_manager.inject_args('osd', self.rerrosd,
1399 'filestore_debug_random_read_err',
1400 self.random_eio)
1401 self.ceph_manager.inject_args('osd', self.rerrosd,
1402 'bluestore_debug_random_read_err',
1403 self.random_eio)
7c673cae
FG
1404 self.log("starting do_thrash")
1405 while not self.stopping:
1406 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1407 "out_osds: ", self.out_osds,
1408 "dead_osds: ", self.dead_osds,
1409 "live_osds: ", self.live_osds]]
1410 self.log(" ".join(to_log))
1411 if random.uniform(0, 1) < (float(delay) / cleanint):
1412 while len(self.dead_osds) > maxdead:
1413 self.revive_osd()
1414 for osd in self.in_osds:
1415 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1416 str(osd), str(1))
1417 if random.uniform(0, 1) < float(
11fdf7f2
TL
1418 self.config.get('chance_test_map_discontinuity', 0)) \
1419 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
7c673cae
FG
1420 self.test_map_discontinuity()
1421 else:
1422 self.ceph_manager.wait_for_recovery(
1423 timeout=self.config.get('timeout')
1424 )
1425 time.sleep(self.clean_wait)
1426 if scrubint > 0:
1427 if random.uniform(0, 1) < (float(delay) / scrubint):
1428 self.log('Scrubbing while thrashing being performed')
1429 Scrubber(self.ceph_manager, self.config)
1430 self.choose_action()()
1431 time.sleep(delay)
181888fb 1432 self.all_up()
224ce89b 1433 if self.random_eio > 0:
11fdf7f2
TL
1434 self.ceph_manager.inject_args('osd', self.rerrosd,
1435 'filestore_debug_random_read_err', '0.0')
1436 self.ceph_manager.inject_args('osd', self.rerrosd,
1437 'bluestore_debug_random_read_err', '0.0')
7c673cae
FG
1438 for pool in list(self.pools_to_fix_pgp_num):
1439 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1440 self.fix_pgp_num(pool)
1441 self.pools_to_fix_pgp_num.clear()
1442 for service, opt, saved_value in self.saved_options:
11fdf7f2 1443 self.ceph_manager.inject_args(service, '*', opt, saved_value)
7c673cae 1444 self.saved_options = []
31f18b77 1445 self.all_up_in()
7c673cae
FG
1446
1447
1448class ObjectStoreTool:
1449
1450 def __init__(self, manager, pool, **kwargs):
1451 self.manager = manager
1452 self.pool = pool
1453 self.osd = kwargs.get('osd', None)
1454 self.object_name = kwargs.get('object_name', None)
1455 self.do_revive = kwargs.get('do_revive', True)
1456 if self.osd and self.pool and self.object_name:
1457 if self.osd == "primary":
1458 self.osd = self.manager.get_object_primary(self.pool,
1459 self.object_name)
a4b75251 1460 assert self.osd is not None
7c673cae
FG
1461 if self.object_name:
1462 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1463 self.object_name,
1464 self.osd)
e306af50
TL
1465 self.remote = next(iter(self.manager.ctx.\
1466 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
7c673cae
FG
1467 path = self.manager.get_filepath().format(id=self.osd)
1468 self.paths = ("--data-path {path} --journal-path {path}/journal".
1469 format(path=path))
1470
1471 def build_cmd(self, options, args, stdin):
1472 lines = []
1473 if self.object_name:
1474 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1475 "{paths} --pgid {pgid} --op list |"
1476 "grep '\"oid\":\"{name}\"')".
1477 format(paths=self.paths,
1478 pgid=self.pgid,
1479 name=self.object_name))
1480 args = '"$object" ' + args
1481 options += " --pgid {pgid}".format(pgid=self.pgid)
1482 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1483 format(paths=self.paths,
1484 args=args,
1485 options=options))
1486 if stdin:
1487 cmd = ("echo {payload} | base64 --decode | {cmd}".
1488 format(payload=base64.encode(stdin),
1489 cmd=cmd))
1490 lines.append(cmd)
1491 return "\n".join(lines)
1492
f91f0fd5 1493 def run(self, options, args):
7c673cae 1494 self.manager.kill_osd(self.osd)
f91f0fd5 1495 cmd = self.build_cmd(options, args, None)
7c673cae
FG
1496 self.manager.log(cmd)
1497 try:
1498 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1499 check_status=False,
f91f0fd5 1500 stdout=BytesIO(),
9f95a23c 1501 stderr=BytesIO())
7c673cae
FG
1502 proc.wait()
1503 if proc.exitstatus != 0:
1504 self.manager.log("failed with " + str(proc.exitstatus))
f91f0fd5
TL
1505 error = proc.stdout.getvalue().decode() + " " + \
1506 proc.stderr.getvalue().decode()
7c673cae
FG
1507 raise Exception(error)
1508 finally:
1509 if self.do_revive:
1510 self.manager.revive_osd(self.osd)
c07f9fc5 1511 self.manager.wait_till_osd_is_up(self.osd, 300)
7c673cae
FG
1512
1513
9f95a23c
TL
1514# XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1515# the same name.
7c673cae
FG
1516class CephManager:
1517 """
1518 Ceph manager object.
1519 Contains several local functions that form a bulk of this module.
1520
9f95a23c
TL
1521 :param controller: the remote machine where the Ceph commands should be
1522 executed
1523 :param ctx: the cluster context
1524 :param config: path to Ceph config file
1525 :param logger: for logging messages
1526 :param cluster: name of the Ceph cluster
7c673cae
FG
1527 """
1528
7c673cae 1529 def __init__(self, controller, ctx=None, config=None, logger=None,
b3b6e05e 1530 cluster='ceph', cephadm=False, rook=False) -> None:
7c673cae
FG
1531 self.lock = threading.RLock()
1532 self.ctx = ctx
1533 self.config = config
1534 self.controller = controller
1535 self.next_pool_id = 0
1536 self.cluster = cluster
20effc67 1537
7c673cae
FG
1538 if (logger):
1539 self.log = lambda x: logger.info(x)
1540 else:
1541 def tmp(x):
1542 """
1543 implement log behavior.
1544 """
9f95a23c 1545 print(x)
7c673cae 1546 self.log = tmp
20effc67 1547
7c673cae
FG
1548 if self.config is None:
1549 self.config = dict()
20effc67
TL
1550
1551 # NOTE: These variables are meant to be overriden by vstart_runner.py.
1552 self.rook = rook
1553 self.cephadm = cephadm
1554 self.testdir = teuthology.get_testdir(self.ctx)
1555 self.run_cluster_cmd_prefix = [
1556 'sudo', 'adjust-ulimits', 'ceph-coverage',
1557 f'{self.testdir}/archive/coverage', 'timeout', '120', 'ceph',
1558 '--cluster', self.cluster]
1559 self.run_ceph_w_prefix = ['sudo', 'daemon-helper', 'kill', 'ceph',
1560 '--cluster', self.cluster]
1561
7c673cae
FG
1562 pools = self.list_pools()
1563 self.pools = {}
1564 for pool in pools:
1565 # we may race with a pool deletion; ignore failures here
1566 try:
9f95a23c 1567 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
7c673cae
FG
1568 except CommandFailedError:
1569 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1570
f67539c2 1571 def ceph(self, cmd, **kwargs):
7c673cae 1572 """
f67539c2
TL
1573 Simple Ceph admin command wrapper around run_cluster_cmd.
1574 """
1575
1576 kwargs.pop('args', None)
1577 args = shlex.split(cmd)
1578 stdout = kwargs.pop('stdout', StringIO())
1579 stderr = kwargs.pop('stderr', StringIO())
1580 return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
1581
1582 def run_cluster_cmd(self, **kwargs):
1583 """
1584 Run a Ceph command and return the object representing the process
1585 for the command.
1586
1587 Accepts arguments same as that of teuthology.orchestra.run.run()
7c673cae 1588 """
20effc67
TL
1589 if isinstance(kwargs['args'], str):
1590 kwargs['args'] = shlex.split(kwargs['args'])
1591 elif isinstance(kwargs['args'], tuple):
1592 kwargs['args'] = list(kwargs['args'])
1593
9f95a23c 1594 if self.cephadm:
f67539c2
TL
1595 return shell(self.ctx, self.cluster, self.controller,
1596 args=['ceph'] + list(kwargs['args']),
1597 stdout=StringIO(),
1598 check_status=kwargs.get('check_status', True))
b3b6e05e
TL
1599 if self.rook:
1600 return toolbox(self.ctx, self.cluster,
1601 args=['ceph'] + list(kwargs['args']),
1602 stdout=StringIO(),
1603 check_status=kwargs.get('check_status', True))
f67539c2 1604
20effc67 1605 kwargs['args'] = self.run_cluster_cmd_prefix + kwargs['args']
f67539c2
TL
1606 return self.controller.run(**kwargs)
1607
1608 def raw_cluster_cmd(self, *args, **kwargs) -> str:
1609 """
1610 Start ceph on a raw cluster. Return count
1611 """
20effc67
TL
1612 if kwargs.get('args') is None and args:
1613 kwargs['args'] = args
1614 kwargs['stdout'] = kwargs.pop('stdout', StringIO())
1615 return self.run_cluster_cmd(**kwargs).stdout.getvalue()
7c673cae 1616
11fdf7f2 1617 def raw_cluster_cmd_result(self, *args, **kwargs):
7c673cae
FG
1618 """
1619 Start ceph on a cluster. Return success or failure information.
1620 """
20effc67
TL
1621 if kwargs.get('args') is None and args:
1622 kwargs['args'] = args
1623 kwargs['check_status'] = False
f67539c2 1624 return self.run_cluster_cmd(**kwargs).exitstatus
7c673cae 1625
11fdf7f2 1626 def run_ceph_w(self, watch_channel=None):
7c673cae 1627 """
9f95a23c 1628 Execute "ceph -w" in the background with stdout connected to a BytesIO,
7c673cae 1629 and return the RemoteProcess.
11fdf7f2
TL
1630
1631 :param watch_channel: Specifies the channel to be watched. This can be
1632 'cluster', 'audit', ...
1633 :type watch_channel: str
1634 """
20effc67 1635 args = self.run_ceph_w_prefix + ['-w']
11fdf7f2
TL
1636 if watch_channel is not None:
1637 args.append("--watch-channel")
1638 args.append(watch_channel)
e306af50 1639 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
9f95a23c
TL
1640
1641 def get_mon_socks(self):
1642 """
1643 Get monitor sockets.
1644
1645 :return socks: tuple of strings; strings are individual sockets.
1646 """
1647 from json import loads
1648
1649 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1650 socks = []
1651 for mon in output['mons']:
1652 for addrvec_mem in mon['public_addrs']['addrvec']:
1653 socks.append(addrvec_mem['addr'])
1654 return tuple(socks)
1655
1656 def get_msgrv1_mon_socks(self):
1657 """
1658 Get monitor sockets that use msgrv1 to operate.
1659
1660 :return socks: tuple of strings; strings are individual sockets.
1661 """
1662 from json import loads
1663
1664 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1665 socks = []
1666 for mon in output['mons']:
1667 for addrvec_mem in mon['public_addrs']['addrvec']:
1668 if addrvec_mem['type'] == 'v1':
1669 socks.append(addrvec_mem['addr'])
1670 return tuple(socks)
1671
1672 def get_msgrv2_mon_socks(self):
1673 """
1674 Get monitor sockets that use msgrv2 to operate.
1675
1676 :return socks: tuple of strings; strings are individual sockets.
1677 """
1678 from json import loads
1679
1680 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1681 socks = []
1682 for mon in output['mons']:
1683 for addrvec_mem in mon['public_addrs']['addrvec']:
1684 if addrvec_mem['type'] == 'v2':
1685 socks.append(addrvec_mem['addr'])
1686 return tuple(socks)
7c673cae 1687
224ce89b 1688 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
31f18b77
FG
1689 """
1690 Flush pg stats from a list of OSD ids, ensuring they are reflected
1691 all the way to the monitor. Luminous and later only.
1692
1693 :param osds: list of OSDs to flush
1694 :param no_wait: list of OSDs not to wait for seq id. by default, we
1695 wait for all specified osds, but some of them could be
1696 moved out of osdmap, so we cannot get their updated
1697 stat seq from monitor anymore. in that case, you need
f67539c2 1698 to pass a blocklist.
31f18b77 1699 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
224ce89b 1700 it. (5 min by default)
31f18b77 1701 """
31f18b77
FG
1702 if no_wait is None:
1703 no_wait = []
20effc67
TL
1704
1705 def flush_one_osd(osd: int, wait_for_mon: int):
1706 need = int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1707 if not wait_for_mon:
1708 return
31f18b77 1709 if osd in no_wait:
20effc67 1710 return
31f18b77
FG
1711 got = 0
1712 while wait_for_mon > 0:
11fdf7f2 1713 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
31f18b77
FG
1714 self.log('need seq {need} got {got} for osd.{osd}'.format(
1715 need=need, got=got, osd=osd))
1716 if got >= need:
1717 break
1718 A_WHILE = 1
1719 time.sleep(A_WHILE)
1720 wait_for_mon -= A_WHILE
1721 else:
1722 raise Exception('timed out waiting for mon to be updated with '
1723 'osd.{osd}: {got} < {need}'.
1724 format(osd=osd, got=got, need=need))
1725
20effc67
TL
1726 with parallel() as p:
1727 for osd in osds:
1728 p.spawn(flush_one_osd, osd, wait_for_mon)
1729
31f18b77
FG
1730 def flush_all_pg_stats(self):
1731 self.flush_pg_stats(range(len(self.get_osd_dump())))
1732
f67539c2 1733 def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
7c673cae
FG
1734 """
1735 Execute a remote rados command.
1736 """
f67539c2
TL
1737 if remote is None:
1738 remote = self.controller
1739
7c673cae
FG
1740 pre = [
1741 'adjust-ulimits',
1742 'ceph-coverage',
20effc67 1743 f'{self.testdir}/archive/coverage',
7c673cae
FG
1744 'rados',
1745 '--cluster',
1746 self.cluster,
1747 ]
f67539c2
TL
1748 if pool is not None:
1749 pre += ['--pool', pool]
1750 if namespace is not None:
1751 pre += ['--namespace', namespace]
7c673cae
FG
1752 pre.extend(cmd)
1753 proc = remote.run(
1754 args=pre,
1755 wait=True,
f67539c2 1756 **kwargs
7c673cae
FG
1757 )
1758 return proc
1759
1760 def rados_write_objects(self, pool, num_objects, size,
1761 timelimit, threads, cleanup=False):
1762 """
1763 Write rados objects
1764 Threads not used yet.
1765 """
1766 args = [
7c673cae
FG
1767 '--num-objects', num_objects,
1768 '-b', size,
1769 'bench', timelimit,
1770 'write'
1771 ]
1772 if not cleanup:
1773 args.append('--no-cleanup')
f67539c2 1774 return self.do_rados(map(str, args), pool=pool)
7c673cae
FG
1775
1776 def do_put(self, pool, obj, fname, namespace=None):
1777 """
1778 Implement rados put operation
1779 """
f67539c2 1780 args = ['put', obj, fname]
7c673cae 1781 return self.do_rados(
7c673cae 1782 args,
f67539c2
TL
1783 check_status=False,
1784 pool=pool,
1785 namespace=namespace
7c673cae
FG
1786 ).exitstatus
1787
1788 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1789 """
1790 Implement rados get operation
1791 """
f67539c2 1792 args = ['get', obj, fname]
7c673cae 1793 return self.do_rados(
7c673cae 1794 args,
f67539c2
TL
1795 check_status=False,
1796 pool=pool,
1797 namespace=namespace,
7c673cae
FG
1798 ).exitstatus
1799
1800 def do_rm(self, pool, obj, namespace=None):
1801 """
1802 Implement rados rm operation
1803 """
f67539c2 1804 args = ['rm', obj]
7c673cae 1805 return self.do_rados(
7c673cae 1806 args,
f67539c2
TL
1807 check_status=False,
1808 pool=pool,
1809 namespace=namespace
7c673cae
FG
1810 ).exitstatus
1811
1812 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1813 if stdout is None:
e306af50 1814 stdout = StringIO()
7c673cae
FG
1815 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1816
1817 def find_remote(self, service_type, service_id):
1818 """
1819 Get the Remote for the host where a particular service runs.
1820
1821 :param service_type: 'mds', 'osd', 'client'
1822 :param service_id: The second part of a role, e.g. '0' for
1823 the role 'client.0'
1824 :return: a Remote instance for the host where the
1825 requested role is placed
1826 """
1827 return get_remote(self.ctx, self.cluster,
1828 service_type, service_id)
1829
1830 def admin_socket(self, service_type, service_id,
1831 command, check_status=True, timeout=0, stdout=None):
1832 """
1833 Remotely start up ceph specifying the admin socket
1834 :param command: a list of words to use as the command
1835 to the admin socket
1836 """
1837 if stdout is None:
e306af50 1838 stdout = StringIO()
9f95a23c 1839
7c673cae 1840 remote = self.find_remote(service_type, service_id)
9f95a23c
TL
1841
1842 if self.cephadm:
1843 return shell(
1844 self.ctx, self.cluster, remote,
1845 args=[
1846 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1847 ] + command,
1848 stdout=stdout,
1849 wait=True,
1850 check_status=check_status,
1851 )
b3b6e05e
TL
1852 if self.rook:
1853 assert False, 'not implemented'
9f95a23c 1854
7c673cae
FG
1855 args = [
1856 'sudo',
1857 'adjust-ulimits',
1858 'ceph-coverage',
20effc67 1859 f'{self.testdir}/archive/coverage',
7c673cae
FG
1860 'timeout',
1861 str(timeout),
1862 'ceph',
1863 '--cluster',
1864 self.cluster,
1865 '--admin-daemon',
1866 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1867 cluster=self.cluster,
1868 type=service_type,
1869 id=service_id),
1870 ]
1871 args.extend(command)
1872 return remote.run(
1873 args=args,
1874 stdout=stdout,
1875 wait=True,
1876 check_status=check_status
1877 )
1878
1879 def objectstore_tool(self, pool, options, args, **kwargs):
1880 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1881
1882 def get_pgid(self, pool, pgnum):
1883 """
1884 :param pool: pool name
1885 :param pgnum: pg number
1886 :returns: a string representing this pg.
1887 """
1888 poolnum = self.get_pool_num(pool)
1889 pg_str = "{poolnum}.{pgnum}".format(
1890 poolnum=poolnum,
1891 pgnum=pgnum)
1892 return pg_str
1893
1894 def get_pg_replica(self, pool, pgnum):
1895 """
1896 get replica for pool, pgnum (e.g. (data, 0)->0
1897 """
1898 pg_str = self.get_pgid(pool, pgnum)
1899 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1900 j = json.loads('\n'.join(output.split('\n')[1:]))
1901 return int(j['acting'][-1])
1902 assert False
1903
1904 def wait_for_pg_stats(func):
11fdf7f2 1905 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
7c673cae
FG
1906 # by default, and take the faulty injection in ms into consideration,
1907 # 12 seconds are more than enough
28e407b8 1908 delays = [1, 1, 2, 3, 5, 8, 13, 0]
7c673cae
FG
1909 @wraps(func)
1910 def wrapper(self, *args, **kwargs):
1911 exc = None
1912 for delay in delays:
1913 try:
1914 return func(self, *args, **kwargs)
1915 except AssertionError as e:
1916 time.sleep(delay)
1917 exc = e
1918 raise exc
1919 return wrapper
1920
1921 def get_pg_primary(self, pool, pgnum):
1922 """
1923 get primary for pool, pgnum (e.g. (data, 0)->0
1924 """
1925 pg_str = self.get_pgid(pool, pgnum)
1926 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1927 j = json.loads('\n'.join(output.split('\n')[1:]))
1928 return int(j['acting'][0])
1929 assert False
1930
1931 def get_pool_num(self, pool):
1932 """
1933 get number for pool (e.g., data -> 2)
1934 """
1935 return int(self.get_pool_dump(pool)['pool'])
1936
1937 def list_pools(self):
1938 """
1939 list all pool names
1940 """
1941 osd_dump = self.get_osd_dump_json()
1942 self.log(osd_dump['pools'])
1943 return [str(i['pool_name']) for i in osd_dump['pools']]
1944
1945 def clear_pools(self):
1946 """
1947 remove all pools
1948 """
1949 [self.remove_pool(i) for i in self.list_pools()]
1950
1951 def kick_recovery_wq(self, osdnum):
1952 """
1953 Run kick_recovery_wq on cluster.
1954 """
1955 return self.raw_cluster_cmd(
1956 'tell', "osd.%d" % (int(osdnum),),
1957 'debug',
1958 'kick_recovery_wq',
1959 '0')
1960
1961 def wait_run_admin_socket(self, service_type,
1962 service_id, args=['version'], timeout=75, stdout=None):
1963 """
11fdf7f2 1964 If osd_admin_socket call succeeds, return. Otherwise wait
7c673cae
FG
1965 five seconds and try again.
1966 """
1967 if stdout is None:
e306af50 1968 stdout = StringIO()
7c673cae
FG
1969 tries = 0
1970 while True:
1971 proc = self.admin_socket(service_type, service_id,
1972 args, check_status=False, stdout=stdout)
9f95a23c 1973 if proc.exitstatus == 0:
7c673cae
FG
1974 return proc
1975 else:
1976 tries += 1
1977 if (tries * 5) > timeout:
1978 raise Exception('timed out waiting for admin_socket '
1979 'to appear after {type}.{id} restart'.
1980 format(type=service_type,
1981 id=service_id))
1982 self.log("waiting on admin_socket for {type}-{id}, "
1983 "{command}".format(type=service_type,
1984 id=service_id,
1985 command=args))
1986 time.sleep(5)
1987
1988 def get_pool_dump(self, pool):
1989 """
1990 get the osd dump part of a pool
1991 """
1992 osd_dump = self.get_osd_dump_json()
1993 for i in osd_dump['pools']:
1994 if i['pool_name'] == pool:
1995 return i
1996 assert False
1997
1998 def get_config(self, service_type, service_id, name):
1999 """
2000 :param node: like 'mon.a'
2001 :param name: the option name
2002 """
2003 proc = self.wait_run_admin_socket(service_type, service_id,
2004 ['config', 'show'])
2005 j = json.loads(proc.stdout.getvalue())
2006 return j[name]
2007
11fdf7f2
TL
2008 def inject_args(self, service_type, service_id, name, value):
2009 whom = '{0}.{1}'.format(service_type, service_id)
2010 if isinstance(value, bool):
2011 value = 'true' if value else 'false'
2012 opt_arg = '--{name}={value}'.format(name=name, value=value)
2013 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
2014
7c673cae
FG
2015 def set_config(self, osdnum, **argdict):
2016 """
2017 :param osdnum: osd number
2018 :param argdict: dictionary containing values to set.
2019 """
9f95a23c 2020 for k, v in argdict.items():
7c673cae
FG
2021 self.wait_run_admin_socket(
2022 'osd', osdnum,
2023 ['config', 'set', str(k), str(v)])
2024
2025 def raw_cluster_status(self):
2026 """
2027 Get status from cluster
2028 """
9f95a23c 2029 status = self.raw_cluster_cmd('status', '--format=json')
7c673cae
FG
2030 return json.loads(status)
2031
2032 def raw_osd_status(self):
2033 """
2034 Get osd status from cluster
2035 """
2036 return self.raw_cluster_cmd('osd', 'dump')
2037
2038 def get_osd_status(self):
2039 """
2040 Get osd statuses sorted by states that the osds are in.
2041 """
e306af50 2042 osd_lines = list(filter(
7c673cae 2043 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
e306af50 2044 self.raw_osd_status().split('\n')))
7c673cae
FG
2045 self.log(osd_lines)
2046 in_osds = [int(i[4:].split()[0])
2047 for i in filter(lambda x: " in " in x, osd_lines)]
2048 out_osds = [int(i[4:].split()[0])
2049 for i in filter(lambda x: " out " in x, osd_lines)]
2050 up_osds = [int(i[4:].split()[0])
2051 for i in filter(lambda x: " up " in x, osd_lines)]
2052 down_osds = [int(i[4:].split()[0])
2053 for i in filter(lambda x: " down " in x, osd_lines)]
2054 dead_osds = [int(x.id_)
2055 for x in filter(lambda x:
2056 not x.running(),
2057 self.ctx.daemons.
2058 iter_daemons_of_role('osd', self.cluster))]
2059 live_osds = [int(x.id_) for x in
2060 filter(lambda x:
2061 x.running(),
2062 self.ctx.daemons.iter_daemons_of_role('osd',
2063 self.cluster))]
2064 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
2065 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
2066 'raw': osd_lines}
2067
2068 def get_num_pgs(self):
2069 """
2070 Check cluster status for the number of pgs
2071 """
2072 status = self.raw_cluster_status()
2073 self.log(status)
2074 return status['pgmap']['num_pgs']
2075
2076 def create_erasure_code_profile(self, profile_name, profile):
2077 """
2078 Create an erasure code profile name that can be used as a parameter
2079 when creating an erasure coded pool.
2080 """
2081 with self.lock:
2082 args = cmd_erasure_code_profile(profile_name, profile)
2083 self.raw_cluster_cmd(*args)
2084
2085 def create_pool_with_unique_name(self, pg_num=16,
2086 erasure_code_profile_name=None,
2087 min_size=None,
2088 erasure_code_use_overwrites=False):
2089 """
2090 Create a pool named unique_pool_X where X is unique.
2091 """
2092 name = ""
2093 with self.lock:
2094 name = "unique_pool_%s" % (str(self.next_pool_id),)
2095 self.next_pool_id += 1
2096 self.create_pool(
2097 name,
2098 pg_num,
2099 erasure_code_profile_name=erasure_code_profile_name,
2100 min_size=min_size,
2101 erasure_code_use_overwrites=erasure_code_use_overwrites)
2102 return name
2103
2104 @contextlib.contextmanager
2105 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
2106 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
2107 yield
2108 self.remove_pool(pool_name)
2109
2110 def create_pool(self, pool_name, pg_num=16,
2111 erasure_code_profile_name=None,
2112 min_size=None,
2113 erasure_code_use_overwrites=False):
2114 """
2115 Create a pool named from the pool_name parameter.
2116 :param pool_name: name of the pool being created.
2117 :param pg_num: initial number of pgs.
2118 :param erasure_code_profile_name: if set and !None create an
2119 erasure coded pool using the profile
2120 :param erasure_code_use_overwrites: if true, allow overwrites
2121 """
2122 with self.lock:
f91f0fd5 2123 assert isinstance(pool_name, str)
7c673cae
FG
2124 assert isinstance(pg_num, int)
2125 assert pool_name not in self.pools
2126 self.log("creating pool_name %s" % (pool_name,))
2127 if erasure_code_profile_name:
2128 self.raw_cluster_cmd('osd', 'pool', 'create',
2129 pool_name, str(pg_num), str(pg_num),
2130 'erasure', erasure_code_profile_name)
2131 else:
2132 self.raw_cluster_cmd('osd', 'pool', 'create',
2133 pool_name, str(pg_num))
2134 if min_size is not None:
2135 self.raw_cluster_cmd(
2136 'osd', 'pool', 'set', pool_name,
2137 'min_size',
2138 str(min_size))
2139 if erasure_code_use_overwrites:
2140 self.raw_cluster_cmd(
2141 'osd', 'pool', 'set', pool_name,
2142 'allow_ec_overwrites',
2143 'true')
c07f9fc5
FG
2144 self.raw_cluster_cmd(
2145 'osd', 'pool', 'application', 'enable',
2146 pool_name, 'rados', '--yes-i-really-mean-it',
2147 run.Raw('||'), 'true')
7c673cae
FG
2148 self.pools[pool_name] = pg_num
2149 time.sleep(1)
2150
2151 def add_pool_snap(self, pool_name, snap_name):
2152 """
2153 Add pool snapshot
2154 :param pool_name: name of pool to snapshot
2155 :param snap_name: name of snapshot to take
2156 """
2157 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
2158 str(pool_name), str(snap_name))
2159
2160 def remove_pool_snap(self, pool_name, snap_name):
2161 """
2162 Remove pool snapshot
2163 :param pool_name: name of pool to snapshot
2164 :param snap_name: name of snapshot to remove
2165 """
2166 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2167 str(pool_name), str(snap_name))
2168
2169 def remove_pool(self, pool_name):
2170 """
2171 Remove the indicated pool
2172 :param pool_name: Pool to be removed
2173 """
2174 with self.lock:
f91f0fd5 2175 assert isinstance(pool_name, str)
7c673cae
FG
2176 assert pool_name in self.pools
2177 self.log("removing pool_name %s" % (pool_name,))
2178 del self.pools[pool_name]
11fdf7f2
TL
2179 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
2180 "--yes-i-really-really-mean-it")
7c673cae
FG
2181
2182 def get_pool(self):
2183 """
2184 Pick a random pool
2185 """
2186 with self.lock:
9f95a23c 2187 if self.pools:
e306af50 2188 return random.sample(self.pools.keys(), 1)[0]
7c673cae
FG
2189
2190 def get_pool_pg_num(self, pool_name):
2191 """
2192 Return the number of pgs in the pool specified.
2193 """
2194 with self.lock:
f91f0fd5 2195 assert isinstance(pool_name, str)
7c673cae
FG
2196 if pool_name in self.pools:
2197 return self.pools[pool_name]
2198 return 0
2199
2200 def get_pool_property(self, pool_name, prop):
2201 """
2202 :param pool_name: pool
2203 :param prop: property to be checked.
9f95a23c 2204 :returns: property as string
7c673cae
FG
2205 """
2206 with self.lock:
f91f0fd5
TL
2207 assert isinstance(pool_name, str)
2208 assert isinstance(prop, str)
7c673cae
FG
2209 output = self.raw_cluster_cmd(
2210 'osd',
2211 'pool',
2212 'get',
2213 pool_name,
2214 prop)
9f95a23c
TL
2215 return output.split()[1]
2216
2217 def get_pool_int_property(self, pool_name, prop):
2218 return int(self.get_pool_property(pool_name, prop))
7c673cae
FG
2219
2220 def set_pool_property(self, pool_name, prop, val):
2221 """
2222 :param pool_name: pool
2223 :param prop: property to be set.
2224 :param val: value to set.
2225
2226 This routine retries if set operation fails.
2227 """
2228 with self.lock:
f91f0fd5
TL
2229 assert isinstance(pool_name, str)
2230 assert isinstance(prop, str)
7c673cae
FG
2231 assert isinstance(val, int)
2232 tries = 0
2233 while True:
2234 r = self.raw_cluster_cmd_result(
2235 'osd',
2236 'pool',
2237 'set',
2238 pool_name,
2239 prop,
2240 str(val))
2241 if r != 11: # EAGAIN
2242 break
2243 tries += 1
2244 if tries > 50:
2245 raise Exception('timed out getting EAGAIN '
2246 'when setting pool property %s %s = %s' %
2247 (pool_name, prop, val))
2248 self.log('got EAGAIN setting pool property, '
2249 'waiting a few seconds...')
2250 time.sleep(2)
2251
2252 def expand_pool(self, pool_name, by, max_pgs):
2253 """
2254 Increase the number of pgs in a pool
2255 """
2256 with self.lock:
f91f0fd5 2257 assert isinstance(pool_name, str)
7c673cae
FG
2258 assert isinstance(by, int)
2259 assert pool_name in self.pools
2260 if self.get_num_creating() > 0:
2261 return False
2262 if (self.pools[pool_name] + by) > max_pgs:
2263 return False
2264 self.log("increase pool size by %d" % (by,))
2265 new_pg_num = self.pools[pool_name] + by
2266 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2267 self.pools[pool_name] = new_pg_num
2268 return True
2269
11fdf7f2
TL
2270 def contract_pool(self, pool_name, by, min_pgs):
2271 """
2272 Decrease the number of pgs in a pool
2273 """
2274 with self.lock:
2275 self.log('contract_pool %s by %s min %s' % (
2276 pool_name, str(by), str(min_pgs)))
f91f0fd5 2277 assert isinstance(pool_name, str)
11fdf7f2
TL
2278 assert isinstance(by, int)
2279 assert pool_name in self.pools
2280 if self.get_num_creating() > 0:
2281 self.log('too many creating')
2282 return False
2283 proj = self.pools[pool_name] - by
2284 if proj < min_pgs:
2285 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2286 return False
2287 self.log("decrease pool size by %d" % (by,))
2288 new_pg_num = self.pools[pool_name] - by
2289 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2290 self.pools[pool_name] = new_pg_num
2291 return True
2292
2293 def stop_pg_num_changes(self):
2294 """
2295 Reset all pg_num_targets back to pg_num, canceling splits and merges
2296 """
2297 self.log('Canceling any pending splits or merges...')
2298 osd_dump = self.get_osd_dump_json()
9f95a23c
TL
2299 try:
2300 for pool in osd_dump['pools']:
2301 if pool['pg_num'] != pool['pg_num_target']:
2302 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2303 (pool['pool_name'], pool['pool'],
2304 pool['pg_num_target'],
2305 pool['pg_num']))
2306 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2307 'pg_num', str(pool['pg_num']))
2308 except KeyError:
2309 # we don't support pg_num_target before nautilus
2310 pass
11fdf7f2 2311
7c673cae
FG
2312 def set_pool_pgpnum(self, pool_name, force):
2313 """
2314 Set pgpnum property of pool_name pool.
2315 """
2316 with self.lock:
f91f0fd5 2317 assert isinstance(pool_name, str)
7c673cae
FG
2318 assert pool_name in self.pools
2319 if not force and self.get_num_creating() > 0:
2320 return False
2321 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2322 return True
2323
11fdf7f2 2324 def list_pg_unfound(self, pgid):
7c673cae 2325 """
11fdf7f2 2326 return list of unfound pgs with the id specified
7c673cae
FG
2327 """
2328 r = None
2329 offset = {}
2330 while True:
11fdf7f2 2331 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
7c673cae
FG
2332 json.dumps(offset))
2333 j = json.loads(out)
2334 if r is None:
2335 r = j
2336 else:
2337 r['objects'].extend(j['objects'])
2338 if not 'more' in j:
2339 break
2340 if j['more'] == 0:
2341 break
2342 offset = j['objects'][-1]['oid']
2343 if 'more' in r:
2344 del r['more']
2345 return r
2346
2347 def get_pg_stats(self):
2348 """
2349 Dump the cluster and get pg stats
2350 """
2351 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2352 j = json.loads('\n'.join(out.split('\n')[1:]))
11fdf7f2
TL
2353 try:
2354 return j['pg_map']['pg_stats']
2355 except KeyError:
2356 return j['pg_stats']
7c673cae 2357
a4b75251
TL
2358 def get_osd_df(self, osdid):
2359 """
2360 Get the osd df stats
2361 """
2362 out = self.raw_cluster_cmd('osd', 'df', 'name', 'osd.{}'.format(osdid),
2363 '--format=json')
2364 j = json.loads('\n'.join(out.split('\n')[1:]))
2365 return j['nodes'][0]
2366
2367 def get_pool_df(self, name):
2368 """
2369 Get the pool df stats
2370 """
2371 out = self.raw_cluster_cmd('df', 'detail', '--format=json')
2372 j = json.loads('\n'.join(out.split('\n')[1:]))
2373 return next((p['stats'] for p in j['pools'] if p['name'] == name),
2374 None)
2375
c07f9fc5
FG
2376 def get_pgids_to_force(self, backfill):
2377 """
2378 Return the randomized list of PGs that can have their recovery/backfill forced
2379 """
2380 j = self.get_pg_stats();
2381 pgids = []
2382 if backfill:
2383 wanted = ['degraded', 'backfilling', 'backfill_wait']
2384 else:
2385 wanted = ['recovering', 'degraded', 'recovery_wait']
2386 for pg in j:
2387 status = pg['state'].split('+')
2388 for t in wanted:
2389 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2390 pgids.append(pg['pgid'])
2391 break
2392 return pgids
2393
2394 def get_pgids_to_cancel_force(self, backfill):
2395 """
2396 Return the randomized list of PGs whose recovery/backfill priority is forced
2397 """
2398 j = self.get_pg_stats();
2399 pgids = []
2400 if backfill:
2401 wanted = 'forced_backfill'
2402 else:
2403 wanted = 'forced_recovery'
2404 for pg in j:
2405 status = pg['state'].split('+')
2406 if wanted in status and random.random() > 0.5:
2407 pgids.append(pg['pgid'])
2408 return pgids
2409
7c673cae
FG
2410 def compile_pg_status(self):
2411 """
2412 Return a histogram of pg state values
2413 """
2414 ret = {}
2415 j = self.get_pg_stats()
2416 for pg in j:
2417 for status in pg['state'].split('+'):
2418 if status not in ret:
2419 ret[status] = 0
2420 ret[status] += 1
2421 return ret
2422
9f95a23c 2423 @wait_for_pg_stats # type: ignore
7c673cae
FG
2424 def with_pg_state(self, pool, pgnum, check):
2425 pgstr = self.get_pgid(pool, pgnum)
2426 stats = self.get_single_pg_stats(pgstr)
2427 assert(check(stats['state']))
2428
9f95a23c 2429 @wait_for_pg_stats # type: ignore
7c673cae
FG
2430 def with_pg(self, pool, pgnum, check):
2431 pgstr = self.get_pgid(pool, pgnum)
2432 stats = self.get_single_pg_stats(pgstr)
2433 return check(stats)
2434
2435 def get_last_scrub_stamp(self, pool, pgnum):
2436 """
2437 Get the timestamp of the last scrub.
2438 """
2439 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2440 return stats["last_scrub_stamp"]
2441
2442 def do_pg_scrub(self, pool, pgnum, stype):
2443 """
2444 Scrub pg and wait for scrubbing to finish
2445 """
2446 init = self.get_last_scrub_stamp(pool, pgnum)
2447 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2448 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2449 SLEEP_TIME = 10
2450 timer = 0
2451 while init == self.get_last_scrub_stamp(pool, pgnum):
2452 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2453 self.log("waiting for scrub type %s" % (stype,))
2454 if (timer % RESEND_TIMEOUT) == 0:
2455 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2456 # The first time in this loop is the actual request
2457 if timer != 0 and stype == "repair":
2458 self.log("WARNING: Resubmitted a non-idempotent repair")
2459 time.sleep(SLEEP_TIME)
2460 timer += SLEEP_TIME
2461
2462 def wait_snap_trimming_complete(self, pool):
2463 """
2464 Wait for snap trimming on pool to end
2465 """
2466 POLL_PERIOD = 10
2467 FATAL_TIMEOUT = 600
2468 start = time.time()
2469 poolnum = self.get_pool_num(pool)
2470 poolnumstr = "%s." % (poolnum,)
2471 while (True):
2472 now = time.time()
2473 if (now - start) > FATAL_TIMEOUT:
2474 assert (now - start) < FATAL_TIMEOUT, \
2475 'failed to complete snap trimming before timeout'
2476 all_stats = self.get_pg_stats()
2477 trimming = False
2478 for pg in all_stats:
2479 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2480 self.log("pg {pg} in trimming, state: {state}".format(
2481 pg=pg['pgid'],
2482 state=pg['state']))
2483 trimming = True
2484 if not trimming:
2485 break
2486 self.log("{pool} still trimming, waiting".format(pool=pool))
2487 time.sleep(POLL_PERIOD)
2488
2489 def get_single_pg_stats(self, pgid):
2490 """
2491 Return pg for the pgid specified.
2492 """
2493 all_stats = self.get_pg_stats()
2494
2495 for pg in all_stats:
2496 if pg['pgid'] == pgid:
2497 return pg
2498
2499 return None
2500
2501 def get_object_pg_with_shard(self, pool, name, osdid):
2502 """
2503 """
2504 pool_dump = self.get_pool_dump(pool)
2505 object_map = self.get_object_map(pool, name)
9f95a23c 2506 if pool_dump["type"] == PoolType.ERASURE_CODED:
7c673cae
FG
2507 shard = object_map['acting'].index(osdid)
2508 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2509 shard=shard)
2510 else:
2511 return object_map['pgid']
2512
2513 def get_object_primary(self, pool, name):
2514 """
2515 """
2516 object_map = self.get_object_map(pool, name)
2517 return object_map['acting_primary']
2518
2519 def get_object_map(self, pool, name):
2520 """
2521 osd map --format=json converted to a python object
2522 :returns: the python object
2523 """
2524 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2525 return json.loads('\n'.join(out.split('\n')[1:]))
2526
2527 def get_osd_dump_json(self):
2528 """
2529 osd dump --format=json converted to a python object
2530 :returns: the python object
2531 """
2532 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2533 return json.loads('\n'.join(out.split('\n')[1:]))
2534
2535 def get_osd_dump(self):
2536 """
2537 Dump osds
2538 :returns: all osds
2539 """
2540 return self.get_osd_dump_json()['osds']
2541
11fdf7f2
TL
2542 def get_osd_metadata(self):
2543 """
2544 osd metadata --format=json converted to a python object
2545 :returns: the python object containing osd metadata information
2546 """
2547 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2548 return json.loads('\n'.join(out.split('\n')[1:]))
2549
c07f9fc5
FG
2550 def get_mgr_dump(self):
2551 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2552 return json.loads(out)
2553
7c673cae
FG
2554 def get_stuck_pgs(self, type_, threshold):
2555 """
2556 :returns: stuck pg information from the cluster
2557 """
2558 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2559 '--format=json')
11fdf7f2 2560 return json.loads(out).get('stuck_pg_stats',[])
7c673cae
FG
2561
2562 def get_num_unfound_objects(self):
2563 """
2564 Check cluster status to get the number of unfound objects
2565 """
2566 status = self.raw_cluster_status()
2567 self.log(status)
2568 return status['pgmap'].get('unfound_objects', 0)
2569
2570 def get_num_creating(self):
2571 """
2572 Find the number of pgs in creating mode.
2573 """
2574 pgs = self.get_pg_stats()
2575 num = 0
2576 for pg in pgs:
2577 if 'creating' in pg['state']:
2578 num += 1
2579 return num
2580
2581 def get_num_active_clean(self):
2582 """
2583 Find the number of active and clean pgs.
2584 """
2585 pgs = self.get_pg_stats()
9f95a23c
TL
2586 return self._get_num_active_clean(pgs)
2587
2588 def _get_num_active_clean(self, pgs):
7c673cae
FG
2589 num = 0
2590 for pg in pgs:
2591 if (pg['state'].count('active') and
2592 pg['state'].count('clean') and
2593 not pg['state'].count('stale')):
2594 num += 1
2595 return num
2596
2597 def get_num_active_recovered(self):
2598 """
2599 Find the number of active and recovered pgs.
2600 """
2601 pgs = self.get_pg_stats()
9f95a23c
TL
2602 return self._get_num_active_recovered(pgs)
2603
2604 def _get_num_active_recovered(self, pgs):
7c673cae
FG
2605 num = 0
2606 for pg in pgs:
2607 if (pg['state'].count('active') and
2608 not pg['state'].count('recover') and
3efd9988 2609 not pg['state'].count('backfilling') and
7c673cae
FG
2610 not pg['state'].count('stale')):
2611 num += 1
2612 return num
2613
2614 def get_is_making_recovery_progress(self):
2615 """
2616 Return whether there is recovery progress discernable in the
2617 raw cluster status
2618 """
2619 status = self.raw_cluster_status()
2620 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2621 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2622 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2623 return kps > 0 or bps > 0 or ops > 0
2624
2625 def get_num_active(self):
2626 """
2627 Find the number of active pgs.
2628 """
2629 pgs = self.get_pg_stats()
9f95a23c
TL
2630 return self._get_num_active(pgs)
2631
2632 def _get_num_active(self, pgs):
7c673cae
FG
2633 num = 0
2634 for pg in pgs:
2635 if pg['state'].count('active') and not pg['state'].count('stale'):
2636 num += 1
2637 return num
2638
2639 def get_num_down(self):
2640 """
2641 Find the number of pgs that are down.
2642 """
2643 pgs = self.get_pg_stats()
2644 num = 0
2645 for pg in pgs:
2646 if ((pg['state'].count('down') and not
2647 pg['state'].count('stale')) or
2648 (pg['state'].count('incomplete') and not
2649 pg['state'].count('stale'))):
2650 num += 1
2651 return num
2652
2653 def get_num_active_down(self):
2654 """
2655 Find the number of pgs that are either active or down.
2656 """
2657 pgs = self.get_pg_stats()
9f95a23c
TL
2658 return self._get_num_active_down(pgs)
2659
2660 def _get_num_active_down(self, pgs):
7c673cae
FG
2661 num = 0
2662 for pg in pgs:
2663 if ((pg['state'].count('active') and not
2664 pg['state'].count('stale')) or
2665 (pg['state'].count('down') and not
2666 pg['state'].count('stale')) or
2667 (pg['state'].count('incomplete') and not
2668 pg['state'].count('stale'))):
2669 num += 1
2670 return num
2671
9f95a23c
TL
2672 def get_num_peered(self):
2673 """
2674 Find the number of PGs that are peered
2675 """
2676 pgs = self.get_pg_stats()
2677 return self._get_num_peered(pgs)
2678
2679 def _get_num_peered(self, pgs):
2680 num = 0
2681 for pg in pgs:
2682 if pg['state'].count('peered') and not pg['state'].count('stale'):
2683 num += 1
2684 return num
2685
7c673cae
FG
2686 def is_clean(self):
2687 """
2688 True if all pgs are clean
2689 """
9f95a23c 2690 pgs = self.get_pg_stats()
2a845540
TL
2691 if self._get_num_active_clean(pgs) == len(pgs):
2692 return True
2693 else:
2694 self.dump_pgs_not_active_clean()
2695 return False
7c673cae
FG
2696
2697 def is_recovered(self):
2698 """
2699 True if all pgs have recovered
2700 """
9f95a23c
TL
2701 pgs = self.get_pg_stats()
2702 return self._get_num_active_recovered(pgs) == len(pgs)
7c673cae
FG
2703
2704 def is_active_or_down(self):
2705 """
2706 True if all pgs are active or down
2707 """
9f95a23c
TL
2708 pgs = self.get_pg_stats()
2709 return self._get_num_active_down(pgs) == len(pgs)
7c673cae 2710
f6b5b4d7
TL
2711 def dump_pgs_not_active_clean(self):
2712 """
2713 Dumps all pgs that are not active+clean
2714 """
2715 pgs = self.get_pg_stats()
2716 for pg in pgs:
2717 if pg['state'] != 'active+clean':
2718 self.log('PG %s is not active+clean' % pg['pgid'])
2719 self.log(pg)
2720
2721 def dump_pgs_not_active_down(self):
2722 """
2723 Dumps all pgs that are not active or down
2724 """
2725 pgs = self.get_pg_stats()
2726 for pg in pgs:
2727 if 'active' not in pg['state'] and 'down' not in pg['state']:
2728 self.log('PG %s is not active or down' % pg['pgid'])
2729 self.log(pg)
2730
2731 def dump_pgs_not_active(self):
2732 """
2733 Dumps all pgs that are not active
2734 """
2735 pgs = self.get_pg_stats()
2736 for pg in pgs:
2737 if 'active' not in pg['state']:
2738 self.log('PG %s is not active' % pg['pgid'])
2739 self.log(pg)
2740
2a845540
TL
2741 def dump_pgs_not_active_peered(self, pgs):
2742 for pg in pgs:
2743 if (not pg['state'].count('active')) and (not pg['state'].count('peered')):
2744 self.log('PG %s is not active or peered' % pg['pgid'])
2745 self.log(pg)
2746
11fdf7f2 2747 def wait_for_clean(self, timeout=1200):
7c673cae
FG
2748 """
2749 Returns true when all pgs are clean.
2750 """
2751 self.log("waiting for clean")
2752 start = time.time()
2753 num_active_clean = self.get_num_active_clean()
2754 while not self.is_clean():
2755 if timeout is not None:
2756 if self.get_is_making_recovery_progress():
2757 self.log("making progress, resetting timeout")
2758 start = time.time()
2759 else:
2760 self.log("no progress seen, keeping timeout for now")
2761 if time.time() - start >= timeout:
f6b5b4d7
TL
2762 self.log('dumping pgs not clean')
2763 self.dump_pgs_not_active_clean()
7c673cae 2764 assert time.time() - start < timeout, \
f6b5b4d7 2765 'wait_for_clean: failed before timeout expired'
7c673cae
FG
2766 cur_active_clean = self.get_num_active_clean()
2767 if cur_active_clean != num_active_clean:
2768 start = time.time()
2769 num_active_clean = cur_active_clean
2770 time.sleep(3)
2771 self.log("clean!")
2772
2773 def are_all_osds_up(self):
2774 """
2775 Returns true if all osds are up.
2776 """
2777 x = self.get_osd_dump()
2778 return (len(x) == sum([(y['up'] > 0) for y in x]))
2779
c07f9fc5 2780 def wait_for_all_osds_up(self, timeout=None):
7c673cae
FG
2781 """
2782 When this exits, either the timeout has expired, or all
2783 osds are up.
2784 """
2785 self.log("waiting for all up")
2786 start = time.time()
2787 while not self.are_all_osds_up():
2788 if timeout is not None:
2789 assert time.time() - start < timeout, \
c07f9fc5 2790 'timeout expired in wait_for_all_osds_up'
7c673cae
FG
2791 time.sleep(3)
2792 self.log("all up!")
2793
c07f9fc5
FG
2794 def pool_exists(self, pool):
2795 if pool in self.list_pools():
2796 return True
2797 return False
2798
2799 def wait_for_pool(self, pool, timeout=300):
2800 """
2801 Wait for a pool to exist
2802 """
2803 self.log('waiting for pool %s to exist' % pool)
2804 start = time.time()
2805 while not self.pool_exists(pool):
2806 if timeout is not None:
2807 assert time.time() - start < timeout, \
2808 'timeout expired in wait_for_pool'
2809 time.sleep(3)
2810
2811 def wait_for_pools(self, pools):
2812 for pool in pools:
2813 self.wait_for_pool(pool)
2814
2815 def is_mgr_available(self):
2816 x = self.get_mgr_dump()
2817 return x.get('available', False)
2818
2819 def wait_for_mgr_available(self, timeout=None):
2820 self.log("waiting for mgr available")
2821 start = time.time()
2822 while not self.is_mgr_available():
2823 if timeout is not None:
2824 assert time.time() - start < timeout, \
2825 'timeout expired in wait_for_mgr_available'
2826 time.sleep(3)
2827 self.log("mgr available!")
2828
7c673cae
FG
2829 def wait_for_recovery(self, timeout=None):
2830 """
2831 Check peering. When this exists, we have recovered.
2832 """
2833 self.log("waiting for recovery to complete")
2834 start = time.time()
2835 num_active_recovered = self.get_num_active_recovered()
2836 while not self.is_recovered():
2837 now = time.time()
2838 if timeout is not None:
2839 if self.get_is_making_recovery_progress():
2840 self.log("making progress, resetting timeout")
2841 start = time.time()
2842 else:
2843 self.log("no progress seen, keeping timeout for now")
2844 if now - start >= timeout:
9f95a23c
TL
2845 if self.is_recovered():
2846 break
f6b5b4d7
TL
2847 self.log('dumping pgs not recovered yet')
2848 self.dump_pgs_not_active_clean()
7c673cae 2849 assert now - start < timeout, \
f6b5b4d7 2850 'wait_for_recovery: failed before timeout expired'
7c673cae
FG
2851 cur_active_recovered = self.get_num_active_recovered()
2852 if cur_active_recovered != num_active_recovered:
2853 start = time.time()
2854 num_active_recovered = cur_active_recovered
2855 time.sleep(3)
2856 self.log("recovered!")
2857
2858 def wait_for_active(self, timeout=None):
2859 """
2860 Check peering. When this exists, we are definitely active
2861 """
2862 self.log("waiting for peering to complete")
2863 start = time.time()
2864 num_active = self.get_num_active()
2865 while not self.is_active():
2866 if timeout is not None:
2867 if time.time() - start >= timeout:
f6b5b4d7
TL
2868 self.log('dumping pgs not active')
2869 self.dump_pgs_not_active()
7c673cae 2870 assert time.time() - start < timeout, \
f6b5b4d7 2871 'wait_for_active: failed before timeout expired'
7c673cae
FG
2872 cur_active = self.get_num_active()
2873 if cur_active != num_active:
2874 start = time.time()
2875 num_active = cur_active
2876 time.sleep(3)
2877 self.log("active!")
2878
2879 def wait_for_active_or_down(self, timeout=None):
2880 """
2881 Check peering. When this exists, we are definitely either
2882 active or down
2883 """
2884 self.log("waiting for peering to complete or become blocked")
2885 start = time.time()
2886 num_active_down = self.get_num_active_down()
2887 while not self.is_active_or_down():
2888 if timeout is not None:
2889 if time.time() - start >= timeout:
f6b5b4d7
TL
2890 self.log('dumping pgs not active or down')
2891 self.dump_pgs_not_active_down()
7c673cae 2892 assert time.time() - start < timeout, \
f6b5b4d7 2893 'wait_for_active_or_down: failed before timeout expired'
7c673cae
FG
2894 cur_active_down = self.get_num_active_down()
2895 if cur_active_down != num_active_down:
2896 start = time.time()
2897 num_active_down = cur_active_down
2898 time.sleep(3)
2899 self.log("active or down!")
2900
2901 def osd_is_up(self, osd):
2902 """
2903 Wrapper for osd check
2904 """
2905 osds = self.get_osd_dump()
2906 return osds[osd]['up'] > 0
2907
2908 def wait_till_osd_is_up(self, osd, timeout=None):
2909 """
2910 Loop waiting for osd.
2911 """
2912 self.log('waiting for osd.%d to be up' % osd)
2913 start = time.time()
2914 while not self.osd_is_up(osd):
2915 if timeout is not None:
2916 assert time.time() - start < timeout, \
2917 'osd.%d failed to come up before timeout expired' % osd
2918 time.sleep(3)
2919 self.log('osd.%d is up' % osd)
2920
2921 def is_active(self):
2922 """
2923 Wrapper to check if all pgs are active
2924 """
2925 return self.get_num_active() == self.get_num_pgs()
2926
9f95a23c
TL
2927 def all_active_or_peered(self):
2928 """
2929 Wrapper to check if all PGs are active or peered
2930 """
2931 pgs = self.get_pg_stats()
2a845540
TL
2932 if self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs):
2933 return True
2934 else:
2935 self.dump_pgs_not_active_peered(pgs)
2936 return False
9f95a23c 2937
7c673cae
FG
2938 def wait_till_active(self, timeout=None):
2939 """
2940 Wait until all pgs are active.
2941 """
2942 self.log("waiting till active")
2943 start = time.time()
2944 while not self.is_active():
2945 if timeout is not None:
2946 if time.time() - start >= timeout:
f6b5b4d7
TL
2947 self.log('dumping pgs not active')
2948 self.dump_pgs_not_active()
7c673cae 2949 assert time.time() - start < timeout, \
f6b5b4d7 2950 'wait_till_active: failed before timeout expired'
7c673cae
FG
2951 time.sleep(3)
2952 self.log("active!")
2953
3efd9988
FG
2954 def wait_till_pg_convergence(self, timeout=None):
2955 start = time.time()
2956 old_stats = None
2957 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2958 if osd['in'] and osd['up']]
2959 while True:
2960 # strictly speaking, no need to wait for mon. but due to the
2961 # "ms inject socket failures" setting, the osdmap could be delayed,
2962 # so mgr is likely to ignore the pg-stat messages with pgs serving
2963 # newly created pools which is not yet known by mgr. so, to make sure
2964 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2965 # necessary.
2966 self.flush_pg_stats(active_osds)
2967 new_stats = dict((stat['pgid'], stat['state'])
2968 for stat in self.get_pg_stats())
2969 if old_stats == new_stats:
2970 return old_stats
2971 if timeout is not None:
2972 assert time.time() - start < timeout, \
2973 'failed to reach convergence before %d secs' % timeout
2974 old_stats = new_stats
2975 # longer than mgr_stats_period
2976 time.sleep(5 + 1)
2977
7c673cae
FG
2978 def mark_out_osd(self, osd):
2979 """
2980 Wrapper to mark osd out.
2981 """
2982 self.raw_cluster_cmd('osd', 'out', str(osd))
2983
2984 def kill_osd(self, osd):
2985 """
2986 Kill osds by either power cycling (if indicated by the config)
2987 or by stopping.
2988 """
2989 if self.config.get('powercycle'):
2990 remote = self.find_remote('osd', osd)
2991 self.log('kill_osd on osd.{o} '
2992 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2993 self._assert_ipmi(remote)
2994 remote.console.power_off()
2995 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2996 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
11fdf7f2
TL
2997 self.inject_args(
2998 'osd', osd,
2999 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
7c673cae
FG
3000 try:
3001 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
3002 except:
3003 pass
3004 else:
3005 raise RuntimeError('osd.%s did not fail' % osd)
3006 else:
3007 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
3008 else:
3009 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
3010
3011 @staticmethod
3012 def _assert_ipmi(remote):
3013 assert remote.console.has_ipmi_credentials, (
3014 "powercycling requested but RemoteConsole is not "
3015 "initialized. Check ipmi config.")
3016
3017 def blackhole_kill_osd(self, osd):
3018 """
3019 Stop osd if nothing else works.
3020 """
11fdf7f2
TL
3021 self.inject_args('osd', osd,
3022 'objectstore-blackhole', True)
7c673cae
FG
3023 time.sleep(2)
3024 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
3025
3efd9988 3026 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
7c673cae
FG
3027 """
3028 Revive osds by either power cycling (if indicated by the config)
3029 or by restarting.
3030 """
3031 if self.config.get('powercycle'):
3032 remote = self.find_remote('osd', osd)
3033 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
3034 format(o=osd, s=remote.name))
3035 self._assert_ipmi(remote)
3036 remote.console.power_on()
3037 if not remote.console.check_status(300):
3038 raise Exception('Failed to revive osd.{o} via ipmi'.
3039 format(o=osd))
3040 teuthology.reconnect(self.ctx, 60, [remote])
3041 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
3042 self.make_admin_daemon_dir(remote)
3043 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
3044 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
3045
3046 if not skip_admin_check:
3047 # wait for dump_ops_in_flight; this command doesn't appear
3048 # until after the signal handler is installed and it is safe
3049 # to stop the osd again without making valgrind leak checks
3050 # unhappy. see #5924.
3051 self.wait_run_admin_socket('osd', osd,
3052 args=['dump_ops_in_flight'],
3053 timeout=timeout, stdout=DEVNULL)
3054
3055 def mark_down_osd(self, osd):
3056 """
3057 Cluster command wrapper
3058 """
3059 self.raw_cluster_cmd('osd', 'down', str(osd))
3060
3061 def mark_in_osd(self, osd):
3062 """
3063 Cluster command wrapper
3064 """
3065 self.raw_cluster_cmd('osd', 'in', str(osd))
3066
3067 def signal_osd(self, osd, sig, silent=False):
3068 """
3069 Wrapper to local get_daemon call which sends the given
3070 signal to the given osd.
3071 """
3072 self.ctx.daemons.get_daemon('osd', osd,
3073 self.cluster).signal(sig, silent=silent)
3074
3075 ## monitors
3076 def signal_mon(self, mon, sig, silent=False):
3077 """
11fdf7f2 3078 Wrapper to local get_daemon call
7c673cae
FG
3079 """
3080 self.ctx.daemons.get_daemon('mon', mon,
3081 self.cluster).signal(sig, silent=silent)
3082
3083 def kill_mon(self, mon):
3084 """
3085 Kill the monitor by either power cycling (if the config says so),
3086 or by doing a stop.
3087 """
3088 if self.config.get('powercycle'):
3089 remote = self.find_remote('mon', mon)
3090 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
3091 format(m=mon, s=remote.name))
3092 self._assert_ipmi(remote)
3093 remote.console.power_off()
3094 else:
3095 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
3096
3097 def revive_mon(self, mon):
3098 """
3099 Restart by either power cycling (if the config says so),
3100 or by doing a normal restart.
3101 """
3102 if self.config.get('powercycle'):
3103 remote = self.find_remote('mon', mon)
3104 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
3105 format(m=mon, s=remote.name))
3106 self._assert_ipmi(remote)
3107 remote.console.power_on()
3108 self.make_admin_daemon_dir(remote)
3109 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
3110
31f18b77
FG
3111 def revive_mgr(self, mgr):
3112 """
3113 Restart by either power cycling (if the config says so),
3114 or by doing a normal restart.
3115 """
3116 if self.config.get('powercycle'):
3117 remote = self.find_remote('mgr', mgr)
3118 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3119 format(m=mgr, s=remote.name))
3120 self._assert_ipmi(remote)
3121 remote.console.power_on()
3122 self.make_admin_daemon_dir(remote)
3123 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
3124
7c673cae
FG
3125 def get_mon_status(self, mon):
3126 """
3127 Extract all the monitor status information from the cluster
3128 """
9f95a23c 3129 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
7c673cae
FG
3130 return json.loads(out)
3131
3132 def get_mon_quorum(self):
3133 """
3134 Extract monitor quorum information from the cluster
3135 """
3136 out = self.raw_cluster_cmd('quorum_status')
3137 j = json.loads(out)
7c673cae
FG
3138 return j['quorum']
3139
3140 def wait_for_mon_quorum_size(self, size, timeout=300):
3141 """
3142 Loop until quorum size is reached.
3143 """
3144 self.log('waiting for quorum size %d' % size)
522d829b
TL
3145 sleep = 3
3146 with safe_while(sleep=sleep,
3147 tries=timeout // sleep,
3148 action=f'wait for quorum size {size}') as proceed:
3149 while proceed():
3150 try:
3151 if len(self.get_mon_quorum()) == size:
3152 break
3153 except CommandFailedError as e:
3154 # could fail instea4d of blocked if the rotating key of the
3155 # connected monitor is not updated yet after they form the
3156 # quorum
3157 if e.exitstatus == errno.EACCES:
3158 pass
3159 else:
3160 raise
7c673cae
FG
3161 self.log("quorum is size %d" % size)
3162
3163 def get_mon_health(self, debug=False):
3164 """
3165 Extract all the monitor health information.
3166 """
3167 out = self.raw_cluster_cmd('health', '--format=json')
3168 if debug:
3169 self.log('health:\n{h}'.format(h=out))
3170 return json.loads(out)
3171
9f95a23c
TL
3172 def wait_until_healthy(self, timeout=None):
3173 self.log("wait_until_healthy")
3174 start = time.time()
3175 while self.get_mon_health()['status'] != 'HEALTH_OK':
3176 if timeout is not None:
3177 assert time.time() - start < timeout, \
3178 'timeout expired in wait_until_healthy'
3179 time.sleep(3)
3180 self.log("wait_until_healthy done")
3181
7c673cae
FG
3182 def get_filepath(self):
3183 """
3184 Return path to osd data with {id} needing to be replaced
3185 """
3186 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
3187
3188 def make_admin_daemon_dir(self, remote):
3189 """
3190 Create /var/run/ceph directory on remote site.
3191
3192 :param ctx: Context
3193 :param remote: Remote site
3194 """
3195 remote.run(args=['sudo',
3196 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3197
9f95a23c
TL
3198 def get_service_task_status(self, service, status_key):
3199 """
3200 Return daemon task status for a given ceph service.
3201
3202 :param service: ceph service (mds, osd, etc...)
3203 :param status_key: matching task status key
3204 """
3205 task_status = {}
3206 status = self.raw_cluster_status()
3207 try:
3208 for k,v in status['servicemap']['services'][service]['daemons'].items():
3209 ts = dict(v).get('task_status', None)
3210 if ts:
3211 task_status[k] = ts[status_key]
3212 except KeyError: # catches missing service and status key
3213 return {}
3214 self.log(task_status)
3215 return task_status
7c673cae
FG
3216
3217def utility_task(name):
3218 """
3219 Generate ceph_manager subtask corresponding to ceph_manager
3220 method name
3221 """
3222 def task(ctx, config):
3223 if config is None:
3224 config = {}
3225 args = config.get('args', [])
3226 kwargs = config.get('kwargs', {})
3227 cluster = config.get('cluster', 'ceph')
3228 fn = getattr(ctx.managers[cluster], name)
3229 fn(*args, **kwargs)
3230 return task
3231
3232revive_osd = utility_task("revive_osd")
3233revive_mon = utility_task("revive_mon")
3234kill_osd = utility_task("kill_osd")
3235kill_mon = utility_task("kill_mon")
3236create_pool = utility_task("create_pool")
3237remove_pool = utility_task("remove_pool")
3238wait_for_clean = utility_task("wait_for_clean")
c07f9fc5 3239flush_all_pg_stats = utility_task("flush_all_pg_stats")
7c673cae
FG
3240set_pool_property = utility_task("set_pool_property")
3241do_pg_scrub = utility_task("do_pg_scrub")
c07f9fc5
FG
3242wait_for_pool = utility_task("wait_for_pool")
3243wait_for_pools = utility_task("wait_for_pools")