]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/ceph_manager.py
e67ca06ddf3513df5d8879af9ab647d0ff9f727b
[ceph.git] / ceph / qa / tasks / ceph_manager.py
1 """
2 ceph manager -- Thrasher and CephManager objects
3 """
4 from functools import wraps
5 import contextlib
6 import random
7 import signal
8 import time
9 import gevent
10 import base64
11 import json
12 import logging
13 import threading
14 import traceback
15 import os
16 import shlex
17
18 from io import BytesIO, StringIO
19 from subprocess import DEVNULL
20 from teuthology import misc as teuthology
21 from tasks.scrub import Scrubber
22 from tasks.util.rados import cmd_erasure_code_profile
23 from tasks.util import get_remote
24 from teuthology.contextutil import safe_while
25 from teuthology.orchestra.remote import Remote
26 from teuthology.orchestra import run
27 from teuthology.exceptions import CommandFailedError
28 from tasks.thrasher import Thrasher
29
30
31 DEFAULT_CONF_PATH = '/etc/ceph/ceph.conf'
32
33 log = logging.getLogger(__name__)
34
35 # this is for cephadm clusters
36 def shell(ctx, cluster_name, remote, args, name=None, **kwargs):
37 extra_args = []
38 if name:
39 extra_args = ['-n', name]
40 return remote.run(
41 args=[
42 'sudo',
43 ctx.cephadm,
44 '--image', ctx.ceph[cluster_name].image,
45 'shell',
46 ] + extra_args + [
47 '--fsid', ctx.ceph[cluster_name].fsid,
48 '--',
49 ] + args,
50 **kwargs
51 )
52
53 def write_conf(ctx, conf_path=DEFAULT_CONF_PATH, cluster='ceph'):
54 conf_fp = BytesIO()
55 ctx.ceph[cluster].conf.write(conf_fp)
56 conf_fp.seek(0)
57 writes = ctx.cluster.run(
58 args=[
59 'sudo', 'mkdir', '-p', '/etc/ceph', run.Raw('&&'),
60 'sudo', 'chmod', '0755', '/etc/ceph', run.Raw('&&'),
61 'sudo', 'tee', conf_path, run.Raw('&&'),
62 'sudo', 'chmod', '0644', conf_path,
63 run.Raw('>'), '/dev/null',
64
65 ],
66 stdin=run.PIPE,
67 wait=False)
68 teuthology.feed_many_stdins_and_close(conf_fp, writes)
69 run.wait(writes)
70
71 def get_valgrind_args(testdir, name, preamble, v, exit_on_first_error=True, cd=True):
72 """
73 Build a command line for running valgrind.
74
75 testdir - test results directory
76 name - name of daemon (for naming hte log file)
77 preamble - stuff we should run before valgrind
78 v - valgrind arguments
79 """
80 if v is None:
81 return preamble
82 if not isinstance(v, list):
83 v = [v]
84
85 # https://tracker.ceph.com/issues/44362
86 preamble.extend([
87 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
88 ])
89
90 val_path = '/var/log/ceph/valgrind'
91 if '--tool=memcheck' in v or '--tool=helgrind' in v:
92 extra_args = [
93 'valgrind',
94 '--trace-children=no',
95 '--child-silent-after-fork=yes',
96 '--soname-synonyms=somalloc=*tcmalloc*',
97 '--num-callers=50',
98 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
99 '--xml=yes',
100 '--xml-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
101 '--time-stamp=yes',
102 '--vgdb=yes',
103 ]
104 else:
105 extra_args = [
106 'valgrind',
107 '--trace-children=no',
108 '--child-silent-after-fork=yes',
109 '--soname-synonyms=somalloc=*tcmalloc*',
110 '--suppressions={tdir}/valgrind.supp'.format(tdir=testdir),
111 '--log-file={vdir}/{n}.log'.format(vdir=val_path, n=name),
112 '--time-stamp=yes',
113 '--vgdb=yes',
114 ]
115 if exit_on_first_error:
116 extra_args.extend([
117 # at least Valgrind 3.14 is required
118 '--exit-on-first-error=yes',
119 '--error-exitcode=42',
120 ])
121 args = []
122 if cd:
123 args += ['cd', testdir, run.Raw('&&')]
124 args += preamble + extra_args + v
125 log.debug('running %s under valgrind with args %s', name, args)
126 return args
127
128
129 def mount_osd_data(ctx, remote, cluster, osd):
130 """
131 Mount a remote OSD
132
133 :param ctx: Context
134 :param remote: Remote site
135 :param cluster: name of ceph cluster
136 :param osd: Osd name
137 """
138 log.debug('Mounting data for osd.{o} on {r}'.format(o=osd, r=remote))
139 role = "{0}.osd.{1}".format(cluster, osd)
140 alt_role = role if cluster != 'ceph' else "osd.{0}".format(osd)
141 if remote in ctx.disk_config.remote_to_roles_to_dev:
142 if alt_role in ctx.disk_config.remote_to_roles_to_dev[remote]:
143 role = alt_role
144 if role not in ctx.disk_config.remote_to_roles_to_dev[remote]:
145 return
146 dev = ctx.disk_config.remote_to_roles_to_dev[remote][role]
147 mount_options = ctx.disk_config.\
148 remote_to_roles_to_dev_mount_options[remote][role]
149 fstype = ctx.disk_config.remote_to_roles_to_dev_fstype[remote][role]
150 mnt = os.path.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster, osd))
151
152 log.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
153 'mountpoint: {p}, type: {t}, options: {v}'.format(
154 o=osd, n=remote.name, p=mnt, t=fstype, v=mount_options,
155 c=cluster))
156
157 remote.run(
158 args=[
159 'sudo',
160 'mount',
161 '-t', fstype,
162 '-o', ','.join(mount_options),
163 dev,
164 mnt,
165 ]
166 )
167
168
169 def log_exc(func):
170 @wraps(func)
171 def wrapper(self):
172 try:
173 return func(self)
174 except:
175 self.log(traceback.format_exc())
176 raise
177 return wrapper
178
179
180 class PoolType:
181 REPLICATED = 1
182 ERASURE_CODED = 3
183
184
185 class OSDThrasher(Thrasher):
186 """
187 Object used to thrash Ceph
188 """
189 def __init__(self, manager, config, name, logger):
190 super(OSDThrasher, self).__init__()
191
192 self.ceph_manager = manager
193 self.cluster = manager.cluster
194 self.ceph_manager.wait_for_clean()
195 osd_status = self.ceph_manager.get_osd_status()
196 self.in_osds = osd_status['in']
197 self.live_osds = osd_status['live']
198 self.out_osds = osd_status['out']
199 self.dead_osds = osd_status['dead']
200 self.stopping = False
201 self.logger = logger
202 self.config = config
203 self.name = name
204 self.revive_timeout = self.config.get("revive_timeout", 360)
205 self.pools_to_fix_pgp_num = set()
206 if self.config.get('powercycle'):
207 self.revive_timeout += 120
208 self.clean_wait = self.config.get('clean_wait', 0)
209 self.minin = self.config.get("min_in", 4)
210 self.chance_move_pg = self.config.get('chance_move_pg', 1.0)
211 self.sighup_delay = self.config.get('sighup_delay')
212 self.optrack_toggle_delay = self.config.get('optrack_toggle_delay')
213 self.dump_ops_enable = self.config.get('dump_ops_enable')
214 self.noscrub_toggle_delay = self.config.get('noscrub_toggle_delay')
215 self.chance_thrash_cluster_full = self.config.get('chance_thrash_cluster_full', .05)
216 self.chance_thrash_pg_upmap = self.config.get('chance_thrash_pg_upmap', 1.0)
217 self.chance_thrash_pg_upmap_items = self.config.get('chance_thrash_pg_upmap', 1.0)
218 self.random_eio = self.config.get('random_eio')
219 self.chance_force_recovery = self.config.get('chance_force_recovery', 0.3)
220
221 num_osds = self.in_osds + self.out_osds
222 self.max_pgs = self.config.get("max_pgs_per_pool_osd", 1200) * len(num_osds)
223 self.min_pgs = self.config.get("min_pgs_per_pool_osd", 1) * len(num_osds)
224 if self.config is None:
225 self.config = dict()
226 # prevent monitor from auto-marking things out while thrasher runs
227 # try both old and new tell syntax, in case we are testing old code
228 self.saved_options = []
229 # assuming that the default settings do not vary from one daemon to
230 # another
231 first_mon = teuthology.get_first_mon(manager.ctx, self.config).split('.')
232 opts = [('mon', 'mon_osd_down_out_interval', 0)]
233 #why do we disable marking an OSD out automatically? :/
234 for service, opt, new_value in opts:
235 old_value = manager.get_config(first_mon[0],
236 first_mon[1],
237 opt)
238 self.saved_options.append((service, opt, old_value))
239 manager.inject_args(service, '*', opt, new_value)
240 # initialize ceph_objectstore_tool property - must be done before
241 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
242 if (self.config.get('powercycle') or
243 not self.cmd_exists_on_osds("ceph-objectstore-tool") or
244 self.config.get('disable_objectstore_tool_tests', False)):
245 self.ceph_objectstore_tool = False
246 if self.config.get('powercycle'):
247 self.log("Unable to test ceph-objectstore-tool, "
248 "powercycle testing")
249 else:
250 self.log("Unable to test ceph-objectstore-tool, "
251 "not available on all OSD nodes")
252 else:
253 self.ceph_objectstore_tool = \
254 self.config.get('ceph_objectstore_tool', True)
255 # spawn do_thrash
256 self.thread = gevent.spawn(self.do_thrash)
257 if self.sighup_delay:
258 self.sighup_thread = gevent.spawn(self.do_sighup)
259 if self.optrack_toggle_delay:
260 self.optrack_toggle_thread = gevent.spawn(self.do_optrack_toggle)
261 if self.dump_ops_enable == "true":
262 self.dump_ops_thread = gevent.spawn(self.do_dump_ops)
263 if self.noscrub_toggle_delay:
264 self.noscrub_toggle_thread = gevent.spawn(self.do_noscrub_toggle)
265
266 def log(self, msg, *args, **kwargs):
267 self.logger.info(msg, *args, **kwargs)
268
269 def cmd_exists_on_osds(self, cmd):
270 if self.ceph_manager.cephadm:
271 return True
272 allremotes = self.ceph_manager.ctx.cluster.only(\
273 teuthology.is_type('osd', self.cluster)).remotes.keys()
274 allremotes = list(set(allremotes))
275 for remote in allremotes:
276 proc = remote.run(args=['type', cmd], wait=True,
277 check_status=False, stdout=BytesIO(),
278 stderr=BytesIO())
279 if proc.exitstatus != 0:
280 return False;
281 return True;
282
283 def run_ceph_objectstore_tool(self, remote, osd, cmd):
284 if self.ceph_manager.cephadm:
285 return shell(
286 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
287 args=['ceph-objectstore-tool'] + cmd,
288 name=osd,
289 wait=True, check_status=False,
290 stdout=StringIO(),
291 stderr=StringIO())
292 else:
293 return remote.run(
294 args=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd,
295 wait=True, check_status=False,
296 stdout=StringIO(),
297 stderr=StringIO())
298
299 def run_ceph_bluestore_tool(self, remote, osd, cmd):
300 if self.ceph_manager.cephadm:
301 return shell(
302 self.ceph_manager.ctx, self.ceph_manager.cluster, remote,
303 args=['ceph-bluestore-tool', '--err-to-stderr'] + cmd,
304 name=osd,
305 wait=True, check_status=False,
306 stdout=StringIO(),
307 stderr=StringIO())
308 else:
309 return remote.run(
310 args=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd,
311 wait=True, check_status=False,
312 stdout=StringIO(),
313 stderr=StringIO())
314
315 def kill_osd(self, osd=None, mark_down=False, mark_out=False):
316 """
317 :param osd: Osd to be killed.
318 :mark_down: Mark down if true.
319 :mark_out: Mark out if true.
320 """
321 if osd is None:
322 osd = random.choice(self.live_osds)
323 self.log("Killing osd %s, live_osds are %s" % (str(osd),
324 str(self.live_osds)))
325 self.live_osds.remove(osd)
326 self.dead_osds.append(osd)
327 self.ceph_manager.kill_osd(osd)
328 if mark_down:
329 self.ceph_manager.mark_down_osd(osd)
330 if mark_out and osd in self.in_osds:
331 self.out_osd(osd)
332 if self.ceph_objectstore_tool:
333 self.log("Testing ceph-objectstore-tool on down osd.%s" % osd)
334 remote = self.ceph_manager.find_remote('osd', osd)
335 FSPATH = self.ceph_manager.get_filepath()
336 JPATH = os.path.join(FSPATH, "journal")
337 exp_osd = imp_osd = osd
338 self.log('remote for osd %s is %s' % (osd, remote))
339 exp_remote = imp_remote = remote
340 # If an older osd is available we'll move a pg from there
341 if (len(self.dead_osds) > 1 and
342 random.random() < self.chance_move_pg):
343 exp_osd = random.choice(self.dead_osds[:-1])
344 exp_remote = self.ceph_manager.find_remote('osd', exp_osd)
345 self.log('remote for exp osd %s is %s' % (exp_osd, exp_remote))
346 prefix = [
347 '--no-mon-config',
348 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
349 ]
350
351 if not self.ceph_manager.cephadm:
352 # ceph-objectstore-tool might be temporarily absent during an
353 # upgrade - see http://tracker.ceph.com/issues/18014
354 with safe_while(sleep=15, tries=40, action="type ceph-objectstore-tool") as proceed:
355 while proceed():
356 proc = exp_remote.run(args=['type', 'ceph-objectstore-tool'],
357 wait=True, check_status=False, stdout=BytesIO(),
358 stderr=BytesIO())
359 if proc.exitstatus == 0:
360 break
361 log.debug("ceph-objectstore-tool binary not present, trying again")
362
363 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
364 # see http://tracker.ceph.com/issues/19556
365 with safe_while(sleep=15, tries=40, action="ceph-objectstore-tool --op list-pgs") as proceed:
366 while proceed():
367 proc = self.run_ceph_objectstore_tool(
368 exp_remote, 'osd.%s' % exp_osd,
369 prefix + [
370 '--data-path', FSPATH.format(id=exp_osd),
371 '--journal-path', JPATH.format(id=exp_osd),
372 '--op', 'list-pgs',
373 ])
374 if proc.exitstatus == 0:
375 break
376 elif (proc.exitstatus == 1 and
377 proc.stderr.getvalue() == "OSD has the store locked"):
378 continue
379 else:
380 raise Exception("ceph-objectstore-tool: "
381 "exp list-pgs failure with status {ret}".
382 format(ret=proc.exitstatus))
383
384 pgs = proc.stdout.getvalue().split('\n')[:-1]
385 if len(pgs) == 0:
386 self.log("No PGs found for osd.{osd}".format(osd=exp_osd))
387 return
388 pg = random.choice(pgs)
389 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
390 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
391 exp_path = os.path.join('/var/log/ceph', # available inside 'shell' container
392 "exp.{pg}.{id}".format(
393 pg=pg,
394 id=exp_osd))
395 if self.ceph_manager.cephadm:
396 exp_host_path = os.path.join(
397 '/var/log/ceph',
398 self.ceph_manager.ctx.ceph[self.ceph_manager.cluster].fsid,
399 "exp.{pg}.{id}".format(
400 pg=pg,
401 id=exp_osd))
402 else:
403 exp_host_path = exp_path
404
405 # export
406 # Can't use new export-remove op since this is part of upgrade testing
407 proc = self.run_ceph_objectstore_tool(
408 exp_remote, 'osd.%s' % exp_osd,
409 prefix + [
410 '--data-path', FSPATH.format(id=exp_osd),
411 '--journal-path', JPATH.format(id=exp_osd),
412 '--op', 'export',
413 '--pgid', pg,
414 '--file', exp_path,
415 ])
416 if proc.exitstatus:
417 raise Exception("ceph-objectstore-tool: "
418 "export failure with status {ret}".
419 format(ret=proc.exitstatus))
420 # remove
421 proc = self.run_ceph_objectstore_tool(
422 exp_remote, 'osd.%s' % exp_osd,
423 prefix + [
424 '--data-path', FSPATH.format(id=exp_osd),
425 '--journal-path', JPATH.format(id=exp_osd),
426 '--force',
427 '--op', 'remove',
428 '--pgid', pg,
429 ])
430 if proc.exitstatus:
431 raise Exception("ceph-objectstore-tool: "
432 "remove failure with status {ret}".
433 format(ret=proc.exitstatus))
434 # If there are at least 2 dead osds we might move the pg
435 if exp_osd != imp_osd:
436 # If pg isn't already on this osd, then we will move it there
437 proc = self.run_ceph_objectstore_tool(
438 imp_remote,
439 'osd.%s' % imp_osd,
440 prefix + [
441 '--data-path', FSPATH.format(id=imp_osd),
442 '--journal-path', JPATH.format(id=imp_osd),
443 '--op', 'list-pgs',
444 ])
445 if proc.exitstatus:
446 raise Exception("ceph-objectstore-tool: "
447 "imp list-pgs failure with status {ret}".
448 format(ret=proc.exitstatus))
449 pgs = proc.stdout.getvalue().split('\n')[:-1]
450 if pg not in pgs:
451 self.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
452 format(pg=pg, fosd=exp_osd, tosd=imp_osd))
453 if imp_remote != exp_remote:
454 # Copy export file to the other machine
455 self.log("Transfer export file from {srem} to {trem}".
456 format(srem=exp_remote, trem=imp_remote))
457 # just in case an upgrade make /var/log/ceph unreadable by non-root,
458 exp_remote.run(args=['sudo', 'chmod', '777',
459 '/var/log/ceph'])
460 imp_remote.run(args=['sudo', 'chmod', '777',
461 '/var/log/ceph'])
462 tmpexport = Remote.get_file(exp_remote, exp_host_path,
463 sudo=True)
464 if exp_host_path != exp_path:
465 # push to /var/log/ceph, then rename (we can't
466 # chmod 777 the /var/log/ceph/$fsid mountpoint)
467 Remote.put_file(imp_remote, tmpexport, exp_path)
468 imp_remote.run(args=[
469 'sudo', 'mv', exp_path, exp_host_path])
470 else:
471 Remote.put_file(imp_remote, tmpexport, exp_host_path)
472 os.remove(tmpexport)
473 else:
474 # Can't move the pg after all
475 imp_osd = exp_osd
476 imp_remote = exp_remote
477 # import
478 proc = self.run_ceph_objectstore_tool(
479 imp_remote, 'osd.%s' % imp_osd,
480 [
481 '--data-path', FSPATH.format(id=imp_osd),
482 '--journal-path', JPATH.format(id=imp_osd),
483 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
484 '--op', 'import',
485 '--file', exp_path,
486 ])
487 if proc.exitstatus == 1:
488 bogosity = "The OSD you are using is older than the exported PG"
489 if bogosity in proc.stderr.getvalue():
490 self.log("OSD older than exported PG"
491 "...ignored")
492 elif proc.exitstatus == 10:
493 self.log("Pool went away before processing an import"
494 "...ignored")
495 elif proc.exitstatus == 11:
496 self.log("Attempt to import an incompatible export"
497 "...ignored")
498 elif proc.exitstatus == 12:
499 # this should be safe to ignore because we only ever move 1
500 # copy of the pg at a time, and merge is only initiated when
501 # all replicas are peered and happy. /me crosses fingers
502 self.log("PG merged on target"
503 "...ignored")
504 elif proc.exitstatus:
505 raise Exception("ceph-objectstore-tool: "
506 "import failure with status {ret}".
507 format(ret=proc.exitstatus))
508 cmd = "sudo rm -f {file}".format(file=exp_host_path)
509 exp_remote.run(args=cmd)
510 if imp_remote != exp_remote:
511 imp_remote.run(args=cmd)
512
513 # apply low split settings to each pool
514 if not self.ceph_manager.cephadm:
515 for pool in self.ceph_manager.list_pools():
516 cmd = ("CEPH_ARGS='--filestore-merge-threshold 1 "
517 "--filestore-split-multiple 1' sudo -E "
518 + 'ceph-objectstore-tool '
519 + ' '.join(prefix + [
520 '--data-path', FSPATH.format(id=imp_osd),
521 '--journal-path', JPATH.format(id=imp_osd),
522 ])
523 + " --op apply-layout-settings --pool " + pool).format(id=osd)
524 proc = imp_remote.run(args=cmd,
525 wait=True, check_status=False,
526 stderr=StringIO())
527 if 'Couldn\'t find pool' in proc.stderr.getvalue():
528 continue
529 if proc.exitstatus:
530 raise Exception("ceph-objectstore-tool apply-layout-settings"
531 " failed with {status}".format(status=proc.exitstatus))
532
533
534 def blackhole_kill_osd(self, osd=None):
535 """
536 If all else fails, kill the osd.
537 :param osd: Osd to be killed.
538 """
539 if osd is None:
540 osd = random.choice(self.live_osds)
541 self.log("Blackholing and then killing osd %s, live_osds are %s" %
542 (str(osd), str(self.live_osds)))
543 self.live_osds.remove(osd)
544 self.dead_osds.append(osd)
545 self.ceph_manager.blackhole_kill_osd(osd)
546
547 def revive_osd(self, osd=None, skip_admin_check=False):
548 """
549 Revive the osd.
550 :param osd: Osd to be revived.
551 """
552 if osd is None:
553 osd = random.choice(self.dead_osds)
554 self.log("Reviving osd %s" % (str(osd),))
555 self.ceph_manager.revive_osd(
556 osd,
557 self.revive_timeout,
558 skip_admin_check=skip_admin_check)
559 self.dead_osds.remove(osd)
560 self.live_osds.append(osd)
561 if self.random_eio > 0 and osd == self.rerrosd:
562 self.ceph_manager.set_config(self.rerrosd,
563 filestore_debug_random_read_err = self.random_eio)
564 self.ceph_manager.set_config(self.rerrosd,
565 bluestore_debug_random_read_err = self.random_eio)
566
567
568 def out_osd(self, osd=None):
569 """
570 Mark the osd out
571 :param osd: Osd to be marked.
572 """
573 if osd is None:
574 osd = random.choice(self.in_osds)
575 self.log("Removing osd %s, in_osds are: %s" %
576 (str(osd), str(self.in_osds)))
577 self.ceph_manager.mark_out_osd(osd)
578 self.in_osds.remove(osd)
579 self.out_osds.append(osd)
580
581 def in_osd(self, osd=None):
582 """
583 Mark the osd out
584 :param osd: Osd to be marked.
585 """
586 if osd is None:
587 osd = random.choice(self.out_osds)
588 if osd in self.dead_osds:
589 return self.revive_osd(osd)
590 self.log("Adding osd %s" % (str(osd),))
591 self.out_osds.remove(osd)
592 self.in_osds.append(osd)
593 self.ceph_manager.mark_in_osd(osd)
594 self.log("Added osd %s" % (str(osd),))
595
596 def reweight_osd_or_by_util(self, osd=None):
597 """
598 Reweight an osd that is in
599 :param osd: Osd to be marked.
600 """
601 if osd is not None or random.choice([True, False]):
602 if osd is None:
603 osd = random.choice(self.in_osds)
604 val = random.uniform(.1, 1.0)
605 self.log("Reweighting osd %s to %s" % (str(osd), str(val)))
606 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
607 str(osd), str(val))
608 else:
609 # do it several times, the option space is large
610 for i in range(5):
611 options = {
612 'max_change': random.choice(['0.05', '1.0', '3.0']),
613 'overage': random.choice(['110', '1000']),
614 'type': random.choice([
615 'reweight-by-utilization',
616 'test-reweight-by-utilization']),
617 }
618 self.log("Reweighting by: %s"%(str(options),))
619 self.ceph_manager.raw_cluster_cmd(
620 'osd',
621 options['type'],
622 options['overage'],
623 options['max_change'])
624
625 def primary_affinity(self, osd=None):
626 if osd is None:
627 osd = random.choice(self.in_osds)
628 if random.random() >= .5:
629 pa = random.random()
630 elif random.random() >= .5:
631 pa = 1
632 else:
633 pa = 0
634 self.log('Setting osd %s primary_affinity to %f' % (str(osd), pa))
635 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
636 str(osd), str(pa))
637
638 def thrash_cluster_full(self):
639 """
640 Set and unset cluster full condition
641 """
642 self.log('Setting full ratio to .001')
643 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
644 time.sleep(1)
645 self.log('Setting full ratio back to .95')
646 self.ceph_manager.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
647
648 def thrash_pg_upmap(self):
649 """
650 Install or remove random pg_upmap entries in OSDMap
651 """
652 from random import shuffle
653 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
654 j = json.loads(out)
655 self.log('j is %s' % j)
656 try:
657 if random.random() >= .3:
658 pgs = self.ceph_manager.get_pg_stats()
659 if not pgs:
660 return
661 pg = random.choice(pgs)
662 pgid = str(pg['pgid'])
663 poolid = int(pgid.split('.')[0])
664 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
665 if len(sizes) == 0:
666 return
667 n = sizes[0]
668 osds = self.in_osds + self.out_osds
669 shuffle(osds)
670 osds = osds[0:n]
671 self.log('Setting %s to %s' % (pgid, osds))
672 cmd = ['osd', 'pg-upmap', pgid] + [str(x) for x in osds]
673 self.log('cmd %s' % cmd)
674 self.ceph_manager.raw_cluster_cmd(*cmd)
675 else:
676 m = j['pg_upmap']
677 if len(m) > 0:
678 shuffle(m)
679 pg = m[0]['pgid']
680 self.log('Clearing pg_upmap on %s' % pg)
681 self.ceph_manager.raw_cluster_cmd(
682 'osd',
683 'rm-pg-upmap',
684 pg)
685 else:
686 self.log('No pg_upmap entries; doing nothing')
687 except CommandFailedError:
688 self.log('Failed to rm-pg-upmap, ignoring')
689
690 def thrash_pg_upmap_items(self):
691 """
692 Install or remove random pg_upmap_items entries in OSDMap
693 """
694 from random import shuffle
695 out = self.ceph_manager.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
696 j = json.loads(out)
697 self.log('j is %s' % j)
698 try:
699 if random.random() >= .3:
700 pgs = self.ceph_manager.get_pg_stats()
701 if not pgs:
702 return
703 pg = random.choice(pgs)
704 pgid = str(pg['pgid'])
705 poolid = int(pgid.split('.')[0])
706 sizes = [x['size'] for x in j['pools'] if x['pool'] == poolid]
707 if len(sizes) == 0:
708 return
709 n = sizes[0]
710 osds = self.in_osds + self.out_osds
711 shuffle(osds)
712 osds = osds[0:n*2]
713 self.log('Setting %s to %s' % (pgid, osds))
714 cmd = ['osd', 'pg-upmap-items', pgid] + [str(x) for x in osds]
715 self.log('cmd %s' % cmd)
716 self.ceph_manager.raw_cluster_cmd(*cmd)
717 else:
718 m = j['pg_upmap_items']
719 if len(m) > 0:
720 shuffle(m)
721 pg = m[0]['pgid']
722 self.log('Clearing pg_upmap on %s' % pg)
723 self.ceph_manager.raw_cluster_cmd(
724 'osd',
725 'rm-pg-upmap-items',
726 pg)
727 else:
728 self.log('No pg_upmap entries; doing nothing')
729 except CommandFailedError:
730 self.log('Failed to rm-pg-upmap-items, ignoring')
731
732 def force_recovery(self):
733 """
734 Force recovery on some of PGs
735 """
736 backfill = random.random() >= 0.5
737 j = self.ceph_manager.get_pgids_to_force(backfill)
738 if j:
739 try:
740 if backfill:
741 self.ceph_manager.raw_cluster_cmd('pg', 'force-backfill', *j)
742 else:
743 self.ceph_manager.raw_cluster_cmd('pg', 'force-recovery', *j)
744 except CommandFailedError:
745 self.log('Failed to force backfill|recovery, ignoring')
746
747
748 def cancel_force_recovery(self):
749 """
750 Force recovery on some of PGs
751 """
752 backfill = random.random() >= 0.5
753 j = self.ceph_manager.get_pgids_to_cancel_force(backfill)
754 if j:
755 try:
756 if backfill:
757 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-backfill', *j)
758 else:
759 self.ceph_manager.raw_cluster_cmd('pg', 'cancel-force-recovery', *j)
760 except CommandFailedError:
761 self.log('Failed to force backfill|recovery, ignoring')
762
763 def force_cancel_recovery(self):
764 """
765 Force or cancel forcing recovery
766 """
767 if random.random() >= 0.4:
768 self.force_recovery()
769 else:
770 self.cancel_force_recovery()
771
772 def all_up(self):
773 """
774 Make sure all osds are up and not out.
775 """
776 while len(self.dead_osds) > 0:
777 self.log("reviving osd")
778 self.revive_osd()
779 while len(self.out_osds) > 0:
780 self.log("inning osd")
781 self.in_osd()
782
783 def all_up_in(self):
784 """
785 Make sure all osds are up and fully in.
786 """
787 self.all_up();
788 for osd in self.live_osds:
789 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
790 str(osd), str(1))
791 self.ceph_manager.raw_cluster_cmd('osd', 'primary-affinity',
792 str(osd), str(1))
793
794 def do_join(self):
795 """
796 Break out of this Ceph loop
797 """
798 self.stopping = True
799 self.thread.get()
800 if self.sighup_delay:
801 self.log("joining the do_sighup greenlet")
802 self.sighup_thread.get()
803 if self.optrack_toggle_delay:
804 self.log("joining the do_optrack_toggle greenlet")
805 self.optrack_toggle_thread.join()
806 if self.dump_ops_enable == "true":
807 self.log("joining the do_dump_ops greenlet")
808 self.dump_ops_thread.join()
809 if self.noscrub_toggle_delay:
810 self.log("joining the do_noscrub_toggle greenlet")
811 self.noscrub_toggle_thread.join()
812
813 def grow_pool(self):
814 """
815 Increase the size of the pool
816 """
817 pool = self.ceph_manager.get_pool()
818 if pool is None:
819 return
820 self.log("Growing pool %s" % (pool,))
821 if self.ceph_manager.expand_pool(pool,
822 self.config.get('pool_grow_by', 10),
823 self.max_pgs):
824 self.pools_to_fix_pgp_num.add(pool)
825
826 def shrink_pool(self):
827 """
828 Decrease the size of the pool
829 """
830 pool = self.ceph_manager.get_pool()
831 if pool is None:
832 return
833 _ = self.ceph_manager.get_pool_pg_num(pool)
834 self.log("Shrinking pool %s" % (pool,))
835 if self.ceph_manager.contract_pool(
836 pool,
837 self.config.get('pool_shrink_by', 10),
838 self.min_pgs):
839 self.pools_to_fix_pgp_num.add(pool)
840
841 def fix_pgp_num(self, pool=None):
842 """
843 Fix number of pgs in pool.
844 """
845 if pool is None:
846 pool = self.ceph_manager.get_pool()
847 if not pool:
848 return
849 force = False
850 else:
851 force = True
852 self.log("fixing pg num pool %s" % (pool,))
853 if self.ceph_manager.set_pool_pgpnum(pool, force):
854 self.pools_to_fix_pgp_num.discard(pool)
855
856 def test_pool_min_size(self):
857 """
858 Loop to selectively push PGs below their min_size and test that recovery
859 still occurs.
860 """
861 self.log("test_pool_min_size")
862 self.all_up()
863 self.ceph_manager.wait_for_recovery(
864 timeout=self.config.get('timeout')
865 )
866
867 minout = int(self.config.get("min_out", 1))
868 minlive = int(self.config.get("min_live", 2))
869 mindead = int(self.config.get("min_dead", 1))
870 self.log("doing min_size thrashing")
871 self.ceph_manager.wait_for_clean(timeout=60)
872 assert self.ceph_manager.is_clean(), \
873 'not clean before minsize thrashing starts'
874 while not self.stopping:
875 # look up k and m from all the pools on each loop, in case it
876 # changes as the cluster runs
877 k = 0
878 m = 99
879 has_pools = False
880 pools_json = self.ceph_manager.get_osd_dump_json()['pools']
881
882 for pool_json in pools_json:
883 pool = pool_json['pool_name']
884 has_pools = True
885 pool_type = pool_json['type'] # 1 for rep, 3 for ec
886 min_size = pool_json['min_size']
887 self.log("pool {pool} min_size is {min_size}".format(pool=pool,min_size=min_size))
888 try:
889 ec_profile = self.ceph_manager.get_pool_property(pool, 'erasure_code_profile')
890 if pool_type != PoolType.ERASURE_CODED:
891 continue
892 ec_profile = pool_json['erasure_code_profile']
893 ec_profile_json = self.ceph_manager.raw_cluster_cmd(
894 'osd',
895 'erasure-code-profile',
896 'get',
897 ec_profile,
898 '--format=json')
899 ec_json = json.loads(ec_profile_json)
900 local_k = int(ec_json['k'])
901 local_m = int(ec_json['m'])
902 self.log("pool {pool} local_k={k} local_m={m}".format(pool=pool,
903 k=local_k, m=local_m))
904 if local_k > k:
905 self.log("setting k={local_k} from previous {k}".format(local_k=local_k, k=k))
906 k = local_k
907 if local_m < m:
908 self.log("setting m={local_m} from previous {m}".format(local_m=local_m, m=m))
909 m = local_m
910 except CommandFailedError:
911 self.log("failed to read erasure_code_profile. %s was likely removed", pool)
912 continue
913
914 if has_pools :
915 self.log("using k={k}, m={m}".format(k=k,m=m))
916 else:
917 self.log("No pools yet, waiting")
918 time.sleep(5)
919 continue
920
921 if minout > len(self.out_osds): # kill OSDs and mark out
922 self.log("forced to out an osd")
923 self.kill_osd(mark_out=True)
924 continue
925 elif mindead > len(self.dead_osds): # kill OSDs but force timeout
926 self.log("forced to kill an osd")
927 self.kill_osd()
928 continue
929 else: # make mostly-random choice to kill or revive OSDs
930 minup = max(minlive, k)
931 rand_val = random.uniform(0, 1)
932 self.log("choosing based on number of live OSDs and rand val {rand}".\
933 format(rand=rand_val))
934 if len(self.live_osds) > minup+1 and rand_val < 0.5:
935 # chose to knock out as many OSDs as we can w/out downing PGs
936
937 most_killable = min(len(self.live_osds) - minup, m)
938 self.log("chose to kill {n} OSDs".format(n=most_killable))
939 for i in range(1, most_killable):
940 self.kill_osd(mark_out=True)
941 time.sleep(10)
942 # try a few times since there might be a concurrent pool
943 # creation or deletion
944 with safe_while(
945 sleep=5, tries=5,
946 action='check for active or peered') as proceed:
947 while proceed():
948 if self.ceph_manager.all_active_or_peered():
949 break
950 self.log('not all PGs are active or peered')
951 else: # chose to revive OSDs, bring up a random fraction of the dead ones
952 self.log("chose to revive osds")
953 for i in range(1, int(rand_val * len(self.dead_osds))):
954 self.revive_osd(i)
955
956 # let PGs repair themselves or our next knockout might kill one
957 self.ceph_manager.wait_for_clean(timeout=self.config.get('timeout'))
958
959 # / while not self.stopping
960 self.all_up_in()
961
962 self.ceph_manager.wait_for_recovery(
963 timeout=self.config.get('timeout')
964 )
965
966 def inject_pause(self, conf_key, duration, check_after, should_be_down):
967 """
968 Pause injection testing. Check for osd being down when finished.
969 """
970 the_one = random.choice(self.live_osds)
971 self.log("inject_pause on {osd}".format(osd=the_one))
972 self.log(
973 "Testing {key} pause injection for duration {duration}".format(
974 key=conf_key,
975 duration=duration
976 ))
977 self.log(
978 "Checking after {after}, should_be_down={shouldbedown}".format(
979 after=check_after,
980 shouldbedown=should_be_down
981 ))
982 self.ceph_manager.set_config(the_one, **{conf_key: duration})
983 if not should_be_down:
984 return
985 time.sleep(check_after)
986 status = self.ceph_manager.get_osd_status()
987 assert the_one in status['down']
988 time.sleep(duration - check_after + 20)
989 status = self.ceph_manager.get_osd_status()
990 assert not the_one in status['down']
991
992 def test_backfill_full(self):
993 """
994 Test backfills stopping when the replica fills up.
995
996 First, use injectfull admin command to simulate a now full
997 osd by setting it to 0 on all of the OSDs.
998
999 Second, on a random subset, set
1000 osd_debug_skip_full_check_in_backfill_reservation to force
1001 the more complicated check in do_scan to be exercised.
1002
1003 Then, verify that all backfillings stop.
1004 """
1005 self.log("injecting backfill full")
1006 for i in self.live_osds:
1007 self.ceph_manager.set_config(
1008 i,
1009 osd_debug_skip_full_check_in_backfill_reservation=
1010 random.choice(['false', 'true']))
1011 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'backfillfull'],
1012 check_status=True, timeout=30, stdout=DEVNULL)
1013 for i in range(30):
1014 status = self.ceph_manager.compile_pg_status()
1015 if 'backfilling' not in status.keys():
1016 break
1017 self.log(
1018 "waiting for {still_going} backfillings".format(
1019 still_going=status.get('backfilling')))
1020 time.sleep(1)
1021 assert('backfilling' not in self.ceph_manager.compile_pg_status().keys())
1022 for i in self.live_osds:
1023 self.ceph_manager.set_config(
1024 i,
1025 osd_debug_skip_full_check_in_backfill_reservation='false')
1026 self.ceph_manager.osd_admin_socket(i, command=['injectfull', 'none'],
1027 check_status=True, timeout=30, stdout=DEVNULL)
1028
1029
1030 def generate_random_sharding(self):
1031 prefixes = [
1032 'm','O','P','L'
1033 ]
1034 new_sharding = ''
1035 for prefix in prefixes:
1036 choose = random.choice([False, True])
1037 if not choose:
1038 continue
1039 if new_sharding != '':
1040 new_sharding = new_sharding + ' '
1041 columns = random.randint(1, 5)
1042 do_hash = random.choice([False, True])
1043 if do_hash:
1044 low_hash = random.choice([0, 5, 8])
1045 do_high_hash = random.choice([False, True])
1046 if do_high_hash:
1047 high_hash = random.choice([8, 16, 30]) + low_hash
1048 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-' + str(high_hash) + ')'
1049 else:
1050 new_sharding = new_sharding + prefix + '(' + str(columns) + ',' + str(low_hash) + '-)'
1051 else:
1052 if columns == 1:
1053 new_sharding = new_sharding + prefix
1054 else:
1055 new_sharding = new_sharding + prefix + '(' + str(columns) + ')'
1056 return new_sharding
1057
1058 def test_bluestore_reshard_action(self):
1059 """
1060 Test if resharding of bluestore works properly.
1061 If bluestore is not used, or bluestore is in version that
1062 does not support sharding, skip.
1063 """
1064
1065 osd = random.choice(self.dead_osds)
1066 remote = self.ceph_manager.find_remote('osd', osd)
1067 FSPATH = self.ceph_manager.get_filepath()
1068
1069 prefix = [
1070 '--no-mon-config',
1071 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1072 '--log-level=10',
1073 '--path', FSPATH.format(id=osd)
1074 ]
1075
1076 # sanity check if bluestore-tool accessible
1077 self.log('checking if target objectstore is bluestore on osd.%s' % osd)
1078 cmd = prefix + [
1079 'show-label'
1080 ]
1081 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1082 if proc.exitstatus != 0:
1083 raise Exception("ceph-bluestore-tool access failed.")
1084
1085 # check if sharding is possible
1086 self.log('checking if target bluestore supports sharding on osd.%s' % osd)
1087 cmd = prefix + [
1088 'show-sharding'
1089 ]
1090 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1091 if proc.exitstatus != 0:
1092 self.log("Unable to test resharding, "
1093 "ceph-bluestore-tool does not support it.")
1094 return
1095
1096 # now go for reshard to something else
1097 self.log('applying new sharding to bluestore on osd.%s' % osd)
1098 new_sharding = self.config.get('bluestore_new_sharding','random')
1099
1100 if new_sharding == 'random':
1101 self.log('generate random sharding')
1102 new_sharding = self.generate_random_sharding()
1103
1104 self.log("applying new sharding: " + new_sharding)
1105 cmd = prefix + [
1106 '--sharding', new_sharding,
1107 'reshard'
1108 ]
1109 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1110 if proc.exitstatus != 0:
1111 raise Exception("ceph-bluestore-tool resharding failed.")
1112
1113 # now do fsck to
1114 self.log('running fsck to verify new sharding on osd.%s' % osd)
1115 cmd = prefix + [
1116 'fsck'
1117 ]
1118 proc = self.run_ceph_bluestore_tool(remote, 'osd.%s' % osd, cmd)
1119 if proc.exitstatus != 0:
1120 raise Exception("ceph-bluestore-tool fsck failed.")
1121 self.log('resharding successfully completed')
1122
1123 def test_bluestore_reshard(self):
1124 """
1125 1) kills an osd
1126 2) reshards bluestore on killed osd
1127 3) revives the osd
1128 """
1129 self.log('test_bluestore_reshard started')
1130 self.kill_osd(mark_down=True, mark_out=True)
1131 self.test_bluestore_reshard_action()
1132 self.revive_osd()
1133 self.log('test_bluestore_reshard completed')
1134
1135
1136 def test_map_discontinuity(self):
1137 """
1138 1) Allows the osds to recover
1139 2) kills an osd
1140 3) allows the remaining osds to recover
1141 4) waits for some time
1142 5) revives the osd
1143 This sequence should cause the revived osd to have to handle
1144 a map gap since the mons would have trimmed
1145 """
1146 while len(self.in_osds) < (self.minin + 1):
1147 self.in_osd()
1148 self.log("Waiting for recovery")
1149 self.ceph_manager.wait_for_all_osds_up(
1150 timeout=self.config.get('timeout')
1151 )
1152 # now we wait 20s for the pg status to change, if it takes longer,
1153 # the test *should* fail!
1154 time.sleep(20)
1155 self.ceph_manager.wait_for_clean(
1156 timeout=self.config.get('timeout')
1157 )
1158
1159 # now we wait 20s for the backfill replicas to hear about the clean
1160 time.sleep(20)
1161 self.log("Recovered, killing an osd")
1162 self.kill_osd(mark_down=True, mark_out=True)
1163 self.log("Waiting for clean again")
1164 self.ceph_manager.wait_for_clean(
1165 timeout=self.config.get('timeout')
1166 )
1167 self.log("Waiting for trim")
1168 time.sleep(int(self.config.get("map_discontinuity_sleep_time", 40)))
1169 self.revive_osd()
1170
1171 def choose_action(self):
1172 """
1173 Random action selector.
1174 """
1175 chance_down = self.config.get('chance_down', 0.4)
1176 _ = self.config.get('chance_test_min_size', 0)
1177 chance_test_backfill_full = \
1178 self.config.get('chance_test_backfill_full', 0)
1179 if isinstance(chance_down, int):
1180 chance_down = float(chance_down) / 100
1181 minin = self.minin
1182 minout = int(self.config.get("min_out", 0))
1183 minlive = int(self.config.get("min_live", 2))
1184 mindead = int(self.config.get("min_dead", 0))
1185
1186 self.log('choose_action: min_in %d min_out '
1187 '%d min_live %d min_dead %d' %
1188 (minin, minout, minlive, mindead))
1189 actions = []
1190 if len(self.in_osds) > minin:
1191 actions.append((self.out_osd, 1.0,))
1192 if len(self.live_osds) > minlive and chance_down > 0:
1193 actions.append((self.kill_osd, chance_down,))
1194 if len(self.out_osds) > minout:
1195 actions.append((self.in_osd, 1.7,))
1196 if len(self.dead_osds) > mindead:
1197 actions.append((self.revive_osd, 1.0,))
1198 if self.config.get('thrash_primary_affinity', True):
1199 actions.append((self.primary_affinity, 1.0,))
1200 actions.append((self.reweight_osd_or_by_util,
1201 self.config.get('reweight_osd', .5),))
1202 actions.append((self.grow_pool,
1203 self.config.get('chance_pgnum_grow', 0),))
1204 actions.append((self.shrink_pool,
1205 self.config.get('chance_pgnum_shrink', 0),))
1206 actions.append((self.fix_pgp_num,
1207 self.config.get('chance_pgpnum_fix', 0),))
1208 actions.append((self.test_pool_min_size,
1209 self.config.get('chance_test_min_size', 0),))
1210 actions.append((self.test_backfill_full,
1211 chance_test_backfill_full,))
1212 if self.chance_thrash_cluster_full > 0:
1213 actions.append((self.thrash_cluster_full, self.chance_thrash_cluster_full,))
1214 if self.chance_thrash_pg_upmap > 0:
1215 actions.append((self.thrash_pg_upmap, self.chance_thrash_pg_upmap,))
1216 if self.chance_thrash_pg_upmap_items > 0:
1217 actions.append((self.thrash_pg_upmap_items, self.chance_thrash_pg_upmap_items,))
1218 if self.chance_force_recovery > 0:
1219 actions.append((self.force_cancel_recovery, self.chance_force_recovery))
1220
1221 for key in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1222 for scenario in [
1223 (lambda:
1224 self.inject_pause(key,
1225 self.config.get('pause_short', 3),
1226 0,
1227 False),
1228 self.config.get('chance_inject_pause_short', 1),),
1229 (lambda:
1230 self.inject_pause(key,
1231 self.config.get('pause_long', 80),
1232 self.config.get('pause_check_after', 70),
1233 True),
1234 self.config.get('chance_inject_pause_long', 0),)]:
1235 actions.append(scenario)
1236
1237 # only consider resharding if objectstore is bluestore
1238 cluster_name = self.ceph_manager.cluster
1239 cluster = self.ceph_manager.ctx.ceph[cluster_name]
1240 if cluster.conf.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1241 actions.append((self.test_bluestore_reshard,
1242 self.config.get('chance_bluestore_reshard', 0),))
1243
1244 total = sum([y for (x, y) in actions])
1245 val = random.uniform(0, total)
1246 for (action, prob) in actions:
1247 if val < prob:
1248 return action
1249 val -= prob
1250 return None
1251
1252 def do_thrash(self):
1253 """
1254 _do_thrash() wrapper.
1255 """
1256 try:
1257 self._do_thrash()
1258 except Exception as e:
1259 # See _run exception comment for MDSThrasher
1260 self.set_thrasher_exception(e)
1261 self.logger.exception("exception:")
1262 # Allow successful completion so gevent doesn't see an exception.
1263 # The DaemonWatchdog will observe the error and tear down the test.
1264
1265 @log_exc
1266 def do_sighup(self):
1267 """
1268 Loops and sends signal.SIGHUP to a random live osd.
1269
1270 Loop delay is controlled by the config value sighup_delay.
1271 """
1272 delay = float(self.sighup_delay)
1273 self.log("starting do_sighup with a delay of {0}".format(delay))
1274 while not self.stopping:
1275 osd = random.choice(self.live_osds)
1276 self.ceph_manager.signal_osd(osd, signal.SIGHUP, silent=True)
1277 time.sleep(delay)
1278
1279 @log_exc
1280 def do_optrack_toggle(self):
1281 """
1282 Loops and toggle op tracking to all osds.
1283
1284 Loop delay is controlled by the config value optrack_toggle_delay.
1285 """
1286 delay = float(self.optrack_toggle_delay)
1287 osd_state = "true"
1288 self.log("starting do_optrack_toggle with a delay of {0}".format(delay))
1289 while not self.stopping:
1290 if osd_state == "true":
1291 osd_state = "false"
1292 else:
1293 osd_state = "true"
1294 try:
1295 self.ceph_manager.inject_args('osd', '*',
1296 'osd_enable_op_tracker',
1297 osd_state)
1298 except CommandFailedError:
1299 self.log('Failed to tell all osds, ignoring')
1300 gevent.sleep(delay)
1301
1302 @log_exc
1303 def do_dump_ops(self):
1304 """
1305 Loops and does op dumps on all osds
1306 """
1307 self.log("starting do_dump_ops")
1308 while not self.stopping:
1309 for osd in self.live_osds:
1310 # Ignore errors because live_osds is in flux
1311 self.ceph_manager.osd_admin_socket(osd, command=['dump_ops_in_flight'],
1312 check_status=False, timeout=30, stdout=DEVNULL)
1313 self.ceph_manager.osd_admin_socket(osd, command=['dump_blocked_ops'],
1314 check_status=False, timeout=30, stdout=DEVNULL)
1315 self.ceph_manager.osd_admin_socket(osd, command=['dump_historic_ops'],
1316 check_status=False, timeout=30, stdout=DEVNULL)
1317 gevent.sleep(0)
1318
1319 @log_exc
1320 def do_noscrub_toggle(self):
1321 """
1322 Loops and toggle noscrub flags
1323
1324 Loop delay is controlled by the config value noscrub_toggle_delay.
1325 """
1326 delay = float(self.noscrub_toggle_delay)
1327 scrub_state = "none"
1328 self.log("starting do_noscrub_toggle with a delay of {0}".format(delay))
1329 while not self.stopping:
1330 if scrub_state == "none":
1331 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'noscrub')
1332 scrub_state = "noscrub"
1333 elif scrub_state == "noscrub":
1334 self.ceph_manager.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1335 scrub_state = "both"
1336 elif scrub_state == "both":
1337 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1338 scrub_state = "nodeep-scrub"
1339 else:
1340 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1341 scrub_state = "none"
1342 gevent.sleep(delay)
1343 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'noscrub')
1344 self.ceph_manager.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1345
1346 @log_exc
1347 def _do_thrash(self):
1348 """
1349 Loop to select random actions to thrash ceph manager with.
1350 """
1351 cleanint = self.config.get("clean_interval", 60)
1352 scrubint = self.config.get("scrub_interval", -1)
1353 maxdead = self.config.get("max_dead", 0)
1354 delay = self.config.get("op_delay", 5)
1355 self.rerrosd = self.live_osds[0]
1356 if self.random_eio > 0:
1357 self.ceph_manager.inject_args('osd', self.rerrosd,
1358 'filestore_debug_random_read_err',
1359 self.random_eio)
1360 self.ceph_manager.inject_args('osd', self.rerrosd,
1361 'bluestore_debug_random_read_err',
1362 self.random_eio)
1363 self.log("starting do_thrash")
1364 while not self.stopping:
1365 to_log = [str(x) for x in ["in_osds: ", self.in_osds,
1366 "out_osds: ", self.out_osds,
1367 "dead_osds: ", self.dead_osds,
1368 "live_osds: ", self.live_osds]]
1369 self.log(" ".join(to_log))
1370 if random.uniform(0, 1) < (float(delay) / cleanint):
1371 while len(self.dead_osds) > maxdead:
1372 self.revive_osd()
1373 for osd in self.in_osds:
1374 self.ceph_manager.raw_cluster_cmd('osd', 'reweight',
1375 str(osd), str(1))
1376 if random.uniform(0, 1) < float(
1377 self.config.get('chance_test_map_discontinuity', 0)) \
1378 and len(self.live_osds) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1379 self.test_map_discontinuity()
1380 else:
1381 self.ceph_manager.wait_for_recovery(
1382 timeout=self.config.get('timeout')
1383 )
1384 time.sleep(self.clean_wait)
1385 if scrubint > 0:
1386 if random.uniform(0, 1) < (float(delay) / scrubint):
1387 self.log('Scrubbing while thrashing being performed')
1388 Scrubber(self.ceph_manager, self.config)
1389 self.choose_action()()
1390 time.sleep(delay)
1391 self.all_up()
1392 if self.random_eio > 0:
1393 self.ceph_manager.inject_args('osd', self.rerrosd,
1394 'filestore_debug_random_read_err', '0.0')
1395 self.ceph_manager.inject_args('osd', self.rerrosd,
1396 'bluestore_debug_random_read_err', '0.0')
1397 for pool in list(self.pools_to_fix_pgp_num):
1398 if self.ceph_manager.get_pool_pg_num(pool) > 0:
1399 self.fix_pgp_num(pool)
1400 self.pools_to_fix_pgp_num.clear()
1401 for service, opt, saved_value in self.saved_options:
1402 self.ceph_manager.inject_args(service, '*', opt, saved_value)
1403 self.saved_options = []
1404 self.all_up_in()
1405
1406
1407 class ObjectStoreTool:
1408
1409 def __init__(self, manager, pool, **kwargs):
1410 self.manager = manager
1411 self.pool = pool
1412 self.osd = kwargs.get('osd', None)
1413 self.object_name = kwargs.get('object_name', None)
1414 self.do_revive = kwargs.get('do_revive', True)
1415 if self.osd and self.pool and self.object_name:
1416 if self.osd == "primary":
1417 self.osd = self.manager.get_object_primary(self.pool,
1418 self.object_name)
1419 assert self.osd
1420 if self.object_name:
1421 self.pgid = self.manager.get_object_pg_with_shard(self.pool,
1422 self.object_name,
1423 self.osd)
1424 self.remote = next(iter(self.manager.ctx.\
1425 cluster.only('osd.{o}'.format(o=self.osd)).remotes.keys()))
1426 path = self.manager.get_filepath().format(id=self.osd)
1427 self.paths = ("--data-path {path} --journal-path {path}/journal".
1428 format(path=path))
1429
1430 def build_cmd(self, options, args, stdin):
1431 lines = []
1432 if self.object_name:
1433 lines.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1434 "{paths} --pgid {pgid} --op list |"
1435 "grep '\"oid\":\"{name}\"')".
1436 format(paths=self.paths,
1437 pgid=self.pgid,
1438 name=self.object_name))
1439 args = '"$object" ' + args
1440 options += " --pgid {pgid}".format(pgid=self.pgid)
1441 cmd = ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1442 format(paths=self.paths,
1443 args=args,
1444 options=options))
1445 if stdin:
1446 cmd = ("echo {payload} | base64 --decode | {cmd}".
1447 format(payload=base64.encode(stdin),
1448 cmd=cmd))
1449 lines.append(cmd)
1450 return "\n".join(lines)
1451
1452 def run(self, options, args):
1453 self.manager.kill_osd(self.osd)
1454 cmd = self.build_cmd(options, args, None)
1455 self.manager.log(cmd)
1456 try:
1457 proc = self.remote.run(args=['bash', '-e', '-x', '-c', cmd],
1458 check_status=False,
1459 stdout=BytesIO(),
1460 stderr=BytesIO())
1461 proc.wait()
1462 if proc.exitstatus != 0:
1463 self.manager.log("failed with " + str(proc.exitstatus))
1464 error = proc.stdout.getvalue().decode() + " " + \
1465 proc.stderr.getvalue().decode()
1466 raise Exception(error)
1467 finally:
1468 if self.do_revive:
1469 self.manager.revive_osd(self.osd)
1470 self.manager.wait_till_osd_is_up(self.osd, 300)
1471
1472
1473 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1474 # the same name.
1475 class CephManager:
1476 """
1477 Ceph manager object.
1478 Contains several local functions that form a bulk of this module.
1479
1480 :param controller: the remote machine where the Ceph commands should be
1481 executed
1482 :param ctx: the cluster context
1483 :param config: path to Ceph config file
1484 :param logger: for logging messages
1485 :param cluster: name of the Ceph cluster
1486 """
1487
1488 def __init__(self, controller, ctx=None, config=None, logger=None,
1489 cluster='ceph', cephadm=False) -> None:
1490 self.lock = threading.RLock()
1491 self.ctx = ctx
1492 self.config = config
1493 self.controller = controller
1494 self.next_pool_id = 0
1495 self.cluster = cluster
1496 self.cephadm = cephadm
1497 if (logger):
1498 self.log = lambda x: logger.info(x)
1499 else:
1500 def tmp(x):
1501 """
1502 implement log behavior.
1503 """
1504 print(x)
1505 self.log = tmp
1506 if self.config is None:
1507 self.config = dict()
1508 pools = self.list_pools()
1509 self.pools = {}
1510 for pool in pools:
1511 # we may race with a pool deletion; ignore failures here
1512 try:
1513 self.pools[pool] = self.get_pool_int_property(pool, 'pg_num')
1514 except CommandFailedError:
1515 self.log('Failed to get pg_num from pool %s, ignoring' % pool)
1516
1517 def ceph(self, cmd, **kwargs):
1518 """
1519 Simple Ceph admin command wrapper around run_cluster_cmd.
1520 """
1521
1522 kwargs.pop('args', None)
1523 args = shlex.split(cmd)
1524 stdout = kwargs.pop('stdout', StringIO())
1525 stderr = kwargs.pop('stderr', StringIO())
1526 return self.run_cluster_cmd(args=args, stdout=stdout, stderr=stderr, **kwargs)
1527
1528 def run_cluster_cmd(self, **kwargs):
1529 """
1530 Run a Ceph command and return the object representing the process
1531 for the command.
1532
1533 Accepts arguments same as that of teuthology.orchestra.run.run()
1534 """
1535 if self.cephadm:
1536 return shell(self.ctx, self.cluster, self.controller,
1537 args=['ceph'] + list(kwargs['args']),
1538 stdout=StringIO(),
1539 check_status=kwargs.get('check_status', True))
1540
1541 testdir = teuthology.get_testdir(self.ctx)
1542 prefix = ['sudo', 'adjust-ulimits', 'ceph-coverage',
1543 f'{testdir}/archive/coverage', 'timeout', '120', 'ceph',
1544 '--cluster', self.cluster]
1545 kwargs['args'] = prefix + list(kwargs['args'])
1546 return self.controller.run(**kwargs)
1547
1548 def raw_cluster_cmd(self, *args, **kwargs) -> str:
1549 """
1550 Start ceph on a raw cluster. Return count
1551 """
1552 stdout = kwargs.pop('stdout', StringIO())
1553 p = self.run_cluster_cmd(args=args, stdout=stdout, **kwargs)
1554 return p.stdout.getvalue()
1555
1556 def raw_cluster_cmd_result(self, *args, **kwargs):
1557 """
1558 Start ceph on a cluster. Return success or failure information.
1559 """
1560 kwargs['args'], kwargs['check_status'] = args, False
1561 return self.run_cluster_cmd(**kwargs).exitstatus
1562
1563 def run_ceph_w(self, watch_channel=None):
1564 """
1565 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1566 and return the RemoteProcess.
1567
1568 :param watch_channel: Specifies the channel to be watched. This can be
1569 'cluster', 'audit', ...
1570 :type watch_channel: str
1571 """
1572 args = ["sudo",
1573 "daemon-helper",
1574 "kill",
1575 "ceph",
1576 '--cluster',
1577 self.cluster,
1578 "-w"]
1579 if watch_channel is not None:
1580 args.append("--watch-channel")
1581 args.append(watch_channel)
1582 return self.controller.run(args=args, wait=False, stdout=StringIO(), stdin=run.PIPE)
1583
1584 def get_mon_socks(self):
1585 """
1586 Get monitor sockets.
1587
1588 :return socks: tuple of strings; strings are individual sockets.
1589 """
1590 from json import loads
1591
1592 output = loads(self.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1593 socks = []
1594 for mon in output['mons']:
1595 for addrvec_mem in mon['public_addrs']['addrvec']:
1596 socks.append(addrvec_mem['addr'])
1597 return tuple(socks)
1598
1599 def get_msgrv1_mon_socks(self):
1600 """
1601 Get monitor sockets that use msgrv1 to operate.
1602
1603 :return socks: tuple of strings; strings are individual sockets.
1604 """
1605 from json import loads
1606
1607 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1608 socks = []
1609 for mon in output['mons']:
1610 for addrvec_mem in mon['public_addrs']['addrvec']:
1611 if addrvec_mem['type'] == 'v1':
1612 socks.append(addrvec_mem['addr'])
1613 return tuple(socks)
1614
1615 def get_msgrv2_mon_socks(self):
1616 """
1617 Get monitor sockets that use msgrv2 to operate.
1618
1619 :return socks: tuple of strings; strings are individual sockets.
1620 """
1621 from json import loads
1622
1623 output = loads(self.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1624 socks = []
1625 for mon in output['mons']:
1626 for addrvec_mem in mon['public_addrs']['addrvec']:
1627 if addrvec_mem['type'] == 'v2':
1628 socks.append(addrvec_mem['addr'])
1629 return tuple(socks)
1630
1631 def flush_pg_stats(self, osds, no_wait=None, wait_for_mon=300):
1632 """
1633 Flush pg stats from a list of OSD ids, ensuring they are reflected
1634 all the way to the monitor. Luminous and later only.
1635
1636 :param osds: list of OSDs to flush
1637 :param no_wait: list of OSDs not to wait for seq id. by default, we
1638 wait for all specified osds, but some of them could be
1639 moved out of osdmap, so we cannot get their updated
1640 stat seq from monitor anymore. in that case, you need
1641 to pass a blocklist.
1642 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1643 it. (5 min by default)
1644 """
1645 seq = {osd: int(self.raw_cluster_cmd('tell', 'osd.%d' % osd, 'flush_pg_stats'))
1646 for osd in osds}
1647 if not wait_for_mon:
1648 return
1649 if no_wait is None:
1650 no_wait = []
1651 for osd, need in seq.items():
1652 if osd in no_wait:
1653 continue
1654 got = 0
1655 while wait_for_mon > 0:
1656 got = int(self.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd))
1657 self.log('need seq {need} got {got} for osd.{osd}'.format(
1658 need=need, got=got, osd=osd))
1659 if got >= need:
1660 break
1661 A_WHILE = 1
1662 time.sleep(A_WHILE)
1663 wait_for_mon -= A_WHILE
1664 else:
1665 raise Exception('timed out waiting for mon to be updated with '
1666 'osd.{osd}: {got} < {need}'.
1667 format(osd=osd, got=got, need=need))
1668
1669 def flush_all_pg_stats(self):
1670 self.flush_pg_stats(range(len(self.get_osd_dump())))
1671
1672 def do_rados(self, cmd, pool=None, namespace=None, remote=None, **kwargs):
1673 """
1674 Execute a remote rados command.
1675 """
1676 if remote is None:
1677 remote = self.controller
1678
1679 testdir = teuthology.get_testdir(self.ctx)
1680 pre = [
1681 'adjust-ulimits',
1682 'ceph-coverage',
1683 '{tdir}/archive/coverage'.format(tdir=testdir),
1684 'rados',
1685 '--cluster',
1686 self.cluster,
1687 ]
1688 if pool is not None:
1689 pre += ['--pool', pool]
1690 if namespace is not None:
1691 pre += ['--namespace', namespace]
1692 pre.extend(cmd)
1693 proc = remote.run(
1694 args=pre,
1695 wait=True,
1696 **kwargs
1697 )
1698 return proc
1699
1700 def rados_write_objects(self, pool, num_objects, size,
1701 timelimit, threads, cleanup=False):
1702 """
1703 Write rados objects
1704 Threads not used yet.
1705 """
1706 args = [
1707 '--num-objects', num_objects,
1708 '-b', size,
1709 'bench', timelimit,
1710 'write'
1711 ]
1712 if not cleanup:
1713 args.append('--no-cleanup')
1714 return self.do_rados(map(str, args), pool=pool)
1715
1716 def do_put(self, pool, obj, fname, namespace=None):
1717 """
1718 Implement rados put operation
1719 """
1720 args = ['put', obj, fname]
1721 return self.do_rados(
1722 args,
1723 check_status=False,
1724 pool=pool,
1725 namespace=namespace
1726 ).exitstatus
1727
1728 def do_get(self, pool, obj, fname='/dev/null', namespace=None):
1729 """
1730 Implement rados get operation
1731 """
1732 args = ['get', obj, fname]
1733 return self.do_rados(
1734 args,
1735 check_status=False,
1736 pool=pool,
1737 namespace=namespace,
1738 ).exitstatus
1739
1740 def do_rm(self, pool, obj, namespace=None):
1741 """
1742 Implement rados rm operation
1743 """
1744 args = ['rm', obj]
1745 return self.do_rados(
1746 args,
1747 check_status=False,
1748 pool=pool,
1749 namespace=namespace
1750 ).exitstatus
1751
1752 def osd_admin_socket(self, osd_id, command, check_status=True, timeout=0, stdout=None):
1753 if stdout is None:
1754 stdout = StringIO()
1755 return self.admin_socket('osd', osd_id, command, check_status, timeout, stdout)
1756
1757 def find_remote(self, service_type, service_id):
1758 """
1759 Get the Remote for the host where a particular service runs.
1760
1761 :param service_type: 'mds', 'osd', 'client'
1762 :param service_id: The second part of a role, e.g. '0' for
1763 the role 'client.0'
1764 :return: a Remote instance for the host where the
1765 requested role is placed
1766 """
1767 return get_remote(self.ctx, self.cluster,
1768 service_type, service_id)
1769
1770 def admin_socket(self, service_type, service_id,
1771 command, check_status=True, timeout=0, stdout=None):
1772 """
1773 Remotely start up ceph specifying the admin socket
1774 :param command: a list of words to use as the command
1775 to the admin socket
1776 """
1777 if stdout is None:
1778 stdout = StringIO()
1779
1780 remote = self.find_remote(service_type, service_id)
1781
1782 if self.cephadm:
1783 return shell(
1784 self.ctx, self.cluster, remote,
1785 args=[
1786 'ceph', 'daemon', '%s.%s' % (service_type, service_id),
1787 ] + command,
1788 stdout=stdout,
1789 wait=True,
1790 check_status=check_status,
1791 )
1792
1793 testdir = teuthology.get_testdir(self.ctx)
1794 args = [
1795 'sudo',
1796 'adjust-ulimits',
1797 'ceph-coverage',
1798 '{tdir}/archive/coverage'.format(tdir=testdir),
1799 'timeout',
1800 str(timeout),
1801 'ceph',
1802 '--cluster',
1803 self.cluster,
1804 '--admin-daemon',
1805 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1806 cluster=self.cluster,
1807 type=service_type,
1808 id=service_id),
1809 ]
1810 args.extend(command)
1811 return remote.run(
1812 args=args,
1813 stdout=stdout,
1814 wait=True,
1815 check_status=check_status
1816 )
1817
1818 def objectstore_tool(self, pool, options, args, **kwargs):
1819 return ObjectStoreTool(self, pool, **kwargs).run(options, args)
1820
1821 def get_pgid(self, pool, pgnum):
1822 """
1823 :param pool: pool name
1824 :param pgnum: pg number
1825 :returns: a string representing this pg.
1826 """
1827 poolnum = self.get_pool_num(pool)
1828 pg_str = "{poolnum}.{pgnum}".format(
1829 poolnum=poolnum,
1830 pgnum=pgnum)
1831 return pg_str
1832
1833 def get_pg_replica(self, pool, pgnum):
1834 """
1835 get replica for pool, pgnum (e.g. (data, 0)->0
1836 """
1837 pg_str = self.get_pgid(pool, pgnum)
1838 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1839 j = json.loads('\n'.join(output.split('\n')[1:]))
1840 return int(j['acting'][-1])
1841 assert False
1842
1843 def wait_for_pg_stats(func):
1844 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1845 # by default, and take the faulty injection in ms into consideration,
1846 # 12 seconds are more than enough
1847 delays = [1, 1, 2, 3, 5, 8, 13, 0]
1848 @wraps(func)
1849 def wrapper(self, *args, **kwargs):
1850 exc = None
1851 for delay in delays:
1852 try:
1853 return func(self, *args, **kwargs)
1854 except AssertionError as e:
1855 time.sleep(delay)
1856 exc = e
1857 raise exc
1858 return wrapper
1859
1860 def get_pg_primary(self, pool, pgnum):
1861 """
1862 get primary for pool, pgnum (e.g. (data, 0)->0
1863 """
1864 pg_str = self.get_pgid(pool, pgnum)
1865 output = self.raw_cluster_cmd("pg", "map", pg_str, '--format=json')
1866 j = json.loads('\n'.join(output.split('\n')[1:]))
1867 return int(j['acting'][0])
1868 assert False
1869
1870 def get_pool_num(self, pool):
1871 """
1872 get number for pool (e.g., data -> 2)
1873 """
1874 return int(self.get_pool_dump(pool)['pool'])
1875
1876 def list_pools(self):
1877 """
1878 list all pool names
1879 """
1880 osd_dump = self.get_osd_dump_json()
1881 self.log(osd_dump['pools'])
1882 return [str(i['pool_name']) for i in osd_dump['pools']]
1883
1884 def clear_pools(self):
1885 """
1886 remove all pools
1887 """
1888 [self.remove_pool(i) for i in self.list_pools()]
1889
1890 def kick_recovery_wq(self, osdnum):
1891 """
1892 Run kick_recovery_wq on cluster.
1893 """
1894 return self.raw_cluster_cmd(
1895 'tell', "osd.%d" % (int(osdnum),),
1896 'debug',
1897 'kick_recovery_wq',
1898 '0')
1899
1900 def wait_run_admin_socket(self, service_type,
1901 service_id, args=['version'], timeout=75, stdout=None):
1902 """
1903 If osd_admin_socket call succeeds, return. Otherwise wait
1904 five seconds and try again.
1905 """
1906 if stdout is None:
1907 stdout = StringIO()
1908 tries = 0
1909 while True:
1910 proc = self.admin_socket(service_type, service_id,
1911 args, check_status=False, stdout=stdout)
1912 if proc.exitstatus == 0:
1913 return proc
1914 else:
1915 tries += 1
1916 if (tries * 5) > timeout:
1917 raise Exception('timed out waiting for admin_socket '
1918 'to appear after {type}.{id} restart'.
1919 format(type=service_type,
1920 id=service_id))
1921 self.log("waiting on admin_socket for {type}-{id}, "
1922 "{command}".format(type=service_type,
1923 id=service_id,
1924 command=args))
1925 time.sleep(5)
1926
1927 def get_pool_dump(self, pool):
1928 """
1929 get the osd dump part of a pool
1930 """
1931 osd_dump = self.get_osd_dump_json()
1932 for i in osd_dump['pools']:
1933 if i['pool_name'] == pool:
1934 return i
1935 assert False
1936
1937 def get_config(self, service_type, service_id, name):
1938 """
1939 :param node: like 'mon.a'
1940 :param name: the option name
1941 """
1942 proc = self.wait_run_admin_socket(service_type, service_id,
1943 ['config', 'show'])
1944 j = json.loads(proc.stdout.getvalue())
1945 return j[name]
1946
1947 def inject_args(self, service_type, service_id, name, value):
1948 whom = '{0}.{1}'.format(service_type, service_id)
1949 if isinstance(value, bool):
1950 value = 'true' if value else 'false'
1951 opt_arg = '--{name}={value}'.format(name=name, value=value)
1952 self.raw_cluster_cmd('--', 'tell', whom, 'injectargs', opt_arg)
1953
1954 def set_config(self, osdnum, **argdict):
1955 """
1956 :param osdnum: osd number
1957 :param argdict: dictionary containing values to set.
1958 """
1959 for k, v in argdict.items():
1960 self.wait_run_admin_socket(
1961 'osd', osdnum,
1962 ['config', 'set', str(k), str(v)])
1963
1964 def raw_cluster_status(self):
1965 """
1966 Get status from cluster
1967 """
1968 status = self.raw_cluster_cmd('status', '--format=json')
1969 return json.loads(status)
1970
1971 def raw_osd_status(self):
1972 """
1973 Get osd status from cluster
1974 """
1975 return self.raw_cluster_cmd('osd', 'dump')
1976
1977 def get_osd_status(self):
1978 """
1979 Get osd statuses sorted by states that the osds are in.
1980 """
1981 osd_lines = list(filter(
1982 lambda x: x.startswith('osd.') and (("up" in x) or ("down" in x)),
1983 self.raw_osd_status().split('\n')))
1984 self.log(osd_lines)
1985 in_osds = [int(i[4:].split()[0])
1986 for i in filter(lambda x: " in " in x, osd_lines)]
1987 out_osds = [int(i[4:].split()[0])
1988 for i in filter(lambda x: " out " in x, osd_lines)]
1989 up_osds = [int(i[4:].split()[0])
1990 for i in filter(lambda x: " up " in x, osd_lines)]
1991 down_osds = [int(i[4:].split()[0])
1992 for i in filter(lambda x: " down " in x, osd_lines)]
1993 dead_osds = [int(x.id_)
1994 for x in filter(lambda x:
1995 not x.running(),
1996 self.ctx.daemons.
1997 iter_daemons_of_role('osd', self.cluster))]
1998 live_osds = [int(x.id_) for x in
1999 filter(lambda x:
2000 x.running(),
2001 self.ctx.daemons.iter_daemons_of_role('osd',
2002 self.cluster))]
2003 return {'in': in_osds, 'out': out_osds, 'up': up_osds,
2004 'down': down_osds, 'dead': dead_osds, 'live': live_osds,
2005 'raw': osd_lines}
2006
2007 def get_num_pgs(self):
2008 """
2009 Check cluster status for the number of pgs
2010 """
2011 status = self.raw_cluster_status()
2012 self.log(status)
2013 return status['pgmap']['num_pgs']
2014
2015 def create_erasure_code_profile(self, profile_name, profile):
2016 """
2017 Create an erasure code profile name that can be used as a parameter
2018 when creating an erasure coded pool.
2019 """
2020 with self.lock:
2021 args = cmd_erasure_code_profile(profile_name, profile)
2022 self.raw_cluster_cmd(*args)
2023
2024 def create_pool_with_unique_name(self, pg_num=16,
2025 erasure_code_profile_name=None,
2026 min_size=None,
2027 erasure_code_use_overwrites=False):
2028 """
2029 Create a pool named unique_pool_X where X is unique.
2030 """
2031 name = ""
2032 with self.lock:
2033 name = "unique_pool_%s" % (str(self.next_pool_id),)
2034 self.next_pool_id += 1
2035 self.create_pool(
2036 name,
2037 pg_num,
2038 erasure_code_profile_name=erasure_code_profile_name,
2039 min_size=min_size,
2040 erasure_code_use_overwrites=erasure_code_use_overwrites)
2041 return name
2042
2043 @contextlib.contextmanager
2044 def pool(self, pool_name, pg_num=16, erasure_code_profile_name=None):
2045 self.create_pool(pool_name, pg_num, erasure_code_profile_name)
2046 yield
2047 self.remove_pool(pool_name)
2048
2049 def create_pool(self, pool_name, pg_num=16,
2050 erasure_code_profile_name=None,
2051 min_size=None,
2052 erasure_code_use_overwrites=False):
2053 """
2054 Create a pool named from the pool_name parameter.
2055 :param pool_name: name of the pool being created.
2056 :param pg_num: initial number of pgs.
2057 :param erasure_code_profile_name: if set and !None create an
2058 erasure coded pool using the profile
2059 :param erasure_code_use_overwrites: if true, allow overwrites
2060 """
2061 with self.lock:
2062 assert isinstance(pool_name, str)
2063 assert isinstance(pg_num, int)
2064 assert pool_name not in self.pools
2065 self.log("creating pool_name %s" % (pool_name,))
2066 if erasure_code_profile_name:
2067 self.raw_cluster_cmd('osd', 'pool', 'create',
2068 pool_name, str(pg_num), str(pg_num),
2069 'erasure', erasure_code_profile_name)
2070 else:
2071 self.raw_cluster_cmd('osd', 'pool', 'create',
2072 pool_name, str(pg_num))
2073 if min_size is not None:
2074 self.raw_cluster_cmd(
2075 'osd', 'pool', 'set', pool_name,
2076 'min_size',
2077 str(min_size))
2078 if erasure_code_use_overwrites:
2079 self.raw_cluster_cmd(
2080 'osd', 'pool', 'set', pool_name,
2081 'allow_ec_overwrites',
2082 'true')
2083 self.raw_cluster_cmd(
2084 'osd', 'pool', 'application', 'enable',
2085 pool_name, 'rados', '--yes-i-really-mean-it',
2086 run.Raw('||'), 'true')
2087 self.pools[pool_name] = pg_num
2088 time.sleep(1)
2089
2090 def add_pool_snap(self, pool_name, snap_name):
2091 """
2092 Add pool snapshot
2093 :param pool_name: name of pool to snapshot
2094 :param snap_name: name of snapshot to take
2095 """
2096 self.raw_cluster_cmd('osd', 'pool', 'mksnap',
2097 str(pool_name), str(snap_name))
2098
2099 def remove_pool_snap(self, pool_name, snap_name):
2100 """
2101 Remove pool snapshot
2102 :param pool_name: name of pool to snapshot
2103 :param snap_name: name of snapshot to remove
2104 """
2105 self.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2106 str(pool_name), str(snap_name))
2107
2108 def remove_pool(self, pool_name):
2109 """
2110 Remove the indicated pool
2111 :param pool_name: Pool to be removed
2112 """
2113 with self.lock:
2114 assert isinstance(pool_name, str)
2115 assert pool_name in self.pools
2116 self.log("removing pool_name %s" % (pool_name,))
2117 del self.pools[pool_name]
2118 self.raw_cluster_cmd('osd', 'pool', 'rm', pool_name, pool_name,
2119 "--yes-i-really-really-mean-it")
2120
2121 def get_pool(self):
2122 """
2123 Pick a random pool
2124 """
2125 with self.lock:
2126 if self.pools:
2127 return random.sample(self.pools.keys(), 1)[0]
2128
2129 def get_pool_pg_num(self, pool_name):
2130 """
2131 Return the number of pgs in the pool specified.
2132 """
2133 with self.lock:
2134 assert isinstance(pool_name, str)
2135 if pool_name in self.pools:
2136 return self.pools[pool_name]
2137 return 0
2138
2139 def get_pool_property(self, pool_name, prop):
2140 """
2141 :param pool_name: pool
2142 :param prop: property to be checked.
2143 :returns: property as string
2144 """
2145 with self.lock:
2146 assert isinstance(pool_name, str)
2147 assert isinstance(prop, str)
2148 output = self.raw_cluster_cmd(
2149 'osd',
2150 'pool',
2151 'get',
2152 pool_name,
2153 prop)
2154 return output.split()[1]
2155
2156 def get_pool_int_property(self, pool_name, prop):
2157 return int(self.get_pool_property(pool_name, prop))
2158
2159 def set_pool_property(self, pool_name, prop, val):
2160 """
2161 :param pool_name: pool
2162 :param prop: property to be set.
2163 :param val: value to set.
2164
2165 This routine retries if set operation fails.
2166 """
2167 with self.lock:
2168 assert isinstance(pool_name, str)
2169 assert isinstance(prop, str)
2170 assert isinstance(val, int)
2171 tries = 0
2172 while True:
2173 r = self.raw_cluster_cmd_result(
2174 'osd',
2175 'pool',
2176 'set',
2177 pool_name,
2178 prop,
2179 str(val))
2180 if r != 11: # EAGAIN
2181 break
2182 tries += 1
2183 if tries > 50:
2184 raise Exception('timed out getting EAGAIN '
2185 'when setting pool property %s %s = %s' %
2186 (pool_name, prop, val))
2187 self.log('got EAGAIN setting pool property, '
2188 'waiting a few seconds...')
2189 time.sleep(2)
2190
2191 def expand_pool(self, pool_name, by, max_pgs):
2192 """
2193 Increase the number of pgs in a pool
2194 """
2195 with self.lock:
2196 assert isinstance(pool_name, str)
2197 assert isinstance(by, int)
2198 assert pool_name in self.pools
2199 if self.get_num_creating() > 0:
2200 return False
2201 if (self.pools[pool_name] + by) > max_pgs:
2202 return False
2203 self.log("increase pool size by %d" % (by,))
2204 new_pg_num = self.pools[pool_name] + by
2205 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2206 self.pools[pool_name] = new_pg_num
2207 return True
2208
2209 def contract_pool(self, pool_name, by, min_pgs):
2210 """
2211 Decrease the number of pgs in a pool
2212 """
2213 with self.lock:
2214 self.log('contract_pool %s by %s min %s' % (
2215 pool_name, str(by), str(min_pgs)))
2216 assert isinstance(pool_name, str)
2217 assert isinstance(by, int)
2218 assert pool_name in self.pools
2219 if self.get_num_creating() > 0:
2220 self.log('too many creating')
2221 return False
2222 proj = self.pools[pool_name] - by
2223 if proj < min_pgs:
2224 self.log('would drop below min_pgs, proj %d, currently %d' % (proj,self.pools[pool_name],))
2225 return False
2226 self.log("decrease pool size by %d" % (by,))
2227 new_pg_num = self.pools[pool_name] - by
2228 self.set_pool_property(pool_name, "pg_num", new_pg_num)
2229 self.pools[pool_name] = new_pg_num
2230 return True
2231
2232 def stop_pg_num_changes(self):
2233 """
2234 Reset all pg_num_targets back to pg_num, canceling splits and merges
2235 """
2236 self.log('Canceling any pending splits or merges...')
2237 osd_dump = self.get_osd_dump_json()
2238 try:
2239 for pool in osd_dump['pools']:
2240 if pool['pg_num'] != pool['pg_num_target']:
2241 self.log('Setting pool %s (%d) pg_num %d -> %d' %
2242 (pool['pool_name'], pool['pool'],
2243 pool['pg_num_target'],
2244 pool['pg_num']))
2245 self.raw_cluster_cmd('osd', 'pool', 'set', pool['pool_name'],
2246 'pg_num', str(pool['pg_num']))
2247 except KeyError:
2248 # we don't support pg_num_target before nautilus
2249 pass
2250
2251 def set_pool_pgpnum(self, pool_name, force):
2252 """
2253 Set pgpnum property of pool_name pool.
2254 """
2255 with self.lock:
2256 assert isinstance(pool_name, str)
2257 assert pool_name in self.pools
2258 if not force and self.get_num_creating() > 0:
2259 return False
2260 self.set_pool_property(pool_name, 'pgp_num', self.pools[pool_name])
2261 return True
2262
2263 def list_pg_unfound(self, pgid):
2264 """
2265 return list of unfound pgs with the id specified
2266 """
2267 r = None
2268 offset = {}
2269 while True:
2270 out = self.raw_cluster_cmd('--', 'pg', pgid, 'list_unfound',
2271 json.dumps(offset))
2272 j = json.loads(out)
2273 if r is None:
2274 r = j
2275 else:
2276 r['objects'].extend(j['objects'])
2277 if not 'more' in j:
2278 break
2279 if j['more'] == 0:
2280 break
2281 offset = j['objects'][-1]['oid']
2282 if 'more' in r:
2283 del r['more']
2284 return r
2285
2286 def get_pg_stats(self):
2287 """
2288 Dump the cluster and get pg stats
2289 """
2290 out = self.raw_cluster_cmd('pg', 'dump', '--format=json')
2291 j = json.loads('\n'.join(out.split('\n')[1:]))
2292 try:
2293 return j['pg_map']['pg_stats']
2294 except KeyError:
2295 return j['pg_stats']
2296
2297 def get_pgids_to_force(self, backfill):
2298 """
2299 Return the randomized list of PGs that can have their recovery/backfill forced
2300 """
2301 j = self.get_pg_stats();
2302 pgids = []
2303 if backfill:
2304 wanted = ['degraded', 'backfilling', 'backfill_wait']
2305 else:
2306 wanted = ['recovering', 'degraded', 'recovery_wait']
2307 for pg in j:
2308 status = pg['state'].split('+')
2309 for t in wanted:
2310 if random.random() > 0.5 and not ('forced_backfill' in status or 'forced_recovery' in status) and t in status:
2311 pgids.append(pg['pgid'])
2312 break
2313 return pgids
2314
2315 def get_pgids_to_cancel_force(self, backfill):
2316 """
2317 Return the randomized list of PGs whose recovery/backfill priority is forced
2318 """
2319 j = self.get_pg_stats();
2320 pgids = []
2321 if backfill:
2322 wanted = 'forced_backfill'
2323 else:
2324 wanted = 'forced_recovery'
2325 for pg in j:
2326 status = pg['state'].split('+')
2327 if wanted in status and random.random() > 0.5:
2328 pgids.append(pg['pgid'])
2329 return pgids
2330
2331 def compile_pg_status(self):
2332 """
2333 Return a histogram of pg state values
2334 """
2335 ret = {}
2336 j = self.get_pg_stats()
2337 for pg in j:
2338 for status in pg['state'].split('+'):
2339 if status not in ret:
2340 ret[status] = 0
2341 ret[status] += 1
2342 return ret
2343
2344 @wait_for_pg_stats # type: ignore
2345 def with_pg_state(self, pool, pgnum, check):
2346 pgstr = self.get_pgid(pool, pgnum)
2347 stats = self.get_single_pg_stats(pgstr)
2348 assert(check(stats['state']))
2349
2350 @wait_for_pg_stats # type: ignore
2351 def with_pg(self, pool, pgnum, check):
2352 pgstr = self.get_pgid(pool, pgnum)
2353 stats = self.get_single_pg_stats(pgstr)
2354 return check(stats)
2355
2356 def get_last_scrub_stamp(self, pool, pgnum):
2357 """
2358 Get the timestamp of the last scrub.
2359 """
2360 stats = self.get_single_pg_stats(self.get_pgid(pool, pgnum))
2361 return stats["last_scrub_stamp"]
2362
2363 def do_pg_scrub(self, pool, pgnum, stype):
2364 """
2365 Scrub pg and wait for scrubbing to finish
2366 """
2367 init = self.get_last_scrub_stamp(pool, pgnum)
2368 RESEND_TIMEOUT = 120 # Must be a multiple of SLEEP_TIME
2369 FATAL_TIMEOUT = RESEND_TIMEOUT * 3
2370 SLEEP_TIME = 10
2371 timer = 0
2372 while init == self.get_last_scrub_stamp(pool, pgnum):
2373 assert timer < FATAL_TIMEOUT, "fatal timeout trying to " + stype
2374 self.log("waiting for scrub type %s" % (stype,))
2375 if (timer % RESEND_TIMEOUT) == 0:
2376 self.raw_cluster_cmd('pg', stype, self.get_pgid(pool, pgnum))
2377 # The first time in this loop is the actual request
2378 if timer != 0 and stype == "repair":
2379 self.log("WARNING: Resubmitted a non-idempotent repair")
2380 time.sleep(SLEEP_TIME)
2381 timer += SLEEP_TIME
2382
2383 def wait_snap_trimming_complete(self, pool):
2384 """
2385 Wait for snap trimming on pool to end
2386 """
2387 POLL_PERIOD = 10
2388 FATAL_TIMEOUT = 600
2389 start = time.time()
2390 poolnum = self.get_pool_num(pool)
2391 poolnumstr = "%s." % (poolnum,)
2392 while (True):
2393 now = time.time()
2394 if (now - start) > FATAL_TIMEOUT:
2395 assert (now - start) < FATAL_TIMEOUT, \
2396 'failed to complete snap trimming before timeout'
2397 all_stats = self.get_pg_stats()
2398 trimming = False
2399 for pg in all_stats:
2400 if (poolnumstr in pg['pgid']) and ('snaptrim' in pg['state']):
2401 self.log("pg {pg} in trimming, state: {state}".format(
2402 pg=pg['pgid'],
2403 state=pg['state']))
2404 trimming = True
2405 if not trimming:
2406 break
2407 self.log("{pool} still trimming, waiting".format(pool=pool))
2408 time.sleep(POLL_PERIOD)
2409
2410 def get_single_pg_stats(self, pgid):
2411 """
2412 Return pg for the pgid specified.
2413 """
2414 all_stats = self.get_pg_stats()
2415
2416 for pg in all_stats:
2417 if pg['pgid'] == pgid:
2418 return pg
2419
2420 return None
2421
2422 def get_object_pg_with_shard(self, pool, name, osdid):
2423 """
2424 """
2425 pool_dump = self.get_pool_dump(pool)
2426 object_map = self.get_object_map(pool, name)
2427 if pool_dump["type"] == PoolType.ERASURE_CODED:
2428 shard = object_map['acting'].index(osdid)
2429 return "{pgid}s{shard}".format(pgid=object_map['pgid'],
2430 shard=shard)
2431 else:
2432 return object_map['pgid']
2433
2434 def get_object_primary(self, pool, name):
2435 """
2436 """
2437 object_map = self.get_object_map(pool, name)
2438 return object_map['acting_primary']
2439
2440 def get_object_map(self, pool, name):
2441 """
2442 osd map --format=json converted to a python object
2443 :returns: the python object
2444 """
2445 out = self.raw_cluster_cmd('--format=json', 'osd', 'map', pool, name)
2446 return json.loads('\n'.join(out.split('\n')[1:]))
2447
2448 def get_osd_dump_json(self):
2449 """
2450 osd dump --format=json converted to a python object
2451 :returns: the python object
2452 """
2453 out = self.raw_cluster_cmd('osd', 'dump', '--format=json')
2454 return json.loads('\n'.join(out.split('\n')[1:]))
2455
2456 def get_osd_dump(self):
2457 """
2458 Dump osds
2459 :returns: all osds
2460 """
2461 return self.get_osd_dump_json()['osds']
2462
2463 def get_osd_metadata(self):
2464 """
2465 osd metadata --format=json converted to a python object
2466 :returns: the python object containing osd metadata information
2467 """
2468 out = self.raw_cluster_cmd('osd', 'metadata', '--format=json')
2469 return json.loads('\n'.join(out.split('\n')[1:]))
2470
2471 def get_mgr_dump(self):
2472 out = self.raw_cluster_cmd('mgr', 'dump', '--format=json')
2473 return json.loads(out)
2474
2475 def get_stuck_pgs(self, type_, threshold):
2476 """
2477 :returns: stuck pg information from the cluster
2478 """
2479 out = self.raw_cluster_cmd('pg', 'dump_stuck', type_, str(threshold),
2480 '--format=json')
2481 return json.loads(out).get('stuck_pg_stats',[])
2482
2483 def get_num_unfound_objects(self):
2484 """
2485 Check cluster status to get the number of unfound objects
2486 """
2487 status = self.raw_cluster_status()
2488 self.log(status)
2489 return status['pgmap'].get('unfound_objects', 0)
2490
2491 def get_num_creating(self):
2492 """
2493 Find the number of pgs in creating mode.
2494 """
2495 pgs = self.get_pg_stats()
2496 num = 0
2497 for pg in pgs:
2498 if 'creating' in pg['state']:
2499 num += 1
2500 return num
2501
2502 def get_num_active_clean(self):
2503 """
2504 Find the number of active and clean pgs.
2505 """
2506 pgs = self.get_pg_stats()
2507 return self._get_num_active_clean(pgs)
2508
2509 def _get_num_active_clean(self, pgs):
2510 num = 0
2511 for pg in pgs:
2512 if (pg['state'].count('active') and
2513 pg['state'].count('clean') and
2514 not pg['state'].count('stale')):
2515 num += 1
2516 return num
2517
2518 def get_num_active_recovered(self):
2519 """
2520 Find the number of active and recovered pgs.
2521 """
2522 pgs = self.get_pg_stats()
2523 return self._get_num_active_recovered(pgs)
2524
2525 def _get_num_active_recovered(self, pgs):
2526 num = 0
2527 for pg in pgs:
2528 if (pg['state'].count('active') and
2529 not pg['state'].count('recover') and
2530 not pg['state'].count('backfilling') and
2531 not pg['state'].count('stale')):
2532 num += 1
2533 return num
2534
2535 def get_is_making_recovery_progress(self):
2536 """
2537 Return whether there is recovery progress discernable in the
2538 raw cluster status
2539 """
2540 status = self.raw_cluster_status()
2541 kps = status['pgmap'].get('recovering_keys_per_sec', 0)
2542 bps = status['pgmap'].get('recovering_bytes_per_sec', 0)
2543 ops = status['pgmap'].get('recovering_objects_per_sec', 0)
2544 return kps > 0 or bps > 0 or ops > 0
2545
2546 def get_num_active(self):
2547 """
2548 Find the number of active pgs.
2549 """
2550 pgs = self.get_pg_stats()
2551 return self._get_num_active(pgs)
2552
2553 def _get_num_active(self, pgs):
2554 num = 0
2555 for pg in pgs:
2556 if pg['state'].count('active') and not pg['state'].count('stale'):
2557 num += 1
2558 return num
2559
2560 def get_num_down(self):
2561 """
2562 Find the number of pgs that are down.
2563 """
2564 pgs = self.get_pg_stats()
2565 num = 0
2566 for pg in pgs:
2567 if ((pg['state'].count('down') and not
2568 pg['state'].count('stale')) or
2569 (pg['state'].count('incomplete') and not
2570 pg['state'].count('stale'))):
2571 num += 1
2572 return num
2573
2574 def get_num_active_down(self):
2575 """
2576 Find the number of pgs that are either active or down.
2577 """
2578 pgs = self.get_pg_stats()
2579 return self._get_num_active_down(pgs)
2580
2581 def _get_num_active_down(self, pgs):
2582 num = 0
2583 for pg in pgs:
2584 if ((pg['state'].count('active') and not
2585 pg['state'].count('stale')) or
2586 (pg['state'].count('down') and not
2587 pg['state'].count('stale')) or
2588 (pg['state'].count('incomplete') and not
2589 pg['state'].count('stale'))):
2590 num += 1
2591 return num
2592
2593 def get_num_peered(self):
2594 """
2595 Find the number of PGs that are peered
2596 """
2597 pgs = self.get_pg_stats()
2598 return self._get_num_peered(pgs)
2599
2600 def _get_num_peered(self, pgs):
2601 num = 0
2602 for pg in pgs:
2603 if pg['state'].count('peered') and not pg['state'].count('stale'):
2604 num += 1
2605 return num
2606
2607 def is_clean(self):
2608 """
2609 True if all pgs are clean
2610 """
2611 pgs = self.get_pg_stats()
2612 return self._get_num_active_clean(pgs) == len(pgs)
2613
2614 def is_recovered(self):
2615 """
2616 True if all pgs have recovered
2617 """
2618 pgs = self.get_pg_stats()
2619 return self._get_num_active_recovered(pgs) == len(pgs)
2620
2621 def is_active_or_down(self):
2622 """
2623 True if all pgs are active or down
2624 """
2625 pgs = self.get_pg_stats()
2626 return self._get_num_active_down(pgs) == len(pgs)
2627
2628 def dump_pgs_not_active_clean(self):
2629 """
2630 Dumps all pgs that are not active+clean
2631 """
2632 pgs = self.get_pg_stats()
2633 for pg in pgs:
2634 if pg['state'] != 'active+clean':
2635 self.log('PG %s is not active+clean' % pg['pgid'])
2636 self.log(pg)
2637
2638 def dump_pgs_not_active_down(self):
2639 """
2640 Dumps all pgs that are not active or down
2641 """
2642 pgs = self.get_pg_stats()
2643 for pg in pgs:
2644 if 'active' not in pg['state'] and 'down' not in pg['state']:
2645 self.log('PG %s is not active or down' % pg['pgid'])
2646 self.log(pg)
2647
2648 def dump_pgs_not_active(self):
2649 """
2650 Dumps all pgs that are not active
2651 """
2652 pgs = self.get_pg_stats()
2653 for pg in pgs:
2654 if 'active' not in pg['state']:
2655 self.log('PG %s is not active' % pg['pgid'])
2656 self.log(pg)
2657
2658 def wait_for_clean(self, timeout=1200):
2659 """
2660 Returns true when all pgs are clean.
2661 """
2662 self.log("waiting for clean")
2663 start = time.time()
2664 num_active_clean = self.get_num_active_clean()
2665 while not self.is_clean():
2666 if timeout is not None:
2667 if self.get_is_making_recovery_progress():
2668 self.log("making progress, resetting timeout")
2669 start = time.time()
2670 else:
2671 self.log("no progress seen, keeping timeout for now")
2672 if time.time() - start >= timeout:
2673 self.log('dumping pgs not clean')
2674 self.dump_pgs_not_active_clean()
2675 assert time.time() - start < timeout, \
2676 'wait_for_clean: failed before timeout expired'
2677 cur_active_clean = self.get_num_active_clean()
2678 if cur_active_clean != num_active_clean:
2679 start = time.time()
2680 num_active_clean = cur_active_clean
2681 time.sleep(3)
2682 self.log("clean!")
2683
2684 def are_all_osds_up(self):
2685 """
2686 Returns true if all osds are up.
2687 """
2688 x = self.get_osd_dump()
2689 return (len(x) == sum([(y['up'] > 0) for y in x]))
2690
2691 def wait_for_all_osds_up(self, timeout=None):
2692 """
2693 When this exits, either the timeout has expired, or all
2694 osds are up.
2695 """
2696 self.log("waiting for all up")
2697 start = time.time()
2698 while not self.are_all_osds_up():
2699 if timeout is not None:
2700 assert time.time() - start < timeout, \
2701 'timeout expired in wait_for_all_osds_up'
2702 time.sleep(3)
2703 self.log("all up!")
2704
2705 def pool_exists(self, pool):
2706 if pool in self.list_pools():
2707 return True
2708 return False
2709
2710 def wait_for_pool(self, pool, timeout=300):
2711 """
2712 Wait for a pool to exist
2713 """
2714 self.log('waiting for pool %s to exist' % pool)
2715 start = time.time()
2716 while not self.pool_exists(pool):
2717 if timeout is not None:
2718 assert time.time() - start < timeout, \
2719 'timeout expired in wait_for_pool'
2720 time.sleep(3)
2721
2722 def wait_for_pools(self, pools):
2723 for pool in pools:
2724 self.wait_for_pool(pool)
2725
2726 def is_mgr_available(self):
2727 x = self.get_mgr_dump()
2728 return x.get('available', False)
2729
2730 def wait_for_mgr_available(self, timeout=None):
2731 self.log("waiting for mgr available")
2732 start = time.time()
2733 while not self.is_mgr_available():
2734 if timeout is not None:
2735 assert time.time() - start < timeout, \
2736 'timeout expired in wait_for_mgr_available'
2737 time.sleep(3)
2738 self.log("mgr available!")
2739
2740 def wait_for_recovery(self, timeout=None):
2741 """
2742 Check peering. When this exists, we have recovered.
2743 """
2744 self.log("waiting for recovery to complete")
2745 start = time.time()
2746 num_active_recovered = self.get_num_active_recovered()
2747 while not self.is_recovered():
2748 now = time.time()
2749 if timeout is not None:
2750 if self.get_is_making_recovery_progress():
2751 self.log("making progress, resetting timeout")
2752 start = time.time()
2753 else:
2754 self.log("no progress seen, keeping timeout for now")
2755 if now - start >= timeout:
2756 if self.is_recovered():
2757 break
2758 self.log('dumping pgs not recovered yet')
2759 self.dump_pgs_not_active_clean()
2760 assert now - start < timeout, \
2761 'wait_for_recovery: failed before timeout expired'
2762 cur_active_recovered = self.get_num_active_recovered()
2763 if cur_active_recovered != num_active_recovered:
2764 start = time.time()
2765 num_active_recovered = cur_active_recovered
2766 time.sleep(3)
2767 self.log("recovered!")
2768
2769 def wait_for_active(self, timeout=None):
2770 """
2771 Check peering. When this exists, we are definitely active
2772 """
2773 self.log("waiting for peering to complete")
2774 start = time.time()
2775 num_active = self.get_num_active()
2776 while not self.is_active():
2777 if timeout is not None:
2778 if time.time() - start >= timeout:
2779 self.log('dumping pgs not active')
2780 self.dump_pgs_not_active()
2781 assert time.time() - start < timeout, \
2782 'wait_for_active: failed before timeout expired'
2783 cur_active = self.get_num_active()
2784 if cur_active != num_active:
2785 start = time.time()
2786 num_active = cur_active
2787 time.sleep(3)
2788 self.log("active!")
2789
2790 def wait_for_active_or_down(self, timeout=None):
2791 """
2792 Check peering. When this exists, we are definitely either
2793 active or down
2794 """
2795 self.log("waiting for peering to complete or become blocked")
2796 start = time.time()
2797 num_active_down = self.get_num_active_down()
2798 while not self.is_active_or_down():
2799 if timeout is not None:
2800 if time.time() - start >= timeout:
2801 self.log('dumping pgs not active or down')
2802 self.dump_pgs_not_active_down()
2803 assert time.time() - start < timeout, \
2804 'wait_for_active_or_down: failed before timeout expired'
2805 cur_active_down = self.get_num_active_down()
2806 if cur_active_down != num_active_down:
2807 start = time.time()
2808 num_active_down = cur_active_down
2809 time.sleep(3)
2810 self.log("active or down!")
2811
2812 def osd_is_up(self, osd):
2813 """
2814 Wrapper for osd check
2815 """
2816 osds = self.get_osd_dump()
2817 return osds[osd]['up'] > 0
2818
2819 def wait_till_osd_is_up(self, osd, timeout=None):
2820 """
2821 Loop waiting for osd.
2822 """
2823 self.log('waiting for osd.%d to be up' % osd)
2824 start = time.time()
2825 while not self.osd_is_up(osd):
2826 if timeout is not None:
2827 assert time.time() - start < timeout, \
2828 'osd.%d failed to come up before timeout expired' % osd
2829 time.sleep(3)
2830 self.log('osd.%d is up' % osd)
2831
2832 def is_active(self):
2833 """
2834 Wrapper to check if all pgs are active
2835 """
2836 return self.get_num_active() == self.get_num_pgs()
2837
2838 def all_active_or_peered(self):
2839 """
2840 Wrapper to check if all PGs are active or peered
2841 """
2842 pgs = self.get_pg_stats()
2843 return self._get_num_active(pgs) + self._get_num_peered(pgs) == len(pgs)
2844
2845 def wait_till_active(self, timeout=None):
2846 """
2847 Wait until all pgs are active.
2848 """
2849 self.log("waiting till active")
2850 start = time.time()
2851 while not self.is_active():
2852 if timeout is not None:
2853 if time.time() - start >= timeout:
2854 self.log('dumping pgs not active')
2855 self.dump_pgs_not_active()
2856 assert time.time() - start < timeout, \
2857 'wait_till_active: failed before timeout expired'
2858 time.sleep(3)
2859 self.log("active!")
2860
2861 def wait_till_pg_convergence(self, timeout=None):
2862 start = time.time()
2863 old_stats = None
2864 active_osds = [osd['osd'] for osd in self.get_osd_dump()
2865 if osd['in'] and osd['up']]
2866 while True:
2867 # strictly speaking, no need to wait for mon. but due to the
2868 # "ms inject socket failures" setting, the osdmap could be delayed,
2869 # so mgr is likely to ignore the pg-stat messages with pgs serving
2870 # newly created pools which is not yet known by mgr. so, to make sure
2871 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2872 # necessary.
2873 self.flush_pg_stats(active_osds)
2874 new_stats = dict((stat['pgid'], stat['state'])
2875 for stat in self.get_pg_stats())
2876 if old_stats == new_stats:
2877 return old_stats
2878 if timeout is not None:
2879 assert time.time() - start < timeout, \
2880 'failed to reach convergence before %d secs' % timeout
2881 old_stats = new_stats
2882 # longer than mgr_stats_period
2883 time.sleep(5 + 1)
2884
2885 def mark_out_osd(self, osd):
2886 """
2887 Wrapper to mark osd out.
2888 """
2889 self.raw_cluster_cmd('osd', 'out', str(osd))
2890
2891 def kill_osd(self, osd):
2892 """
2893 Kill osds by either power cycling (if indicated by the config)
2894 or by stopping.
2895 """
2896 if self.config.get('powercycle'):
2897 remote = self.find_remote('osd', osd)
2898 self.log('kill_osd on osd.{o} '
2899 'doing powercycle of {s}'.format(o=osd, s=remote.name))
2900 self._assert_ipmi(remote)
2901 remote.console.power_off()
2902 elif self.config.get('bdev_inject_crash') and self.config.get('bdev_inject_crash_probability'):
2903 if random.uniform(0, 1) < self.config.get('bdev_inject_crash_probability', .5):
2904 self.inject_args(
2905 'osd', osd,
2906 'bdev-inject-crash', self.config.get('bdev_inject_crash'))
2907 try:
2908 self.ctx.daemons.get_daemon('osd', osd, self.cluster).wait()
2909 except:
2910 pass
2911 else:
2912 raise RuntimeError('osd.%s did not fail' % osd)
2913 else:
2914 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2915 else:
2916 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2917
2918 @staticmethod
2919 def _assert_ipmi(remote):
2920 assert remote.console.has_ipmi_credentials, (
2921 "powercycling requested but RemoteConsole is not "
2922 "initialized. Check ipmi config.")
2923
2924 def blackhole_kill_osd(self, osd):
2925 """
2926 Stop osd if nothing else works.
2927 """
2928 self.inject_args('osd', osd,
2929 'objectstore-blackhole', True)
2930 time.sleep(2)
2931 self.ctx.daemons.get_daemon('osd', osd, self.cluster).stop()
2932
2933 def revive_osd(self, osd, timeout=360, skip_admin_check=False):
2934 """
2935 Revive osds by either power cycling (if indicated by the config)
2936 or by restarting.
2937 """
2938 if self.config.get('powercycle'):
2939 remote = self.find_remote('osd', osd)
2940 self.log('kill_osd on osd.{o} doing powercycle of {s}'.
2941 format(o=osd, s=remote.name))
2942 self._assert_ipmi(remote)
2943 remote.console.power_on()
2944 if not remote.console.check_status(300):
2945 raise Exception('Failed to revive osd.{o} via ipmi'.
2946 format(o=osd))
2947 teuthology.reconnect(self.ctx, 60, [remote])
2948 mount_osd_data(self.ctx, remote, self.cluster, str(osd))
2949 self.make_admin_daemon_dir(remote)
2950 self.ctx.daemons.get_daemon('osd', osd, self.cluster).reset()
2951 self.ctx.daemons.get_daemon('osd', osd, self.cluster).restart()
2952
2953 if not skip_admin_check:
2954 # wait for dump_ops_in_flight; this command doesn't appear
2955 # until after the signal handler is installed and it is safe
2956 # to stop the osd again without making valgrind leak checks
2957 # unhappy. see #5924.
2958 self.wait_run_admin_socket('osd', osd,
2959 args=['dump_ops_in_flight'],
2960 timeout=timeout, stdout=DEVNULL)
2961
2962 def mark_down_osd(self, osd):
2963 """
2964 Cluster command wrapper
2965 """
2966 self.raw_cluster_cmd('osd', 'down', str(osd))
2967
2968 def mark_in_osd(self, osd):
2969 """
2970 Cluster command wrapper
2971 """
2972 self.raw_cluster_cmd('osd', 'in', str(osd))
2973
2974 def signal_osd(self, osd, sig, silent=False):
2975 """
2976 Wrapper to local get_daemon call which sends the given
2977 signal to the given osd.
2978 """
2979 self.ctx.daemons.get_daemon('osd', osd,
2980 self.cluster).signal(sig, silent=silent)
2981
2982 ## monitors
2983 def signal_mon(self, mon, sig, silent=False):
2984 """
2985 Wrapper to local get_daemon call
2986 """
2987 self.ctx.daemons.get_daemon('mon', mon,
2988 self.cluster).signal(sig, silent=silent)
2989
2990 def kill_mon(self, mon):
2991 """
2992 Kill the monitor by either power cycling (if the config says so),
2993 or by doing a stop.
2994 """
2995 if self.config.get('powercycle'):
2996 remote = self.find_remote('mon', mon)
2997 self.log('kill_mon on mon.{m} doing powercycle of {s}'.
2998 format(m=mon, s=remote.name))
2999 self._assert_ipmi(remote)
3000 remote.console.power_off()
3001 else:
3002 self.ctx.daemons.get_daemon('mon', mon, self.cluster).stop()
3003
3004 def revive_mon(self, mon):
3005 """
3006 Restart by either power cycling (if the config says so),
3007 or by doing a normal restart.
3008 """
3009 if self.config.get('powercycle'):
3010 remote = self.find_remote('mon', mon)
3011 self.log('revive_mon on mon.{m} doing powercycle of {s}'.
3012 format(m=mon, s=remote.name))
3013 self._assert_ipmi(remote)
3014 remote.console.power_on()
3015 self.make_admin_daemon_dir(remote)
3016 self.ctx.daemons.get_daemon('mon', mon, self.cluster).restart()
3017
3018 def revive_mgr(self, mgr):
3019 """
3020 Restart by either power cycling (if the config says so),
3021 or by doing a normal restart.
3022 """
3023 if self.config.get('powercycle'):
3024 remote = self.find_remote('mgr', mgr)
3025 self.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3026 format(m=mgr, s=remote.name))
3027 self._assert_ipmi(remote)
3028 remote.console.power_on()
3029 self.make_admin_daemon_dir(remote)
3030 self.ctx.daemons.get_daemon('mgr', mgr, self.cluster).restart()
3031
3032 def get_mon_status(self, mon):
3033 """
3034 Extract all the monitor status information from the cluster
3035 """
3036 out = self.raw_cluster_cmd('tell', 'mon.%s' % mon, 'mon_status')
3037 return json.loads(out)
3038
3039 def get_mon_quorum(self):
3040 """
3041 Extract monitor quorum information from the cluster
3042 """
3043 out = self.raw_cluster_cmd('quorum_status')
3044 j = json.loads(out)
3045 return j['quorum']
3046
3047 def wait_for_mon_quorum_size(self, size, timeout=300):
3048 """
3049 Loop until quorum size is reached.
3050 """
3051 self.log('waiting for quorum size %d' % size)
3052 start = time.time()
3053 while not len(self.get_mon_quorum()) == size:
3054 if timeout is not None:
3055 assert time.time() - start < timeout, \
3056 ('failed to reach quorum size %d '
3057 'before timeout expired' % size)
3058 time.sleep(3)
3059 self.log("quorum is size %d" % size)
3060
3061 def get_mon_health(self, debug=False):
3062 """
3063 Extract all the monitor health information.
3064 """
3065 out = self.raw_cluster_cmd('health', '--format=json')
3066 if debug:
3067 self.log('health:\n{h}'.format(h=out))
3068 return json.loads(out)
3069
3070 def wait_until_healthy(self, timeout=None):
3071 self.log("wait_until_healthy")
3072 start = time.time()
3073 while self.get_mon_health()['status'] != 'HEALTH_OK':
3074 if timeout is not None:
3075 assert time.time() - start < timeout, \
3076 'timeout expired in wait_until_healthy'
3077 time.sleep(3)
3078 self.log("wait_until_healthy done")
3079
3080 def get_filepath(self):
3081 """
3082 Return path to osd data with {id} needing to be replaced
3083 """
3084 return '/var/lib/ceph/osd/' + self.cluster + '-{id}'
3085
3086 def make_admin_daemon_dir(self, remote):
3087 """
3088 Create /var/run/ceph directory on remote site.
3089
3090 :param ctx: Context
3091 :param remote: Remote site
3092 """
3093 remote.run(args=['sudo',
3094 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3095
3096 def get_service_task_status(self, service, status_key):
3097 """
3098 Return daemon task status for a given ceph service.
3099
3100 :param service: ceph service (mds, osd, etc...)
3101 :param status_key: matching task status key
3102 """
3103 task_status = {}
3104 status = self.raw_cluster_status()
3105 try:
3106 for k,v in status['servicemap']['services'][service]['daemons'].items():
3107 ts = dict(v).get('task_status', None)
3108 if ts:
3109 task_status[k] = ts[status_key]
3110 except KeyError: # catches missing service and status key
3111 return {}
3112 self.log(task_status)
3113 return task_status
3114
3115 def utility_task(name):
3116 """
3117 Generate ceph_manager subtask corresponding to ceph_manager
3118 method name
3119 """
3120 def task(ctx, config):
3121 if config is None:
3122 config = {}
3123 args = config.get('args', [])
3124 kwargs = config.get('kwargs', {})
3125 cluster = config.get('cluster', 'ceph')
3126 fn = getattr(ctx.managers[cluster], name)
3127 fn(*args, **kwargs)
3128 return task
3129
3130 revive_osd = utility_task("revive_osd")
3131 revive_mon = utility_task("revive_mon")
3132 kill_osd = utility_task("kill_osd")
3133 kill_mon = utility_task("kill_mon")
3134 create_pool = utility_task("create_pool")
3135 remove_pool = utility_task("remove_pool")
3136 wait_for_clean = utility_task("wait_for_clean")
3137 flush_all_pg_stats = utility_task("flush_all_pg_stats")
3138 set_pool_property = utility_task("set_pool_property")
3139 do_pg_scrub = utility_task("do_pg_scrub")
3140 wait_for_pool = utility_task("wait_for_pool")
3141 wait_for_pools = utility_task("wait_for_pools")