]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
51d802cd7ce9d53cf7278863711465aff32aa03d
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from functools import wraps
5 import contextlib
6 import errno
7 import random
8 import signal
9 import time
10 import gevent
11 import base64
12 import json
13 import logging
14 import threading
15 import traceback
16 import os
17 import shlex
18
19 from io import BytesIO, StringIO
20 from subprocess import DEVNULL
21 from teuthology import misc as teuthology
22 from tasks.scrub import Scrubber
23 from tasks.util.rados import cmd_erasure_code_profile
24 from tasks.util import get_remote
25 from teuthology.contextutil import safe_while
26 from teuthology.orchestra.remote import Remote
27 from teuthology.orchestra import run
28 from teuthology.parallel import parallel
29 from teuthology.exceptions import CommandFailedError
30 from tasks.thrasher import Thrasher
31
32
33 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
34
35 log = logging.getLogger(__name__)
36
37 # this is for cephadm clusters
38 def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
39 extra_args = []
40 if name:
41 extra_args = ['-n', name]
42 return remote.run(
43 args=[
44 'sudo',
45 ctx.cephadm,
46 '--image', ctx.ceph[cluster_name].image,
47 'shell',
48 ] + extra_args + [
49 '--fsid', ctx.ceph[cluster_name].fsid,
50 '--',
51 ] + args,
52 **kwargs
53 )
54
55 # this is for rook clusters
56 def toolbox(ctx, cluster_name, args, **kwargs):
57 return ctx.rook[cluster_name].remote.run(
58 args=[
59 'kubectl',
60 '-n', 'rook-ceph',
61 'exec',
62 ctx.rook[cluster_name].toolbox,
63 '--',
64 ] + args,
65 **kwargs
66 )
67
68
69 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
70 conf_fp = BytesIO()
71 ctx.ceph[cluster].conf.write(conf_fp)
72 conf_fp.seek(0)
73 writes = ctx.cluster.run(
74 args=[
75 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
76 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
77 'sudo', 'tee', conf_path, run.Raw('&&'),
78 'sudo', 'chmod', '0644', conf_path,
79 run.Raw('>'), '/dev/null',
80
81 ],
82 stdin=run.PIPE,
83 wait=False)
84 teuthology.feed_many_stdins_and_close(conf_fp, writes)
85 run.wait(writes)
86
87 def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
88 """
89 Build a command line for running valgrind.
90
91 testdir - test results directory
92 name - name of daemon (for naming hte log file)
93 preamble - stuff we should run before valgrind
94 v - valgrind arguments
95 """
96 if v is None:
97 return preamble
98 if not isinstance(v, list):
99 v = [v]
100
101 # https://tracker.ceph.com/issues/44362
102 preamble.extend([
103 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
104 ])
105
106 val_path = '/var/log/ceph/valgrind'
107 if '--tool=memcheck' in v or '--tool=helgrind' in v:
108 extra_args = [
109 'valgrind',
110 '--trace-children=no',
111 '--child-silent-after-fork=yes',
112 '--soname-synonyms=somalloc=*tcmalloc*',
113 '--num-callers=50',
114 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
115 '--xml=yes',
116 '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
117 '--time-stamp=yes',
118 '--vgdb=yes',
119 ]
120 else:
121 extra_args = [
122 'valgrind',
123 '--trace-children=no',
124 '--child-silent-after-fork=yes',
125 '--soname-synonyms=somalloc=*tcmalloc*',
126 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
127 '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
128 '--time-stamp=yes',
129 '--vgdb=yes',
130 ]
131 if exit_on_first_error:
132 extra_args.extend([
133 # at least Valgrind 3.14 is required
134 '--exit-on-first-error=yes',
135 '--error-exitcode=42',
136 ])
137 args = []
138 if cd:
139 args += ['cd', testdir, run.Raw('&&')]
140 args += preamble + extra_args + v
141 log.debug('running %s under valgrind with args %s', name, args)
142 return args
143
144
145 def mount_osd_data(ctx, remote, cluster, osd):
146 """
147 Mount a remote OSD
148
149 :param ctx: Context
150 :param remote: Remote site
151 :param cluster: name of ceph cluster
152 :param osd: Osd name
153 """
154 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
155 role = "{0}.osd.{1}".format(cluster, osd)
156 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
157 if remote in ctx.disk_config.remote_to_roles_to_dev:
158 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
159 role = alt_role
160 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
161 return
162 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
163 mount_options = ctx.disk_config.\
164 remote_to_roles_to_dev_mount_options[remote][role]
165 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
166 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
167
168 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
169 'mountpoint: {p}, type: {t}, options: {v}'.format(
170 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
171 c=cluster))
172
173 remote.run(
174 args=[
175 'sudo',
176 'mount',
177 '-t', fstype,
178 '-o', ','.join(mount_options),
179 dev,
180 mnt,
181 ]
182 )
183
184
185 def log_exc(func):
186 @wraps(func)
187 def wrapper(self):
188 try:
189 return func(self)
190 except:
191 self.log(traceback.format_exc())
192 raise
193 return wrapper
194
195
196 class PoolType:
197 REPLICATED = 1
198 ERASURE_CODED = 3
199
200
201 class OSDThrasher(Thrasher):
202 """
203 Object used to thrash Ceph
204 """
205 def __init__(self, manager, config, name, logger):
206 super(OSDThrasher, self).__init__()
207
208 self.ceph_manager = manager
209 self.cluster = manager.cluster
210 self.ceph_manager.wait_for_clean()
211 osd_status = self.ceph_manager.get_osd_status()
212 self.in_osds = osd_status['in']
213 self.live_osds = osd_status['live']
214 self.out_osds = osd_status['out']
215 self.dead_osds = osd_status['dead']
216 self.stopping = False
217 self.logger = logger
218 self.config = config
219 self.name = name
220 self.revive_timeout = self.config.get("revive_timeout", 360)
221 self.pools_to_fix_pgp_num = set()
222 if self.config.get('powercycle'):
223 self.revive_timeout += 120
224 self.clean_wait = self.config.get('clean_wait', 0)
225 self.minin = self.config.get("min_in", 4)
226 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
227 self.sighup_delay = self.config.get('sighup_delay')
228 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
229 self.dump_ops_enable = self.config.get('dump_ops_enable')
230 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
231 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
232 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
233 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
234 self.random_eio = self.config.get('random_eio')
235 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
236
237 num_osds = self.in_osds + self.out_osds
238 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
239 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
240 if self.config is None:
241 self.config = dict()
242 # prevent monitor from auto-marking things out while thrasher runs
243 # try both old and new tell syntax, in case we are testing old code
244 self.saved_options = []
245 # assuming that the default settings do not vary from one daemon to
246 # another
247 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
248 opts = [('mon', 'mon_osd_down_out_interval', 0)]
249 #why do we disable marking an OSD out automatically? :/
250 for service, opt, new_value in opts:
251 old_value = manager.get_config(first_mon[0],
252 first_mon[1],
253 opt)
254 self.saved_options.append((service, opt, old_value))
255 manager.inject_args(service, '*', opt, new_value)
256 # initialize ceph_objectstore_tool property - must be done before
257 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
258 if (self.config.get('powercycle') or
259 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
260 self.config.get('disable_objectstore_tool_tests', False)):
261 self.ceph_objectstore_tool = False
262 if self.config.get('powercycle'):
263 self.log("Unable to test ceph-objectstore-tool, "
264 "powercycle testing")
265 else:
266 self.log("Unable to test ceph-objectstore-tool, "
267 "not available on all OSD nodes")
268 else:
269 self.ceph_objectstore_tool = \
270 self.config.get('ceph_objectstore_tool', True)
271 # spawn do_thrash
272 self.thread = gevent.spawn(self.do_thrash)
273 if self.sighup_delay:
274 self.sighup_thread = gevent.spawn(self.do_sighup)
275 if self.optrack_toggle_delay:
276 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
277 if self.dump_ops_enable == "true":
278 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
279 if self.noscrub_toggle_delay:
280 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
281
282 def log(self, msg, *args, **kwargs):
283 self.logger.info(msg, *args, **kwargs)
284
285 def cmd_exists_on_osds(self, cmd):
286 if self.ceph_manager.cephadm or self.ceph_manager.rook:
287 return True
288 allremotes = self.ceph_manager.ctx.cluster.only(\
289 teuthology.is_type('osd', self.cluster)).remotes.keys()
290 allremotes = list(set(allremotes))
291 for remote in allremotes:
292 proc = remote.run(args=['type', cmd], wait=True,
293 check_status=False, stdout=BytesIO(),
294 stderr=BytesIO())
295 if proc.exitstatus != 0:
296 return False;
297 return True;
298
299 def run_ceph_objectstore_tool(self, remote, osd, cmd):
300 if self.ceph_manager.cephadm:
301 return shell(
302 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
303 args=['ceph-objectstore-tool', '--err-to-stderr'] + cmd,
304 name=osd,
305 wait=True, check_status=False,
306 stdout=StringIO(),
307 stderr=StringIO())
308 elif self.ceph_manager.rook:
309 assert False, 'not implemented'
310 else:
311 return remote.run(
312 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool', '--err-to-stderr'] + cmd,
313 wait=True, check_status=False,
314 stdout=StringIO(),
315 stderr=StringIO())
316
317 def run_ceph_bluestore_tool(self, remote, osd, cmd):
318 if self.ceph_manager.cephadm:
319 return shell(
320 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
321 args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
322 name=osd,
323 wait=True, check_status=False,
324 stdout=StringIO(),
325 stderr=StringIO())
326 elif self.ceph_manager.rook:
327 assert False, 'not implemented'
328 else:
329 return remote.run(
330 args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
331 wait=True, check_status=False,
332 stdout=StringIO(),
333 stderr=StringIO())
334
335 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
336 """
337 :param osd: Osd to be killed.
338 :mark_down: Mark down if true.
339 :mark_out: Mark out if true.
340 """
341 if osd is None:
342 osd = random.choice(self.live_osds)
343 self.log("Killing osd %s, live_osds are %s" % (str(osd),
344 str(self.live_osds)))
345 self.live_osds.remove(osd)
346 self.dead_osds.append(osd)
347 self.ceph_manager.kill_osd(osd)
348 if mark_down:
349 self.ceph_manager.mark_down_osd(osd)
350 if mark_out and osd in self.in_osds:
351 self.out_osd(osd)
352 if self.ceph_objectstore_tool:
353 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
354 remote = self.ceph_manager.find_remote('osd', osd)
355 FSPATH = self.ceph_manager.get_filepath()
356 JPATH = os.path.join(FSPATH, "journal")
357 exp_osd = imp_osd = osd
358 self.log('remote for osd %s is %s' % (osd, remote))
359 exp_remote = imp_remote = remote
360 # If an older osd is available we'll move a pg from there
361 if (len(self.dead_osds) > 1 and
362 random.random() < self.chance_move_pg):
363 exp_osd = random.choice(self.dead_osds[:-1])
364 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
365 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
366 prefix = [
367 '--no-mon-config',
368 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
369 ]
370
371 if self.ceph_manager.rook:
372 assert False, 'not implemented'
373
374 if not self.ceph_manager.cephadm:
375 # ceph-objectstore-tool might be temporarily absent during an
376 # upgrade - see http://tracker.ceph.com/issues/18014
377 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
378 while proceed():
379 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
380 wait=True, check_status=False, stdout=BytesIO(),
381 stderr=BytesIO())
382 if proc.exitstatus == 0:
383 break
384 log.debug("ceph-objectstore-tool binary not present, trying again")
385
386 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
387 # see http://tracker.ceph.com/issues/19556
388 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
389 while proceed():
390 proc = self.run_ceph_objectstore_tool(
391 exp_remote, 'osd.%s' % exp_osd,
392 prefix + [
393 '--data-path', FSPATH.format(id=exp_osd),
394 '--journal-path', JPATH.format(id=exp_osd),
395 '--op', 'list-pgs',
396 ])
397 if proc.exitstatus == 0:
398 break
399 elif (proc.exitstatus == 1 and
400 proc.stderr.getvalue() == "OSD has the store locked"):
401 continue
402 else:
403 raise Exception("ceph-objectstore-tool: "
404 "exp list-pgs failure with status {ret}".
405 format(ret=proc.exitstatus))
406
407 pgs = proc.stdout.getvalue().split('\n')[:-1]
408 if len(pgs) == 0:
409 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
410 return
411 pg = random.choice(pgs)
412 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
413 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
414 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
415 "exp.{pg}.{id}".format(
416 pg=pg,
417 id=exp_osd))
418 if self.ceph_manager.cephadm:
419 exp_host_path = os.path.join(
420 '/var/log/ceph',
421 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
422 "exp.{pg}.{id}".format(
423 pg=pg,
424 id=exp_osd))
425 else:
426 exp_host_path = exp_path
427
428 # export
429 # Can't use new export-remove op since this is part of upgrade testing
430 proc = self.run_ceph_objectstore_tool(
431 exp_remote, 'osd.%s' % exp_osd,
432 prefix + [
433 '--data-path', FSPATH.format(id=exp_osd),
434 '--journal-path', JPATH.format(id=exp_osd),
435 '--op', 'export',
436 '--pgid', pg,
437 '--file', exp_path,
438 ])
439 if proc.exitstatus:
440 raise Exception("ceph-objectstore-tool: "
441 "export failure with status {ret}".
442 format(ret=proc.exitstatus))
443 # remove
444 proc = self.run_ceph_objectstore_tool(
445 exp_remote, 'osd.%s' % exp_osd,
446 prefix + [
447 '--data-path', FSPATH.format(id=exp_osd),
448 '--journal-path', JPATH.format(id=exp_osd),
449 '--force',
450 '--op', 'remove',
451 '--pgid', pg,
452 ])
453 if proc.exitstatus:
454 raise Exception("ceph-objectstore-tool: "
455 "remove failure with status {ret}".
456 format(ret=proc.exitstatus))
457 # If there are at least 2 dead osds we might move the pg
458 if exp_osd != imp_osd:
459 # If pg isn't already on this osd, then we will move it there
460 proc = self.run_ceph_objectstore_tool(
461 imp_remote,
462 'osd.%s' % imp_osd,
463 prefix + [
464 '--data-path', FSPATH.format(id=imp_osd),
465 '--journal-path', JPATH.format(id=imp_osd),
466 '--op', 'list-pgs',
467 ])
468 if proc.exitstatus:
469 raise Exception("ceph-objectstore-tool: "
470 "imp list-pgs failure with status {ret}".
471 format(ret=proc.exitstatus))
472 pgs = proc.stdout.getvalue().split('\n')[:-1]
473 if pg not in pgs:
474 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
475 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
476 if imp_remote != exp_remote:
477 # Copy export file to the other machine
478 self.log("Transfer export file from {srem} to {trem}".
479 format(srem=exp_remote, trem=imp_remote))
480 # just in case an upgrade make /var/log/ceph unreadable by non-root,
481 exp_remote.run(args=['sudo', 'chmod', '777',
482 '/var/log/ceph'])
483 imp_remote.run(args=['sudo', 'chmod', '777',
484 '/var/log/ceph'])
485 tmpexport = Remote.get_file(exp_remote, exp_host_path,
486 sudo=True)
487 if exp_host_path != exp_path:
488 # push to /var/log/ceph, then rename (we can't
489 # chmod 777 the /var/log/ceph/$fsid mountpoint)
490 Remote.put_file(imp_remote, tmpexport, exp_path)
491 imp_remote.run(args=[
492 'sudo', 'mv', exp_path, exp_host_path])
493 else:
494 Remote.put_file(imp_remote, tmpexport, exp_host_path)
495 os.remove(tmpexport)
496 else:
497 # Can't move the pg after all
498 imp_osd = exp_osd
499 imp_remote = exp_remote
500 # import
501 proc = self.run_ceph_objectstore_tool(
502 imp_remote, 'osd.%s' % imp_osd,
503 [
504 '--data-path', FSPATH.format(id=imp_osd),
505 '--journal-path', JPATH.format(id=imp_osd),
506 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
507 '--op', 'import',
508 '--file', exp_path,
509 ])
510 if proc.exitstatus == 1:
511 bogosity = "The OSD you are using is older than the exported PG"
512 if bogosity in proc.stderr.getvalue():
513 self.log("OSD older than exported PG"
514 "...ignored")
515 elif proc.exitstatus == 10:
516 self.log("Pool went away before processing an import"
517 "...ignored")
518 elif proc.exitstatus == 11:
519 self.log("Attempt to import an incompatible export"
520 "...ignored")
521 elif proc.exitstatus == 12:
522 # this should be safe to ignore because we only ever move 1
523 # copy of the pg at a time, and merge is only initiated when
524 # all replicas are peered and happy. /me crosses fingers
525 self.log("PG merged on target"
526 "...ignored")
527 elif proc.exitstatus:
528 raise Exception("ceph-objectstore-tool: "
529 "import failure with status {ret}".
530 format(ret=proc.exitstatus))
531 cmd = "sudo rm -f {file}".format(file=exp_host_path)
532 exp_remote.run(args=cmd)
533 if imp_remote != exp_remote:
534 imp_remote.run(args=cmd)
535
536 # apply low split settings to each pool
537 if not self.ceph_manager.cephadm:
538 for pool in self.ceph_manager.list_pools():
539 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
540 "--filestore-split-multiple 1' sudo -E "
541 + 'ceph-objectstore-tool '
542 + ' '.join(prefix + [
543 '--data-path', FSPATH.format(id=imp_osd),
544 '--journal-path', JPATH.format(id=imp_osd),
545 ])
546 + " --op apply-layout-settings --pool " + pool).format(id=osd)
547 proc = imp_remote.run(args=cmd,
548 wait=True, check_status=False,
549 stderr=StringIO())
550 if 'Couldn\'t find pool' in proc.stderr.getvalue():
551 continue
552 if proc.exitstatus:
553 raise Exception("ceph-objectstore-tool apply-layout-settings"
554 " failed with {status}".format(status=proc.exitstatus))
555
556
557 def blackhole_kill_osd(self, osd=None):
558 """
559 If all else fails, kill the osd.
560 :param osd: Osd to be killed.
561 """
562 if osd is None:
563 osd = random.choice(self.live_osds)
564 self.log("Blackholing and then killing osd %s, live_osds are %s" %
565 (str(osd), str(self.live_osds)))
566 self.live_osds.remove(osd)
567 self.dead_osds.append(osd)
568 self.ceph_manager.blackhole_kill_osd(osd)
569
570 def revive_osd(self, osd=None, skip_admin_check=False):
571 """
572 Revive the osd.
573 :param osd: Osd to be revived.
574 """
575 if osd is None:
576 osd = random.choice(self.dead_osds)
577 self.log("Reviving osd %s" % (str(osd),))
578 self.ceph_manager.revive_osd(
579 osd,
580 self.revive_timeout,
581 skip_admin_check=skip_admin_check)
582 self.dead_osds.remove(osd)
583 self.live_osds.append(osd)
584 if self.random_eio > 0 and osd == self.rerrosd:
585 self.ceph_manager.set_config(self.rerrosd,
586 filestore_debug_random_read_err = self.random_eio)
587 self.ceph_manager.set_config(self.rerrosd,
588 bluestore_debug_random_read_err = self.random_eio)
589
590
591 def out_osd(self, osd=None):
592 """
593 Mark the osd out
594 :param osd: Osd to be marked.
595 """
596 if osd is None:
597 osd = random.choice(self.in_osds)
598 self.log("Removing osd %s, in_osds are: %s" %
599 (str(osd), str(self.in_osds)))
600 self.ceph_manager.mark_out_osd(osd)
601 self.in_osds.remove(osd)
602 self.out_osds.append(osd)
603
604 def in_osd(self, osd=None):
605 """
606 Mark the osd out
607 :param osd: Osd to be marked.
608 """
609 if osd is None:
610 osd = random.choice(self.out_osds)
611 if osd in self.dead_osds:
612 return self.revive_osd(osd)
613 self.log("Adding osd %s" % (str(osd),))
614 self.out_osds.remove(osd)
615 self.in_osds.append(osd)
616 self.ceph_manager.mark_in_osd(osd)
617 self.log("Added osd %s" % (str(osd),))
618
619 def reweight_osd_or_by_util(self, osd=None):
620 """
621 Reweight an osd that is in
622 :param osd: Osd to be marked.
623 """
624 if osd is not None or random.choice([True, False]):
625 if osd is None:
626 osd = random.choice(self.in_osds)
627 val = random.uniform(.1, 1.0)
628 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
629 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
630 str(osd), str(val))
631 else:
632 # do it several times, the option space is large
633 for i in range(5):
634 options = {
635 'max_change': random.choice(['0.05', '1.0', '3.0']),
636 'overage': random.choice(['110', '1000']),
637 'type': random.choice([
638 'reweight-by-utilization',
639 'test-reweight-by-utilization']),
640 }
641 self.log("Reweighting by: %s"%(str(options),))
642 self.ceph_manager.raw_cluster_cmd(
643 'osd',
644 options['type'],
645 options['overage'],
646 options['max_change'])
647
648 def primary_affinity(self, osd=None):
649 if osd is None:
650 osd = random.choice(self.in_osds)
651 if random.random() >= .5:
652 pa = random.random()
653 elif random.random() >= .5:
654 pa = 1
655 else:
656 pa = 0
657 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
658 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
659 str(osd), str(pa))
660
661 def thrash_cluster_full(self):
662 """
663 Set and unset cluster full condition
664 """
665 self.log('Setting full ratio to .001')
666 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
667 time.sleep(1)
668 self.log('Setting full ratio back to .95')
669 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
670
671 def thrash_pg_upmap(self):
672 """
673 Install or remove random pg_upmap entries in OSDMap
674 """
675 from random import shuffle
676 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
677 j = json.loads(out)
678 self.log('j is %s' % j)
679 try:
680 if random.random() >= .3:
681 pgs = self.ceph_manager.get_pg_stats()
682 if not pgs:
683 return
684 pg = random.choice(pgs)
685 pgid = str(pg['pgid'])
686 poolid = int(pgid.split('.')[0])
687 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
688 if len(sizes) == 0:
689 return
690 n = sizes[0]
691 osds = self.in_osds + self.out_osds
692 shuffle(osds)
693 osds = osds[0:n]
694 self.log('Setting %s to %s' % (pgid, osds))
695 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
696 self.log('cmd %s' % cmd)
697 self.ceph_manager.raw_cluster_cmd(*cmd)
698 else:
699 m = j['pg_upmap']
700 if len(m) > 0:
701 shuffle(m)
702 pg = m[0]['pgid']
703 self.log('Clearing pg_upmap on %s' % pg)
704 self.ceph_manager.raw_cluster_cmd(
705 'osd',
706 'rm-pg-upmap',
707 pg)
708 else:
709 self.log('No pg_upmap entries; doing nothing')
710 except CommandFailedError:
711 self.log('Failed to rm-pg-upmap, ignoring')
712
713 def thrash_pg_upmap_items(self):
714 """
715 Install or remove random pg_upmap_items entries in OSDMap
716 """
717 from random import shuffle
718 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
719 j = json.loads(out)
720 self.log('j is %s' % j)
721 try:
722 if random.random() >= .3:
723 pgs = self.ceph_manager.get_pg_stats()
724 if not pgs:
725 return
726 pg = random.choice(pgs)
727 pgid = str(pg['pgid'])
728 poolid = int(pgid.split('.')[0])
729 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
730 if len(sizes) == 0:
731 return
732 n = sizes[0]
733 osds = self.in_osds + self.out_osds
734 shuffle(osds)
735 osds = osds[0:n*2]
736 self.log('Setting %s to %s' % (pgid, osds))
737 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
738 self.log('cmd %s' % cmd)
739 self.ceph_manager.raw_cluster_cmd(*cmd)
740 else:
741 m = j['pg_upmap_items']
742 if len(m) > 0:
743 shuffle(m)
744 pg = m[0]['pgid']
745 self.log('Clearing pg_upmap on %s' % pg)
746 self.ceph_manager.raw_cluster_cmd(
747 'osd',
748 'rm-pg-upmap-items',
749 pg)
750 else:
751 self.log('No pg_upmap entries; doing nothing')
752 except CommandFailedError:
753 self.log('Failed to rm-pg-upmap-items, ignoring')
754
755 def force_recovery(self):
756 """
757 Force recovery on some of PGs
758 """
759 backfill = random.random() >= 0.5
760 j = self.ceph_manager.get_pgids_to_force(backfill)
761 if j:
762 try:
763 if backfill:
764 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
765 else:
766 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
767 except CommandFailedError:
768 self.log('Failed to force backfill|recovery, ignoring')
769
770
771 def cancel_force_recovery(self):
772 """
773 Force recovery on some of PGs
774 """
775 backfill = random.random() >= 0.5
776 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
777 if j:
778 try:
779 if backfill:
780 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
781 else:
782 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
783 except CommandFailedError:
784 self.log('Failed to force backfill|recovery, ignoring')
785
786 def force_cancel_recovery(self):
787 """
788 Force or cancel forcing recovery
789 """
790 if random.random() >= 0.4:
791 self.force_recovery()
792 else:
793 self.cancel_force_recovery()
794
795 def all_up(self):
796 """
797 Make sure all osds are up and not out.
798 """
799 while len(self.dead_osds) > 0:
800 self.log("reviving osd")
801 self.revive_osd()
802 while len(self.out_osds) > 0:
803 self.log("inning osd")
804 self.in_osd()
805
806 def all_up_in(self):
807 """
808 Make sure all osds are up and fully in.
809 """
810 self.all_up();
811 for osd in self.live_osds:
812 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
813 str(osd), str(1))
814 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
815 str(osd), str(1))
816
817 def do_join(self):
818 """
819 Break out of this Ceph loop
820 """
821 self.stopping = True
822 self.thread.get()
823 if self.sighup_delay:
824 self.log("joining the do_sighup greenlet")
825 self.sighup_thread.get()
826 if self.optrack_toggle_delay:
827 self.log("joining the do_optrack_toggle greenlet")
828 self.optrack_toggle_thread.join()
829 if self.dump_ops_enable == "true":
830 self.log("joining the do_dump_ops greenlet")
831 self.dump_ops_thread.join()
832 if self.noscrub_toggle_delay:
833 self.log("joining the do_noscrub_toggle greenlet")
834 self.noscrub_toggle_thread.join()
835
836 def grow_pool(self):
837 """
838 Increase the size of the pool
839 """
840 pool = self.ceph_manager.get_pool()
841 if pool is None:
842 return
843 self.log("Growing pool %s" % (pool,))
844 if self.ceph_manager.expand_pool(pool,
845 self.config.get('pool_grow_by', 10),
846 self.max_pgs):
847 self.pools_to_fix_pgp_num.add(pool)
848
849 def shrink_pool(self):
850 """
851 Decrease the size of the pool
852 """
853 pool = self.ceph_manager.get_pool()
854 if pool is None:
855 return
856 _ = self.ceph_manager.get_pool_pg_num(pool)
857 self.log("Shrinking pool %s" % (pool,))
858 if self.ceph_manager.contract_pool(
859 pool,
860 self.config.get('pool_shrink_by', 10),
861 self.min_pgs):
862 self.pools_to_fix_pgp_num.add(pool)
863
864 def fix_pgp_num(self, pool=None):
865 """
866 Fix number of pgs in pool.
867 """
868 if pool is None:
869 pool = self.ceph_manager.get_pool()
870 if not pool:
871 return
872 force = False
873 else:
874 force = True
875 self.log("fixing pg num pool %s" % (pool,))
876 if self.ceph_manager.set_pool_pgpnum(pool, force):
877 self.pools_to_fix_pgp_num.discard(pool)
878
879 def test_pool_min_size(self):
880 """
881 Loop to selectively push PGs below their min_size and test that recovery
882 still occurs.
883 """
884 self.log("test_pool_min_size")
885 self.all_up()
886 self.ceph_manager.wait_for_recovery(
887 timeout=self.config.get('timeout')
888 )
889
890 minout = int(self.config.get("min_out", 1))
891 minlive = int(self.config.get("min_live", 2))
892 mindead = int(self.config.get("min_dead", 1))
893 self.log("doing min_size thrashing")
894 self.ceph_manager.wait_for_clean(timeout=60)
895 assert self.ceph_manager.is_clean(), \
896 'not clean before minsize thrashing starts'
897 while not self.stopping:
898 # look up k and m from all the pools on each loop, in case it
899 # changes as the cluster runs
900 k = 0
901 m = 99
902 has_pools = False
903 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
904
905 for pool_json in pools_json:
906 pool = pool_json['pool_name']
907 has_pools = True
908 pool_type = pool_json['type'] # 1 for rep, 3 for ec
909 min_size = pool_json['min_size']
910 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
911 try:
912 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
913 if pool_type != PoolType.ERASURE_CODED:
914 continue
915 ec_profile = pool_json['erasure_code_profile']
916 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
917 'osd',
918 'erasure-code-profile',
919 'get',
920 ec_profile,
921 '--format=json')
922 ec_json = json.loads(ec_profile_json)
923 local_k = int(ec_json['k'])
924 local_m = int(ec_json['m'])
925 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
926 k=local_k, m=local_m))
927 if local_k > k:
928 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
929 k = local_k
930 if local_m < m:
931 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
932 m = local_m
933 except CommandFailedError:
934 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
935 continue
936
937 if has_pools :
938 self.log("using k={k}, m={m}".format(k=k,m=m))
939 else:
940 self.log("No pools yet, waiting")
941 time.sleep(5)
942 continue
943
944 if minout > len(self.out_osds): # kill OSDs and mark out
945 self.log("forced to out an osd")
946 self.kill_osd(mark_out=True)
947 continue
948 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
949 self.log("forced to kill an osd")
950 self.kill_osd()
951 continue
952 else: # make mostly-random choice to kill or revive OSDs
953 minup = max(minlive, k)
954 rand_val = random.uniform(0, 1)
955 self.log("choosing based on number of live OSDs and rand val {rand}".\
956 format(rand=rand_val))
957 if len(self.live_osds) > minup+1 and rand_val < 0.5:
958 # chose to knock out as many OSDs as we can w/out downing PGs
959
960 most_killable = min(len(self.live_osds) - minup, m)
961 self.log("chose to kill {n} OSDs".format(n=most_killable))
962 for i in range(1, most_killable):
963 self.kill_osd(mark_out=True)
964 time.sleep(10)
965 # try a few times since there might be a concurrent pool
966 # creation or deletion
967 with safe_while(
968 sleep=5, tries=5,
969 action='check for active or peered') as proceed:
970 while proceed():
971 if self.ceph_manager.all_active_or_peered():
972 break
973 self.log('not all PGs are active or peered')
974 else: # chose to revive OSDs, bring up a random fraction of the dead ones
975 self.log("chose to revive osds")
976 for i in range(1, int(rand_val * len(self.dead_osds))):
977 self.revive_osd(i)
978
979 # let PGs repair themselves or our next knockout might kill one
980 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
981
982 # / while not self.stopping
983 self.all_up_in()
984
985 self.ceph_manager.wait_for_recovery(
986 timeout=self.config.get('timeout')
987 )
988
989 def inject_pause(self, conf_key, duration, check_after, should_be_down):
990 """
991 Pause injection testing. Check for osd being down when finished.
992 """
993 the_one = random.choice(self.live_osds)
994 self.log("inject_pause on {osd}".format(osd=the_one))
995 self.log(
996 "Testing {key} pause injection for duration {duration}".format(
997 key=conf_key,
998 duration=duration
999 ))
1000 self.log(
1001 "Checking after {after}, should_be_down={shouldbedown}".format(
1002 after=check_after,
1003 shouldbedown=should_be_down
1004 ))
1005 self.ceph_manager.set_config(the_one, **{conf_key: duration})
1006 if not should_be_down:
1007 return
1008 time.sleep(check_after)
1009 status = self.ceph_manager.get_osd_status()
1010 assert the_one in status['down']
1011 time.sleep(duration - check_after + 20)
1012 status = self.ceph_manager.get_osd_status()
1013 assert not the_one in status['down']
1014
1015 def test_backfill_full(self):
1016 """
1017 Test backfills stopping when the replica fills up.
1018
1019 First, use injectfull admin command to simulate a now full
1020 osd by setting it to 0 on all of the OSDs.
1021
1022 Second, on a random subset, set
1023 osd_debug_skip_full_check_in_backfill_reservation to force
1024 the more complicated check in do_scan to be exercised.
1025
1026 Then, verify that all backfillings stop.
1027 """
1028 self.log("injecting backfill full")
1029 for i in self.live_osds:
1030 self.ceph_manager.set_config(
1031 i,
1032 osd_debug_skip_full_check_in_backfill_reservation=
1033 random.choice(['false', 'true']))
1034 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
1035 check_status=True, timeout=30, stdout=DEVNULL)
1036 for i in range(30):
1037 status = self.ceph_manager.compile_pg_status()
1038 if 'backfilling' not in status.keys():
1039 break
1040 self.log(
1041 "waiting for {still_going} backfillings".format(
1042 still_going=status.get('backfilling')))
1043 time.sleep(1)
1044 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
1045 for i in self.live_osds:
1046 self.ceph_manager.set_config(
1047 i,
1048 osd_debug_skip_full_check_in_backfill_reservation='false')
1049 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
1050 check_status=True, timeout=30, stdout=DEVNULL)
1051
1052
1053 def generate_random_sharding(self):
1054 prefixes = [
1055 'm','O','P','L'
1056 ]
1057 new_sharding = ''
1058 for prefix in prefixes:
1059 choose = random.choice([False, True])
1060 if not choose:
1061 continue
1062 if new_sharding != '':
1063 new_sharding = new_sharding + ' '
1064 columns = random.randint(1, 5)
1065 do_hash = random.choice([False, True])
1066 if do_hash:
1067 low_hash = random.choice([0, 5, 8])
1068 do_high_hash = random.choice([False, True])
1069 if do_high_hash:
1070 high_hash = random.choice([8, 16, 30]) + low_hash
1071 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
1072 else:
1073 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
1074 else:
1075 if columns == 1:
1076 new_sharding = new_sharding + prefix
1077 else:
1078 new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
1079 return new_sharding
1080
1081 def test_bluestore_reshard_action(self):
1082 """
1083 Test if resharding of bluestore works properly.
1084 If bluestore is not used, or bluestore is in version that
1085 does not support sharding, skip.
1086 """
1087
1088 osd = random.choice(self.dead_osds)
1089 remote = self.ceph_manager.find_remote('osd', osd)
1090 FSPATH = self.ceph_manager.get_filepath()
1091
1092 prefix = [
1093 '--no-mon-config',
1094 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1095 '--log-level=10',
1096 '--path', FSPATH.format(id=osd)
1097 ]
1098
1099 # sanity check if bluestore-tool accessible
1100 self.log('checking if target objectstore is bluestore on osd.%s' % osd)
1101 cmd = prefix + [
1102 'show-label'
1103 ]
1104 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1105 if proc.exitstatus != 0:
1106 raise Exception("ceph-bluestore-tool access failed.")
1107
1108 # check if sharding is possible
1109 self.log('checking if target bluestore supports sharding on osd.%s' % osd)
1110 cmd = prefix + [
1111 'show-sharding'
1112 ]
1113 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1114 if proc.exitstatus != 0:
1115 self.log("Unable to test resharding, "
1116 "ceph-bluestore-tool does not support it.")
1117 return
1118
1119 # now go for reshard to something else
1120 self.log('applying new sharding to bluestore on osd.%s' % osd)
1121 new_sharding = self.config.get('bluestore_new_sharding','random')
1122
1123 if new_sharding == 'random':
1124 self.log('generate random sharding')
1125 new_sharding = self.generate_random_sharding()
1126
1127 self.log("applying new sharding: " + new_sharding)
1128 cmd = prefix + [
1129 '--sharding', new_sharding,
1130 'reshard'
1131 ]
1132 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1133 if proc.exitstatus != 0:
1134 raise Exception("ceph-bluestore-tool resharding failed.")
1135
1136 # now do fsck to
1137 self.log('running fsck to verify new sharding on osd.%s' % osd)
1138 cmd = prefix + [
1139 'fsck'
1140 ]
1141 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1142 if proc.exitstatus != 0:
1143 raise Exception("ceph-bluestore-tool fsck failed.")
1144 self.log('resharding successfully completed')
1145
1146 def test_bluestore_reshard(self):
1147 """
1148 1) kills an osd
1149 2) reshards bluestore on killed osd
1150 3) revives the osd
1151 """
1152 self.log('test_bluestore_reshard started')
1153 self.kill_osd(mark_down=True, mark_out=True)
1154 self.test_bluestore_reshard_action()
1155 self.revive_osd()
1156 self.log('test_bluestore_reshard completed')
1157
1158
1159 def test_map_discontinuity(self):
1160 """
1161 1) Allows the osds to recover
1162 2) kills an osd
1163 3) allows the remaining osds to recover
1164 4) waits for some time
1165 5) revives the osd
1166 This sequence should cause the revived osd to have to handle
1167 a map gap since the mons would have trimmed
1168 """
1169 while len(self.in_osds) < (self.minin + 1):
1170 self.in_osd()
1171 self.log("Waiting for recovery")
1172 self.ceph_manager.wait_for_all_osds_up(
1173 timeout=self.config.get('timeout')
1174 )
1175 # now we wait 20s for the pg status to change, if it takes longer,
1176 # the test *should* fail!
1177 time.sleep(20)
1178 self.ceph_manager.wait_for_clean(
1179 timeout=self.config.get('timeout')
1180 )
1181
1182 # now we wait 20s for the backfill replicas to hear about the clean
1183 time.sleep(20)
1184 self.log("Recovered, killing an osd")
1185 self.kill_osd(mark_down=True, mark_out=True)
1186 self.log("Waiting for clean again")
1187 self.ceph_manager.wait_for_clean(
1188 timeout=self.config.get('timeout')
1189 )
1190 self.log("Waiting for trim")
1191 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
1192 self.revive_osd()
1193
1194 def choose_action(self):
1195 """
1196 Random action selector.
1197 """
1198 chance_down = self.config.get('chance_down', 0.4)
1199 _ = self.config.get('chance_test_min_size', 0)
1200 chance_test_backfill_full = \
1201 self.config.get('chance_test_backfill_full', 0)
1202 if isinstance(chance_down, int):
1203 chance_down = float(chance_down) / 100
1204 minin = self.minin
1205 minout = int(self.config.get("min_out", 0))
1206 minlive = int(self.config.get("min_live", 2))
1207 mindead = int(self.config.get("min_dead", 0))
1208
1209 self.log('choose_action: min_in %d min_out '
1210 '%d min_live %d min_dead %d' %
1211 (minin, minout, minlive, mindead))
1212 actions = []
1213 if len(self.in_osds) > minin:
1214 actions.append((self.out_osd, 1.0,))
1215 if len(self.live_osds) > minlive and chance_down > 0:
1216 actions.append((self.kill_osd, chance_down,))
1217 if len(self.out_osds) > minout:
1218 actions.append((self.in_osd, 1.7,))
1219 if len(self.dead_osds) > mindead:
1220 actions.append((self.revive_osd, 1.0,))
1221 if self.config.get('thrash_primary_affinity', True):
1222 actions.append((self.primary_affinity, 1.0,))
1223 actions.append((self.reweight_osd_or_by_util,
1224 self.config.get('reweight_osd', .5),))
1225 actions.append((self.grow_pool,
1226 self.config.get('chance_pgnum_grow', 0),))
1227 actions.append((self.shrink_pool,
1228 self.config.get('chance_pgnum_shrink', 0),))
1229 actions.append((self.fix_pgp_num,
1230 self.config.get('chance_pgpnum_fix', 0),))
1231 actions.append((self.test_pool_min_size,
1232 self.config.get('chance_test_min_size', 0),))
1233 actions.append((self.test_backfill_full,
1234 chance_test_backfill_full,))
1235 if self.chance_thrash_cluster_full > 0:
1236 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1237 if self.chance_thrash_pg_upmap > 0:
1238 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1239 if self.chance_thrash_pg_upmap_items > 0:
1240 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
1241 if self.chance_force_recovery > 0:
1242 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
1243
1244 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1245 for scenario in [
1246 (lambda:
1247 self.inject_pause(key,
1248 self.config.get('pause_short', 3),
1249 0,
1250 False),
1251 self.config.get('chance_inject_pause_short', 1),),
1252 (lambda:
1253 self.inject_pause(key,
1254 self.config.get('pause_long', 80),
1255 self.config.get('pause_check_after', 70),
1256 True),
1257 self.config.get('chance_inject_pause_long', 0),)]:
1258 actions.append(scenario)
1259
1260 # only consider resharding if objectstore is bluestore
1261 cluster_name = self.ceph_manager.cluster
1262 cluster = self.ceph_manager.ctx.ceph[cluster_name]
1263 if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1264 actions.append((self.test_bluestore_reshard,
1265 self.config.get('chance_bluestore_reshard', 0),))
1266
1267 total = sum([y for (x, y) in actions])
1268 val = random.uniform(0, total)
1269 for (action, prob) in actions:
1270 if val < prob:
1271 return action
1272 val -= prob
1273 return None
1274
1275 def do_thrash(self):
1276 """
1277 _do_thrash() wrapper.
1278 """
1279 try:
1280 self._do_thrash()
1281 except Exception as e:
1282 # See _run exception comment for MDSThrasher
1283 self.set_thrasher_exception(e)
1284 self.logger.exception("exception:")
1285 # Allow successful completion so gevent doesn't see an exception.
1286 # The DaemonWatchdog will observe the error and tear down the test.
1287
1288 @log_exc
1289 def do_sighup(self):
1290 """
1291 Loops and sends signal.SIGHUP to a random live osd.
1292
1293 Loop delay is controlled by the config value sighup_delay.
1294 """
1295 delay = float(self.sighup_delay)
1296 self.log("starting do_sighup with a delay of {0}".format(delay))
1297 while not self.stopping:
1298 osd = random.choice(self.live_osds)
1299 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1300 time.sleep(delay)
1301
1302 @log_exc
1303 def do_optrack_toggle(self):
1304 """
1305 Loops and toggle op tracking to all osds.
1306
1307 Loop delay is controlled by the config value optrack_toggle_delay.
1308 """
1309 delay = float(self.optrack_toggle_delay)
1310 osd_state = "true"
1311 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1312 while not self.stopping:
1313 if osd_state == "true":
1314 osd_state = "false"
1315 else:
1316 osd_state = "true"
1317 try:
1318 self.ceph_manager.inject_args('osd', '*',
1319 'osd_enable_op_tracker',
1320 osd_state)
1321 except CommandFailedError:
1322 self.log('Failed to tell all osds, ignoring')
1323 gevent.sleep(delay)
1324
1325 @log_exc
1326 def do_dump_ops(self):
1327 """
1328 Loops and does op dumps on all osds
1329 """
1330 self.log("starting do_dump_ops")
1331 while not self.stopping:
1332 for osd in self.live_osds:
1333 # Ignore errors because live_osds is in flux
1334 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1335 check_status=False, timeout=30, stdout=DEVNULL)
1336 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1337 check_status=False, timeout=30, stdout=DEVNULL)
1338 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1339 check_status=False, timeout=30, stdout=DEVNULL)
1340 gevent.sleep(0)
1341
1342 @log_exc
1343 def do_noscrub_toggle(self):
1344 """
1345 Loops and toggle noscrub flags
1346
1347 Loop delay is controlled by the config value noscrub_toggle_delay.
1348 """
1349 delay = float(self.noscrub_toggle_delay)
1350 scrub_state = "none"
1351 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1352 while not self.stopping:
1353 if scrub_state == "none":
1354 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1355 scrub_state = "noscrub"
1356 elif scrub_state == "noscrub":
1357 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1358 scrub_state = "both"
1359 elif scrub_state == "both":
1360 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1361 scrub_state = "nodeep-scrub"
1362 else:
1363 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1364 scrub_state = "none"
1365 gevent.sleep(delay)
1366 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1367 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1368
1369 @log_exc
1370 def _do_thrash(self):
1371 """
1372 Loop to select random actions to thrash ceph manager with.
1373 """
1374 cleanint = self.config.get("clean_interval", 60)
1375 scrubint = self.config.get("scrub_interval", -1)
1376 maxdead = self.config.get("max_dead", 0)
1377 delay = self.config.get("op_delay", 5)
1378 self.rerrosd = self.live_osds[0]
1379 if self.random_eio > 0:
1380 self.ceph_manager.inject_args('osd', self.rerrosd,
1381 'filestore_debug_random_read_err',
1382 self.random_eio)
1383 self.ceph_manager.inject_args('osd', self.rerrosd,
1384 'bluestore_debug_random_read_err',
1385 self.random_eio)
1386 self.log("starting do_thrash")
1387 while not self.stopping:
1388 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1389 "out_osds: ", self.out_osds,
1390 "dead_osds: ", self.dead_osds,
1391 "live_osds: ", self.live_osds]]
1392 self.log(" ".join(to_log))
1393 if random.uniform(0, 1) < (float(delay) / cleanint):
1394 while len(self.dead_osds) > maxdead:
1395 self.revive_osd()
1396 for osd in self.in_osds:
1397 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1398 str(osd), str(1))
1399 if random.uniform(0, 1) < float(
1400 self.config.get('chance_test_map_discontinuity', 0)) \
1401 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1402 self.test_map_discontinuity()
1403 else:
1404 self.ceph_manager.wait_for_recovery(
1405 timeout=self.config.get('timeout')
1406 )
1407 time.sleep(self.clean_wait)
1408 if scrubint > 0:
1409 if random.uniform(0, 1) < (float(delay) / scrubint):
1410 self.log('Scrubbing while thrashing being performed')
1411 Scrubber(self.ceph_manager, self.config)
1412 self.choose_action()()
1413 time.sleep(delay)
1414 self.all_up()
1415 if self.random_eio > 0:
1416 self.ceph_manager.inject_args('osd', self.rerrosd,
1417 'filestore_debug_random_read_err', '0.0')
1418 self.ceph_manager.inject_args('osd', self.rerrosd,
1419 'bluestore_debug_random_read_err', '0.0')
1420 for pool in list(self.pools_to_fix_pgp_num):
1421 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1422 self.fix_pgp_num(pool)
1423 self.pools_to_fix_pgp_num.clear()
1424 for service, opt, saved_value in self.saved_options:
1425 self.ceph_manager.inject_args(service, '*', opt, saved_value)
1426 self.saved_options = []
1427 self.all_up_in()
1428
1429
1430 class ObjectStoreTool:
1431
1432 def __init__(self, manager, pool, **kwargs):
1433 self.manager = manager
1434 self.pool = pool
1435 self.osd = kwargs.get('osd', None)
1436 self.object_name = kwargs.get('object_name', None)
1437 self.do_revive = kwargs.get('do_revive', True)
1438 if self.osd and self.pool and self.object_name:
1439 if self.osd == "primary":
1440 self.osd = self.manager.get_object_primary(self.pool,
1441 self.object_name)
1442 assert self.osd is not None
1443 if self.object_name:
1444 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1445 self.object_name,
1446 self.osd)
1447 self.remote = next(iter(self.manager.ctx.\
1448 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
1449 path = self.manager.get_filepath().format(id=self.osd)
1450 self.paths = ("--data-path {path} --journal-path {path}/journal".
1451 format(path=path))
1452
1453 def build_cmd(self, options, args, stdin):
1454 lines = []
1455 if self.object_name:
1456 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1457 "{paths} --pgid {pgid} --op list |"
1458 "grep '\"oid\":\"{name}\"')".
1459 format(paths=self.paths,
1460 pgid=self.pgid,
1461 name=self.object_name))
1462 args = '"$object" ' + args
1463 options += " --pgid {pgid}".format(pgid=self.pgid)
1464 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1465 format(paths=self.paths,
1466 args=args,
1467 options=options))
1468 if stdin:
1469 cmd = ("echo {payload} | base64 --decode | {cmd}".
1470 format(payload=base64.encode(stdin),
1471 cmd=cmd))
1472 lines.append(cmd)
1473 return "\n".join(lines)
1474
1475 def run(self, options, args):
1476 self.manager.kill_osd(self.osd)
1477 cmd = self.build_cmd(options, args, None)
1478 self.manager.log(cmd)
1479 try:
1480 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1481 check_status=False,
1482 stdout=BytesIO(),
1483 stderr=BytesIO())
1484 proc.wait()
1485 if proc.exitstatus != 0:
1486 self.manager.log("failed with " + str(proc.exitstatus))
1487 error = proc.stdout.getvalue().decode() + " " + \
1488 proc.stderr.getvalue().decode()
1489 raise Exception(error)
1490 finally:
1491 if self.do_revive:
1492 self.manager.revive_osd(self.osd)
1493 self.manager.wait_till_osd_is_up(self.osd, 300)
1494
1495
1496 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1497 # the same name.
1498 class CephManager:
1499 """
1500 Ceph manager object.
1501 Contains several local functions that form a bulk of this module.
1502
1503 :param controller: the remote machine where the Ceph commands should be
1504 executed
1505 :param ctx: the cluster context
1506 :param config: path to Ceph config file
1507 :param logger: for logging messages
1508 :param cluster: name of the Ceph cluster
1509 """
1510
1511 def __init__(self, controller, ctx=None, config=None, logger=None,
1512 cluster='ceph', cephadm=False, rook=False) -> None:
1513 self.lock = threading.RLock()
1514 self.ctx = ctx
1515 self.config = config
1516 self.controller = controller
1517 self.next_pool_id = 0
1518 self.cluster = cluster
1519
1520 if (logger):
1521 self.log = lambda x: logger.info(x)
1522 else:
1523 def tmp(x):
1524 """
1525 implement log behavior.
1526 """
1527 print(x)
1528 self.log = tmp
1529
1530 if self.config is None:
1531 self.config = dict()
1532
1533 # NOTE: These variables are meant to be overriden by vstart_runner.py.
1534 self.rook = rook
1535 self.cephadm = cephadm
1536 self.testdir = teuthology.get_testdir(self.ctx)
1537 self.run_cluster_cmd_prefix = [
1538 'sudo', 'adjust-ulimits', 'ceph-coverage',
1539 f'{self.testdir}/archive/coverage', 'timeout', '120', 'ceph',
1540 '--cluster', self.cluster]
1541 self.run_ceph_w_prefix = ['sudo', 'daemon-helper', 'kill', 'ceph',
1542 '--cluster', self.cluster]
1543
1544 pools = self.list_pools()
1545 self.pools = {}
1546 for pool in pools:
1547 # we may race with a pool deletion; ignore failures here
1548 try:
1549 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
1550 except CommandFailedError:
1551 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1552
1553 def ceph(self, cmd, **kwargs):
1554 """
1555 Simple Ceph admin command wrapper around run_cluster_cmd.
1556 """
1557
1558 kwargs.pop('args', None)
1559 args = shlex.split(cmd)
1560 stdout = kwargs.pop('stdout', StringIO())
1561 stderr = kwargs.pop('stderr', StringIO())
1562 return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
1563
1564 def run_cluster_cmd(self, **kwargs):
1565 """
1566 Run a Ceph command and return the object representing the process
1567 for the command.
1568
1569 Accepts arguments same as that of teuthology.orchestra.run.run()
1570 """
1571 if isinstance(kwargs['args'], str):
1572 kwargs['args'] = shlex.split(kwargs['args'])
1573 elif isinstance(kwargs['args'], tuple):
1574 kwargs['args'] = list(kwargs['args'])
1575
1576 if self.cephadm:
1577 return shell(self.ctx, self.cluster, self.controller,
1578 args=['ceph'] + list(kwargs['args']),
1579 stdout=StringIO(),
1580 check_status=kwargs.get('check_status', True))
1581 if self.rook:
1582 return toolbox(self.ctx, self.cluster,
1583 args=['ceph'] + list(kwargs['args']),
1584 stdout=StringIO(),
1585 check_status=kwargs.get('check_status', True))
1586
1587 kwargs['args'] = self.run_cluster_cmd_prefix + kwargs['args']
1588 return self.controller.run(**kwargs)
1589
1590 def raw_cluster_cmd(self, *args, **kwargs) -> str:
1591 """
1592 Start ceph on a raw cluster. Return count
1593 """
1594 if kwargs.get('args') is None and args:
1595 kwargs['args'] = args
1596 kwargs['stdout'] = kwargs.pop('stdout', StringIO())
1597 return self.run_cluster_cmd(**kwargs).stdout.getvalue()
1598
1599 def raw_cluster_cmd_result(self, *args, **kwargs):
1600 """
1601 Start ceph on a cluster. Return success or failure information.
1602 """
1603 if kwargs.get('args') is None and args:
1604 kwargs['args'] = args
1605 kwargs['check_status'] = False
1606 return self.run_cluster_cmd(**kwargs).exitstatus
1607
1608 def run_ceph_w(self, watch_channel=None):
1609 """
1610 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1611 and return the RemoteProcess.
1612
1613 :param watch_channel: Specifies the channel to be watched. This can be
1614 'cluster', 'audit', ...
1615 :type watch_channel: str
1616 """
1617 args = self.run_ceph_w_prefix + ['-w']
1618 if watch_channel is not None:
1619 args.append("--watch-channel")
1620 args.append(watch_channel)
1621 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
1622
1623 def get_mon_socks(self):
1624 """
1625 Get monitor sockets.
1626
1627 :return socks: tuple of strings; strings are individual sockets.
1628 """
1629 from json import loads
1630
1631 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1632 socks = []
1633 for mon in output['mons']:
1634 for addrvec_mem in mon['public_addrs']['addrvec']:
1635 socks.append(addrvec_mem['addr'])
1636 return tuple(socks)
1637
1638 def get_msgrv1_mon_socks(self):
1639 """
1640 Get monitor sockets that use msgrv1 to operate.
1641
1642 :return socks: tuple of strings; strings are individual sockets.
1643 """
1644 from json import loads
1645
1646 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1647 socks = []
1648 for mon in output['mons']:
1649 for addrvec_mem in mon['public_addrs']['addrvec']:
1650 if addrvec_mem['type'] == 'v1':
1651 socks.append(addrvec_mem['addr'])
1652 return tuple(socks)
1653
1654 def get_msgrv2_mon_socks(self):
1655 """
1656 Get monitor sockets that use msgrv2 to operate.
1657
1658 :return socks: tuple of strings; strings are individual sockets.
1659 """
1660 from json import loads
1661
1662 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1663 socks = []
1664 for mon in output['mons']:
1665 for addrvec_mem in mon['public_addrs']['addrvec']:
1666 if addrvec_mem['type'] == 'v2':
1667 socks.append(addrvec_mem['addr'])
1668 return tuple(socks)
1669
1670 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1671 """
1672 Flush pg stats from a list of OSD ids, ensuring they are reflected
1673 all the way to the monitor. Luminous and later only.
1674
1675 :param osds: list of OSDs to flush
1676 :param no_wait: list of OSDs not to wait for seq id. by default, we
1677 wait for all specified osds, but some of them could be
1678 moved out of osdmap, so we cannot get their updated
1679 stat seq from monitor anymore. in that case, you need
1680 to pass a blocklist.
1681 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1682 it. (5 min by default)
1683 """
1684 if no_wait is None:
1685 no_wait = []
1686
1687 def flush_one_osd(osd: int, wait_for_mon: int):
1688 need = int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1689 if not wait_for_mon:
1690 return
1691 if osd in no_wait:
1692 return
1693 got = 0
1694 while wait_for_mon > 0:
1695 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
1696 self.log('need seq {need} got {got} for osd.{osd}'.format(
1697 need=need, got=got, osd=osd))
1698 if got >= need:
1699 break
1700 A_WHILE = 1
1701 time.sleep(A_WHILE)
1702 wait_for_mon -= A_WHILE
1703 else:
1704 raise Exception('timed out waiting for mon to be updated with '
1705 'osd.{osd}: {got} < {need}'.
1706 format(osd=osd, got=got, need=need))
1707
1708 with parallel() as p:
1709 for osd in osds:
1710 p.spawn(flush_one_osd, osd, wait_for_mon)
1711
1712 def flush_all_pg_stats(self):
1713 self.flush_pg_stats(range(len(self.get_osd_dump())))
1714
1715 def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
1716 """
1717 Execute a remote rados command.
1718 """
1719 if remote is None:
1720 remote = self.controller
1721
1722 pre = [
1723 'adjust-ulimits',
1724 'ceph-coverage',
1725 f'{self.testdir}/archive/coverage',
1726 'rados',
1727 '--cluster',
1728 self.cluster,
1729 ]
1730 if pool is not None:
1731 pre += ['--pool', pool]
1732 if namespace is not None:
1733 pre += ['--namespace', namespace]
1734 pre.extend(cmd)
1735 proc = remote.run(
1736 args=pre,
1737 wait=True,
1738 **kwargs
1739 )
1740 return proc
1741
1742 def rados_write_objects(self, pool, num_objects, size,
1743 timelimit, threads, cleanup=False):
1744 """
1745 Write rados objects
1746 Threads not used yet.
1747 """
1748 args = [
1749 '--num-objects', num_objects,
1750 '-b', size,
1751 'bench', timelimit,
1752 'write'
1753 ]
1754 if not cleanup:
1755 args.append('--no-cleanup')
1756 return self.do_rados(map(str, args), pool=pool)
1757
1758 def do_put(self, pool, obj, fname, namespace=None):
1759 """
1760 Implement rados put operation
1761 """
1762 args = ['put', obj, fname]
1763 return self.do_rados(
1764 args,
1765 check_status=False,
1766 pool=pool,
1767 namespace=namespace
1768 ).exitstatus
1769
1770 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1771 """
1772 Implement rados get operation
1773 """
1774 args = ['get', obj, fname]
1775 return self.do_rados(
1776 args,
1777 check_status=False,
1778 pool=pool,
1779 namespace=namespace,
1780 ).exitstatus
1781
1782 def do_rm(self, pool, obj, namespace=None):
1783 """
1784 Implement rados rm operation
1785 """
1786 args = ['rm', obj]
1787 return self.do_rados(
1788 args,
1789 check_status=False,
1790 pool=pool,
1791 namespace=namespace
1792 ).exitstatus
1793
1794 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1795 if stdout is None:
1796 stdout = StringIO()
1797 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1798
1799 def find_remote(self, service_type, service_id):
1800 """
1801 Get the Remote for the host where a particular service runs.
1802
1803 :param service_type: 'mds', 'osd', 'client'
1804 :param service_id: The second part of a role, e.g. '0' for
1805 the role 'client.0'
1806 :return: a Remote instance for the host where the
1807 requested role is placed
1808 """
1809 return get_remote(self.ctx, self.cluster,
1810 service_type, service_id)
1811
1812 def admin_socket(self, service_type, service_id,
1813 command, check_status=True, timeout=0, stdout=None):
1814 """
1815 Remotely start up ceph specifying the admin socket
1816 :param command: a list of words to use as the command
1817 to the admin socket
1818 """
1819 if stdout is None:
1820 stdout = StringIO()
1821
1822 remote = self.find_remote(service_type, service_id)
1823
1824 if self.cephadm:
1825 return shell(
1826 self.ctx, self.cluster, remote,
1827 args=[
1828 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1829 ] + command,
1830 stdout=stdout,
1831 wait=True,
1832 check_status=check_status,
1833 )
1834 if self.rook:
1835 assert False, 'not implemented'
1836
1837 args = [
1838 'sudo',
1839 'adjust-ulimits',
1840 'ceph-coverage',
1841 f'{self.testdir}/archive/coverage',
1842 'timeout',
1843 str(timeout),
1844 'ceph',
1845 '--cluster',
1846 self.cluster,
1847 '--admin-daemon',
1848 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1849 cluster=self.cluster,
1850 type=service_type,
1851 id=service_id),
1852 ]
1853 args.extend(command)
1854 return remote.run(
1855 args=args,
1856 stdout=stdout,
1857 wait=True,
1858 check_status=check_status
1859 )
1860
1861 def objectstore_tool(self, pool, options, args, **kwargs):
1862 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1863
1864 def get_pgid(self, pool, pgnum):
1865 """
1866 :param pool: pool name
1867 :param pgnum: pg number
1868 :returns: a string representing this pg.
1869 """
1870 poolnum = self.get_pool_num(pool)
1871 pg_str = "{poolnum}.{pgnum}".format(
1872 poolnum=poolnum,
1873 pgnum=pgnum)
1874 return pg_str
1875
1876 def get_pg_replica(self, pool, pgnum):
1877 """
1878 get replica for pool, pgnum (e.g. (data, 0)->0
1879 """
1880 pg_str = self.get_pgid(pool, pgnum)
1881 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1882 j = json.loads('\n'.join(output.split('\n')[1:]))
1883 return int(j['acting'][-1])
1884 assert False
1885
1886 def wait_for_pg_stats(func):
1887 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1888 # by default, and take the faulty injection in ms into consideration,
1889 # 12 seconds are more than enough
1890 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1891 @wraps(func)
1892 def wrapper(self, *args, **kwargs):
1893 exc = None
1894 for delay in delays:
1895 try:
1896 return func(self, *args, **kwargs)
1897 except AssertionError as e:
1898 time.sleep(delay)
1899 exc = e
1900 raise exc
1901 return wrapper
1902
1903 def get_pg_primary(self, pool, pgnum):
1904 """
1905 get primary for pool, pgnum (e.g. (data, 0)->0
1906 """
1907 pg_str = self.get_pgid(pool, pgnum)
1908 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1909 j = json.loads('\n'.join(output.split('\n')[1:]))
1910 return int(j['acting'][0])
1911 assert False
1912
1913 def get_pool_num(self, pool):
1914 """
1915 get number for pool (e.g., data -> 2)
1916 """
1917 return int(self.get_pool_dump(pool)['pool'])
1918
1919 def list_pools(self):
1920 """
1921 list all pool names
1922 """
1923 osd_dump = self.get_osd_dump_json()
1924 self.log(osd_dump['pools'])
1925 return [str(i['pool_name']) for i in osd_dump['pools']]
1926
1927 def clear_pools(self):
1928 """
1929 remove all pools
1930 """
1931 [self.remove_pool(i) for i in self.list_pools()]
1932
1933 def kick_recovery_wq(self, osdnum):
1934 """
1935 Run kick_recovery_wq on cluster.
1936 """
1937 return self.raw_cluster_cmd(
1938 'tell', "osd.%d" % (int(osdnum),),
1939 'debug',
1940 'kick_recovery_wq',
1941 '0')
1942
1943 def wait_run_admin_socket(self, service_type,
1944 service_id, args=['version'], timeout=75, stdout=None):
1945 """
1946 If osd_admin_socket call succeeds, return. Otherwise wait
1947 five seconds and try again.
1948 """
1949 if stdout is None:
1950 stdout = StringIO()
1951 tries = 0
1952 while True:
1953 proc = self.admin_socket(service_type, service_id,
1954 args, check_status=False, stdout=stdout)
1955 if proc.exitstatus == 0:
1956 return proc
1957 else:
1958 tries += 1
1959 if (tries * 5) > timeout:
1960 raise Exception('timed out waiting for admin_socket '
1961 'to appear after {type}.{id} restart'.
1962 format(type=service_type,
1963 id=service_id))
1964 self.log("waiting on admin_socket for {type}-{id}, "
1965 "{command}".format(type=service_type,
1966 id=service_id,
1967 command=args))
1968 time.sleep(5)
1969
1970 def get_pool_dump(self, pool):
1971 """
1972 get the osd dump part of a pool
1973 """
1974 osd_dump = self.get_osd_dump_json()
1975 for i in osd_dump['pools']:
1976 if i['pool_name'] == pool:
1977 return i
1978 assert False
1979
1980 def get_config(self, service_type, service_id, name):
1981 """
1982 :param node: like 'mon.a'
1983 :param name: the option name
1984 """
1985 proc = self.wait_run_admin_socket(service_type, service_id,
1986 ['config', 'show'])
1987 j = json.loads(proc.stdout.getvalue())
1988 return j[name]
1989
1990 def inject_args(self, service_type, service_id, name, value):
1991 whom = '{0}.{1}'.format(service_type, service_id)
1992 if isinstance(value, bool):
1993 value = 'true' if value else 'false'
1994 opt_arg = '--{name}={value}'.format(name=name, value=value)
1995 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1996
1997 def set_config(self, osdnum, **argdict):
1998 """
1999 :param osdnum: osd number
2000 :param argdict: dictionary containing values to set.
2001 """
2002 for k, v in argdict.items():
2003 self.wait_run_admin_socket(
2004 'osd', osdnum,
2005 ['config', 'set', str(k), str(v)])
2006
2007 def raw_cluster_status(self):
2008 """
2009 Get status from cluster
2010 """
2011 status = self.raw_cluster_cmd('status', '--format=json')
2012 return json.loads(status)
2013
2014 def raw_osd_status(self):
2015 """
2016 Get osd status from cluster
2017 """
2018 return self.raw_cluster_cmd('osd', 'dump')
2019
2020 def get_osd_status(self):
2021 """
2022 Get osd statuses sorted by states that the osds are in.
2023 """
2024 osd_lines = list(filter(
2025 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
2026 self.raw_osd_status().split('\n')))
2027 self.log(osd_lines)
2028 in_osds = [int(i[4:].split()[0])
2029 for i in filter(lambda x: " in " in x, osd_lines)]
2030 out_osds = [int(i[4:].split()[0])
2031 for i in filter(lambda x: " out " in x, osd_lines)]
2032 up_osds = [int(i[4:].split()[0])
2033 for i in filter(lambda x: " up " in x, osd_lines)]
2034 down_osds = [int(i[4:].split()[0])
2035 for i in filter(lambda x: " down " in x, osd_lines)]
2036 dead_osds = [int(x.id_)
2037 for x in filter(lambda x:
2038 not x.running(),
2039 self.ctx.daemons.
2040 iter_daemons_of_role('osd', self.cluster))]
2041 live_osds = [int(x.id_) for x in
2042 filter(lambda x:
2043 x.running(),
2044 self.ctx.daemons.iter_daemons_of_role('osd',
2045 self.cluster))]
2046 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
2047 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
2048 'raw': osd_lines}
2049
2050 def get_num_pgs(self):
2051 """
2052 Check cluster status for the number of pgs
2053 """
2054 status = self.raw_cluster_status()
2055 self.log(status)
2056 return status['pgmap']['num_pgs']
2057
2058 def create_erasure_code_profile(self, profile_name, profile):
2059 """
2060 Create an erasure code profile name that can be used as a parameter
2061 when creating an erasure coded pool.
2062 """
2063 with self.lock:
2064 args = cmd_erasure_code_profile(profile_name, profile)
2065 self.raw_cluster_cmd(*args)
2066
2067 def create_pool_with_unique_name(self, pg_num=16,
2068 erasure_code_profile_name=None,
2069 min_size=None,
2070 erasure_code_use_overwrites=False):
2071 """
2072 Create a pool named unique_pool_X where X is unique.
2073 """
2074 name = ""
2075 with self.lock:
2076 name = "unique_pool_%s" % (str(self.next_pool_id),)
2077 self.next_pool_id += 1
2078 self.create_pool(
2079 name,
2080 pg_num,
2081 erasure_code_profile_name=erasure_code_profile_name,
2082 min_size=min_size,
2083 erasure_code_use_overwrites=erasure_code_use_overwrites)
2084 return name
2085
2086 @contextlib.contextmanager
2087 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
2088 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
2089 yield
2090 self.remove_pool(pool_name)
2091
2092 def create_pool(self, pool_name, pg_num=16,
2093 erasure_code_profile_name=None,
2094 min_size=None,
2095 erasure_code_use_overwrites=False):
2096 """
2097 Create a pool named from the pool_name parameter.
2098 :param pool_name: name of the pool being created.
2099 :param pg_num: initial number of pgs.
2100 :param erasure_code_profile_name: if set and !None create an
2101 erasure coded pool using the profile
2102 :param erasure_code_use_overwrites: if true, allow overwrites
2103 """
2104 with self.lock:
2105 assert isinstance(pool_name, str)
2106 assert isinstance(pg_num, int)
2107 assert pool_name not in self.pools
2108 self.log("creating pool_name %s" % (pool_name,))
2109 if erasure_code_profile_name:
2110 self.raw_cluster_cmd('osd', 'pool', 'create',
2111 pool_name, str(pg_num), str(pg_num),
2112 'erasure', erasure_code_profile_name)
2113 else:
2114 self.raw_cluster_cmd('osd', 'pool', 'create',
2115 pool_name, str(pg_num))
2116 if min_size is not None:
2117 self.raw_cluster_cmd(
2118 'osd', 'pool', 'set', pool_name,
2119 'min_size',
2120 str(min_size))
2121 if erasure_code_use_overwrites:
2122 self.raw_cluster_cmd(
2123 'osd', 'pool', 'set', pool_name,
2124 'allow_ec_overwrites',
2125 'true')
2126 self.raw_cluster_cmd(
2127 'osd', 'pool', 'application', 'enable',
2128 pool_name, 'rados', '--yes-i-really-mean-it',
2129 run.Raw('||'), 'true')
2130 self.pools[pool_name] = pg_num
2131 time.sleep(1)
2132
2133 def add_pool_snap(self, pool_name, snap_name):
2134 """
2135 Add pool snapshot
2136 :param pool_name: name of pool to snapshot
2137 :param snap_name: name of snapshot to take
2138 """
2139 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
2140 str(pool_name), str(snap_name))
2141
2142 def remove_pool_snap(self, pool_name, snap_name):
2143 """
2144 Remove pool snapshot
2145 :param pool_name: name of pool to snapshot
2146 :param snap_name: name of snapshot to remove
2147 """
2148 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2149 str(pool_name), str(snap_name))
2150
2151 def remove_pool(self, pool_name):
2152 """
2153 Remove the indicated pool
2154 :param pool_name: Pool to be removed
2155 """
2156 with self.lock:
2157 assert isinstance(pool_name, str)
2158 assert pool_name in self.pools
2159 self.log("removing pool_name %s" % (pool_name,))
2160 del self.pools[pool_name]
2161 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
2162 "--yes-i-really-really-mean-it")
2163
2164 def get_pool(self):
2165 """
2166 Pick a random pool
2167 """
2168 with self.lock:
2169 if self.pools:
2170 return random.sample(self.pools.keys(), 1)[0]
2171
2172 def get_pool_pg_num(self, pool_name):
2173 """
2174 Return the number of pgs in the pool specified.
2175 """
2176 with self.lock:
2177 assert isinstance(pool_name, str)
2178 if pool_name in self.pools:
2179 return self.pools[pool_name]
2180 return 0
2181
2182 def get_pool_property(self, pool_name, prop):
2183 """
2184 :param pool_name: pool
2185 :param prop: property to be checked.
2186 :returns: property as string
2187 """
2188 with self.lock:
2189 assert isinstance(pool_name, str)
2190 assert isinstance(prop, str)
2191 output = self.raw_cluster_cmd(
2192 'osd',
2193 'pool',
2194 'get',
2195 pool_name,
2196 prop)
2197 return output.split()[1]
2198
2199 def get_pool_int_property(self, pool_name, prop):
2200 return int(self.get_pool_property(pool_name, prop))
2201
2202 def set_pool_property(self, pool_name, prop, val):
2203 """
2204 :param pool_name: pool
2205 :param prop: property to be set.
2206 :param val: value to set.
2207
2208 This routine retries if set operation fails.
2209 """
2210 with self.lock:
2211 assert isinstance(pool_name, str)
2212 assert isinstance(prop, str)
2213 assert isinstance(val, int)
2214 tries = 0
2215 while True:
2216 r = self.raw_cluster_cmd_result(
2217 'osd',
2218 'pool',
2219 'set',
2220 pool_name,
2221 prop,
2222 str(val))
2223 if r != 11: # EAGAIN
2224 break
2225 tries += 1
2226 if tries > 50:
2227 raise Exception('timed out getting EAGAIN '
2228 'when setting pool property %s %s = %s' %
2229 (pool_name, prop, val))
2230 self.log('got EAGAIN setting pool property, '
2231 'waiting a few seconds...')
2232 time.sleep(2)
2233
2234 def expand_pool(self, pool_name, by, max_pgs):
2235 """
2236 Increase the number of pgs in a pool
2237 """
2238 with self.lock:
2239 assert isinstance(pool_name, str)
2240 assert isinstance(by, int)
2241 assert pool_name in self.pools
2242 if self.get_num_creating() > 0:
2243 return False
2244 if (self.pools[pool_name] + by) > max_pgs:
2245 return False
2246 self.log("increase pool size by %d" % (by,))
2247 new_pg_num = self.pools[pool_name] + by
2248 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2249 self.pools[pool_name] = new_pg_num
2250 return True
2251
2252 def contract_pool(self, pool_name, by, min_pgs):
2253 """
2254 Decrease the number of pgs in a pool
2255 """
2256 with self.lock:
2257 self.log('contract_pool %s by %s min %s' % (
2258 pool_name, str(by), str(min_pgs)))
2259 assert isinstance(pool_name, str)
2260 assert isinstance(by, int)
2261 assert pool_name in self.pools
2262 if self.get_num_creating() > 0:
2263 self.log('too many creating')
2264 return False
2265 proj = self.pools[pool_name] - by
2266 if proj < min_pgs:
2267 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2268 return False
2269 self.log("decrease pool size by %d" % (by,))
2270 new_pg_num = self.pools[pool_name] - by
2271 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2272 self.pools[pool_name] = new_pg_num
2273 return True
2274
2275 def stop_pg_num_changes(self):
2276 """
2277 Reset all pg_num_targets back to pg_num, canceling splits and merges
2278 """
2279 self.log('Canceling any pending splits or merges...')
2280 osd_dump = self.get_osd_dump_json()
2281 try:
2282 for pool in osd_dump['pools']:
2283 if pool['pg_num'] != pool['pg_num_target']:
2284 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2285 (pool['pool_name'], pool['pool'],
2286 pool['pg_num_target'],
2287 pool['pg_num']))
2288 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2289 'pg_num', str(pool['pg_num']))
2290 except KeyError:
2291 # we don't support pg_num_target before nautilus
2292 pass
2293
2294 def set_pool_pgpnum(self, pool_name, force):
2295 """
2296 Set pgpnum property of pool_name pool.
2297 """
2298 with self.lock:
2299 assert isinstance(pool_name, str)
2300 assert pool_name in self.pools
2301 if not force and self.get_num_creating() > 0:
2302 return False
2303 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2304 return True
2305
2306 def list_pg_unfound(self, pgid):
2307 """
2308 return list of unfound pgs with the id specified
2309 """
2310 r = None
2311 offset = {}
2312 while True:
2313 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
2314 json.dumps(offset))
2315 j = json.loads(out)
2316 if r is None:
2317 r = j
2318 else:
2319 r['objects'].extend(j['objects'])
2320 if not 'more' in j:
2321 break
2322 if j['more'] == 0:
2323 break
2324 offset = j['objects'][-1]['oid']
2325 if 'more' in r:
2326 del r['more']
2327 return r
2328
2329 def get_pg_stats(self):
2330 """
2331 Dump the cluster and get pg stats
2332 """
2333 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2334 j = json.loads('\n'.join(out.split('\n')[1:]))
2335 try:
2336 return j['pg_map']['pg_stats']
2337 except KeyError:
2338 return j['pg_stats']
2339
2340 def get_osd_df(self, osdid):
2341 """
2342 Get the osd df stats
2343 """
2344 out = self.raw_cluster_cmd('osd', 'df', 'name', 'osd.{}'.format(osdid),
2345 '--format=json')
2346 j = json.loads('\n'.join(out.split('\n')[1:]))
2347 return j['nodes'][0]
2348
2349 def get_pool_df(self, name):
2350 """
2351 Get the pool df stats
2352 """
2353 out = self.raw_cluster_cmd('df', 'detail', '--format=json')
2354 j = json.loads('\n'.join(out.split('\n')[1:]))
2355 return next((p['stats'] for p in j['pools'] if p['name'] == name),
2356 None)
2357
2358 def get_pgids_to_force(self, backfill):
2359 """
2360 Return the randomized list of PGs that can have their recovery/backfill forced
2361 """
2362 j = self.get_pg_stats();
2363 pgids = []
2364 if backfill:
2365 wanted = ['degraded', 'backfilling', 'backfill_wait']
2366 else:
2367 wanted = ['recovering', 'degraded', 'recovery_wait']
2368 for pg in j:
2369 status = pg['state'].split('+')
2370 for t in wanted:
2371 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2372 pgids.append(pg['pgid'])
2373 break
2374 return pgids
2375
2376 def get_pgids_to_cancel_force(self, backfill):
2377 """
2378 Return the randomized list of PGs whose recovery/backfill priority is forced
2379 """
2380 j = self.get_pg_stats();
2381 pgids = []
2382 if backfill:
2383 wanted = 'forced_backfill'
2384 else:
2385 wanted = 'forced_recovery'
2386 for pg in j:
2387 status = pg['state'].split('+')
2388 if wanted in status and random.random() > 0.5:
2389 pgids.append(pg['pgid'])
2390 return pgids
2391
2392 def compile_pg_status(self):
2393 """
2394 Return a histogram of pg state values
2395 """
2396 ret = {}
2397 j = self.get_pg_stats()
2398 for pg in j:
2399 for status in pg['state'].split('+'):
2400 if status not in ret:
2401 ret[status] = 0
2402 ret[status] += 1
2403 return ret
2404
2405 @wait_for_pg_stats # type: ignore
2406 def with_pg_state(self, pool, pgnum, check):
2407 pgstr = self.get_pgid(pool, pgnum)
2408 stats = self.get_single_pg_stats(pgstr)
2409 assert(check(stats['state']))
2410
2411 @wait_for_pg_stats # type: ignore
2412 def with_pg(self, pool, pgnum, check):
2413 pgstr = self.get_pgid(pool, pgnum)
2414 stats = self.get_single_pg_stats(pgstr)
2415 return check(stats)
2416
2417 def get_last_scrub_stamp(self, pool, pgnum):
2418 """
2419 Get the timestamp of the last scrub.
2420 """
2421 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2422 return stats["last_scrub_stamp"]
2423
2424 def do_pg_scrub(self, pool, pgnum, stype):
2425 """
2426 Scrub pg and wait for scrubbing to finish
2427 """
2428 init = self.get_last_scrub_stamp(pool, pgnum)
2429 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2430 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2431 SLEEP_TIME = 10
2432 timer = 0
2433 while init == self.get_last_scrub_stamp(pool, pgnum):
2434 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2435 self.log("waiting for scrub type %s" % (stype,))
2436 if (timer % RESEND_TIMEOUT) == 0:
2437 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2438 # The first time in this loop is the actual request
2439 if timer != 0 and stype == "repair":
2440 self.log("WARNING: Resubmitted a non-idempotent repair")
2441 time.sleep(SLEEP_TIME)
2442 timer += SLEEP_TIME
2443
2444 def wait_snap_trimming_complete(self, pool):
2445 """
2446 Wait for snap trimming on pool to end
2447 """
2448 POLL_PERIOD = 10
2449 FATAL_TIMEOUT = 600
2450 start = time.time()
2451 poolnum = self.get_pool_num(pool)
2452 poolnumstr = "%s." % (poolnum,)
2453 while (True):
2454 now = time.time()
2455 if (now - start) > FATAL_TIMEOUT:
2456 assert (now - start) < FATAL_TIMEOUT, \
2457 'failed to complete snap trimming before timeout'
2458 all_stats = self.get_pg_stats()
2459 trimming = False
2460 for pg in all_stats:
2461 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2462 self.log("pg {pg} in trimming, state: {state}".format(
2463 pg=pg['pgid'],
2464 state=pg['state']))
2465 trimming = True
2466 if not trimming:
2467 break
2468 self.log("{pool} still trimming, waiting".format(pool=pool))
2469 time.sleep(POLL_PERIOD)
2470
2471 def get_single_pg_stats(self, pgid):
2472 """
2473 Return pg for the pgid specified.
2474 """
2475 all_stats = self.get_pg_stats()
2476
2477 for pg in all_stats:
2478 if pg['pgid'] == pgid:
2479 return pg
2480
2481 return None
2482
2483 def get_object_pg_with_shard(self, pool, name, osdid):
2484 """
2485 """
2486 pool_dump = self.get_pool_dump(pool)
2487 object_map = self.get_object_map(pool, name)
2488 if pool_dump["type"] == PoolType.ERASURE_CODED:
2489 shard = object_map['acting'].index(osdid)
2490 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2491 shard=shard)
2492 else:
2493 return object_map['pgid']
2494
2495 def get_object_primary(self, pool, name):
2496 """
2497 """
2498 object_map = self.get_object_map(pool, name)
2499 return object_map['acting_primary']
2500
2501 def get_object_map(self, pool, name):
2502 """
2503 osd map --format=json converted to a python object
2504 :returns: the python object
2505 """
2506 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2507 return json.loads('\n'.join(out.split('\n')[1:]))
2508
2509 def get_osd_dump_json(self):
2510 """
2511 osd dump --format=json converted to a python object
2512 :returns: the python object
2513 """
2514 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2515 return json.loads('\n'.join(out.split('\n')[1:]))
2516
2517 def get_osd_dump(self):
2518 """
2519 Dump osds
2520 :returns: all osds
2521 """
2522 return self.get_osd_dump_json()['osds']
2523
2524 def get_osd_metadata(self):
2525 """
2526 osd metadata --format=json converted to a python object
2527 :returns: the python object containing osd metadata information
2528 """
2529 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2530 return json.loads('\n'.join(out.split('\n')[1:]))
2531
2532 def get_mgr_dump(self):
2533 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2534 return json.loads(out)
2535
2536 def get_stuck_pgs(self, type_, threshold):
2537 """
2538 :returns: stuck pg information from the cluster
2539 """
2540 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2541 '--format=json')
2542 return json.loads(out).get('stuck_pg_stats',[])
2543
2544 def get_num_unfound_objects(self):
2545 """
2546 Check cluster status to get the number of unfound objects
2547 """
2548 status = self.raw_cluster_status()
2549 self.log(status)
2550 return status['pgmap'].get('unfound_objects', 0)
2551
2552 def get_num_creating(self):
2553 """
2554 Find the number of pgs in creating mode.
2555 """
2556 pgs = self.get_pg_stats()
2557 num = 0
2558 for pg in pgs:
2559 if 'creating' in pg['state']:
2560 num += 1
2561 return num
2562
2563 def get_num_active_clean(self):
2564 """
2565 Find the number of active and clean pgs.
2566 """
2567 pgs = self.get_pg_stats()
2568 return self._get_num_active_clean(pgs)
2569
2570 def _get_num_active_clean(self, pgs):
2571 num = 0
2572 for pg in pgs:
2573 if (pg['state'].count('active') and
2574 pg['state'].count('clean') and
2575 not pg['state'].count('stale')):
2576 num += 1
2577 return num
2578
2579 def get_num_active_recovered(self):
2580 """
2581 Find the number of active and recovered pgs.
2582 """
2583 pgs = self.get_pg_stats()
2584 return self._get_num_active_recovered(pgs)
2585
2586 def _get_num_active_recovered(self, pgs):
2587 num = 0
2588 for pg in pgs:
2589 if (pg['state'].count('active') and
2590 not pg['state'].count('recover') and
2591 not pg['state'].count('backfilling') and
2592 not pg['state'].count('stale')):
2593 num += 1
2594 return num
2595
2596 def get_is_making_recovery_progress(self):
2597 """
2598 Return whether there is recovery progress discernable in the
2599 raw cluster status
2600 """
2601 status = self.raw_cluster_status()
2602 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2603 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2604 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2605 return kps > 0 or bps > 0 or ops > 0
2606
2607 def get_num_active(self):
2608 """
2609 Find the number of active pgs.
2610 """
2611 pgs = self.get_pg_stats()
2612 return self._get_num_active(pgs)
2613
2614 def _get_num_active(self, pgs):
2615 num = 0
2616 for pg in pgs:
2617 if pg['state'].count('active') and not pg['state'].count('stale'):
2618 num += 1
2619 return num
2620
2621 def get_num_down(self):
2622 """
2623 Find the number of pgs that are down.
2624 """
2625 pgs = self.get_pg_stats()
2626 num = 0
2627 for pg in pgs:
2628 if ((pg['state'].count('down') and not
2629 pg['state'].count('stale')) or
2630 (pg['state'].count('incomplete') and not
2631 pg['state'].count('stale'))):
2632 num += 1
2633 return num
2634
2635 def get_num_active_down(self):
2636 """
2637 Find the number of pgs that are either active or down.
2638 """
2639 pgs = self.get_pg_stats()
2640 return self._get_num_active_down(pgs)
2641
2642 def _get_num_active_down(self, pgs):
2643 num = 0
2644 for pg in pgs:
2645 if ((pg['state'].count('active') and not
2646 pg['state'].count('stale')) or
2647 (pg['state'].count('down') and not
2648 pg['state'].count('stale')) or
2649 (pg['state'].count('incomplete') and not
2650 pg['state'].count('stale'))):
2651 num += 1
2652 return num
2653
2654 def get_num_peered(self):
2655 """
2656 Find the number of PGs that are peered
2657 """
2658 pgs = self.get_pg_stats()
2659 return self._get_num_peered(pgs)
2660
2661 def _get_num_peered(self, pgs):
2662 num = 0
2663 for pg in pgs:
2664 if pg['state'].count('peered') and not pg['state'].count('stale'):
2665 num += 1
2666 return num
2667
2668 def is_clean(self):
2669 """
2670 True if all pgs are clean
2671 """
2672 pgs = self.get_pg_stats()
2673 return self._get_num_active_clean(pgs) == len(pgs)
2674
2675 def is_recovered(self):
2676 """
2677 True if all pgs have recovered
2678 """
2679 pgs = self.get_pg_stats()
2680 return self._get_num_active_recovered(pgs) == len(pgs)
2681
2682 def is_active_or_down(self):
2683 """
2684 True if all pgs are active or down
2685 """
2686 pgs = self.get_pg_stats()
2687 return self._get_num_active_down(pgs) == len(pgs)
2688
2689 def dump_pgs_not_active_clean(self):
2690 """
2691 Dumps all pgs that are not active+clean
2692 """
2693 pgs = self.get_pg_stats()
2694 for pg in pgs:
2695 if pg['state'] != 'active+clean':
2696 self.log('PG %s is not active+clean' % pg['pgid'])
2697 self.log(pg)
2698
2699 def dump_pgs_not_active_down(self):
2700 """
2701 Dumps all pgs that are not active or down
2702 """
2703 pgs = self.get_pg_stats()
2704 for pg in pgs:
2705 if 'active' not in pg['state'] and 'down' not in pg['state']:
2706 self.log('PG %s is not active or down' % pg['pgid'])
2707 self.log(pg)
2708
2709 def dump_pgs_not_active(self):
2710 """
2711 Dumps all pgs that are not active
2712 """
2713 pgs = self.get_pg_stats()
2714 for pg in pgs:
2715 if 'active' not in pg['state']:
2716 self.log('PG %s is not active' % pg['pgid'])
2717 self.log(pg)
2718
2719 def wait_for_clean(self, timeout=1200):
2720 """
2721 Returns true when all pgs are clean.
2722 """
2723 self.log("waiting for clean")
2724 start = time.time()
2725 num_active_clean = self.get_num_active_clean()
2726 while not self.is_clean():
2727 if timeout is not None:
2728 if self.get_is_making_recovery_progress():
2729 self.log("making progress, resetting timeout")
2730 start = time.time()
2731 else:
2732 self.log("no progress seen, keeping timeout for now")
2733 if time.time() - start >= timeout:
2734 self.log('dumping pgs not clean')
2735 self.dump_pgs_not_active_clean()
2736 assert time.time() - start < timeout, \
2737 'wait_for_clean: failed before timeout expired'
2738 cur_active_clean = self.get_num_active_clean()
2739 if cur_active_clean != num_active_clean:
2740 start = time.time()
2741 num_active_clean = cur_active_clean
2742 time.sleep(3)
2743 self.log("clean!")
2744
2745 def are_all_osds_up(self):
2746 """
2747 Returns true if all osds are up.
2748 """
2749 x = self.get_osd_dump()
2750 return (len(x) == sum([(y['up'] > 0) for y in x]))
2751
2752 def wait_for_all_osds_up(self, timeout=None):
2753 """
2754 When this exits, either the timeout has expired, or all
2755 osds are up.
2756 """
2757 self.log("waiting for all up")
2758 start = time.time()
2759 while not self.are_all_osds_up():
2760 if timeout is not None:
2761 assert time.time() - start < timeout, \
2762 'timeout expired in wait_for_all_osds_up'
2763 time.sleep(3)
2764 self.log("all up!")
2765
2766 def pool_exists(self, pool):
2767 if pool in self.list_pools():
2768 return True
2769 return False
2770
2771 def wait_for_pool(self, pool, timeout=300):
2772 """
2773 Wait for a pool to exist
2774 """
2775 self.log('waiting for pool %s to exist' % pool)
2776 start = time.time()
2777 while not self.pool_exists(pool):
2778 if timeout is not None:
2779 assert time.time() - start < timeout, \
2780 'timeout expired in wait_for_pool'
2781 time.sleep(3)
2782
2783 def wait_for_pools(self, pools):
2784 for pool in pools:
2785 self.wait_for_pool(pool)
2786
2787 def is_mgr_available(self):
2788 x = self.get_mgr_dump()
2789 return x.get('available', False)
2790
2791 def wait_for_mgr_available(self, timeout=None):
2792 self.log("waiting for mgr available")
2793 start = time.time()
2794 while not self.is_mgr_available():
2795 if timeout is not None:
2796 assert time.time() - start < timeout, \
2797 'timeout expired in wait_for_mgr_available'
2798 time.sleep(3)
2799 self.log("mgr available!")
2800
2801 def wait_for_recovery(self, timeout=None):
2802 """
2803 Check peering. When this exists, we have recovered.
2804 """
2805 self.log("waiting for recovery to complete")
2806 start = time.time()
2807 num_active_recovered = self.get_num_active_recovered()
2808 while not self.is_recovered():
2809 now = time.time()
2810 if timeout is not None:
2811 if self.get_is_making_recovery_progress():
2812 self.log("making progress, resetting timeout")
2813 start = time.time()
2814 else:
2815 self.log("no progress seen, keeping timeout for now")
2816 if now - start >= timeout:
2817 if self.is_recovered():
2818 break
2819 self.log('dumping pgs not recovered yet')
2820 self.dump_pgs_not_active_clean()
2821 assert now - start < timeout, \
2822 'wait_for_recovery: failed before timeout expired'
2823 cur_active_recovered = self.get_num_active_recovered()
2824 if cur_active_recovered != num_active_recovered:
2825 start = time.time()
2826 num_active_recovered = cur_active_recovered
2827 time.sleep(3)
2828 self.log("recovered!")
2829
2830 def wait_for_active(self, timeout=None):
2831 """
2832 Check peering. When this exists, we are definitely active
2833 """
2834 self.log("waiting for peering to complete")
2835 start = time.time()
2836 num_active = self.get_num_active()
2837 while not self.is_active():
2838 if timeout is not None:
2839 if time.time() - start >= timeout:
2840 self.log('dumping pgs not active')
2841 self.dump_pgs_not_active()
2842 assert time.time() - start < timeout, \
2843 'wait_for_active: failed before timeout expired'
2844 cur_active = self.get_num_active()
2845 if cur_active != num_active:
2846 start = time.time()
2847 num_active = cur_active
2848 time.sleep(3)
2849 self.log("active!")
2850
2851 def wait_for_active_or_down(self, timeout=None):
2852 """
2853 Check peering. When this exists, we are definitely either
2854 active or down
2855 """
2856 self.log("waiting for peering to complete or become blocked")
2857 start = time.time()
2858 num_active_down = self.get_num_active_down()
2859 while not self.is_active_or_down():
2860 if timeout is not None:
2861 if time.time() - start >= timeout:
2862 self.log('dumping pgs not active or down')
2863 self.dump_pgs_not_active_down()
2864 assert time.time() - start < timeout, \
2865 'wait_for_active_or_down: failed before timeout expired'
2866 cur_active_down = self.get_num_active_down()
2867 if cur_active_down != num_active_down:
2868 start = time.time()
2869 num_active_down = cur_active_down
2870 time.sleep(3)
2871 self.log("active or down!")
2872
2873 def osd_is_up(self, osd):
2874 """
2875 Wrapper for osd check
2876 """
2877 osds = self.get_osd_dump()
2878 return osds[osd]['up'] > 0
2879
2880 def wait_till_osd_is_up(self, osd, timeout=None):
2881 """
2882 Loop waiting for osd.
2883 """
2884 self.log('waiting for osd.%d to be up' % osd)
2885 start = time.time()
2886 while not self.osd_is_up(osd):
2887 if timeout is not None:
2888 assert time.time() - start < timeout, \
2889 'osd.%d failed to come up before timeout expired' % osd
2890 time.sleep(3)
2891 self.log('osd.%d is up' % osd)
2892
2893 def is_active(self):
2894 """
2895 Wrapper to check if all pgs are active
2896 """
2897 return self.get_num_active() == self.get_num_pgs()
2898
2899 def all_active_or_peered(self):
2900 """
2901 Wrapper to check if all PGs are active or peered
2902 """
2903 pgs = self.get_pg_stats()
2904 return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
2905
2906 def wait_till_active(self, timeout=None):
2907 """
2908 Wait until all pgs are active.
2909 """
2910 self.log("waiting till active")
2911 start = time.time()
2912 while not self.is_active():
2913 if timeout is not None:
2914 if time.time() - start >= timeout:
2915 self.log('dumping pgs not active')
2916 self.dump_pgs_not_active()
2917 assert time.time() - start < timeout, \
2918 'wait_till_active: failed before timeout expired'
2919 time.sleep(3)
2920 self.log("active!")
2921
2922 def wait_till_pg_convergence(self, timeout=None):
2923 start = time.time()
2924 old_stats = None
2925 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2926 if osd['in'] and osd['up']]
2927 while True:
2928 # strictly speaking, no need to wait for mon. but due to the
2929 # "ms inject socket failures" setting, the osdmap could be delayed,
2930 # so mgr is likely to ignore the pg-stat messages with pgs serving
2931 # newly created pools which is not yet known by mgr. so, to make sure
2932 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2933 # necessary.
2934 self.flush_pg_stats(active_osds)
2935 new_stats = dict((stat['pgid'], stat['state'])
2936 for stat in self.get_pg_stats())
2937 if old_stats == new_stats:
2938 return old_stats
2939 if timeout is not None:
2940 assert time.time() - start < timeout, \
2941 'failed to reach convergence before %d secs' % timeout
2942 old_stats = new_stats
2943 # longer than mgr_stats_period
2944 time.sleep(5 + 1)
2945
2946 def mark_out_osd(self, osd):
2947 """
2948 Wrapper to mark osd out.
2949 """
2950 self.raw_cluster_cmd('osd', 'out', str(osd))
2951
2952 def kill_osd(self, osd):
2953 """
2954 Kill osds by either power cycling (if indicated by the config)
2955 or by stopping.
2956 """
2957 if self.config.get('powercycle'):
2958 remote = self.find_remote('osd', osd)
2959 self.log('kill_osd on osd.{o} '
2960 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2961 self._assert_ipmi(remote)
2962 remote.console.power_off()
2963 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2964 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2965 self.inject_args(
2966 'osd', osd,
2967 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
2968 try:
2969 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2970 except:
2971 pass
2972 else:
2973 raise RuntimeError('osd.%s did not fail' % osd)
2974 else:
2975 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2976 else:
2977 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2978
2979 @staticmethod
2980 def _assert_ipmi(remote):
2981 assert remote.console.has_ipmi_credentials, (
2982 "powercycling requested but RemoteConsole is not "
2983 "initialized. Check ipmi config.")
2984
2985 def blackhole_kill_osd(self, osd):
2986 """
2987 Stop osd if nothing else works.
2988 """
2989 self.inject_args('osd', osd,
2990 'objectstore-blackhole', True)
2991 time.sleep(2)
2992 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2993
2994 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2995 """
2996 Revive osds by either power cycling (if indicated by the config)
2997 or by restarting.
2998 """
2999 if self.config.get('powercycle'):
3000 remote = self.find_remote('osd', osd)
3001 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
3002 format(o=osd, s=remote.name))
3003 self._assert_ipmi(remote)
3004 remote.console.power_on()
3005 if not remote.console.check_status(300):
3006 raise Exception('Failed to revive osd.{o} via ipmi'.
3007 format(o=osd))
3008 teuthology.reconnect(self.ctx, 60, [remote])
3009 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
3010 self.make_admin_daemon_dir(remote)
3011 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
3012 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
3013
3014 if not skip_admin_check:
3015 # wait for dump_ops_in_flight; this command doesn't appear
3016 # until after the signal handler is installed and it is safe
3017 # to stop the osd again without making valgrind leak checks
3018 # unhappy. see #5924.
3019 self.wait_run_admin_socket('osd', osd,
3020 args=['dump_ops_in_flight'],
3021 timeout=timeout, stdout=DEVNULL)
3022
3023 def mark_down_osd(self, osd):
3024 """
3025 Cluster command wrapper
3026 """
3027 self.raw_cluster_cmd('osd', 'down', str(osd))
3028
3029 def mark_in_osd(self, osd):
3030 """
3031 Cluster command wrapper
3032 """
3033 self.raw_cluster_cmd('osd', 'in', str(osd))
3034
3035 def signal_osd(self, osd, sig, silent=False):
3036 """
3037 Wrapper to local get_daemon call which sends the given
3038 signal to the given osd.
3039 """
3040 self.ctx.daemons.get_daemon('osd', osd,
3041 self.cluster).signal(sig, silent=silent)
3042
3043 ## monitors
3044 def signal_mon(self, mon, sig, silent=False):
3045 """
3046 Wrapper to local get_daemon call
3047 """
3048 self.ctx.daemons.get_daemon('mon', mon,
3049 self.cluster).signal(sig, silent=silent)
3050
3051 def kill_mon(self, mon):
3052 """
3053 Kill the monitor by either power cycling (if the config says so),
3054 or by doing a stop.
3055 """
3056 if self.config.get('powercycle'):
3057 remote = self.find_remote('mon', mon)
3058 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
3059 format(m=mon, s=remote.name))
3060 self._assert_ipmi(remote)
3061 remote.console.power_off()
3062 else:
3063 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
3064
3065 def revive_mon(self, mon):
3066 """
3067 Restart by either power cycling (if the config says so),
3068 or by doing a normal restart.
3069 """
3070 if self.config.get('powercycle'):
3071 remote = self.find_remote('mon', mon)
3072 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
3073 format(m=mon, s=remote.name))
3074 self._assert_ipmi(remote)
3075 remote.console.power_on()
3076 self.make_admin_daemon_dir(remote)
3077 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
3078
3079 def revive_mgr(self, mgr):
3080 """
3081 Restart by either power cycling (if the config says so),
3082 or by doing a normal restart.
3083 """
3084 if self.config.get('powercycle'):
3085 remote = self.find_remote('mgr', mgr)
3086 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3087 format(m=mgr, s=remote.name))
3088 self._assert_ipmi(remote)
3089 remote.console.power_on()
3090 self.make_admin_daemon_dir(remote)
3091 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
3092
3093 def get_mon_status(self, mon):
3094 """
3095 Extract all the monitor status information from the cluster
3096 """
3097 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
3098 return json.loads(out)
3099
3100 def get_mon_quorum(self):
3101 """
3102 Extract monitor quorum information from the cluster
3103 """
3104 out = self.raw_cluster_cmd('quorum_status')
3105 j = json.loads(out)
3106 return j['quorum']
3107
3108 def wait_for_mon_quorum_size(self, size, timeout=300):
3109 """
3110 Loop until quorum size is reached.
3111 """
3112 self.log('waiting for quorum size %d' % size)
3113 sleep = 3
3114 with safe_while(sleep=sleep,
3115 tries=timeout // sleep,
3116 action=f'wait for quorum size {size}') as proceed:
3117 while proceed():
3118 try:
3119 if len(self.get_mon_quorum()) == size:
3120 break
3121 except CommandFailedError as e:
3122 # could fail instea4d of blocked if the rotating key of the
3123 # connected monitor is not updated yet after they form the
3124 # quorum
3125 if e.exitstatus == errno.EACCES:
3126 pass
3127 else:
3128 raise
3129 self.log("quorum is size %d" % size)
3130
3131 def get_mon_health(self, debug=False):
3132 """
3133 Extract all the monitor health information.
3134 """
3135 out = self.raw_cluster_cmd('health', '--format=json')
3136 if debug:
3137 self.log('health:\n{h}'.format(h=out))
3138 return json.loads(out)
3139
3140 def wait_until_healthy(self, timeout=None):
3141 self.log("wait_until_healthy")
3142 start = time.time()
3143 while self.get_mon_health()['status'] != 'HEALTH_OK':
3144 if timeout is not None:
3145 assert time.time() - start < timeout, \
3146 'timeout expired in wait_until_healthy'
3147 time.sleep(3)
3148 self.log("wait_until_healthy done")
3149
3150 def get_filepath(self):
3151 """
3152 Return path to osd data with {id} needing to be replaced
3153 """
3154 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
3155
3156 def make_admin_daemon_dir(self, remote):
3157 """
3158 Create /var/run/ceph directory on remote site.
3159
3160 :param ctx: Context
3161 :param remote: Remote site
3162 """
3163 remote.run(args=['sudo',
3164 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3165
3166 def get_service_task_status(self, service, status_key):
3167 """
3168 Return daemon task status for a given ceph service.
3169
3170 :param service: ceph service (mds, osd, etc...)
3171 :param status_key: matching task status key
3172 """
3173 task_status = {}
3174 status = self.raw_cluster_status()
3175 try:
3176 for k,v in status['servicemap']['services'][service]['daemons'].items():
3177 ts = dict(v).get('task_status', None)
3178 if ts:
3179 task_status[k] = ts[status_key]
3180 except KeyError: # catches missing service and status key
3181 return {}
3182 self.log(task_status)
3183 return task_status
3184
3185 def utility_task(name):
3186 """
3187 Generate ceph_manager subtask corresponding to ceph_manager
3188 method name
3189 """
3190 def task(ctx, config):
3191 if config is None:
3192 config = {}
3193 args = config.get('args', [])
3194 kwargs = config.get('kwargs', {})
3195 cluster = config.get('cluster', 'ceph')
3196 fn = getattr(ctx.managers[cluster], name)
3197 fn(*args, **kwargs)
3198 return task
3199
3200 revive_osd = utility_task("revive_osd")
3201 revive_mon = utility_task("revive_mon")
3202 kill_osd = utility_task("kill_osd")
3203 kill_mon = utility_task("kill_mon")
3204 create_pool = utility_task("create_pool")
3205 remove_pool = utility_task("remove_pool")
3206 wait_for_clean = utility_task("wait_for_clean")
3207 flush_all_pg_stats = utility_task("flush_all_pg_stats")
3208 set_pool_property = utility_task("set_pool_property")
3209 do_pg_scrub = utility_task("do_pg_scrub")
3210 wait_for_pool = utility_task("wait_for_pool")
3211 wait_for_pools = utility_task("wait_for_pools")