]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
import ceph pacific 16.2.5
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from functools import wraps
5 import contextlib
6 import random
7 import signal
8 import time
9 import gevent
10 import base64
11 import json
12 import logging
13 import threading
14 import traceback
15 import os
16 import shlex
17
18 from io import BytesIO, StringIO
19 from subprocess import DEVNULL
20 from teuthology import misc as teuthology
21 from tasks.scrub import Scrubber
22 from tasks.util.rados import cmd_erasure_code_profile
23 from tasks.util import get_remote
24 from teuthology.contextutil import safe_while
25 from teuthology.orchestra.remote import Remote
26 from teuthology.orchestra import run
27 from teuthology.exceptions import CommandFailedError
28 from tasks.thrasher import Thrasher
29
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35 # this is for cephadm clusters
36 def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
37 extra_args = []
38 if name:
39 extra_args = ['-n', name]
40 return remote.run(
41 args=[
42 'sudo',
43 ctx.cephadm,
44 '--image', ctx.ceph[cluster_name].image,
45 'shell',
46 ] + extra_args + [
47 '--fsid', ctx.ceph[cluster_name].fsid,
48 '--',
49 ] + args,
50 **kwargs
51 )
52
53 # this is for rook clusters
54 def toolbox(ctx, cluster_name, args, **kwargs):
55 return ctx.rook[cluster_name].remote.run(
56 args=[
57 'kubectl',
58 '-n', 'rook-ceph',
59 'exec',
60 ctx.rook[cluster_name].toolbox,
61 '--',
62 ] + args,
63 **kwargs
64 )
65
66
67 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
68 conf_fp = BytesIO()
69 ctx.ceph[cluster].conf.write(conf_fp)
70 conf_fp.seek(0)
71 writes = ctx.cluster.run(
72 args=[
73 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
74 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
75 'sudo', 'tee', conf_path, run.Raw('&&'),
76 'sudo', 'chmod', '0644', conf_path,
77 run.Raw('>'), '/dev/null',
78
79 ],
80 stdin=run.PIPE,
81 wait=False)
82 teuthology.feed_many_stdins_and_close(conf_fp, writes)
83 run.wait(writes)
84
85 def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
86 """
87 Build a command line for running valgrind.
88
89 testdir - test results directory
90 name - name of daemon (for naming hte log file)
91 preamble - stuff we should run before valgrind
92 v - valgrind arguments
93 """
94 if v is None:
95 return preamble
96 if not isinstance(v, list):
97 v = [v]
98
99 # https://tracker.ceph.com/issues/44362
100 preamble.extend([
101 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
102 ])
103
104 val_path = '/var/log/ceph/valgrind'
105 if '--tool=memcheck' in v or '--tool=helgrind' in v:
106 extra_args = [
107 'valgrind',
108 '--trace-children=no',
109 '--child-silent-after-fork=yes',
110 '--soname-synonyms=somalloc=*tcmalloc*',
111 '--num-callers=50',
112 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
113 '--xml=yes',
114 '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
115 '--time-stamp=yes',
116 '--vgdb=yes',
117 ]
118 else:
119 extra_args = [
120 'valgrind',
121 '--trace-children=no',
122 '--child-silent-after-fork=yes',
123 '--soname-synonyms=somalloc=*tcmalloc*',
124 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
125 '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
126 '--time-stamp=yes',
127 '--vgdb=yes',
128 ]
129 if exit_on_first_error:
130 extra_args.extend([
131 # at least Valgrind 3.14 is required
132 '--exit-on-first-error=yes',
133 '--error-exitcode=42',
134 ])
135 args = []
136 if cd:
137 args += ['cd', testdir, run.Raw('&&')]
138 args += preamble + extra_args + v
139 log.debug('running %s under valgrind with args %s', name, args)
140 return args
141
142
143 def mount_osd_data(ctx, remote, cluster, osd):
144 """
145 Mount a remote OSD
146
147 :param ctx: Context
148 :param remote: Remote site
149 :param cluster: name of ceph cluster
150 :param osd: Osd name
151 """
152 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
153 role = "{0}.osd.{1}".format(cluster, osd)
154 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
155 if remote in ctx.disk_config.remote_to_roles_to_dev:
156 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
157 role = alt_role
158 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
159 return
160 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
161 mount_options = ctx.disk_config.\
162 remote_to_roles_to_dev_mount_options[remote][role]
163 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
164 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
165
166 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
167 'mountpoint: {p}, type: {t}, options: {v}'.format(
168 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
169 c=cluster))
170
171 remote.run(
172 args=[
173 'sudo',
174 'mount',
175 '-t', fstype,
176 '-o', ','.join(mount_options),
177 dev,
178 mnt,
179 ]
180 )
181
182
183 def log_exc(func):
184 @wraps(func)
185 def wrapper(self):
186 try:
187 return func(self)
188 except:
189 self.log(traceback.format_exc())
190 raise
191 return wrapper
192
193
194 class PoolType:
195 REPLICATED = 1
196 ERASURE_CODED = 3
197
198
199 class OSDThrasher(Thrasher):
200 """
201 Object used to thrash Ceph
202 """
203 def __init__(self, manager, config, name, logger):
204 super(OSDThrasher, self).__init__()
205
206 self.ceph_manager = manager
207 self.cluster = manager.cluster
208 self.ceph_manager.wait_for_clean()
209 osd_status = self.ceph_manager.get_osd_status()
210 self.in_osds = osd_status['in']
211 self.live_osds = osd_status['live']
212 self.out_osds = osd_status['out']
213 self.dead_osds = osd_status['dead']
214 self.stopping = False
215 self.logger = logger
216 self.config = config
217 self.name = name
218 self.revive_timeout = self.config.get("revive_timeout", 360)
219 self.pools_to_fix_pgp_num = set()
220 if self.config.get('powercycle'):
221 self.revive_timeout += 120
222 self.clean_wait = self.config.get('clean_wait', 0)
223 self.minin = self.config.get("min_in", 4)
224 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
225 self.sighup_delay = self.config.get('sighup_delay')
226 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
227 self.dump_ops_enable = self.config.get('dump_ops_enable')
228 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
229 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
230 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
231 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
232 self.random_eio = self.config.get('random_eio')
233 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
234
235 num_osds = self.in_osds + self.out_osds
236 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
237 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
238 if self.config is None:
239 self.config = dict()
240 # prevent monitor from auto-marking things out while thrasher runs
241 # try both old and new tell syntax, in case we are testing old code
242 self.saved_options = []
243 # assuming that the default settings do not vary from one daemon to
244 # another
245 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
246 opts = [('mon', 'mon_osd_down_out_interval', 0)]
247 #why do we disable marking an OSD out automatically? :/
248 for service, opt, new_value in opts:
249 old_value = manager.get_config(first_mon[0],
250 first_mon[1],
251 opt)
252 self.saved_options.append((service, opt, old_value))
253 manager.inject_args(service, '*', opt, new_value)
254 # initialize ceph_objectstore_tool property - must be done before
255 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
256 if (self.config.get('powercycle') or
257 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
258 self.config.get('disable_objectstore_tool_tests', False)):
259 self.ceph_objectstore_tool = False
260 if self.config.get('powercycle'):
261 self.log("Unable to test ceph-objectstore-tool, "
262 "powercycle testing")
263 else:
264 self.log("Unable to test ceph-objectstore-tool, "
265 "not available on all OSD nodes")
266 else:
267 self.ceph_objectstore_tool = \
268 self.config.get('ceph_objectstore_tool', True)
269 # spawn do_thrash
270 self.thread = gevent.spawn(self.do_thrash)
271 if self.sighup_delay:
272 self.sighup_thread = gevent.spawn(self.do_sighup)
273 if self.optrack_toggle_delay:
274 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
275 if self.dump_ops_enable == "true":
276 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
277 if self.noscrub_toggle_delay:
278 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
279
280 def log(self, msg, *args, **kwargs):
281 self.logger.info(msg, *args, **kwargs)
282
283 def cmd_exists_on_osds(self, cmd):
284 if self.ceph_manager.cephadm or self.ceph_manager.rook:
285 return True
286 allremotes = self.ceph_manager.ctx.cluster.only(\
287 teuthology.is_type('osd', self.cluster)).remotes.keys()
288 allremotes = list(set(allremotes))
289 for remote in allremotes:
290 proc = remote.run(args=['type', cmd], wait=True,
291 check_status=False, stdout=BytesIO(),
292 stderr=BytesIO())
293 if proc.exitstatus != 0:
294 return False;
295 return True;
296
297 def run_ceph_objectstore_tool(self, remote, osd, cmd):
298 if self.ceph_manager.cephadm:
299 return shell(
300 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
301 args=['ceph-objectstore-tool'] + cmd,
302 name=osd,
303 wait=True, check_status=False,
304 stdout=StringIO(),
305 stderr=StringIO())
306 elif self.ceph_manager.rook:
307 assert False, 'not implemented'
308 else:
309 return remote.run(
310 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
311 wait=True, check_status=False,
312 stdout=StringIO(),
313 stderr=StringIO())
314
315 def run_ceph_bluestore_tool(self, remote, osd, cmd):
316 if self.ceph_manager.cephadm:
317 return shell(
318 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
319 args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
320 name=osd,
321 wait=True, check_status=False,
322 stdout=StringIO(),
323 stderr=StringIO())
324 elif self.ceph_manager.rook:
325 assert False, 'not implemented'
326 else:
327 return remote.run(
328 args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
329 wait=True, check_status=False,
330 stdout=StringIO(),
331 stderr=StringIO())
332
333 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
334 """
335 :param osd: Osd to be killed.
336 :mark_down: Mark down if true.
337 :mark_out: Mark out if true.
338 """
339 if osd is None:
340 osd = random.choice(self.live_osds)
341 self.log("Killing osd %s, live_osds are %s" % (str(osd),
342 str(self.live_osds)))
343 self.live_osds.remove(osd)
344 self.dead_osds.append(osd)
345 self.ceph_manager.kill_osd(osd)
346 if mark_down:
347 self.ceph_manager.mark_down_osd(osd)
348 if mark_out and osd in self.in_osds:
349 self.out_osd(osd)
350 if self.ceph_objectstore_tool:
351 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
352 remote = self.ceph_manager.find_remote('osd', osd)
353 FSPATH = self.ceph_manager.get_filepath()
354 JPATH = os.path.join(FSPATH, "journal")
355 exp_osd = imp_osd = osd
356 self.log('remote for osd %s is %s' % (osd, remote))
357 exp_remote = imp_remote = remote
358 # If an older osd is available we'll move a pg from there
359 if (len(self.dead_osds) > 1 and
360 random.random() < self.chance_move_pg):
361 exp_osd = random.choice(self.dead_osds[:-1])
362 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
363 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
364 prefix = [
365 '--no-mon-config',
366 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
367 ]
368
369 if self.ceph_manager.rook:
370 assert False, 'not implemented'
371
372 if not self.ceph_manager.cephadm:
373 # ceph-objectstore-tool might be temporarily absent during an
374 # upgrade - see http://tracker.ceph.com/issues/18014
375 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
376 while proceed():
377 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
378 wait=True, check_status=False, stdout=BytesIO(),
379 stderr=BytesIO())
380 if proc.exitstatus == 0:
381 break
382 log.debug("ceph-objectstore-tool binary not present, trying again")
383
384 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
385 # see http://tracker.ceph.com/issues/19556
386 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
387 while proceed():
388 proc = self.run_ceph_objectstore_tool(
389 exp_remote, 'osd.%s' % exp_osd,
390 prefix + [
391 '--data-path', FSPATH.format(id=exp_osd),
392 '--journal-path', JPATH.format(id=exp_osd),
393 '--op', 'list-pgs',
394 ])
395 if proc.exitstatus == 0:
396 break
397 elif (proc.exitstatus == 1 and
398 proc.stderr.getvalue() == "OSD has the store locked"):
399 continue
400 else:
401 raise Exception("ceph-objectstore-tool: "
402 "exp list-pgs failure with status {ret}".
403 format(ret=proc.exitstatus))
404
405 pgs = proc.stdout.getvalue().split('\n')[:-1]
406 if len(pgs) == 0:
407 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
408 return
409 pg = random.choice(pgs)
410 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
411 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
412 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
413 "exp.{pg}.{id}".format(
414 pg=pg,
415 id=exp_osd))
416 if self.ceph_manager.cephadm:
417 exp_host_path = os.path.join(
418 '/var/log/ceph',
419 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
420 "exp.{pg}.{id}".format(
421 pg=pg,
422 id=exp_osd))
423 else:
424 exp_host_path = exp_path
425
426 # export
427 # Can't use new export-remove op since this is part of upgrade testing
428 proc = self.run_ceph_objectstore_tool(
429 exp_remote, 'osd.%s' % exp_osd,
430 prefix + [
431 '--data-path', FSPATH.format(id=exp_osd),
432 '--journal-path', JPATH.format(id=exp_osd),
433 '--op', 'export',
434 '--pgid', pg,
435 '--file', exp_path,
436 ])
437 if proc.exitstatus:
438 raise Exception("ceph-objectstore-tool: "
439 "export failure with status {ret}".
440 format(ret=proc.exitstatus))
441 # remove
442 proc = self.run_ceph_objectstore_tool(
443 exp_remote, 'osd.%s' % exp_osd,
444 prefix + [
445 '--data-path', FSPATH.format(id=exp_osd),
446 '--journal-path', JPATH.format(id=exp_osd),
447 '--force',
448 '--op', 'remove',
449 '--pgid', pg,
450 ])
451 if proc.exitstatus:
452 raise Exception("ceph-objectstore-tool: "
453 "remove failure with status {ret}".
454 format(ret=proc.exitstatus))
455 # If there are at least 2 dead osds we might move the pg
456 if exp_osd != imp_osd:
457 # If pg isn't already on this osd, then we will move it there
458 proc = self.run_ceph_objectstore_tool(
459 imp_remote,
460 'osd.%s' % imp_osd,
461 prefix + [
462 '--data-path', FSPATH.format(id=imp_osd),
463 '--journal-path', JPATH.format(id=imp_osd),
464 '--op', 'list-pgs',
465 ])
466 if proc.exitstatus:
467 raise Exception("ceph-objectstore-tool: "
468 "imp list-pgs failure with status {ret}".
469 format(ret=proc.exitstatus))
470 pgs = proc.stdout.getvalue().split('\n')[:-1]
471 if pg not in pgs:
472 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
473 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
474 if imp_remote != exp_remote:
475 # Copy export file to the other machine
476 self.log("Transfer export file from {srem} to {trem}".
477 format(srem=exp_remote, trem=imp_remote))
478 # just in case an upgrade make /var/log/ceph unreadable by non-root,
479 exp_remote.run(args=['sudo', 'chmod', '777',
480 '/var/log/ceph'])
481 imp_remote.run(args=['sudo', 'chmod', '777',
482 '/var/log/ceph'])
483 tmpexport = Remote.get_file(exp_remote, exp_host_path,
484 sudo=True)
485 if exp_host_path != exp_path:
486 # push to /var/log/ceph, then rename (we can't
487 # chmod 777 the /var/log/ceph/$fsid mountpoint)
488 Remote.put_file(imp_remote, tmpexport, exp_path)
489 imp_remote.run(args=[
490 'sudo', 'mv', exp_path, exp_host_path])
491 else:
492 Remote.put_file(imp_remote, tmpexport, exp_host_path)
493 os.remove(tmpexport)
494 else:
495 # Can't move the pg after all
496 imp_osd = exp_osd
497 imp_remote = exp_remote
498 # import
499 proc = self.run_ceph_objectstore_tool(
500 imp_remote, 'osd.%s' % imp_osd,
501 [
502 '--data-path', FSPATH.format(id=imp_osd),
503 '--journal-path', JPATH.format(id=imp_osd),
504 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
505 '--op', 'import',
506 '--file', exp_path,
507 ])
508 if proc.exitstatus == 1:
509 bogosity = "The OSD you are using is older than the exported PG"
510 if bogosity in proc.stderr.getvalue():
511 self.log("OSD older than exported PG"
512 "...ignored")
513 elif proc.exitstatus == 10:
514 self.log("Pool went away before processing an import"
515 "...ignored")
516 elif proc.exitstatus == 11:
517 self.log("Attempt to import an incompatible export"
518 "...ignored")
519 elif proc.exitstatus == 12:
520 # this should be safe to ignore because we only ever move 1
521 # copy of the pg at a time, and merge is only initiated when
522 # all replicas are peered and happy. /me crosses fingers
523 self.log("PG merged on target"
524 "...ignored")
525 elif proc.exitstatus:
526 raise Exception("ceph-objectstore-tool: "
527 "import failure with status {ret}".
528 format(ret=proc.exitstatus))
529 cmd = "sudo rm -f {file}".format(file=exp_host_path)
530 exp_remote.run(args=cmd)
531 if imp_remote != exp_remote:
532 imp_remote.run(args=cmd)
533
534 # apply low split settings to each pool
535 if not self.ceph_manager.cephadm:
536 for pool in self.ceph_manager.list_pools():
537 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
538 "--filestore-split-multiple 1' sudo -E "
539 + 'ceph-objectstore-tool '
540 + ' '.join(prefix + [
541 '--data-path', FSPATH.format(id=imp_osd),
542 '--journal-path', JPATH.format(id=imp_osd),
543 ])
544 + " --op apply-layout-settings --pool " + pool).format(id=osd)
545 proc = imp_remote.run(args=cmd,
546 wait=True, check_status=False,
547 stderr=StringIO())
548 if 'Couldn\'t find pool' in proc.stderr.getvalue():
549 continue
550 if proc.exitstatus:
551 raise Exception("ceph-objectstore-tool apply-layout-settings"
552 " failed with {status}".format(status=proc.exitstatus))
553
554
555 def blackhole_kill_osd(self, osd=None):
556 """
557 If all else fails, kill the osd.
558 :param osd: Osd to be killed.
559 """
560 if osd is None:
561 osd = random.choice(self.live_osds)
562 self.log("Blackholing and then killing osd %s, live_osds are %s" %
563 (str(osd), str(self.live_osds)))
564 self.live_osds.remove(osd)
565 self.dead_osds.append(osd)
566 self.ceph_manager.blackhole_kill_osd(osd)
567
568 def revive_osd(self, osd=None, skip_admin_check=False):
569 """
570 Revive the osd.
571 :param osd: Osd to be revived.
572 """
573 if osd is None:
574 osd = random.choice(self.dead_osds)
575 self.log("Reviving osd %s" % (str(osd),))
576 self.ceph_manager.revive_osd(
577 osd,
578 self.revive_timeout,
579 skip_admin_check=skip_admin_check)
580 self.dead_osds.remove(osd)
581 self.live_osds.append(osd)
582 if self.random_eio > 0 and osd == self.rerrosd:
583 self.ceph_manager.set_config(self.rerrosd,
584 filestore_debug_random_read_err = self.random_eio)
585 self.ceph_manager.set_config(self.rerrosd,
586 bluestore_debug_random_read_err = self.random_eio)
587
588
589 def out_osd(self, osd=None):
590 """
591 Mark the osd out
592 :param osd: Osd to be marked.
593 """
594 if osd is None:
595 osd = random.choice(self.in_osds)
596 self.log("Removing osd %s, in_osds are: %s" %
597 (str(osd), str(self.in_osds)))
598 self.ceph_manager.mark_out_osd(osd)
599 self.in_osds.remove(osd)
600 self.out_osds.append(osd)
601
602 def in_osd(self, osd=None):
603 """
604 Mark the osd out
605 :param osd: Osd to be marked.
606 """
607 if osd is None:
608 osd = random.choice(self.out_osds)
609 if osd in self.dead_osds:
610 return self.revive_osd(osd)
611 self.log("Adding osd %s" % (str(osd),))
612 self.out_osds.remove(osd)
613 self.in_osds.append(osd)
614 self.ceph_manager.mark_in_osd(osd)
615 self.log("Added osd %s" % (str(osd),))
616
617 def reweight_osd_or_by_util(self, osd=None):
618 """
619 Reweight an osd that is in
620 :param osd: Osd to be marked.
621 """
622 if osd is not None or random.choice([True, False]):
623 if osd is None:
624 osd = random.choice(self.in_osds)
625 val = random.uniform(.1, 1.0)
626 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
627 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
628 str(osd), str(val))
629 else:
630 # do it several times, the option space is large
631 for i in range(5):
632 options = {
633 'max_change': random.choice(['0.05', '1.0', '3.0']),
634 'overage': random.choice(['110', '1000']),
635 'type': random.choice([
636 'reweight-by-utilization',
637 'test-reweight-by-utilization']),
638 }
639 self.log("Reweighting by: %s"%(str(options),))
640 self.ceph_manager.raw_cluster_cmd(
641 'osd',
642 options['type'],
643 options['overage'],
644 options['max_change'])
645
646 def primary_affinity(self, osd=None):
647 if osd is None:
648 osd = random.choice(self.in_osds)
649 if random.random() >= .5:
650 pa = random.random()
651 elif random.random() >= .5:
652 pa = 1
653 else:
654 pa = 0
655 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
656 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
657 str(osd), str(pa))
658
659 def thrash_cluster_full(self):
660 """
661 Set and unset cluster full condition
662 """
663 self.log('Setting full ratio to .001')
664 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
665 time.sleep(1)
666 self.log('Setting full ratio back to .95')
667 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
668
669 def thrash_pg_upmap(self):
670 """
671 Install or remove random pg_upmap entries in OSDMap
672 """
673 from random import shuffle
674 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
675 j = json.loads(out)
676 self.log('j is %s' % j)
677 try:
678 if random.random() >= .3:
679 pgs = self.ceph_manager.get_pg_stats()
680 if not pgs:
681 return
682 pg = random.choice(pgs)
683 pgid = str(pg['pgid'])
684 poolid = int(pgid.split('.')[0])
685 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
686 if len(sizes) == 0:
687 return
688 n = sizes[0]
689 osds = self.in_osds + self.out_osds
690 shuffle(osds)
691 osds = osds[0:n]
692 self.log('Setting %s to %s' % (pgid, osds))
693 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
694 self.log('cmd %s' % cmd)
695 self.ceph_manager.raw_cluster_cmd(*cmd)
696 else:
697 m = j['pg_upmap']
698 if len(m) > 0:
699 shuffle(m)
700 pg = m[0]['pgid']
701 self.log('Clearing pg_upmap on %s' % pg)
702 self.ceph_manager.raw_cluster_cmd(
703 'osd',
704 'rm-pg-upmap',
705 pg)
706 else:
707 self.log('No pg_upmap entries; doing nothing')
708 except CommandFailedError:
709 self.log('Failed to rm-pg-upmap, ignoring')
710
711 def thrash_pg_upmap_items(self):
712 """
713 Install or remove random pg_upmap_items entries in OSDMap
714 """
715 from random import shuffle
716 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
717 j = json.loads(out)
718 self.log('j is %s' % j)
719 try:
720 if random.random() >= .3:
721 pgs = self.ceph_manager.get_pg_stats()
722 if not pgs:
723 return
724 pg = random.choice(pgs)
725 pgid = str(pg['pgid'])
726 poolid = int(pgid.split('.')[0])
727 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
728 if len(sizes) == 0:
729 return
730 n = sizes[0]
731 osds = self.in_osds + self.out_osds
732 shuffle(osds)
733 osds = osds[0:n*2]
734 self.log('Setting %s to %s' % (pgid, osds))
735 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
736 self.log('cmd %s' % cmd)
737 self.ceph_manager.raw_cluster_cmd(*cmd)
738 else:
739 m = j['pg_upmap_items']
740 if len(m) > 0:
741 shuffle(m)
742 pg = m[0]['pgid']
743 self.log('Clearing pg_upmap on %s' % pg)
744 self.ceph_manager.raw_cluster_cmd(
745 'osd',
746 'rm-pg-upmap-items',
747 pg)
748 else:
749 self.log('No pg_upmap entries; doing nothing')
750 except CommandFailedError:
751 self.log('Failed to rm-pg-upmap-items, ignoring')
752
753 def force_recovery(self):
754 """
755 Force recovery on some of PGs
756 """
757 backfill = random.random() >= 0.5
758 j = self.ceph_manager.get_pgids_to_force(backfill)
759 if j:
760 try:
761 if backfill:
762 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
763 else:
764 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
765 except CommandFailedError:
766 self.log('Failed to force backfill|recovery, ignoring')
767
768
769 def cancel_force_recovery(self):
770 """
771 Force recovery on some of PGs
772 """
773 backfill = random.random() >= 0.5
774 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
775 if j:
776 try:
777 if backfill:
778 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
779 else:
780 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
781 except CommandFailedError:
782 self.log('Failed to force backfill|recovery, ignoring')
783
784 def force_cancel_recovery(self):
785 """
786 Force or cancel forcing recovery
787 """
788 if random.random() >= 0.4:
789 self.force_recovery()
790 else:
791 self.cancel_force_recovery()
792
793 def all_up(self):
794 """
795 Make sure all osds are up and not out.
796 """
797 while len(self.dead_osds) > 0:
798 self.log("reviving osd")
799 self.revive_osd()
800 while len(self.out_osds) > 0:
801 self.log("inning osd")
802 self.in_osd()
803
804 def all_up_in(self):
805 """
806 Make sure all osds are up and fully in.
807 """
808 self.all_up();
809 for osd in self.live_osds:
810 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
811 str(osd), str(1))
812 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
813 str(osd), str(1))
814
815 def do_join(self):
816 """
817 Break out of this Ceph loop
818 """
819 self.stopping = True
820 self.thread.get()
821 if self.sighup_delay:
822 self.log("joining the do_sighup greenlet")
823 self.sighup_thread.get()
824 if self.optrack_toggle_delay:
825 self.log("joining the do_optrack_toggle greenlet")
826 self.optrack_toggle_thread.join()
827 if self.dump_ops_enable == "true":
828 self.log("joining the do_dump_ops greenlet")
829 self.dump_ops_thread.join()
830 if self.noscrub_toggle_delay:
831 self.log("joining the do_noscrub_toggle greenlet")
832 self.noscrub_toggle_thread.join()
833
834 def grow_pool(self):
835 """
836 Increase the size of the pool
837 """
838 pool = self.ceph_manager.get_pool()
839 if pool is None:
840 return
841 self.log("Growing pool %s" % (pool,))
842 if self.ceph_manager.expand_pool(pool,
843 self.config.get('pool_grow_by', 10),
844 self.max_pgs):
845 self.pools_to_fix_pgp_num.add(pool)
846
847 def shrink_pool(self):
848 """
849 Decrease the size of the pool
850 """
851 pool = self.ceph_manager.get_pool()
852 if pool is None:
853 return
854 _ = self.ceph_manager.get_pool_pg_num(pool)
855 self.log("Shrinking pool %s" % (pool,))
856 if self.ceph_manager.contract_pool(
857 pool,
858 self.config.get('pool_shrink_by', 10),
859 self.min_pgs):
860 self.pools_to_fix_pgp_num.add(pool)
861
862 def fix_pgp_num(self, pool=None):
863 """
864 Fix number of pgs in pool.
865 """
866 if pool is None:
867 pool = self.ceph_manager.get_pool()
868 if not pool:
869 return
870 force = False
871 else:
872 force = True
873 self.log("fixing pg num pool %s" % (pool,))
874 if self.ceph_manager.set_pool_pgpnum(pool, force):
875 self.pools_to_fix_pgp_num.discard(pool)
876
877 def test_pool_min_size(self):
878 """
879 Loop to selectively push PGs below their min_size and test that recovery
880 still occurs.
881 """
882 self.log("test_pool_min_size")
883 self.all_up()
884 self.ceph_manager.wait_for_recovery(
885 timeout=self.config.get('timeout')
886 )
887
888 minout = int(self.config.get("min_out", 1))
889 minlive = int(self.config.get("min_live", 2))
890 mindead = int(self.config.get("min_dead", 1))
891 self.log("doing min_size thrashing")
892 self.ceph_manager.wait_for_clean(timeout=60)
893 assert self.ceph_manager.is_clean(), \
894 'not clean before minsize thrashing starts'
895 while not self.stopping:
896 # look up k and m from all the pools on each loop, in case it
897 # changes as the cluster runs
898 k = 0
899 m = 99
900 has_pools = False
901 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
902
903 for pool_json in pools_json:
904 pool = pool_json['pool_name']
905 has_pools = True
906 pool_type = pool_json['type'] # 1 for rep, 3 for ec
907 min_size = pool_json['min_size']
908 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
909 try:
910 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
911 if pool_type != PoolType.ERASURE_CODED:
912 continue
913 ec_profile = pool_json['erasure_code_profile']
914 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
915 'osd',
916 'erasure-code-profile',
917 'get',
918 ec_profile,
919 '--format=json')
920 ec_json = json.loads(ec_profile_json)
921 local_k = int(ec_json['k'])
922 local_m = int(ec_json['m'])
923 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
924 k=local_k, m=local_m))
925 if local_k > k:
926 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
927 k = local_k
928 if local_m < m:
929 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
930 m = local_m
931 except CommandFailedError:
932 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
933 continue
934
935 if has_pools :
936 self.log("using k={k}, m={m}".format(k=k,m=m))
937 else:
938 self.log("No pools yet, waiting")
939 time.sleep(5)
940 continue
941
942 if minout > len(self.out_osds): # kill OSDs and mark out
943 self.log("forced to out an osd")
944 self.kill_osd(mark_out=True)
945 continue
946 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
947 self.log("forced to kill an osd")
948 self.kill_osd()
949 continue
950 else: # make mostly-random choice to kill or revive OSDs
951 minup = max(minlive, k)
952 rand_val = random.uniform(0, 1)
953 self.log("choosing based on number of live OSDs and rand val {rand}".\
954 format(rand=rand_val))
955 if len(self.live_osds) > minup+1 and rand_val < 0.5:
956 # chose to knock out as many OSDs as we can w/out downing PGs
957
958 most_killable = min(len(self.live_osds) - minup, m)
959 self.log("chose to kill {n} OSDs".format(n=most_killable))
960 for i in range(1, most_killable):
961 self.kill_osd(mark_out=True)
962 time.sleep(10)
963 # try a few times since there might be a concurrent pool
964 # creation or deletion
965 with safe_while(
966 sleep=5, tries=5,
967 action='check for active or peered') as proceed:
968 while proceed():
969 if self.ceph_manager.all_active_or_peered():
970 break
971 self.log('not all PGs are active or peered')
972 else: # chose to revive OSDs, bring up a random fraction of the dead ones
973 self.log("chose to revive osds")
974 for i in range(1, int(rand_val * len(self.dead_osds))):
975 self.revive_osd(i)
976
977 # let PGs repair themselves or our next knockout might kill one
978 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
979
980 # / while not self.stopping
981 self.all_up_in()
982
983 self.ceph_manager.wait_for_recovery(
984 timeout=self.config.get('timeout')
985 )
986
987 def inject_pause(self, conf_key, duration, check_after, should_be_down):
988 """
989 Pause injection testing. Check for osd being down when finished.
990 """
991 the_one = random.choice(self.live_osds)
992 self.log("inject_pause on {osd}".format(osd=the_one))
993 self.log(
994 "Testing {key} pause injection for duration {duration}".format(
995 key=conf_key,
996 duration=duration
997 ))
998 self.log(
999 "Checking after {after}, should_be_down={shouldbedown}".format(
1000 after=check_after,
1001 shouldbedown=should_be_down
1002 ))
1003 self.ceph_manager.set_config(the_one, **{conf_key: duration})
1004 if not should_be_down:
1005 return
1006 time.sleep(check_after)
1007 status = self.ceph_manager.get_osd_status()
1008 assert the_one in status['down']
1009 time.sleep(duration - check_after + 20)
1010 status = self.ceph_manager.get_osd_status()
1011 assert not the_one in status['down']
1012
1013 def test_backfill_full(self):
1014 """
1015 Test backfills stopping when the replica fills up.
1016
1017 First, use injectfull admin command to simulate a now full
1018 osd by setting it to 0 on all of the OSDs.
1019
1020 Second, on a random subset, set
1021 osd_debug_skip_full_check_in_backfill_reservation to force
1022 the more complicated check in do_scan to be exercised.
1023
1024 Then, verify that all backfillings stop.
1025 """
1026 self.log("injecting backfill full")
1027 for i in self.live_osds:
1028 self.ceph_manager.set_config(
1029 i,
1030 osd_debug_skip_full_check_in_backfill_reservation=
1031 random.choice(['false', 'true']))
1032 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
1033 check_status=True, timeout=30, stdout=DEVNULL)
1034 for i in range(30):
1035 status = self.ceph_manager.compile_pg_status()
1036 if 'backfilling' not in status.keys():
1037 break
1038 self.log(
1039 "waiting for {still_going} backfillings".format(
1040 still_going=status.get('backfilling')))
1041 time.sleep(1)
1042 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
1043 for i in self.live_osds:
1044 self.ceph_manager.set_config(
1045 i,
1046 osd_debug_skip_full_check_in_backfill_reservation='false')
1047 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
1048 check_status=True, timeout=30, stdout=DEVNULL)
1049
1050
1051 def generate_random_sharding(self):
1052 prefixes = [
1053 'm','O','P','L'
1054 ]
1055 new_sharding = ''
1056 for prefix in prefixes:
1057 choose = random.choice([False, True])
1058 if not choose:
1059 continue
1060 if new_sharding != '':
1061 new_sharding = new_sharding + ' '
1062 columns = random.randint(1, 5)
1063 do_hash = random.choice([False, True])
1064 if do_hash:
1065 low_hash = random.choice([0, 5, 8])
1066 do_high_hash = random.choice([False, True])
1067 if do_high_hash:
1068 high_hash = random.choice([8, 16, 30]) + low_hash
1069 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
1070 else:
1071 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
1072 else:
1073 if columns == 1:
1074 new_sharding = new_sharding + prefix
1075 else:
1076 new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
1077 return new_sharding
1078
1079 def test_bluestore_reshard_action(self):
1080 """
1081 Test if resharding of bluestore works properly.
1082 If bluestore is not used, or bluestore is in version that
1083 does not support sharding, skip.
1084 """
1085
1086 osd = random.choice(self.dead_osds)
1087 remote = self.ceph_manager.find_remote('osd', osd)
1088 FSPATH = self.ceph_manager.get_filepath()
1089
1090 prefix = [
1091 '--no-mon-config',
1092 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1093 '--log-level=10',
1094 '--path', FSPATH.format(id=osd)
1095 ]
1096
1097 # sanity check if bluestore-tool accessible
1098 self.log('checking if target objectstore is bluestore on osd.%s' % osd)
1099 cmd = prefix + [
1100 'show-label'
1101 ]
1102 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1103 if proc.exitstatus != 0:
1104 raise Exception("ceph-bluestore-tool access failed.")
1105
1106 # check if sharding is possible
1107 self.log('checking if target bluestore supports sharding on osd.%s' % osd)
1108 cmd = prefix + [
1109 'show-sharding'
1110 ]
1111 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1112 if proc.exitstatus != 0:
1113 self.log("Unable to test resharding, "
1114 "ceph-bluestore-tool does not support it.")
1115 return
1116
1117 # now go for reshard to something else
1118 self.log('applying new sharding to bluestore on osd.%s' % osd)
1119 new_sharding = self.config.get('bluestore_new_sharding','random')
1120
1121 if new_sharding == 'random':
1122 self.log('generate random sharding')
1123 new_sharding = self.generate_random_sharding()
1124
1125 self.log("applying new sharding: " + new_sharding)
1126 cmd = prefix + [
1127 '--sharding', new_sharding,
1128 'reshard'
1129 ]
1130 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1131 if proc.exitstatus != 0:
1132 raise Exception("ceph-bluestore-tool resharding failed.")
1133
1134 # now do fsck to
1135 self.log('running fsck to verify new sharding on osd.%s' % osd)
1136 cmd = prefix + [
1137 'fsck'
1138 ]
1139 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1140 if proc.exitstatus != 0:
1141 raise Exception("ceph-bluestore-tool fsck failed.")
1142 self.log('resharding successfully completed')
1143
1144 def test_bluestore_reshard(self):
1145 """
1146 1) kills an osd
1147 2) reshards bluestore on killed osd
1148 3) revives the osd
1149 """
1150 self.log('test_bluestore_reshard started')
1151 self.kill_osd(mark_down=True, mark_out=True)
1152 self.test_bluestore_reshard_action()
1153 self.revive_osd()
1154 self.log('test_bluestore_reshard completed')
1155
1156
1157 def test_map_discontinuity(self):
1158 """
1159 1) Allows the osds to recover
1160 2) kills an osd
1161 3) allows the remaining osds to recover
1162 4) waits for some time
1163 5) revives the osd
1164 This sequence should cause the revived osd to have to handle
1165 a map gap since the mons would have trimmed
1166 """
1167 while len(self.in_osds) < (self.minin + 1):
1168 self.in_osd()
1169 self.log("Waiting for recovery")
1170 self.ceph_manager.wait_for_all_osds_up(
1171 timeout=self.config.get('timeout')
1172 )
1173 # now we wait 20s for the pg status to change, if it takes longer,
1174 # the test *should* fail!
1175 time.sleep(20)
1176 self.ceph_manager.wait_for_clean(
1177 timeout=self.config.get('timeout')
1178 )
1179
1180 # now we wait 20s for the backfill replicas to hear about the clean
1181 time.sleep(20)
1182 self.log("Recovered, killing an osd")
1183 self.kill_osd(mark_down=True, mark_out=True)
1184 self.log("Waiting for clean again")
1185 self.ceph_manager.wait_for_clean(
1186 timeout=self.config.get('timeout')
1187 )
1188 self.log("Waiting for trim")
1189 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
1190 self.revive_osd()
1191
1192 def choose_action(self):
1193 """
1194 Random action selector.
1195 """
1196 chance_down = self.config.get('chance_down', 0.4)
1197 _ = self.config.get('chance_test_min_size', 0)
1198 chance_test_backfill_full = \
1199 self.config.get('chance_test_backfill_full', 0)
1200 if isinstance(chance_down, int):
1201 chance_down = float(chance_down) / 100
1202 minin = self.minin
1203 minout = int(self.config.get("min_out", 0))
1204 minlive = int(self.config.get("min_live", 2))
1205 mindead = int(self.config.get("min_dead", 0))
1206
1207 self.log('choose_action: min_in %d min_out '
1208 '%d min_live %d min_dead %d' %
1209 (minin, minout, minlive, mindead))
1210 actions = []
1211 if len(self.in_osds) > minin:
1212 actions.append((self.out_osd, 1.0,))
1213 if len(self.live_osds) > minlive and chance_down > 0:
1214 actions.append((self.kill_osd, chance_down,))
1215 if len(self.out_osds) > minout:
1216 actions.append((self.in_osd, 1.7,))
1217 if len(self.dead_osds) > mindead:
1218 actions.append((self.revive_osd, 1.0,))
1219 if self.config.get('thrash_primary_affinity', True):
1220 actions.append((self.primary_affinity, 1.0,))
1221 actions.append((self.reweight_osd_or_by_util,
1222 self.config.get('reweight_osd', .5),))
1223 actions.append((self.grow_pool,
1224 self.config.get('chance_pgnum_grow', 0),))
1225 actions.append((self.shrink_pool,
1226 self.config.get('chance_pgnum_shrink', 0),))
1227 actions.append((self.fix_pgp_num,
1228 self.config.get('chance_pgpnum_fix', 0),))
1229 actions.append((self.test_pool_min_size,
1230 self.config.get('chance_test_min_size', 0),))
1231 actions.append((self.test_backfill_full,
1232 chance_test_backfill_full,))
1233 if self.chance_thrash_cluster_full > 0:
1234 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1235 if self.chance_thrash_pg_upmap > 0:
1236 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1237 if self.chance_thrash_pg_upmap_items > 0:
1238 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
1239 if self.chance_force_recovery > 0:
1240 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
1241
1242 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1243 for scenario in [
1244 (lambda:
1245 self.inject_pause(key,
1246 self.config.get('pause_short', 3),
1247 0,
1248 False),
1249 self.config.get('chance_inject_pause_short', 1),),
1250 (lambda:
1251 self.inject_pause(key,
1252 self.config.get('pause_long', 80),
1253 self.config.get('pause_check_after', 70),
1254 True),
1255 self.config.get('chance_inject_pause_long', 0),)]:
1256 actions.append(scenario)
1257
1258 # only consider resharding if objectstore is bluestore
1259 cluster_name = self.ceph_manager.cluster
1260 cluster = self.ceph_manager.ctx.ceph[cluster_name]
1261 if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1262 actions.append((self.test_bluestore_reshard,
1263 self.config.get('chance_bluestore_reshard', 0),))
1264
1265 total = sum([y for (x, y) in actions])
1266 val = random.uniform(0, total)
1267 for (action, prob) in actions:
1268 if val < prob:
1269 return action
1270 val -= prob
1271 return None
1272
1273 def do_thrash(self):
1274 """
1275 _do_thrash() wrapper.
1276 """
1277 try:
1278 self._do_thrash()
1279 except Exception as e:
1280 # See _run exception comment for MDSThrasher
1281 self.set_thrasher_exception(e)
1282 self.logger.exception("exception:")
1283 # Allow successful completion so gevent doesn't see an exception.
1284 # The DaemonWatchdog will observe the error and tear down the test.
1285
1286 @log_exc
1287 def do_sighup(self):
1288 """
1289 Loops and sends signal.SIGHUP to a random live osd.
1290
1291 Loop delay is controlled by the config value sighup_delay.
1292 """
1293 delay = float(self.sighup_delay)
1294 self.log("starting do_sighup with a delay of {0}".format(delay))
1295 while not self.stopping:
1296 osd = random.choice(self.live_osds)
1297 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1298 time.sleep(delay)
1299
1300 @log_exc
1301 def do_optrack_toggle(self):
1302 """
1303 Loops and toggle op tracking to all osds.
1304
1305 Loop delay is controlled by the config value optrack_toggle_delay.
1306 """
1307 delay = float(self.optrack_toggle_delay)
1308 osd_state = "true"
1309 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1310 while not self.stopping:
1311 if osd_state == "true":
1312 osd_state = "false"
1313 else:
1314 osd_state = "true"
1315 try:
1316 self.ceph_manager.inject_args('osd', '*',
1317 'osd_enable_op_tracker',
1318 osd_state)
1319 except CommandFailedError:
1320 self.log('Failed to tell all osds, ignoring')
1321 gevent.sleep(delay)
1322
1323 @log_exc
1324 def do_dump_ops(self):
1325 """
1326 Loops and does op dumps on all osds
1327 """
1328 self.log("starting do_dump_ops")
1329 while not self.stopping:
1330 for osd in self.live_osds:
1331 # Ignore errors because live_osds is in flux
1332 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1333 check_status=False, timeout=30, stdout=DEVNULL)
1334 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1335 check_status=False, timeout=30, stdout=DEVNULL)
1336 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1337 check_status=False, timeout=30, stdout=DEVNULL)
1338 gevent.sleep(0)
1339
1340 @log_exc
1341 def do_noscrub_toggle(self):
1342 """
1343 Loops and toggle noscrub flags
1344
1345 Loop delay is controlled by the config value noscrub_toggle_delay.
1346 """
1347 delay = float(self.noscrub_toggle_delay)
1348 scrub_state = "none"
1349 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1350 while not self.stopping:
1351 if scrub_state == "none":
1352 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1353 scrub_state = "noscrub"
1354 elif scrub_state == "noscrub":
1355 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1356 scrub_state = "both"
1357 elif scrub_state == "both":
1358 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1359 scrub_state = "nodeep-scrub"
1360 else:
1361 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1362 scrub_state = "none"
1363 gevent.sleep(delay)
1364 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1365 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1366
1367 @log_exc
1368 def _do_thrash(self):
1369 """
1370 Loop to select random actions to thrash ceph manager with.
1371 """
1372 cleanint = self.config.get("clean_interval", 60)
1373 scrubint = self.config.get("scrub_interval", -1)
1374 maxdead = self.config.get("max_dead", 0)
1375 delay = self.config.get("op_delay", 5)
1376 self.rerrosd = self.live_osds[0]
1377 if self.random_eio > 0:
1378 self.ceph_manager.inject_args('osd', self.rerrosd,
1379 'filestore_debug_random_read_err',
1380 self.random_eio)
1381 self.ceph_manager.inject_args('osd', self.rerrosd,
1382 'bluestore_debug_random_read_err',
1383 self.random_eio)
1384 self.log("starting do_thrash")
1385 while not self.stopping:
1386 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1387 "out_osds: ", self.out_osds,
1388 "dead_osds: ", self.dead_osds,
1389 "live_osds: ", self.live_osds]]
1390 self.log(" ".join(to_log))
1391 if random.uniform(0, 1) < (float(delay) / cleanint):
1392 while len(self.dead_osds) > maxdead:
1393 self.revive_osd()
1394 for osd in self.in_osds:
1395 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1396 str(osd), str(1))
1397 if random.uniform(0, 1) < float(
1398 self.config.get('chance_test_map_discontinuity', 0)) \
1399 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1400 self.test_map_discontinuity()
1401 else:
1402 self.ceph_manager.wait_for_recovery(
1403 timeout=self.config.get('timeout')
1404 )
1405 time.sleep(self.clean_wait)
1406 if scrubint > 0:
1407 if random.uniform(0, 1) < (float(delay) / scrubint):
1408 self.log('Scrubbing while thrashing being performed')
1409 Scrubber(self.ceph_manager, self.config)
1410 self.choose_action()()
1411 time.sleep(delay)
1412 self.all_up()
1413 if self.random_eio > 0:
1414 self.ceph_manager.inject_args('osd', self.rerrosd,
1415 'filestore_debug_random_read_err', '0.0')
1416 self.ceph_manager.inject_args('osd', self.rerrosd,
1417 'bluestore_debug_random_read_err', '0.0')
1418 for pool in list(self.pools_to_fix_pgp_num):
1419 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1420 self.fix_pgp_num(pool)
1421 self.pools_to_fix_pgp_num.clear()
1422 for service, opt, saved_value in self.saved_options:
1423 self.ceph_manager.inject_args(service, '*', opt, saved_value)
1424 self.saved_options = []
1425 self.all_up_in()
1426
1427
1428 class ObjectStoreTool:
1429
1430 def __init__(self, manager, pool, **kwargs):
1431 self.manager = manager
1432 self.pool = pool
1433 self.osd = kwargs.get('osd', None)
1434 self.object_name = kwargs.get('object_name', None)
1435 self.do_revive = kwargs.get('do_revive', True)
1436 if self.osd and self.pool and self.object_name:
1437 if self.osd == "primary":
1438 self.osd = self.manager.get_object_primary(self.pool,
1439 self.object_name)
1440 assert self.osd
1441 if self.object_name:
1442 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1443 self.object_name,
1444 self.osd)
1445 self.remote = next(iter(self.manager.ctx.\
1446 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
1447 path = self.manager.get_filepath().format(id=self.osd)
1448 self.paths = ("--data-path {path} --journal-path {path}/journal".
1449 format(path=path))
1450
1451 def build_cmd(self, options, args, stdin):
1452 lines = []
1453 if self.object_name:
1454 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1455 "{paths} --pgid {pgid} --op list |"
1456 "grep '\"oid\":\"{name}\"')".
1457 format(paths=self.paths,
1458 pgid=self.pgid,
1459 name=self.object_name))
1460 args = '"$object" ' + args
1461 options += " --pgid {pgid}".format(pgid=self.pgid)
1462 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1463 format(paths=self.paths,
1464 args=args,
1465 options=options))
1466 if stdin:
1467 cmd = ("echo {payload} | base64 --decode | {cmd}".
1468 format(payload=base64.encode(stdin),
1469 cmd=cmd))
1470 lines.append(cmd)
1471 return "\n".join(lines)
1472
1473 def run(self, options, args):
1474 self.manager.kill_osd(self.osd)
1475 cmd = self.build_cmd(options, args, None)
1476 self.manager.log(cmd)
1477 try:
1478 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1479 check_status=False,
1480 stdout=BytesIO(),
1481 stderr=BytesIO())
1482 proc.wait()
1483 if proc.exitstatus != 0:
1484 self.manager.log("failed with " + str(proc.exitstatus))
1485 error = proc.stdout.getvalue().decode() + " " + \
1486 proc.stderr.getvalue().decode()
1487 raise Exception(error)
1488 finally:
1489 if self.do_revive:
1490 self.manager.revive_osd(self.osd)
1491 self.manager.wait_till_osd_is_up(self.osd, 300)
1492
1493
1494 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1495 # the same name.
1496 class CephManager:
1497 """
1498 Ceph manager object.
1499 Contains several local functions that form a bulk of this module.
1500
1501 :param controller: the remote machine where the Ceph commands should be
1502 executed
1503 :param ctx: the cluster context
1504 :param config: path to Ceph config file
1505 :param logger: for logging messages
1506 :param cluster: name of the Ceph cluster
1507 """
1508
1509 def __init__(self, controller, ctx=None, config=None, logger=None,
1510 cluster='ceph', cephadm=False, rook=False) -> None:
1511 self.lock = threading.RLock()
1512 self.ctx = ctx
1513 self.config = config
1514 self.controller = controller
1515 self.next_pool_id = 0
1516 self.cluster = cluster
1517 self.cephadm = cephadm
1518 self.rook = rook
1519 if (logger):
1520 self.log = lambda x: logger.info(x)
1521 else:
1522 def tmp(x):
1523 """
1524 implement log behavior.
1525 """
1526 print(x)
1527 self.log = tmp
1528 if self.config is None:
1529 self.config = dict()
1530 pools = self.list_pools()
1531 self.pools = {}
1532 for pool in pools:
1533 # we may race with a pool deletion; ignore failures here
1534 try:
1535 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
1536 except CommandFailedError:
1537 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1538
1539 def ceph(self, cmd, **kwargs):
1540 """
1541 Simple Ceph admin command wrapper around run_cluster_cmd.
1542 """
1543
1544 kwargs.pop('args', None)
1545 args = shlex.split(cmd)
1546 stdout = kwargs.pop('stdout', StringIO())
1547 stderr = kwargs.pop('stderr', StringIO())
1548 return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
1549
1550 def run_cluster_cmd(self, **kwargs):
1551 """
1552 Run a Ceph command and return the object representing the process
1553 for the command.
1554
1555 Accepts arguments same as that of teuthology.orchestra.run.run()
1556 """
1557 if self.cephadm:
1558 return shell(self.ctx, self.cluster, self.controller,
1559 args=['ceph'] + list(kwargs['args']),
1560 stdout=StringIO(),
1561 check_status=kwargs.get('check_status', True))
1562 if self.rook:
1563 return toolbox(self.ctx, self.cluster,
1564 args=['ceph'] + list(kwargs['args']),
1565 stdout=StringIO(),
1566 check_status=kwargs.get('check_status', True))
1567
1568 testdir = teuthology.get_testdir(self.ctx)
1569 prefix = ['sudo', 'adjust-ulimits', 'ceph-coverage',
1570 f'{testdir}/archive/coverage', 'timeout', '120', 'ceph',
1571 '--cluster', self.cluster]
1572 kwargs['args'] = prefix + list(kwargs['args'])
1573 return self.controller.run(**kwargs)
1574
1575 def raw_cluster_cmd(self, *args, **kwargs) -> str:
1576 """
1577 Start ceph on a raw cluster. Return count
1578 """
1579 stdout = kwargs.pop('stdout', StringIO())
1580 p = self.run_cluster_cmd(args=args, stdout=stdout, **kwargs)
1581 return p.stdout.getvalue()
1582
1583 def raw_cluster_cmd_result(self, *args, **kwargs):
1584 """
1585 Start ceph on a cluster. Return success or failure information.
1586 """
1587 kwargs['args'], kwargs['check_status'] = args, False
1588 return self.run_cluster_cmd(**kwargs).exitstatus
1589
1590 def run_ceph_w(self, watch_channel=None):
1591 """
1592 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1593 and return the RemoteProcess.
1594
1595 :param watch_channel: Specifies the channel to be watched. This can be
1596 'cluster', 'audit', ...
1597 :type watch_channel: str
1598 """
1599 args = ["sudo",
1600 "daemon-helper",
1601 "kill",
1602 "ceph",
1603 '--cluster',
1604 self.cluster,
1605 "-w"]
1606 if watch_channel is not None:
1607 args.append("--watch-channel")
1608 args.append(watch_channel)
1609 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
1610
1611 def get_mon_socks(self):
1612 """
1613 Get monitor sockets.
1614
1615 :return socks: tuple of strings; strings are individual sockets.
1616 """
1617 from json import loads
1618
1619 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1620 socks = []
1621 for mon in output['mons']:
1622 for addrvec_mem in mon['public_addrs']['addrvec']:
1623 socks.append(addrvec_mem['addr'])
1624 return tuple(socks)
1625
1626 def get_msgrv1_mon_socks(self):
1627 """
1628 Get monitor sockets that use msgrv1 to operate.
1629
1630 :return socks: tuple of strings; strings are individual sockets.
1631 """
1632 from json import loads
1633
1634 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1635 socks = []
1636 for mon in output['mons']:
1637 for addrvec_mem in mon['public_addrs']['addrvec']:
1638 if addrvec_mem['type'] == 'v1':
1639 socks.append(addrvec_mem['addr'])
1640 return tuple(socks)
1641
1642 def get_msgrv2_mon_socks(self):
1643 """
1644 Get monitor sockets that use msgrv2 to operate.
1645
1646 :return socks: tuple of strings; strings are individual sockets.
1647 """
1648 from json import loads
1649
1650 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1651 socks = []
1652 for mon in output['mons']:
1653 for addrvec_mem in mon['public_addrs']['addrvec']:
1654 if addrvec_mem['type'] == 'v2':
1655 socks.append(addrvec_mem['addr'])
1656 return tuple(socks)
1657
1658 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1659 """
1660 Flush pg stats from a list of OSD ids, ensuring they are reflected
1661 all the way to the monitor. Luminous and later only.
1662
1663 :param osds: list of OSDs to flush
1664 :param no_wait: list of OSDs not to wait for seq id. by default, we
1665 wait for all specified osds, but some of them could be
1666 moved out of osdmap, so we cannot get their updated
1667 stat seq from monitor anymore. in that case, you need
1668 to pass a blocklist.
1669 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1670 it. (5 min by default)
1671 """
1672 seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1673 for osd in osds}
1674 if not wait_for_mon:
1675 return
1676 if no_wait is None:
1677 no_wait = []
1678 for osd, need in seq.items():
1679 if osd in no_wait:
1680 continue
1681 got = 0
1682 while wait_for_mon > 0:
1683 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
1684 self.log('need seq {need} got {got} for osd.{osd}'.format(
1685 need=need, got=got, osd=osd))
1686 if got >= need:
1687 break
1688 A_WHILE = 1
1689 time.sleep(A_WHILE)
1690 wait_for_mon -= A_WHILE
1691 else:
1692 raise Exception('timed out waiting for mon to be updated with '
1693 'osd.{osd}: {got} < {need}'.
1694 format(osd=osd, got=got, need=need))
1695
1696 def flush_all_pg_stats(self):
1697 self.flush_pg_stats(range(len(self.get_osd_dump())))
1698
1699 def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
1700 """
1701 Execute a remote rados command.
1702 """
1703 if remote is None:
1704 remote = self.controller
1705
1706 testdir = teuthology.get_testdir(self.ctx)
1707 pre = [
1708 'adjust-ulimits',
1709 'ceph-coverage',
1710 '{tdir}/archive/coverage'.format(tdir=testdir),
1711 'rados',
1712 '--cluster',
1713 self.cluster,
1714 ]
1715 if pool is not None:
1716 pre += ['--pool', pool]
1717 if namespace is not None:
1718 pre += ['--namespace', namespace]
1719 pre.extend(cmd)
1720 proc = remote.run(
1721 args=pre,
1722 wait=True,
1723 **kwargs
1724 )
1725 return proc
1726
1727 def rados_write_objects(self, pool, num_objects, size,
1728 timelimit, threads, cleanup=False):
1729 """
1730 Write rados objects
1731 Threads not used yet.
1732 """
1733 args = [
1734 '--num-objects', num_objects,
1735 '-b', size,
1736 'bench', timelimit,
1737 'write'
1738 ]
1739 if not cleanup:
1740 args.append('--no-cleanup')
1741 return self.do_rados(map(str, args), pool=pool)
1742
1743 def do_put(self, pool, obj, fname, namespace=None):
1744 """
1745 Implement rados put operation
1746 """
1747 args = ['put', obj, fname]
1748 return self.do_rados(
1749 args,
1750 check_status=False,
1751 pool=pool,
1752 namespace=namespace
1753 ).exitstatus
1754
1755 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1756 """
1757 Implement rados get operation
1758 """
1759 args = ['get', obj, fname]
1760 return self.do_rados(
1761 args,
1762 check_status=False,
1763 pool=pool,
1764 namespace=namespace,
1765 ).exitstatus
1766
1767 def do_rm(self, pool, obj, namespace=None):
1768 """
1769 Implement rados rm operation
1770 """
1771 args = ['rm', obj]
1772 return self.do_rados(
1773 args,
1774 check_status=False,
1775 pool=pool,
1776 namespace=namespace
1777 ).exitstatus
1778
1779 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1780 if stdout is None:
1781 stdout = StringIO()
1782 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1783
1784 def find_remote(self, service_type, service_id):
1785 """
1786 Get the Remote for the host where a particular service runs.
1787
1788 :param service_type: 'mds', 'osd', 'client'
1789 :param service_id: The second part of a role, e.g. '0' for
1790 the role 'client.0'
1791 :return: a Remote instance for the host where the
1792 requested role is placed
1793 """
1794 return get_remote(self.ctx, self.cluster,
1795 service_type, service_id)
1796
1797 def admin_socket(self, service_type, service_id,
1798 command, check_status=True, timeout=0, stdout=None):
1799 """
1800 Remotely start up ceph specifying the admin socket
1801 :param command: a list of words to use as the command
1802 to the admin socket
1803 """
1804 if stdout is None:
1805 stdout = StringIO()
1806
1807 remote = self.find_remote(service_type, service_id)
1808
1809 if self.cephadm:
1810 return shell(
1811 self.ctx, self.cluster, remote,
1812 args=[
1813 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1814 ] + command,
1815 stdout=stdout,
1816 wait=True,
1817 check_status=check_status,
1818 )
1819 if self.rook:
1820 assert False, 'not implemented'
1821
1822 testdir = teuthology.get_testdir(self.ctx)
1823 args = [
1824 'sudo',
1825 'adjust-ulimits',
1826 'ceph-coverage',
1827 '{tdir}/archive/coverage'.format(tdir=testdir),
1828 'timeout',
1829 str(timeout),
1830 'ceph',
1831 '--cluster',
1832 self.cluster,
1833 '--admin-daemon',
1834 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1835 cluster=self.cluster,
1836 type=service_type,
1837 id=service_id),
1838 ]
1839 args.extend(command)
1840 return remote.run(
1841 args=args,
1842 stdout=stdout,
1843 wait=True,
1844 check_status=check_status
1845 )
1846
1847 def objectstore_tool(self, pool, options, args, **kwargs):
1848 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1849
1850 def get_pgid(self, pool, pgnum):
1851 """
1852 :param pool: pool name
1853 :param pgnum: pg number
1854 :returns: a string representing this pg.
1855 """
1856 poolnum = self.get_pool_num(pool)
1857 pg_str = "{poolnum}.{pgnum}".format(
1858 poolnum=poolnum,
1859 pgnum=pgnum)
1860 return pg_str
1861
1862 def get_pg_replica(self, pool, pgnum):
1863 """
1864 get replica for pool, pgnum (e.g. (data, 0)->0
1865 """
1866 pg_str = self.get_pgid(pool, pgnum)
1867 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1868 j = json.loads('\n'.join(output.split('\n')[1:]))
1869 return int(j['acting'][-1])
1870 assert False
1871
1872 def wait_for_pg_stats(func):
1873 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1874 # by default, and take the faulty injection in ms into consideration,
1875 # 12 seconds are more than enough
1876 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1877 @wraps(func)
1878 def wrapper(self, *args, **kwargs):
1879 exc = None
1880 for delay in delays:
1881 try:
1882 return func(self, *args, **kwargs)
1883 except AssertionError as e:
1884 time.sleep(delay)
1885 exc = e
1886 raise exc
1887 return wrapper
1888
1889 def get_pg_primary(self, pool, pgnum):
1890 """
1891 get primary for pool, pgnum (e.g. (data, 0)->0
1892 """
1893 pg_str = self.get_pgid(pool, pgnum)
1894 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1895 j = json.loads('\n'.join(output.split('\n')[1:]))
1896 return int(j['acting'][0])
1897 assert False
1898
1899 def get_pool_num(self, pool):
1900 """
1901 get number for pool (e.g., data -> 2)
1902 """
1903 return int(self.get_pool_dump(pool)['pool'])
1904
1905 def list_pools(self):
1906 """
1907 list all pool names
1908 """
1909 osd_dump = self.get_osd_dump_json()
1910 self.log(osd_dump['pools'])
1911 return [str(i['pool_name']) for i in osd_dump['pools']]
1912
1913 def clear_pools(self):
1914 """
1915 remove all pools
1916 """
1917 [self.remove_pool(i) for i in self.list_pools()]
1918
1919 def kick_recovery_wq(self, osdnum):
1920 """
1921 Run kick_recovery_wq on cluster.
1922 """
1923 return self.raw_cluster_cmd(
1924 'tell', "osd.%d" % (int(osdnum),),
1925 'debug',
1926 'kick_recovery_wq',
1927 '0')
1928
1929 def wait_run_admin_socket(self, service_type,
1930 service_id, args=['version'], timeout=75, stdout=None):
1931 """
1932 If osd_admin_socket call succeeds, return. Otherwise wait
1933 five seconds and try again.
1934 """
1935 if stdout is None:
1936 stdout = StringIO()
1937 tries = 0
1938 while True:
1939 proc = self.admin_socket(service_type, service_id,
1940 args, check_status=False, stdout=stdout)
1941 if proc.exitstatus == 0:
1942 return proc
1943 else:
1944 tries += 1
1945 if (tries * 5) > timeout:
1946 raise Exception('timed out waiting for admin_socket '
1947 'to appear after {type}.{id} restart'.
1948 format(type=service_type,
1949 id=service_id))
1950 self.log("waiting on admin_socket for {type}-{id}, "
1951 "{command}".format(type=service_type,
1952 id=service_id,
1953 command=args))
1954 time.sleep(5)
1955
1956 def get_pool_dump(self, pool):
1957 """
1958 get the osd dump part of a pool
1959 """
1960 osd_dump = self.get_osd_dump_json()
1961 for i in osd_dump['pools']:
1962 if i['pool_name'] == pool:
1963 return i
1964 assert False
1965
1966 def get_config(self, service_type, service_id, name):
1967 """
1968 :param node: like 'mon.a'
1969 :param name: the option name
1970 """
1971 proc = self.wait_run_admin_socket(service_type, service_id,
1972 ['config', 'show'])
1973 j = json.loads(proc.stdout.getvalue())
1974 return j[name]
1975
1976 def inject_args(self, service_type, service_id, name, value):
1977 whom = '{0}.{1}'.format(service_type, service_id)
1978 if isinstance(value, bool):
1979 value = 'true' if value else 'false'
1980 opt_arg = '--{name}={value}'.format(name=name, value=value)
1981 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1982
1983 def set_config(self, osdnum, **argdict):
1984 """
1985 :param osdnum: osd number
1986 :param argdict: dictionary containing values to set.
1987 """
1988 for k, v in argdict.items():
1989 self.wait_run_admin_socket(
1990 'osd', osdnum,
1991 ['config', 'set', str(k), str(v)])
1992
1993 def raw_cluster_status(self):
1994 """
1995 Get status from cluster
1996 """
1997 status = self.raw_cluster_cmd('status', '--format=json')
1998 return json.loads(status)
1999
2000 def raw_osd_status(self):
2001 """
2002 Get osd status from cluster
2003 """
2004 return self.raw_cluster_cmd('osd', 'dump')
2005
2006 def get_osd_status(self):
2007 """
2008 Get osd statuses sorted by states that the osds are in.
2009 """
2010 osd_lines = list(filter(
2011 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
2012 self.raw_osd_status().split('\n')))
2013 self.log(osd_lines)
2014 in_osds = [int(i[4:].split()[0])
2015 for i in filter(lambda x: " in " in x, osd_lines)]
2016 out_osds = [int(i[4:].split()[0])
2017 for i in filter(lambda x: " out " in x, osd_lines)]
2018 up_osds = [int(i[4:].split()[0])
2019 for i in filter(lambda x: " up " in x, osd_lines)]
2020 down_osds = [int(i[4:].split()[0])
2021 for i in filter(lambda x: " down " in x, osd_lines)]
2022 dead_osds = [int(x.id_)
2023 for x in filter(lambda x:
2024 not x.running(),
2025 self.ctx.daemons.
2026 iter_daemons_of_role('osd', self.cluster))]
2027 live_osds = [int(x.id_) for x in
2028 filter(lambda x:
2029 x.running(),
2030 self.ctx.daemons.iter_daemons_of_role('osd',
2031 self.cluster))]
2032 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
2033 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
2034 'raw': osd_lines}
2035
2036 def get_num_pgs(self):
2037 """
2038 Check cluster status for the number of pgs
2039 """
2040 status = self.raw_cluster_status()
2041 self.log(status)
2042 return status['pgmap']['num_pgs']
2043
2044 def create_erasure_code_profile(self, profile_name, profile):
2045 """
2046 Create an erasure code profile name that can be used as a parameter
2047 when creating an erasure coded pool.
2048 """
2049 with self.lock:
2050 args = cmd_erasure_code_profile(profile_name, profile)
2051 self.raw_cluster_cmd(*args)
2052
2053 def create_pool_with_unique_name(self, pg_num=16,
2054 erasure_code_profile_name=None,
2055 min_size=None,
2056 erasure_code_use_overwrites=False):
2057 """
2058 Create a pool named unique_pool_X where X is unique.
2059 """
2060 name = ""
2061 with self.lock:
2062 name = "unique_pool_%s" % (str(self.next_pool_id),)
2063 self.next_pool_id += 1
2064 self.create_pool(
2065 name,
2066 pg_num,
2067 erasure_code_profile_name=erasure_code_profile_name,
2068 min_size=min_size,
2069 erasure_code_use_overwrites=erasure_code_use_overwrites)
2070 return name
2071
2072 @contextlib.contextmanager
2073 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
2074 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
2075 yield
2076 self.remove_pool(pool_name)
2077
2078 def create_pool(self, pool_name, pg_num=16,
2079 erasure_code_profile_name=None,
2080 min_size=None,
2081 erasure_code_use_overwrites=False):
2082 """
2083 Create a pool named from the pool_name parameter.
2084 :param pool_name: name of the pool being created.
2085 :param pg_num: initial number of pgs.
2086 :param erasure_code_profile_name: if set and !None create an
2087 erasure coded pool using the profile
2088 :param erasure_code_use_overwrites: if true, allow overwrites
2089 """
2090 with self.lock:
2091 assert isinstance(pool_name, str)
2092 assert isinstance(pg_num, int)
2093 assert pool_name not in self.pools
2094 self.log("creating pool_name %s" % (pool_name,))
2095 if erasure_code_profile_name:
2096 self.raw_cluster_cmd('osd', 'pool', 'create',
2097 pool_name, str(pg_num), str(pg_num),
2098 'erasure', erasure_code_profile_name)
2099 else:
2100 self.raw_cluster_cmd('osd', 'pool', 'create',
2101 pool_name, str(pg_num))
2102 if min_size is not None:
2103 self.raw_cluster_cmd(
2104 'osd', 'pool', 'set', pool_name,
2105 'min_size',
2106 str(min_size))
2107 if erasure_code_use_overwrites:
2108 self.raw_cluster_cmd(
2109 'osd', 'pool', 'set', pool_name,
2110 'allow_ec_overwrites',
2111 'true')
2112 self.raw_cluster_cmd(
2113 'osd', 'pool', 'application', 'enable',
2114 pool_name, 'rados', '--yes-i-really-mean-it',
2115 run.Raw('||'), 'true')
2116 self.pools[pool_name] = pg_num
2117 time.sleep(1)
2118
2119 def add_pool_snap(self, pool_name, snap_name):
2120 """
2121 Add pool snapshot
2122 :param pool_name: name of pool to snapshot
2123 :param snap_name: name of snapshot to take
2124 """
2125 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
2126 str(pool_name), str(snap_name))
2127
2128 def remove_pool_snap(self, pool_name, snap_name):
2129 """
2130 Remove pool snapshot
2131 :param pool_name: name of pool to snapshot
2132 :param snap_name: name of snapshot to remove
2133 """
2134 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2135 str(pool_name), str(snap_name))
2136
2137 def remove_pool(self, pool_name):
2138 """
2139 Remove the indicated pool
2140 :param pool_name: Pool to be removed
2141 """
2142 with self.lock:
2143 assert isinstance(pool_name, str)
2144 assert pool_name in self.pools
2145 self.log("removing pool_name %s" % (pool_name,))
2146 del self.pools[pool_name]
2147 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
2148 "--yes-i-really-really-mean-it")
2149
2150 def get_pool(self):
2151 """
2152 Pick a random pool
2153 """
2154 with self.lock:
2155 if self.pools:
2156 return random.sample(self.pools.keys(), 1)[0]
2157
2158 def get_pool_pg_num(self, pool_name):
2159 """
2160 Return the number of pgs in the pool specified.
2161 """
2162 with self.lock:
2163 assert isinstance(pool_name, str)
2164 if pool_name in self.pools:
2165 return self.pools[pool_name]
2166 return 0
2167
2168 def get_pool_property(self, pool_name, prop):
2169 """
2170 :param pool_name: pool
2171 :param prop: property to be checked.
2172 :returns: property as string
2173 """
2174 with self.lock:
2175 assert isinstance(pool_name, str)
2176 assert isinstance(prop, str)
2177 output = self.raw_cluster_cmd(
2178 'osd',
2179 'pool',
2180 'get',
2181 pool_name,
2182 prop)
2183 return output.split()[1]
2184
2185 def get_pool_int_property(self, pool_name, prop):
2186 return int(self.get_pool_property(pool_name, prop))
2187
2188 def set_pool_property(self, pool_name, prop, val):
2189 """
2190 :param pool_name: pool
2191 :param prop: property to be set.
2192 :param val: value to set.
2193
2194 This routine retries if set operation fails.
2195 """
2196 with self.lock:
2197 assert isinstance(pool_name, str)
2198 assert isinstance(prop, str)
2199 assert isinstance(val, int)
2200 tries = 0
2201 while True:
2202 r = self.raw_cluster_cmd_result(
2203 'osd',
2204 'pool',
2205 'set',
2206 pool_name,
2207 prop,
2208 str(val))
2209 if r != 11: # EAGAIN
2210 break
2211 tries += 1
2212 if tries > 50:
2213 raise Exception('timed out getting EAGAIN '
2214 'when setting pool property %s %s = %s' %
2215 (pool_name, prop, val))
2216 self.log('got EAGAIN setting pool property, '
2217 'waiting a few seconds...')
2218 time.sleep(2)
2219
2220 def expand_pool(self, pool_name, by, max_pgs):
2221 """
2222 Increase the number of pgs in a pool
2223 """
2224 with self.lock:
2225 assert isinstance(pool_name, str)
2226 assert isinstance(by, int)
2227 assert pool_name in self.pools
2228 if self.get_num_creating() > 0:
2229 return False
2230 if (self.pools[pool_name] + by) > max_pgs:
2231 return False
2232 self.log("increase pool size by %d" % (by,))
2233 new_pg_num = self.pools[pool_name] + by
2234 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2235 self.pools[pool_name] = new_pg_num
2236 return True
2237
2238 def contract_pool(self, pool_name, by, min_pgs):
2239 """
2240 Decrease the number of pgs in a pool
2241 """
2242 with self.lock:
2243 self.log('contract_pool %s by %s min %s' % (
2244 pool_name, str(by), str(min_pgs)))
2245 assert isinstance(pool_name, str)
2246 assert isinstance(by, int)
2247 assert pool_name in self.pools
2248 if self.get_num_creating() > 0:
2249 self.log('too many creating')
2250 return False
2251 proj = self.pools[pool_name] - by
2252 if proj < min_pgs:
2253 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2254 return False
2255 self.log("decrease pool size by %d" % (by,))
2256 new_pg_num = self.pools[pool_name] - by
2257 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2258 self.pools[pool_name] = new_pg_num
2259 return True
2260
2261 def stop_pg_num_changes(self):
2262 """
2263 Reset all pg_num_targets back to pg_num, canceling splits and merges
2264 """
2265 self.log('Canceling any pending splits or merges...')
2266 osd_dump = self.get_osd_dump_json()
2267 try:
2268 for pool in osd_dump['pools']:
2269 if pool['pg_num'] != pool['pg_num_target']:
2270 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2271 (pool['pool_name'], pool['pool'],
2272 pool['pg_num_target'],
2273 pool['pg_num']))
2274 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2275 'pg_num', str(pool['pg_num']))
2276 except KeyError:
2277 # we don't support pg_num_target before nautilus
2278 pass
2279
2280 def set_pool_pgpnum(self, pool_name, force):
2281 """
2282 Set pgpnum property of pool_name pool.
2283 """
2284 with self.lock:
2285 assert isinstance(pool_name, str)
2286 assert pool_name in self.pools
2287 if not force and self.get_num_creating() > 0:
2288 return False
2289 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2290 return True
2291
2292 def list_pg_unfound(self, pgid):
2293 """
2294 return list of unfound pgs with the id specified
2295 """
2296 r = None
2297 offset = {}
2298 while True:
2299 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
2300 json.dumps(offset))
2301 j = json.loads(out)
2302 if r is None:
2303 r = j
2304 else:
2305 r['objects'].extend(j['objects'])
2306 if not 'more' in j:
2307 break
2308 if j['more'] == 0:
2309 break
2310 offset = j['objects'][-1]['oid']
2311 if 'more' in r:
2312 del r['more']
2313 return r
2314
2315 def get_pg_stats(self):
2316 """
2317 Dump the cluster and get pg stats
2318 """
2319 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2320 j = json.loads('\n'.join(out.split('\n')[1:]))
2321 try:
2322 return j['pg_map']['pg_stats']
2323 except KeyError:
2324 return j['pg_stats']
2325
2326 def get_pgids_to_force(self, backfill):
2327 """
2328 Return the randomized list of PGs that can have their recovery/backfill forced
2329 """
2330 j = self.get_pg_stats();
2331 pgids = []
2332 if backfill:
2333 wanted = ['degraded', 'backfilling', 'backfill_wait']
2334 else:
2335 wanted = ['recovering', 'degraded', 'recovery_wait']
2336 for pg in j:
2337 status = pg['state'].split('+')
2338 for t in wanted:
2339 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2340 pgids.append(pg['pgid'])
2341 break
2342 return pgids
2343
2344 def get_pgids_to_cancel_force(self, backfill):
2345 """
2346 Return the randomized list of PGs whose recovery/backfill priority is forced
2347 """
2348 j = self.get_pg_stats();
2349 pgids = []
2350 if backfill:
2351 wanted = 'forced_backfill'
2352 else:
2353 wanted = 'forced_recovery'
2354 for pg in j:
2355 status = pg['state'].split('+')
2356 if wanted in status and random.random() > 0.5:
2357 pgids.append(pg['pgid'])
2358 return pgids
2359
2360 def compile_pg_status(self):
2361 """
2362 Return a histogram of pg state values
2363 """
2364 ret = {}
2365 j = self.get_pg_stats()
2366 for pg in j:
2367 for status in pg['state'].split('+'):
2368 if status not in ret:
2369 ret[status] = 0
2370 ret[status] += 1
2371 return ret
2372
2373 @wait_for_pg_stats # type: ignore
2374 def with_pg_state(self, pool, pgnum, check):
2375 pgstr = self.get_pgid(pool, pgnum)
2376 stats = self.get_single_pg_stats(pgstr)
2377 assert(check(stats['state']))
2378
2379 @wait_for_pg_stats # type: ignore
2380 def with_pg(self, pool, pgnum, check):
2381 pgstr = self.get_pgid(pool, pgnum)
2382 stats = self.get_single_pg_stats(pgstr)
2383 return check(stats)
2384
2385 def get_last_scrub_stamp(self, pool, pgnum):
2386 """
2387 Get the timestamp of the last scrub.
2388 """
2389 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2390 return stats["last_scrub_stamp"]
2391
2392 def do_pg_scrub(self, pool, pgnum, stype):
2393 """
2394 Scrub pg and wait for scrubbing to finish
2395 """
2396 init = self.get_last_scrub_stamp(pool, pgnum)
2397 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2398 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2399 SLEEP_TIME = 10
2400 timer = 0
2401 while init == self.get_last_scrub_stamp(pool, pgnum):
2402 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2403 self.log("waiting for scrub type %s" % (stype,))
2404 if (timer % RESEND_TIMEOUT) == 0:
2405 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2406 # The first time in this loop is the actual request
2407 if timer != 0 and stype == "repair":
2408 self.log("WARNING: Resubmitted a non-idempotent repair")
2409 time.sleep(SLEEP_TIME)
2410 timer += SLEEP_TIME
2411
2412 def wait_snap_trimming_complete(self, pool):
2413 """
2414 Wait for snap trimming on pool to end
2415 """
2416 POLL_PERIOD = 10
2417 FATAL_TIMEOUT = 600
2418 start = time.time()
2419 poolnum = self.get_pool_num(pool)
2420 poolnumstr = "%s." % (poolnum,)
2421 while (True):
2422 now = time.time()
2423 if (now - start) > FATAL_TIMEOUT:
2424 assert (now - start) < FATAL_TIMEOUT, \
2425 'failed to complete snap trimming before timeout'
2426 all_stats = self.get_pg_stats()
2427 trimming = False
2428 for pg in all_stats:
2429 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2430 self.log("pg {pg} in trimming, state: {state}".format(
2431 pg=pg['pgid'],
2432 state=pg['state']))
2433 trimming = True
2434 if not trimming:
2435 break
2436 self.log("{pool} still trimming, waiting".format(pool=pool))
2437 time.sleep(POLL_PERIOD)
2438
2439 def get_single_pg_stats(self, pgid):
2440 """
2441 Return pg for the pgid specified.
2442 """
2443 all_stats = self.get_pg_stats()
2444
2445 for pg in all_stats:
2446 if pg['pgid'] == pgid:
2447 return pg
2448
2449 return None
2450
2451 def get_object_pg_with_shard(self, pool, name, osdid):
2452 """
2453 """
2454 pool_dump = self.get_pool_dump(pool)
2455 object_map = self.get_object_map(pool, name)
2456 if pool_dump["type"] == PoolType.ERASURE_CODED:
2457 shard = object_map['acting'].index(osdid)
2458 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2459 shard=shard)
2460 else:
2461 return object_map['pgid']
2462
2463 def get_object_primary(self, pool, name):
2464 """
2465 """
2466 object_map = self.get_object_map(pool, name)
2467 return object_map['acting_primary']
2468
2469 def get_object_map(self, pool, name):
2470 """
2471 osd map --format=json converted to a python object
2472 :returns: the python object
2473 """
2474 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2475 return json.loads('\n'.join(out.split('\n')[1:]))
2476
2477 def get_osd_dump_json(self):
2478 """
2479 osd dump --format=json converted to a python object
2480 :returns: the python object
2481 """
2482 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2483 return json.loads('\n'.join(out.split('\n')[1:]))
2484
2485 def get_osd_dump(self):
2486 """
2487 Dump osds
2488 :returns: all osds
2489 """
2490 return self.get_osd_dump_json()['osds']
2491
2492 def get_osd_metadata(self):
2493 """
2494 osd metadata --format=json converted to a python object
2495 :returns: the python object containing osd metadata information
2496 """
2497 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2498 return json.loads('\n'.join(out.split('\n')[1:]))
2499
2500 def get_mgr_dump(self):
2501 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2502 return json.loads(out)
2503
2504 def get_stuck_pgs(self, type_, threshold):
2505 """
2506 :returns: stuck pg information from the cluster
2507 """
2508 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2509 '--format=json')
2510 return json.loads(out).get('stuck_pg_stats',[])
2511
2512 def get_num_unfound_objects(self):
2513 """
2514 Check cluster status to get the number of unfound objects
2515 """
2516 status = self.raw_cluster_status()
2517 self.log(status)
2518 return status['pgmap'].get('unfound_objects', 0)
2519
2520 def get_num_creating(self):
2521 """
2522 Find the number of pgs in creating mode.
2523 """
2524 pgs = self.get_pg_stats()
2525 num = 0
2526 for pg in pgs:
2527 if 'creating' in pg['state']:
2528 num += 1
2529 return num
2530
2531 def get_num_active_clean(self):
2532 """
2533 Find the number of active and clean pgs.
2534 """
2535 pgs = self.get_pg_stats()
2536 return self._get_num_active_clean(pgs)
2537
2538 def _get_num_active_clean(self, pgs):
2539 num = 0
2540 for pg in pgs:
2541 if (pg['state'].count('active') and
2542 pg['state'].count('clean') and
2543 not pg['state'].count('stale')):
2544 num += 1
2545 return num
2546
2547 def get_num_active_recovered(self):
2548 """
2549 Find the number of active and recovered pgs.
2550 """
2551 pgs = self.get_pg_stats()
2552 return self._get_num_active_recovered(pgs)
2553
2554 def _get_num_active_recovered(self, pgs):
2555 num = 0
2556 for pg in pgs:
2557 if (pg['state'].count('active') and
2558 not pg['state'].count('recover') and
2559 not pg['state'].count('backfilling') and
2560 not pg['state'].count('stale')):
2561 num += 1
2562 return num
2563
2564 def get_is_making_recovery_progress(self):
2565 """
2566 Return whether there is recovery progress discernable in the
2567 raw cluster status
2568 """
2569 status = self.raw_cluster_status()
2570 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2571 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2572 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2573 return kps > 0 or bps > 0 or ops > 0
2574
2575 def get_num_active(self):
2576 """
2577 Find the number of active pgs.
2578 """
2579 pgs = self.get_pg_stats()
2580 return self._get_num_active(pgs)
2581
2582 def _get_num_active(self, pgs):
2583 num = 0
2584 for pg in pgs:
2585 if pg['state'].count('active') and not pg['state'].count('stale'):
2586 num += 1
2587 return num
2588
2589 def get_num_down(self):
2590 """
2591 Find the number of pgs that are down.
2592 """
2593 pgs = self.get_pg_stats()
2594 num = 0
2595 for pg in pgs:
2596 if ((pg['state'].count('down') and not
2597 pg['state'].count('stale')) or
2598 (pg['state'].count('incomplete') and not
2599 pg['state'].count('stale'))):
2600 num += 1
2601 return num
2602
2603 def get_num_active_down(self):
2604 """
2605 Find the number of pgs that are either active or down.
2606 """
2607 pgs = self.get_pg_stats()
2608 return self._get_num_active_down(pgs)
2609
2610 def _get_num_active_down(self, pgs):
2611 num = 0
2612 for pg in pgs:
2613 if ((pg['state'].count('active') and not
2614 pg['state'].count('stale')) or
2615 (pg['state'].count('down') and not
2616 pg['state'].count('stale')) or
2617 (pg['state'].count('incomplete') and not
2618 pg['state'].count('stale'))):
2619 num += 1
2620 return num
2621
2622 def get_num_peered(self):
2623 """
2624 Find the number of PGs that are peered
2625 """
2626 pgs = self.get_pg_stats()
2627 return self._get_num_peered(pgs)
2628
2629 def _get_num_peered(self, pgs):
2630 num = 0
2631 for pg in pgs:
2632 if pg['state'].count('peered') and not pg['state'].count('stale'):
2633 num += 1
2634 return num
2635
2636 def is_clean(self):
2637 """
2638 True if all pgs are clean
2639 """
2640 pgs = self.get_pg_stats()
2641 return self._get_num_active_clean(pgs) == len(pgs)
2642
2643 def is_recovered(self):
2644 """
2645 True if all pgs have recovered
2646 """
2647 pgs = self.get_pg_stats()
2648 return self._get_num_active_recovered(pgs) == len(pgs)
2649
2650 def is_active_or_down(self):
2651 """
2652 True if all pgs are active or down
2653 """
2654 pgs = self.get_pg_stats()
2655 return self._get_num_active_down(pgs) == len(pgs)
2656
2657 def dump_pgs_not_active_clean(self):
2658 """
2659 Dumps all pgs that are not active+clean
2660 """
2661 pgs = self.get_pg_stats()
2662 for pg in pgs:
2663 if pg['state'] != 'active+clean':
2664 self.log('PG %s is not active+clean' % pg['pgid'])
2665 self.log(pg)
2666
2667 def dump_pgs_not_active_down(self):
2668 """
2669 Dumps all pgs that are not active or down
2670 """
2671 pgs = self.get_pg_stats()
2672 for pg in pgs:
2673 if 'active' not in pg['state'] and 'down' not in pg['state']:
2674 self.log('PG %s is not active or down' % pg['pgid'])
2675 self.log(pg)
2676
2677 def dump_pgs_not_active(self):
2678 """
2679 Dumps all pgs that are not active
2680 """
2681 pgs = self.get_pg_stats()
2682 for pg in pgs:
2683 if 'active' not in pg['state']:
2684 self.log('PG %s is not active' % pg['pgid'])
2685 self.log(pg)
2686
2687 def wait_for_clean(self, timeout=1200):
2688 """
2689 Returns true when all pgs are clean.
2690 """
2691 self.log("waiting for clean")
2692 start = time.time()
2693 num_active_clean = self.get_num_active_clean()
2694 while not self.is_clean():
2695 if timeout is not None:
2696 if self.get_is_making_recovery_progress():
2697 self.log("making progress, resetting timeout")
2698 start = time.time()
2699 else:
2700 self.log("no progress seen, keeping timeout for now")
2701 if time.time() - start >= timeout:
2702 self.log('dumping pgs not clean')
2703 self.dump_pgs_not_active_clean()
2704 assert time.time() - start < timeout, \
2705 'wait_for_clean: failed before timeout expired'
2706 cur_active_clean = self.get_num_active_clean()
2707 if cur_active_clean != num_active_clean:
2708 start = time.time()
2709 num_active_clean = cur_active_clean
2710 time.sleep(3)
2711 self.log("clean!")
2712
2713 def are_all_osds_up(self):
2714 """
2715 Returns true if all osds are up.
2716 """
2717 x = self.get_osd_dump()
2718 return (len(x) == sum([(y['up'] > 0) for y in x]))
2719
2720 def wait_for_all_osds_up(self, timeout=None):
2721 """
2722 When this exits, either the timeout has expired, or all
2723 osds are up.
2724 """
2725 self.log("waiting for all up")
2726 start = time.time()
2727 while not self.are_all_osds_up():
2728 if timeout is not None:
2729 assert time.time() - start < timeout, \
2730 'timeout expired in wait_for_all_osds_up'
2731 time.sleep(3)
2732 self.log("all up!")
2733
2734 def pool_exists(self, pool):
2735 if pool in self.list_pools():
2736 return True
2737 return False
2738
2739 def wait_for_pool(self, pool, timeout=300):
2740 """
2741 Wait for a pool to exist
2742 """
2743 self.log('waiting for pool %s to exist' % pool)
2744 start = time.time()
2745 while not self.pool_exists(pool):
2746 if timeout is not None:
2747 assert time.time() - start < timeout, \
2748 'timeout expired in wait_for_pool'
2749 time.sleep(3)
2750
2751 def wait_for_pools(self, pools):
2752 for pool in pools:
2753 self.wait_for_pool(pool)
2754
2755 def is_mgr_available(self):
2756 x = self.get_mgr_dump()
2757 return x.get('available', False)
2758
2759 def wait_for_mgr_available(self, timeout=None):
2760 self.log("waiting for mgr available")
2761 start = time.time()
2762 while not self.is_mgr_available():
2763 if timeout is not None:
2764 assert time.time() - start < timeout, \
2765 'timeout expired in wait_for_mgr_available'
2766 time.sleep(3)
2767 self.log("mgr available!")
2768
2769 def wait_for_recovery(self, timeout=None):
2770 """
2771 Check peering. When this exists, we have recovered.
2772 """
2773 self.log("waiting for recovery to complete")
2774 start = time.time()
2775 num_active_recovered = self.get_num_active_recovered()
2776 while not self.is_recovered():
2777 now = time.time()
2778 if timeout is not None:
2779 if self.get_is_making_recovery_progress():
2780 self.log("making progress, resetting timeout")
2781 start = time.time()
2782 else:
2783 self.log("no progress seen, keeping timeout for now")
2784 if now - start >= timeout:
2785 if self.is_recovered():
2786 break
2787 self.log('dumping pgs not recovered yet')
2788 self.dump_pgs_not_active_clean()
2789 assert now - start < timeout, \
2790 'wait_for_recovery: failed before timeout expired'
2791 cur_active_recovered = self.get_num_active_recovered()
2792 if cur_active_recovered != num_active_recovered:
2793 start = time.time()
2794 num_active_recovered = cur_active_recovered
2795 time.sleep(3)
2796 self.log("recovered!")
2797
2798 def wait_for_active(self, timeout=None):
2799 """
2800 Check peering. When this exists, we are definitely active
2801 """
2802 self.log("waiting for peering to complete")
2803 start = time.time()
2804 num_active = self.get_num_active()
2805 while not self.is_active():
2806 if timeout is not None:
2807 if time.time() - start >= timeout:
2808 self.log('dumping pgs not active')
2809 self.dump_pgs_not_active()
2810 assert time.time() - start < timeout, \
2811 'wait_for_active: failed before timeout expired'
2812 cur_active = self.get_num_active()
2813 if cur_active != num_active:
2814 start = time.time()
2815 num_active = cur_active
2816 time.sleep(3)
2817 self.log("active!")
2818
2819 def wait_for_active_or_down(self, timeout=None):
2820 """
2821 Check peering. When this exists, we are definitely either
2822 active or down
2823 """
2824 self.log("waiting for peering to complete or become blocked")
2825 start = time.time()
2826 num_active_down = self.get_num_active_down()
2827 while not self.is_active_or_down():
2828 if timeout is not None:
2829 if time.time() - start >= timeout:
2830 self.log('dumping pgs not active or down')
2831 self.dump_pgs_not_active_down()
2832 assert time.time() - start < timeout, \
2833 'wait_for_active_or_down: failed before timeout expired'
2834 cur_active_down = self.get_num_active_down()
2835 if cur_active_down != num_active_down:
2836 start = time.time()
2837 num_active_down = cur_active_down
2838 time.sleep(3)
2839 self.log("active or down!")
2840
2841 def osd_is_up(self, osd):
2842 """
2843 Wrapper for osd check
2844 """
2845 osds = self.get_osd_dump()
2846 return osds[osd]['up'] > 0
2847
2848 def wait_till_osd_is_up(self, osd, timeout=None):
2849 """
2850 Loop waiting for osd.
2851 """
2852 self.log('waiting for osd.%d to be up' % osd)
2853 start = time.time()
2854 while not self.osd_is_up(osd):
2855 if timeout is not None:
2856 assert time.time() - start < timeout, \
2857 'osd.%d failed to come up before timeout expired' % osd
2858 time.sleep(3)
2859 self.log('osd.%d is up' % osd)
2860
2861 def is_active(self):
2862 """
2863 Wrapper to check if all pgs are active
2864 """
2865 return self.get_num_active() == self.get_num_pgs()
2866
2867 def all_active_or_peered(self):
2868 """
2869 Wrapper to check if all PGs are active or peered
2870 """
2871 pgs = self.get_pg_stats()
2872 return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
2873
2874 def wait_till_active(self, timeout=None):
2875 """
2876 Wait until all pgs are active.
2877 """
2878 self.log("waiting till active")
2879 start = time.time()
2880 while not self.is_active():
2881 if timeout is not None:
2882 if time.time() - start >= timeout:
2883 self.log('dumping pgs not active')
2884 self.dump_pgs_not_active()
2885 assert time.time() - start < timeout, \
2886 'wait_till_active: failed before timeout expired'
2887 time.sleep(3)
2888 self.log("active!")
2889
2890 def wait_till_pg_convergence(self, timeout=None):
2891 start = time.time()
2892 old_stats = None
2893 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2894 if osd['in'] and osd['up']]
2895 while True:
2896 # strictly speaking, no need to wait for mon. but due to the
2897 # "ms inject socket failures" setting, the osdmap could be delayed,
2898 # so mgr is likely to ignore the pg-stat messages with pgs serving
2899 # newly created pools which is not yet known by mgr. so, to make sure
2900 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2901 # necessary.
2902 self.flush_pg_stats(active_osds)
2903 new_stats = dict((stat['pgid'], stat['state'])
2904 for stat in self.get_pg_stats())
2905 if old_stats == new_stats:
2906 return old_stats
2907 if timeout is not None:
2908 assert time.time() - start < timeout, \
2909 'failed to reach convergence before %d secs' % timeout
2910 old_stats = new_stats
2911 # longer than mgr_stats_period
2912 time.sleep(5 + 1)
2913
2914 def mark_out_osd(self, osd):
2915 """
2916 Wrapper to mark osd out.
2917 """
2918 self.raw_cluster_cmd('osd', 'out', str(osd))
2919
2920 def kill_osd(self, osd):
2921 """
2922 Kill osds by either power cycling (if indicated by the config)
2923 or by stopping.
2924 """
2925 if self.config.get('powercycle'):
2926 remote = self.find_remote('osd', osd)
2927 self.log('kill_osd on osd.{o} '
2928 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2929 self._assert_ipmi(remote)
2930 remote.console.power_off()
2931 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2932 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2933 self.inject_args(
2934 'osd', osd,
2935 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
2936 try:
2937 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2938 except:
2939 pass
2940 else:
2941 raise RuntimeError('osd.%s did not fail' % osd)
2942 else:
2943 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2944 else:
2945 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2946
2947 @staticmethod
2948 def _assert_ipmi(remote):
2949 assert remote.console.has_ipmi_credentials, (
2950 "powercycling requested but RemoteConsole is not "
2951 "initialized. Check ipmi config.")
2952
2953 def blackhole_kill_osd(self, osd):
2954 """
2955 Stop osd if nothing else works.
2956 """
2957 self.inject_args('osd', osd,
2958 'objectstore-blackhole', True)
2959 time.sleep(2)
2960 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2961
2962 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2963 """
2964 Revive osds by either power cycling (if indicated by the config)
2965 or by restarting.
2966 """
2967 if self.config.get('powercycle'):
2968 remote = self.find_remote('osd', osd)
2969 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2970 format(o=osd, s=remote.name))
2971 self._assert_ipmi(remote)
2972 remote.console.power_on()
2973 if not remote.console.check_status(300):
2974 raise Exception('Failed to revive osd.{o} via ipmi'.
2975 format(o=osd))
2976 teuthology.reconnect(self.ctx, 60, [remote])
2977 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2978 self.make_admin_daemon_dir(remote)
2979 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2980 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2981
2982 if not skip_admin_check:
2983 # wait for dump_ops_in_flight; this command doesn't appear
2984 # until after the signal handler is installed and it is safe
2985 # to stop the osd again without making valgrind leak checks
2986 # unhappy. see #5924.
2987 self.wait_run_admin_socket('osd', osd,
2988 args=['dump_ops_in_flight'],
2989 timeout=timeout, stdout=DEVNULL)
2990
2991 def mark_down_osd(self, osd):
2992 """
2993 Cluster command wrapper
2994 """
2995 self.raw_cluster_cmd('osd', 'down', str(osd))
2996
2997 def mark_in_osd(self, osd):
2998 """
2999 Cluster command wrapper
3000 """
3001 self.raw_cluster_cmd('osd', 'in', str(osd))
3002
3003 def signal_osd(self, osd, sig, silent=False):
3004 """
3005 Wrapper to local get_daemon call which sends the given
3006 signal to the given osd.
3007 """
3008 self.ctx.daemons.get_daemon('osd', osd,
3009 self.cluster).signal(sig, silent=silent)
3010
3011 ## monitors
3012 def signal_mon(self, mon, sig, silent=False):
3013 """
3014 Wrapper to local get_daemon call
3015 """
3016 self.ctx.daemons.get_daemon('mon', mon,
3017 self.cluster).signal(sig, silent=silent)
3018
3019 def kill_mon(self, mon):
3020 """
3021 Kill the monitor by either power cycling (if the config says so),
3022 or by doing a stop.
3023 """
3024 if self.config.get('powercycle'):
3025 remote = self.find_remote('mon', mon)
3026 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
3027 format(m=mon, s=remote.name))
3028 self._assert_ipmi(remote)
3029 remote.console.power_off()
3030 else:
3031 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
3032
3033 def revive_mon(self, mon):
3034 """
3035 Restart by either power cycling (if the config says so),
3036 or by doing a normal restart.
3037 """
3038 if self.config.get('powercycle'):
3039 remote = self.find_remote('mon', mon)
3040 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
3041 format(m=mon, s=remote.name))
3042 self._assert_ipmi(remote)
3043 remote.console.power_on()
3044 self.make_admin_daemon_dir(remote)
3045 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
3046
3047 def revive_mgr(self, mgr):
3048 """
3049 Restart by either power cycling (if the config says so),
3050 or by doing a normal restart.
3051 """
3052 if self.config.get('powercycle'):
3053 remote = self.find_remote('mgr', mgr)
3054 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3055 format(m=mgr, s=remote.name))
3056 self._assert_ipmi(remote)
3057 remote.console.power_on()
3058 self.make_admin_daemon_dir(remote)
3059 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
3060
3061 def get_mon_status(self, mon):
3062 """
3063 Extract all the monitor status information from the cluster
3064 """
3065 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
3066 return json.loads(out)
3067
3068 def get_mon_quorum(self):
3069 """
3070 Extract monitor quorum information from the cluster
3071 """
3072 out = self.raw_cluster_cmd('quorum_status')
3073 j = json.loads(out)
3074 return j['quorum']
3075
3076 def wait_for_mon_quorum_size(self, size, timeout=300):
3077 """
3078 Loop until quorum size is reached.
3079 """
3080 self.log('waiting for quorum size %d' % size)
3081 start = time.time()
3082 while not len(self.get_mon_quorum()) == size:
3083 if timeout is not None:
3084 assert time.time() - start < timeout, \
3085 ('failed to reach quorum size %d '
3086 'before timeout expired' % size)
3087 time.sleep(3)
3088 self.log("quorum is size %d" % size)
3089
3090 def get_mon_health(self, debug=False):
3091 """
3092 Extract all the monitor health information.
3093 """
3094 out = self.raw_cluster_cmd('health', '--format=json')
3095 if debug:
3096 self.log('health:\n{h}'.format(h=out))
3097 return json.loads(out)
3098
3099 def wait_until_healthy(self, timeout=None):
3100 self.log("wait_until_healthy")
3101 start = time.time()
3102 while self.get_mon_health()['status'] != 'HEALTH_OK':
3103 if timeout is not None:
3104 assert time.time() - start < timeout, \
3105 'timeout expired in wait_until_healthy'
3106 time.sleep(3)
3107 self.log("wait_until_healthy done")
3108
3109 def get_filepath(self):
3110 """
3111 Return path to osd data with {id} needing to be replaced
3112 """
3113 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
3114
3115 def make_admin_daemon_dir(self, remote):
3116 """
3117 Create /var/run/ceph directory on remote site.
3118
3119 :param ctx: Context
3120 :param remote: Remote site
3121 """
3122 remote.run(args=['sudo',
3123 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3124
3125 def get_service_task_status(self, service, status_key):
3126 """
3127 Return daemon task status for a given ceph service.
3128
3129 :param service: ceph service (mds, osd, etc...)
3130 :param status_key: matching task status key
3131 """
3132 task_status = {}
3133 status = self.raw_cluster_status()
3134 try:
3135 for k,v in status['servicemap']['services'][service]['daemons'].items():
3136 ts = dict(v).get('task_status', None)
3137 if ts:
3138 task_status[k] = ts[status_key]
3139 except KeyError: # catches missing service and status key
3140 return {}
3141 self.log(task_status)
3142 return task_status
3143
3144 def utility_task(name):
3145 """
3146 Generate ceph_manager subtask corresponding to ceph_manager
3147 method name
3148 """
3149 def task(ctx, config):
3150 if config is None:
3151 config = {}
3152 args = config.get('args', [])
3153 kwargs = config.get('kwargs', {})
3154 cluster = config.get('cluster', 'ceph')
3155 fn = getattr(ctx.managers[cluster], name)
3156 fn(*args, **kwargs)
3157 return task
3158
3159 revive_osd = utility_task("revive_osd")
3160 revive_mon = utility_task("revive_mon")
3161 kill_osd = utility_task("kill_osd")
3162 kill_mon = utility_task("kill_mon")
3163 create_pool = utility_task("create_pool")
3164 remove_pool = utility_task("remove_pool")
3165 wait_for_clean = utility_task("wait_for_clean")
3166 flush_all_pg_stats = utility_task("flush_all_pg_stats")
3167 set_pool_property = utility_task("set_pool_property")
3168 do_pg_scrub = utility_task("do_pg_scrub")
3169 wait_for_pool = utility_task("wait_for_pool")
3170 wait_for_pools = utility_task("wait_for_pools")