2 ceph manager -- Thrasher and CephManager objects
4 from functools
import wraps
18 from io
import BytesIO
, StringIO
19 from subprocess
import DEVNULL
20 from teuthology
import misc
as teuthology
21 from tasks
.scrub
import Scrubber
22 from tasks
.util
.rados
import cmd_erasure_code_profile
23 from tasks
.util
import get_remote
24 from teuthology
.contextutil
import safe_while
25 from teuthology
.orchestra
.remote
import Remote
26 from teuthology
.orchestra
import run
27 from teuthology
.exceptions
import CommandFailedError
28 from tasks
.thrasher
import Thrasher
31 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
33 log
= logging
.getLogger(__name__
)
35 # this is for cephadm clusters
36 def shell(ctx
, cluster_name
, remote
, args
, name
=None, **kwargs
):
39 extra_args
= ['-n', name
]
44 '--image', ctx
.ceph
[cluster_name
].image
,
47 '--fsid', ctx
.ceph
[cluster_name
].fsid
,
53 # this is for rook clusters
54 def toolbox(ctx
, cluster_name
, args
, **kwargs
):
55 return ctx
.rook
[cluster_name
].remote
.run(
60 ctx
.rook
[cluster_name
].toolbox
,
67 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
69 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
71 writes
= ctx
.cluster
.run(
73 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
74 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
75 'sudo', 'tee', conf_path
, run
.Raw('&&'),
76 'sudo', 'chmod', '0644', conf_path
,
77 run
.Raw('>'), '/dev/null',
82 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
85 def get_valgrind_args(testdir
, name
, preamble
, v
, exit_on_first_error
=True, cd
=True):
87 Build a command line for running valgrind.
89 testdir - test results directory
90 name - name of daemon (for naming hte log file)
91 preamble - stuff we should run before valgrind
92 v - valgrind arguments
96 if not isinstance(v
, list):
99 # https://tracker.ceph.com/issues/44362
101 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
104 val_path
= '/var/log/ceph/valgrind'
105 if '--tool=memcheck' in v
or '--tool=helgrind' in v
:
108 '--trace-children=no',
109 '--child-silent-after-fork=yes',
110 '--soname-synonyms=somalloc=*tcmalloc*',
112 '--suppressions={tdir}/valgrind.supp'.format(tdir
=testdir
),
114 '--xml-file={vdir}/{n}.log'.format(vdir
=val_path
, n
=name
),
121 '--trace-children=no',
122 '--child-silent-after-fork=yes',
123 '--soname-synonyms=somalloc=*tcmalloc*',
124 '--suppressions={tdir}/valgrind.supp'.format(tdir
=testdir
),
125 '--log-file={vdir}/{n}.log'.format(vdir
=val_path
, n
=name
),
129 if exit_on_first_error
:
131 # at least Valgrind 3.14 is required
132 '--exit-on-first-error=yes',
133 '--error-exitcode=42',
137 args
+= ['cd', testdir
, run
.Raw('&&')]
138 args
+= preamble
+ extra_args
+ v
139 log
.debug('running %s under valgrind with args %s', name
, args
)
143 def mount_osd_data(ctx
, remote
, cluster
, osd
):
148 :param remote: Remote site
149 :param cluster: name of ceph cluster
152 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
153 role
= "{0}.osd.{1}".format(cluster
, osd
)
154 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
155 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
156 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
158 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
160 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
161 mount_options
= ctx
.disk_config
.\
162 remote_to_roles_to_dev_mount_options
[remote
][role
]
163 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
164 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
166 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
167 'mountpoint: {p}, type: {t}, options: {v}'.format(
168 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
176 '-o', ','.join(mount_options
),
189 self
.log(traceback
.format_exc())
199 class OSDThrasher(Thrasher
):
201 Object used to thrash Ceph
203 def __init__(self
, manager
, config
, name
, logger
):
204 super(OSDThrasher
, self
).__init
__()
206 self
.ceph_manager
= manager
207 self
.cluster
= manager
.cluster
208 self
.ceph_manager
.wait_for_clean()
209 osd_status
= self
.ceph_manager
.get_osd_status()
210 self
.in_osds
= osd_status
['in']
211 self
.live_osds
= osd_status
['live']
212 self
.out_osds
= osd_status
['out']
213 self
.dead_osds
= osd_status
['dead']
214 self
.stopping
= False
218 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
219 self
.pools_to_fix_pgp_num
= set()
220 if self
.config
.get('powercycle'):
221 self
.revive_timeout
+= 120
222 self
.clean_wait
= self
.config
.get('clean_wait', 0)
223 self
.minin
= self
.config
.get("min_in", 4)
224 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
225 self
.sighup_delay
= self
.config
.get('sighup_delay')
226 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
227 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
228 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
229 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
230 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
231 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
232 self
.random_eio
= self
.config
.get('random_eio')
233 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
235 num_osds
= self
.in_osds
+ self
.out_osds
236 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * len(num_osds
)
237 self
.min_pgs
= self
.config
.get("min_pgs_per_pool_osd", 1) * len(num_osds
)
238 if self
.config
is None:
240 # prevent monitor from auto-marking things out while thrasher runs
241 # try both old and new tell syntax, in case we are testing old code
242 self
.saved_options
= []
243 # assuming that the default settings do not vary from one daemon to
245 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
246 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
247 #why do we disable marking an OSD out automatically? :/
248 for service
, opt
, new_value
in opts
:
249 old_value
= manager
.get_config(first_mon
[0],
252 self
.saved_options
.append((service
, opt
, old_value
))
253 manager
.inject_args(service
, '*', opt
, new_value
)
254 # initialize ceph_objectstore_tool property - must be done before
255 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
256 if (self
.config
.get('powercycle') or
257 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
258 self
.config
.get('disable_objectstore_tool_tests', False)):
259 self
.ceph_objectstore_tool
= False
260 if self
.config
.get('powercycle'):
261 self
.log("Unable to test ceph-objectstore-tool, "
262 "powercycle testing")
264 self
.log("Unable to test ceph-objectstore-tool, "
265 "not available on all OSD nodes")
267 self
.ceph_objectstore_tool
= \
268 self
.config
.get('ceph_objectstore_tool', True)
270 self
.thread
= gevent
.spawn(self
.do_thrash
)
271 if self
.sighup_delay
:
272 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
273 if self
.optrack_toggle_delay
:
274 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
275 if self
.dump_ops_enable
== "true":
276 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
277 if self
.noscrub_toggle_delay
:
278 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
280 def log(self
, msg
, *args
, **kwargs
):
281 self
.logger
.info(msg
, *args
, **kwargs
)
283 def cmd_exists_on_osds(self
, cmd
):
284 if self
.ceph_manager
.cephadm
or self
.ceph_manager
.rook
:
286 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
287 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
288 allremotes
= list(set(allremotes
))
289 for remote
in allremotes
:
290 proc
= remote
.run(args
=['type', cmd
], wait
=True,
291 check_status
=False, stdout
=BytesIO(),
293 if proc
.exitstatus
!= 0:
297 def run_ceph_objectstore_tool(self
, remote
, osd
, cmd
):
298 if self
.ceph_manager
.cephadm
:
300 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
301 args
=['ceph-objectstore-tool'] + cmd
,
303 wait
=True, check_status
=False,
306 elif self
.ceph_manager
.rook
:
307 assert False, 'not implemented'
310 args
=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd
,
311 wait
=True, check_status
=False,
315 def run_ceph_bluestore_tool(self
, remote
, osd
, cmd
):
316 if self
.ceph_manager
.cephadm
:
318 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
319 args
=['ceph-bluestore-tool', '--err-to-stderr'] + cmd
,
321 wait
=True, check_status
=False,
324 elif self
.ceph_manager
.rook
:
325 assert False, 'not implemented'
328 args
=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd
,
329 wait
=True, check_status
=False,
333 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
335 :param osd: Osd to be killed.
336 :mark_down: Mark down if true.
337 :mark_out: Mark out if true.
340 osd
= random
.choice(self
.live_osds
)
341 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
342 str(self
.live_osds
)))
343 self
.live_osds
.remove(osd
)
344 self
.dead_osds
.append(osd
)
345 self
.ceph_manager
.kill_osd(osd
)
347 self
.ceph_manager
.mark_down_osd(osd
)
348 if mark_out
and osd
in self
.in_osds
:
350 if self
.ceph_objectstore_tool
:
351 self
.log("Testing ceph-objectstore-tool on down osd.%s" % osd
)
352 remote
= self
.ceph_manager
.find_remote('osd', osd
)
353 FSPATH
= self
.ceph_manager
.get_filepath()
354 JPATH
= os
.path
.join(FSPATH
, "journal")
355 exp_osd
= imp_osd
= osd
356 self
.log('remote for osd %s is %s' % (osd
, remote
))
357 exp_remote
= imp_remote
= remote
358 # If an older osd is available we'll move a pg from there
359 if (len(self
.dead_osds
) > 1 and
360 random
.random() < self
.chance_move_pg
):
361 exp_osd
= random
.choice(self
.dead_osds
[:-1])
362 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
363 self
.log('remote for exp osd %s is %s' % (exp_osd
, exp_remote
))
366 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
369 if self
.ceph_manager
.rook
:
370 assert False, 'not implemented'
372 if not self
.ceph_manager
.cephadm
:
373 # ceph-objectstore-tool might be temporarily absent during an
374 # upgrade - see http://tracker.ceph.com/issues/18014
375 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
377 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
378 wait
=True, check_status
=False, stdout
=BytesIO(),
380 if proc
.exitstatus
== 0:
382 log
.debug("ceph-objectstore-tool binary not present, trying again")
384 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
385 # see http://tracker.ceph.com/issues/19556
386 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
388 proc
= self
.run_ceph_objectstore_tool(
389 exp_remote
, 'osd.%s' % exp_osd
,
391 '--data-path', FSPATH
.format(id=exp_osd
),
392 '--journal-path', JPATH
.format(id=exp_osd
),
395 if proc
.exitstatus
== 0:
397 elif (proc
.exitstatus
== 1 and
398 proc
.stderr
.getvalue() == "OSD has the store locked"):
401 raise Exception("ceph-objectstore-tool: "
402 "exp list-pgs failure with status {ret}".
403 format(ret
=proc
.exitstatus
))
405 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
407 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
409 pg
= random
.choice(pgs
)
410 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
411 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
412 exp_path
= os
.path
.join('/var/log/ceph', # available inside 'shell' container
413 "exp.{pg}.{id}".format(
416 if self
.ceph_manager
.cephadm
:
417 exp_host_path
= os
.path
.join(
419 self
.ceph_manager
.ctx
.ceph
[self
.ceph_manager
.cluster
].fsid
,
420 "exp.{pg}.{id}".format(
424 exp_host_path
= exp_path
427 # Can't use new export-remove op since this is part of upgrade testing
428 proc
= self
.run_ceph_objectstore_tool(
429 exp_remote
, 'osd.%s' % exp_osd
,
431 '--data-path', FSPATH
.format(id=exp_osd
),
432 '--journal-path', JPATH
.format(id=exp_osd
),
438 raise Exception("ceph-objectstore-tool: "
439 "export failure with status {ret}".
440 format(ret
=proc
.exitstatus
))
442 proc
= self
.run_ceph_objectstore_tool(
443 exp_remote
, 'osd.%s' % exp_osd
,
445 '--data-path', FSPATH
.format(id=exp_osd
),
446 '--journal-path', JPATH
.format(id=exp_osd
),
452 raise Exception("ceph-objectstore-tool: "
453 "remove failure with status {ret}".
454 format(ret
=proc
.exitstatus
))
455 # If there are at least 2 dead osds we might move the pg
456 if exp_osd
!= imp_osd
:
457 # If pg isn't already on this osd, then we will move it there
458 proc
= self
.run_ceph_objectstore_tool(
462 '--data-path', FSPATH
.format(id=imp_osd
),
463 '--journal-path', JPATH
.format(id=imp_osd
),
467 raise Exception("ceph-objectstore-tool: "
468 "imp list-pgs failure with status {ret}".
469 format(ret
=proc
.exitstatus
))
470 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
472 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
473 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
474 if imp_remote
!= exp_remote
:
475 # Copy export file to the other machine
476 self
.log("Transfer export file from {srem} to {trem}".
477 format(srem
=exp_remote
, trem
=imp_remote
))
478 # just in case an upgrade make /var/log/ceph unreadable by non-root,
479 exp_remote
.run(args
=['sudo', 'chmod', '777',
481 imp_remote
.run(args
=['sudo', 'chmod', '777',
483 tmpexport
= Remote
.get_file(exp_remote
, exp_host_path
,
485 if exp_host_path
!= exp_path
:
486 # push to /var/log/ceph, then rename (we can't
487 # chmod 777 the /var/log/ceph/$fsid mountpoint)
488 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
489 imp_remote
.run(args
=[
490 'sudo', 'mv', exp_path
, exp_host_path
])
492 Remote
.put_file(imp_remote
, tmpexport
, exp_host_path
)
495 # Can't move the pg after all
497 imp_remote
= exp_remote
499 proc
= self
.run_ceph_objectstore_tool(
500 imp_remote
, 'osd.%s' % imp_osd
,
502 '--data-path', FSPATH
.format(id=imp_osd
),
503 '--journal-path', JPATH
.format(id=imp_osd
),
504 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
508 if proc
.exitstatus
== 1:
509 bogosity
= "The OSD you are using is older than the exported PG"
510 if bogosity
in proc
.stderr
.getvalue():
511 self
.log("OSD older than exported PG"
513 elif proc
.exitstatus
== 10:
514 self
.log("Pool went away before processing an import"
516 elif proc
.exitstatus
== 11:
517 self
.log("Attempt to import an incompatible export"
519 elif proc
.exitstatus
== 12:
520 # this should be safe to ignore because we only ever move 1
521 # copy of the pg at a time, and merge is only initiated when
522 # all replicas are peered and happy. /me crosses fingers
523 self
.log("PG merged on target"
525 elif proc
.exitstatus
:
526 raise Exception("ceph-objectstore-tool: "
527 "import failure with status {ret}".
528 format(ret
=proc
.exitstatus
))
529 cmd
= "sudo rm -f {file}".format(file=exp_host_path
)
530 exp_remote
.run(args
=cmd
)
531 if imp_remote
!= exp_remote
:
532 imp_remote
.run(args
=cmd
)
534 # apply low split settings to each pool
535 if not self
.ceph_manager
.cephadm
:
536 for pool
in self
.ceph_manager
.list_pools():
537 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
538 "--filestore-split-multiple 1' sudo -E "
539 + 'ceph-objectstore-tool '
540 + ' '.join(prefix
+ [
541 '--data-path', FSPATH
.format(id=imp_osd
),
542 '--journal-path', JPATH
.format(id=imp_osd
),
544 + " --op apply-layout-settings --pool " + pool
).format(id=osd
)
545 proc
= imp_remote
.run(args
=cmd
,
546 wait
=True, check_status
=False,
548 if 'Couldn\'t find pool' in proc
.stderr
.getvalue():
551 raise Exception("ceph-objectstore-tool apply-layout-settings"
552 " failed with {status}".format(status
=proc
.exitstatus
))
555 def blackhole_kill_osd(self
, osd
=None):
557 If all else fails, kill the osd.
558 :param osd: Osd to be killed.
561 osd
= random
.choice(self
.live_osds
)
562 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
563 (str(osd
), str(self
.live_osds
)))
564 self
.live_osds
.remove(osd
)
565 self
.dead_osds
.append(osd
)
566 self
.ceph_manager
.blackhole_kill_osd(osd
)
568 def revive_osd(self
, osd
=None, skip_admin_check
=False):
571 :param osd: Osd to be revived.
574 osd
= random
.choice(self
.dead_osds
)
575 self
.log("Reviving osd %s" % (str(osd
),))
576 self
.ceph_manager
.revive_osd(
579 skip_admin_check
=skip_admin_check
)
580 self
.dead_osds
.remove(osd
)
581 self
.live_osds
.append(osd
)
582 if self
.random_eio
> 0 and osd
== self
.rerrosd
:
583 self
.ceph_manager
.set_config(self
.rerrosd
,
584 filestore_debug_random_read_err
= self
.random_eio
)
585 self
.ceph_manager
.set_config(self
.rerrosd
,
586 bluestore_debug_random_read_err
= self
.random_eio
)
589 def out_osd(self
, osd
=None):
592 :param osd: Osd to be marked.
595 osd
= random
.choice(self
.in_osds
)
596 self
.log("Removing osd %s, in_osds are: %s" %
597 (str(osd
), str(self
.in_osds
)))
598 self
.ceph_manager
.mark_out_osd(osd
)
599 self
.in_osds
.remove(osd
)
600 self
.out_osds
.append(osd
)
602 def in_osd(self
, osd
=None):
605 :param osd: Osd to be marked.
608 osd
= random
.choice(self
.out_osds
)
609 if osd
in self
.dead_osds
:
610 return self
.revive_osd(osd
)
611 self
.log("Adding osd %s" % (str(osd
),))
612 self
.out_osds
.remove(osd
)
613 self
.in_osds
.append(osd
)
614 self
.ceph_manager
.mark_in_osd(osd
)
615 self
.log("Added osd %s" % (str(osd
),))
617 def reweight_osd_or_by_util(self
, osd
=None):
619 Reweight an osd that is in
620 :param osd: Osd to be marked.
622 if osd
is not None or random
.choice([True, False]):
624 osd
= random
.choice(self
.in_osds
)
625 val
= random
.uniform(.1, 1.0)
626 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
627 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
630 # do it several times, the option space is large
633 'max_change': random
.choice(['0.05', '1.0', '3.0']),
634 'overage': random
.choice(['110', '1000']),
635 'type': random
.choice([
636 'reweight-by-utilization',
637 'test-reweight-by-utilization']),
639 self
.log("Reweighting by: %s"%(str(options
),))
640 self
.ceph_manager
.raw_cluster_cmd(
644 options
['max_change'])
646 def primary_affinity(self
, osd
=None):
648 osd
= random
.choice(self
.in_osds
)
649 if random
.random() >= .5:
651 elif random
.random() >= .5:
655 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
656 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
659 def thrash_cluster_full(self
):
661 Set and unset cluster full condition
663 self
.log('Setting full ratio to .001')
664 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
666 self
.log('Setting full ratio back to .95')
667 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
669 def thrash_pg_upmap(self
):
671 Install or remove random pg_upmap entries in OSDMap
673 from random
import shuffle
674 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
676 self
.log('j is %s' % j
)
678 if random
.random() >= .3:
679 pgs
= self
.ceph_manager
.get_pg_stats()
682 pg
= random
.choice(pgs
)
683 pgid
= str(pg
['pgid'])
684 poolid
= int(pgid
.split('.')[0])
685 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
689 osds
= self
.in_osds
+ self
.out_osds
692 self
.log('Setting %s to %s' % (pgid
, osds
))
693 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
694 self
.log('cmd %s' % cmd
)
695 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
701 self
.log('Clearing pg_upmap on %s' % pg
)
702 self
.ceph_manager
.raw_cluster_cmd(
707 self
.log('No pg_upmap entries; doing nothing')
708 except CommandFailedError
:
709 self
.log('Failed to rm-pg-upmap, ignoring')
711 def thrash_pg_upmap_items(self
):
713 Install or remove random pg_upmap_items entries in OSDMap
715 from random
import shuffle
716 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
718 self
.log('j is %s' % j
)
720 if random
.random() >= .3:
721 pgs
= self
.ceph_manager
.get_pg_stats()
724 pg
= random
.choice(pgs
)
725 pgid
= str(pg
['pgid'])
726 poolid
= int(pgid
.split('.')[0])
727 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
731 osds
= self
.in_osds
+ self
.out_osds
734 self
.log('Setting %s to %s' % (pgid
, osds
))
735 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
736 self
.log('cmd %s' % cmd
)
737 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
739 m
= j
['pg_upmap_items']
743 self
.log('Clearing pg_upmap on %s' % pg
)
744 self
.ceph_manager
.raw_cluster_cmd(
749 self
.log('No pg_upmap entries; doing nothing')
750 except CommandFailedError
:
751 self
.log('Failed to rm-pg-upmap-items, ignoring')
753 def force_recovery(self
):
755 Force recovery on some of PGs
757 backfill
= random
.random() >= 0.5
758 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
762 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
764 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
765 except CommandFailedError
:
766 self
.log('Failed to force backfill|recovery, ignoring')
769 def cancel_force_recovery(self
):
771 Force recovery on some of PGs
773 backfill
= random
.random() >= 0.5
774 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
778 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
780 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
781 except CommandFailedError
:
782 self
.log('Failed to force backfill|recovery, ignoring')
784 def force_cancel_recovery(self
):
786 Force or cancel forcing recovery
788 if random
.random() >= 0.4:
789 self
.force_recovery()
791 self
.cancel_force_recovery()
795 Make sure all osds are up and not out.
797 while len(self
.dead_osds
) > 0:
798 self
.log("reviving osd")
800 while len(self
.out_osds
) > 0:
801 self
.log("inning osd")
806 Make sure all osds are up and fully in.
809 for osd
in self
.live_osds
:
810 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
812 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
817 Break out of this Ceph loop
821 if self
.sighup_delay
:
822 self
.log("joining the do_sighup greenlet")
823 self
.sighup_thread
.get()
824 if self
.optrack_toggle_delay
:
825 self
.log("joining the do_optrack_toggle greenlet")
826 self
.optrack_toggle_thread
.join()
827 if self
.dump_ops_enable
== "true":
828 self
.log("joining the do_dump_ops greenlet")
829 self
.dump_ops_thread
.join()
830 if self
.noscrub_toggle_delay
:
831 self
.log("joining the do_noscrub_toggle greenlet")
832 self
.noscrub_toggle_thread
.join()
836 Increase the size of the pool
838 pool
= self
.ceph_manager
.get_pool()
841 self
.log("Growing pool %s" % (pool
,))
842 if self
.ceph_manager
.expand_pool(pool
,
843 self
.config
.get('pool_grow_by', 10),
845 self
.pools_to_fix_pgp_num
.add(pool
)
847 def shrink_pool(self
):
849 Decrease the size of the pool
851 pool
= self
.ceph_manager
.get_pool()
854 _
= self
.ceph_manager
.get_pool_pg_num(pool
)
855 self
.log("Shrinking pool %s" % (pool
,))
856 if self
.ceph_manager
.contract_pool(
858 self
.config
.get('pool_shrink_by', 10),
860 self
.pools_to_fix_pgp_num
.add(pool
)
862 def fix_pgp_num(self
, pool
=None):
864 Fix number of pgs in pool.
867 pool
= self
.ceph_manager
.get_pool()
873 self
.log("fixing pg num pool %s" % (pool
,))
874 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
875 self
.pools_to_fix_pgp_num
.discard(pool
)
877 def test_pool_min_size(self
):
879 Loop to selectively push PGs below their min_size and test that recovery
882 self
.log("test_pool_min_size")
884 self
.ceph_manager
.wait_for_recovery(
885 timeout
=self
.config
.get('timeout')
888 minout
= int(self
.config
.get("min_out", 1))
889 minlive
= int(self
.config
.get("min_live", 2))
890 mindead
= int(self
.config
.get("min_dead", 1))
891 self
.log("doing min_size thrashing")
892 self
.ceph_manager
.wait_for_clean(timeout
=60)
893 assert self
.ceph_manager
.is_clean(), \
894 'not clean before minsize thrashing starts'
895 while not self
.stopping
:
896 # look up k and m from all the pools on each loop, in case it
897 # changes as the cluster runs
901 pools_json
= self
.ceph_manager
.get_osd_dump_json()['pools']
903 for pool_json
in pools_json
:
904 pool
= pool_json
['pool_name']
906 pool_type
= pool_json
['type'] # 1 for rep, 3 for ec
907 min_size
= pool_json
['min_size']
908 self
.log("pool {pool} min_size is {min_size}".format(pool
=pool
,min_size
=min_size
))
910 ec_profile
= self
.ceph_manager
.get_pool_property(pool
, 'erasure_code_profile')
911 if pool_type
!= PoolType
.ERASURE_CODED
:
913 ec_profile
= pool_json
['erasure_code_profile']
914 ec_profile_json
= self
.ceph_manager
.raw_cluster_cmd(
916 'erasure-code-profile',
920 ec_json
= json
.loads(ec_profile_json
)
921 local_k
= int(ec_json
['k'])
922 local_m
= int(ec_json
['m'])
923 self
.log("pool {pool} local_k={k} local_m={m}".format(pool
=pool
,
924 k
=local_k
, m
=local_m
))
926 self
.log("setting k={local_k} from previous {k}".format(local_k
=local_k
, k
=k
))
929 self
.log("setting m={local_m} from previous {m}".format(local_m
=local_m
, m
=m
))
931 except CommandFailedError
:
932 self
.log("failed to read erasure_code_profile. %s was likely removed", pool
)
936 self
.log("using k={k}, m={m}".format(k
=k
,m
=m
))
938 self
.log("No pools yet, waiting")
942 if minout
> len(self
.out_osds
): # kill OSDs and mark out
943 self
.log("forced to out an osd")
944 self
.kill_osd(mark_out
=True)
946 elif mindead
> len(self
.dead_osds
): # kill OSDs but force timeout
947 self
.log("forced to kill an osd")
950 else: # make mostly-random choice to kill or revive OSDs
951 minup
= max(minlive
, k
)
952 rand_val
= random
.uniform(0, 1)
953 self
.log("choosing based on number of live OSDs and rand val {rand}".\
954 format(rand
=rand_val
))
955 if len(self
.live_osds
) > minup
+1 and rand_val
< 0.5:
956 # chose to knock out as many OSDs as we can w/out downing PGs
958 most_killable
= min(len(self
.live_osds
) - minup
, m
)
959 self
.log("chose to kill {n} OSDs".format(n
=most_killable
))
960 for i
in range(1, most_killable
):
961 self
.kill_osd(mark_out
=True)
963 # try a few times since there might be a concurrent pool
964 # creation or deletion
967 action
='check for active or peered') as proceed
:
969 if self
.ceph_manager
.all_active_or_peered():
971 self
.log('not all PGs are active or peered')
972 else: # chose to revive OSDs, bring up a random fraction of the dead ones
973 self
.log("chose to revive osds")
974 for i
in range(1, int(rand_val
* len(self
.dead_osds
))):
977 # let PGs repair themselves or our next knockout might kill one
978 self
.ceph_manager
.wait_for_clean(timeout
=self
.config
.get('timeout'))
980 # / while not self.stopping
983 self
.ceph_manager
.wait_for_recovery(
984 timeout
=self
.config
.get('timeout')
987 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
989 Pause injection testing. Check for osd being down when finished.
991 the_one
= random
.choice(self
.live_osds
)
992 self
.log("inject_pause on {osd}".format(osd
=the_one
))
994 "Testing {key} pause injection for duration {duration}".format(
999 "Checking after {after}, should_be_down={shouldbedown}".format(
1001 shouldbedown
=should_be_down
1003 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
1004 if not should_be_down
:
1006 time
.sleep(check_after
)
1007 status
= self
.ceph_manager
.get_osd_status()
1008 assert the_one
in status
['down']
1009 time
.sleep(duration
- check_after
+ 20)
1010 status
= self
.ceph_manager
.get_osd_status()
1011 assert not the_one
in status
['down']
1013 def test_backfill_full(self
):
1015 Test backfills stopping when the replica fills up.
1017 First, use injectfull admin command to simulate a now full
1018 osd by setting it to 0 on all of the OSDs.
1020 Second, on a random subset, set
1021 osd_debug_skip_full_check_in_backfill_reservation to force
1022 the more complicated check in do_scan to be exercised.
1024 Then, verify that all backfillings stop.
1026 self
.log("injecting backfill full")
1027 for i
in self
.live_osds
:
1028 self
.ceph_manager
.set_config(
1030 osd_debug_skip_full_check_in_backfill_reservation
=
1031 random
.choice(['false', 'true']))
1032 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
1033 check_status
=True, timeout
=30, stdout
=DEVNULL
)
1035 status
= self
.ceph_manager
.compile_pg_status()
1036 if 'backfilling' not in status
.keys():
1039 "waiting for {still_going} backfillings".format(
1040 still_going
=status
.get('backfilling')))
1042 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
1043 for i
in self
.live_osds
:
1044 self
.ceph_manager
.set_config(
1046 osd_debug_skip_full_check_in_backfill_reservation
='false')
1047 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
1048 check_status
=True, timeout
=30, stdout
=DEVNULL
)
1051 def generate_random_sharding(self
):
1056 for prefix
in prefixes
:
1057 choose
= random
.choice([False, True])
1060 if new_sharding
!= '':
1061 new_sharding
= new_sharding
+ ' '
1062 columns
= random
.randint(1, 5)
1063 do_hash
= random
.choice([False, True])
1065 low_hash
= random
.choice([0, 5, 8])
1066 do_high_hash
= random
.choice([False, True])
1068 high_hash
= random
.choice([8, 16, 30]) + low_hash
1069 new_sharding
= new_sharding
+ prefix
+ '(' + str(columns
) + ',' + str(low_hash
) + '-' + str(high_hash
) + ')'
1071 new_sharding
= new_sharding
+ prefix
+ '(' + str(columns
) + ',' + str(low_hash
) + '-)'
1074 new_sharding
= new_sharding
+ prefix
1076 new_sharding
= new_sharding
+ prefix
+ '(' + str(columns
) + ')'
1079 def test_bluestore_reshard_action(self
):
1081 Test if resharding of bluestore works properly.
1082 If bluestore is not used, or bluestore is in version that
1083 does not support sharding, skip.
1086 osd
= random
.choice(self
.dead_osds
)
1087 remote
= self
.ceph_manager
.find_remote('osd', osd
)
1088 FSPATH
= self
.ceph_manager
.get_filepath()
1092 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1094 '--path', FSPATH
.format(id=osd
)
1097 # sanity check if bluestore-tool accessible
1098 self
.log('checking if target objectstore is bluestore on osd.%s' % osd
)
1102 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1103 if proc
.exitstatus
!= 0:
1104 raise Exception("ceph-bluestore-tool access failed.")
1106 # check if sharding is possible
1107 self
.log('checking if target bluestore supports sharding on osd.%s' % osd
)
1111 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1112 if proc
.exitstatus
!= 0:
1113 self
.log("Unable to test resharding, "
1114 "ceph-bluestore-tool does not support it.")
1117 # now go for reshard to something else
1118 self
.log('applying new sharding to bluestore on osd.%s' % osd
)
1119 new_sharding
= self
.config
.get('bluestore_new_sharding','random')
1121 if new_sharding
== 'random':
1122 self
.log('generate random sharding')
1123 new_sharding
= self
.generate_random_sharding()
1125 self
.log("applying new sharding: " + new_sharding
)
1127 '--sharding', new_sharding
,
1130 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1131 if proc
.exitstatus
!= 0:
1132 raise Exception("ceph-bluestore-tool resharding failed.")
1135 self
.log('running fsck to verify new sharding on osd.%s' % osd
)
1139 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1140 if proc
.exitstatus
!= 0:
1141 raise Exception("ceph-bluestore-tool fsck failed.")
1142 self
.log('resharding successfully completed')
1144 def test_bluestore_reshard(self
):
1147 2) reshards bluestore on killed osd
1150 self
.log('test_bluestore_reshard started')
1151 self
.kill_osd(mark_down
=True, mark_out
=True)
1152 self
.test_bluestore_reshard_action()
1154 self
.log('test_bluestore_reshard completed')
1157 def test_map_discontinuity(self
):
1159 1) Allows the osds to recover
1161 3) allows the remaining osds to recover
1162 4) waits for some time
1164 This sequence should cause the revived osd to have to handle
1165 a map gap since the mons would have trimmed
1167 while len(self
.in_osds
) < (self
.minin
+ 1):
1169 self
.log("Waiting for recovery")
1170 self
.ceph_manager
.wait_for_all_osds_up(
1171 timeout
=self
.config
.get('timeout')
1173 # now we wait 20s for the pg status to change, if it takes longer,
1174 # the test *should* fail!
1176 self
.ceph_manager
.wait_for_clean(
1177 timeout
=self
.config
.get('timeout')
1180 # now we wait 20s for the backfill replicas to hear about the clean
1182 self
.log("Recovered, killing an osd")
1183 self
.kill_osd(mark_down
=True, mark_out
=True)
1184 self
.log("Waiting for clean again")
1185 self
.ceph_manager
.wait_for_clean(
1186 timeout
=self
.config
.get('timeout')
1188 self
.log("Waiting for trim")
1189 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
1192 def choose_action(self
):
1194 Random action selector.
1196 chance_down
= self
.config
.get('chance_down', 0.4)
1197 _
= self
.config
.get('chance_test_min_size', 0)
1198 chance_test_backfill_full
= \
1199 self
.config
.get('chance_test_backfill_full', 0)
1200 if isinstance(chance_down
, int):
1201 chance_down
= float(chance_down
) / 100
1203 minout
= int(self
.config
.get("min_out", 0))
1204 minlive
= int(self
.config
.get("min_live", 2))
1205 mindead
= int(self
.config
.get("min_dead", 0))
1207 self
.log('choose_action: min_in %d min_out '
1208 '%d min_live %d min_dead %d' %
1209 (minin
, minout
, minlive
, mindead
))
1211 if len(self
.in_osds
) > minin
:
1212 actions
.append((self
.out_osd
, 1.0,))
1213 if len(self
.live_osds
) > minlive
and chance_down
> 0:
1214 actions
.append((self
.kill_osd
, chance_down
,))
1215 if len(self
.out_osds
) > minout
:
1216 actions
.append((self
.in_osd
, 1.7,))
1217 if len(self
.dead_osds
) > mindead
:
1218 actions
.append((self
.revive_osd
, 1.0,))
1219 if self
.config
.get('thrash_primary_affinity', True):
1220 actions
.append((self
.primary_affinity
, 1.0,))
1221 actions
.append((self
.reweight_osd_or_by_util
,
1222 self
.config
.get('reweight_osd', .5),))
1223 actions
.append((self
.grow_pool
,
1224 self
.config
.get('chance_pgnum_grow', 0),))
1225 actions
.append((self
.shrink_pool
,
1226 self
.config
.get('chance_pgnum_shrink', 0),))
1227 actions
.append((self
.fix_pgp_num
,
1228 self
.config
.get('chance_pgpnum_fix', 0),))
1229 actions
.append((self
.test_pool_min_size
,
1230 self
.config
.get('chance_test_min_size', 0),))
1231 actions
.append((self
.test_backfill_full
,
1232 chance_test_backfill_full
,))
1233 if self
.chance_thrash_cluster_full
> 0:
1234 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
1235 if self
.chance_thrash_pg_upmap
> 0:
1236 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
1237 if self
.chance_thrash_pg_upmap_items
> 0:
1238 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
1239 if self
.chance_force_recovery
> 0:
1240 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
1242 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1245 self
.inject_pause(key
,
1246 self
.config
.get('pause_short', 3),
1249 self
.config
.get('chance_inject_pause_short', 1),),
1251 self
.inject_pause(key
,
1252 self
.config
.get('pause_long', 80),
1253 self
.config
.get('pause_check_after', 70),
1255 self
.config
.get('chance_inject_pause_long', 0),)]:
1256 actions
.append(scenario
)
1258 # only consider resharding if objectstore is bluestore
1259 cluster_name
= self
.ceph_manager
.cluster
1260 cluster
= self
.ceph_manager
.ctx
.ceph
[cluster_name
]
1261 if cluster
.conf
.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1262 actions
.append((self
.test_bluestore_reshard
,
1263 self
.config
.get('chance_bluestore_reshard', 0),))
1265 total
= sum([y
for (x
, y
) in actions
])
1266 val
= random
.uniform(0, total
)
1267 for (action
, prob
) in actions
:
1273 def do_thrash(self
):
1275 _do_thrash() wrapper.
1279 except Exception as e
:
1280 # See _run exception comment for MDSThrasher
1281 self
.set_thrasher_exception(e
)
1282 self
.logger
.exception("exception:")
1283 # Allow successful completion so gevent doesn't see an exception.
1284 # The DaemonWatchdog will observe the error and tear down the test.
1287 def do_sighup(self
):
1289 Loops and sends signal.SIGHUP to a random live osd.
1291 Loop delay is controlled by the config value sighup_delay.
1293 delay
= float(self
.sighup_delay
)
1294 self
.log("starting do_sighup with a delay of {0}".format(delay
))
1295 while not self
.stopping
:
1296 osd
= random
.choice(self
.live_osds
)
1297 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
1301 def do_optrack_toggle(self
):
1303 Loops and toggle op tracking to all osds.
1305 Loop delay is controlled by the config value optrack_toggle_delay.
1307 delay
= float(self
.optrack_toggle_delay
)
1309 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
1310 while not self
.stopping
:
1311 if osd_state
== "true":
1316 self
.ceph_manager
.inject_args('osd', '*',
1317 'osd_enable_op_tracker',
1319 except CommandFailedError
:
1320 self
.log('Failed to tell all osds, ignoring')
1324 def do_dump_ops(self
):
1326 Loops and does op dumps on all osds
1328 self
.log("starting do_dump_ops")
1329 while not self
.stopping
:
1330 for osd
in self
.live_osds
:
1331 # Ignore errors because live_osds is in flux
1332 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
1333 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1334 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
1335 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1336 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
1337 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1341 def do_noscrub_toggle(self
):
1343 Loops and toggle noscrub flags
1345 Loop delay is controlled by the config value noscrub_toggle_delay.
1347 delay
= float(self
.noscrub_toggle_delay
)
1348 scrub_state
= "none"
1349 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
1350 while not self
.stopping
:
1351 if scrub_state
== "none":
1352 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
1353 scrub_state
= "noscrub"
1354 elif scrub_state
== "noscrub":
1355 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1356 scrub_state
= "both"
1357 elif scrub_state
== "both":
1358 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1359 scrub_state
= "nodeep-scrub"
1361 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1362 scrub_state
= "none"
1364 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1365 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1368 def _do_thrash(self
):
1370 Loop to select random actions to thrash ceph manager with.
1372 cleanint
= self
.config
.get("clean_interval", 60)
1373 scrubint
= self
.config
.get("scrub_interval", -1)
1374 maxdead
= self
.config
.get("max_dead", 0)
1375 delay
= self
.config
.get("op_delay", 5)
1376 self
.rerrosd
= self
.live_osds
[0]
1377 if self
.random_eio
> 0:
1378 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1379 'filestore_debug_random_read_err',
1381 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1382 'bluestore_debug_random_read_err',
1384 self
.log("starting do_thrash")
1385 while not self
.stopping
:
1386 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1387 "out_osds: ", self
.out_osds
,
1388 "dead_osds: ", self
.dead_osds
,
1389 "live_osds: ", self
.live_osds
]]
1390 self
.log(" ".join(to_log
))
1391 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1392 while len(self
.dead_osds
) > maxdead
:
1394 for osd
in self
.in_osds
:
1395 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1397 if random
.uniform(0, 1) < float(
1398 self
.config
.get('chance_test_map_discontinuity', 0)) \
1399 and len(self
.live_osds
) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1400 self
.test_map_discontinuity()
1402 self
.ceph_manager
.wait_for_recovery(
1403 timeout
=self
.config
.get('timeout')
1405 time
.sleep(self
.clean_wait
)
1407 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1408 self
.log('Scrubbing while thrashing being performed')
1409 Scrubber(self
.ceph_manager
, self
.config
)
1410 self
.choose_action()()
1413 if self
.random_eio
> 0:
1414 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1415 'filestore_debug_random_read_err', '0.0')
1416 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1417 'bluestore_debug_random_read_err', '0.0')
1418 for pool
in list(self
.pools_to_fix_pgp_num
):
1419 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1420 self
.fix_pgp_num(pool
)
1421 self
.pools_to_fix_pgp_num
.clear()
1422 for service
, opt
, saved_value
in self
.saved_options
:
1423 self
.ceph_manager
.inject_args(service
, '*', opt
, saved_value
)
1424 self
.saved_options
= []
1428 class ObjectStoreTool
:
1430 def __init__(self
, manager
, pool
, **kwargs
):
1431 self
.manager
= manager
1433 self
.osd
= kwargs
.get('osd', None)
1434 self
.object_name
= kwargs
.get('object_name', None)
1435 self
.do_revive
= kwargs
.get('do_revive', True)
1436 if self
.osd
and self
.pool
and self
.object_name
:
1437 if self
.osd
== "primary":
1438 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1441 if self
.object_name
:
1442 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1445 self
.remote
= next(iter(self
.manager
.ctx
.\
1446 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()))
1447 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1448 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1451 def build_cmd(self
, options
, args
, stdin
):
1453 if self
.object_name
:
1454 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1455 "{paths} --pgid {pgid} --op list |"
1456 "grep '\"oid\":\"{name}\"')".
1457 format(paths
=self
.paths
,
1459 name
=self
.object_name
))
1460 args
= '"$object" ' + args
1461 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1462 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1463 format(paths
=self
.paths
,
1467 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1468 format(payload
=base64
.encode(stdin
),
1471 return "\n".join(lines
)
1473 def run(self
, options
, args
):
1474 self
.manager
.kill_osd(self
.osd
)
1475 cmd
= self
.build_cmd(options
, args
, None)
1476 self
.manager
.log(cmd
)
1478 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1483 if proc
.exitstatus
!= 0:
1484 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1485 error
= proc
.stdout
.getvalue().decode() + " " + \
1486 proc
.stderr
.getvalue().decode()
1487 raise Exception(error
)
1490 self
.manager
.revive_osd(self
.osd
)
1491 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1494 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1498 Ceph manager object.
1499 Contains several local functions that form a bulk of this module.
1501 :param controller: the remote machine where the Ceph commands should be
1503 :param ctx: the cluster context
1504 :param config: path to Ceph config file
1505 :param logger: for logging messages
1506 :param cluster: name of the Ceph cluster
1509 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1510 cluster
='ceph', cephadm
=False, rook
=False) -> None:
1511 self
.lock
= threading
.RLock()
1513 self
.config
= config
1514 self
.controller
= controller
1515 self
.next_pool_id
= 0
1516 self
.cluster
= cluster
1517 self
.cephadm
= cephadm
1520 self
.log
= lambda x
: logger
.info(x
)
1524 implement log behavior.
1528 if self
.config
is None:
1529 self
.config
= dict()
1530 pools
= self
.list_pools()
1533 # we may race with a pool deletion; ignore failures here
1535 self
.pools
[pool
] = self
.get_pool_int_property(pool
, 'pg_num')
1536 except CommandFailedError
:
1537 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1539 def ceph(self
, cmd
, **kwargs
):
1541 Simple Ceph admin command wrapper around run_cluster_cmd.
1544 kwargs
.pop('args', None)
1545 args
= shlex
.split(cmd
)
1546 stdout
= kwargs
.pop('stdout', StringIO())
1547 stderr
= kwargs
.pop('stderr', StringIO())
1548 return self
.run_cluster_cmd(args
=args
, stdout
=stdout
, stderr
=stderr
, **kwargs
)
1550 def run_cluster_cmd(self
, **kwargs
):
1552 Run a Ceph command and return the object representing the process
1555 Accepts arguments same as that of teuthology.orchestra.run.run()
1558 return shell(self
.ctx
, self
.cluster
, self
.controller
,
1559 args
=['ceph'] + list(kwargs
['args']),
1561 check_status
=kwargs
.get('check_status', True))
1563 return toolbox(self
.ctx
, self
.cluster
,
1564 args
=['ceph'] + list(kwargs
['args']),
1566 check_status
=kwargs
.get('check_status', True))
1568 testdir
= teuthology
.get_testdir(self
.ctx
)
1569 prefix
= ['sudo', 'adjust-ulimits', 'ceph-coverage',
1570 f
'{testdir}/archive/coverage', 'timeout', '120', 'ceph',
1571 '--cluster', self
.cluster
]
1572 kwargs
['args'] = prefix
+ list(kwargs
['args'])
1573 return self
.controller
.run(**kwargs
)
1575 def raw_cluster_cmd(self
, *args
, **kwargs
) -> str:
1577 Start ceph on a raw cluster. Return count
1579 stdout
= kwargs
.pop('stdout', StringIO())
1580 p
= self
.run_cluster_cmd(args
=args
, stdout
=stdout
, **kwargs
)
1581 return p
.stdout
.getvalue()
1583 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1585 Start ceph on a cluster. Return success or failure information.
1587 kwargs
['args'], kwargs
['check_status'] = args
, False
1588 return self
.run_cluster_cmd(**kwargs
).exitstatus
1590 def run_ceph_w(self
, watch_channel
=None):
1592 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1593 and return the RemoteProcess.
1595 :param watch_channel: Specifies the channel to be watched. This can be
1596 'cluster', 'audit', ...
1597 :type watch_channel: str
1606 if watch_channel
is not None:
1607 args
.append("--watch-channel")
1608 args
.append(watch_channel
)
1609 return self
.controller
.run(args
=args
, wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1611 def get_mon_socks(self
):
1613 Get monitor sockets.
1615 :return socks: tuple of strings; strings are individual sockets.
1617 from json
import loads
1619 output
= loads(self
.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1621 for mon
in output
['mons']:
1622 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1623 socks
.append(addrvec_mem
['addr'])
1626 def get_msgrv1_mon_socks(self
):
1628 Get monitor sockets that use msgrv1 to operate.
1630 :return socks: tuple of strings; strings are individual sockets.
1632 from json
import loads
1634 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1636 for mon
in output
['mons']:
1637 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1638 if addrvec_mem
['type'] == 'v1':
1639 socks
.append(addrvec_mem
['addr'])
1642 def get_msgrv2_mon_socks(self
):
1644 Get monitor sockets that use msgrv2 to operate.
1646 :return socks: tuple of strings; strings are individual sockets.
1648 from json
import loads
1650 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1652 for mon
in output
['mons']:
1653 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1654 if addrvec_mem
['type'] == 'v2':
1655 socks
.append(addrvec_mem
['addr'])
1658 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1660 Flush pg stats from a list of OSD ids, ensuring they are reflected
1661 all the way to the monitor. Luminous and later only.
1663 :param osds: list of OSDs to flush
1664 :param no_wait: list of OSDs not to wait for seq id. by default, we
1665 wait for all specified osds, but some of them could be
1666 moved out of osdmap, so we cannot get their updated
1667 stat seq from monitor anymore. in that case, you need
1668 to pass a blocklist.
1669 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1670 it. (5 min by default)
1672 seq
= {osd
: int(self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats'))
1674 if not wait_for_mon
:
1678 for osd
, need
in seq
.items():
1682 while wait_for_mon
> 0:
1683 got
= int(self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
))
1684 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1685 need
=need
, got
=got
, osd
=osd
))
1690 wait_for_mon
-= A_WHILE
1692 raise Exception('timed out waiting for mon to be updated with '
1693 'osd.{osd}: {got} < {need}'.
1694 format(osd
=osd
, got
=got
, need
=need
))
1696 def flush_all_pg_stats(self
):
1697 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1699 def do_rados(self
, cmd
, pool
=None, namespace
=None, remote
=None, **kwargs
):
1701 Execute a remote rados command.
1704 remote
= self
.controller
1706 testdir
= teuthology
.get_testdir(self
.ctx
)
1710 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1715 if pool
is not None:
1716 pre
+= ['--pool', pool
]
1717 if namespace
is not None:
1718 pre
+= ['--namespace', namespace
]
1727 def rados_write_objects(self
, pool
, num_objects
, size
,
1728 timelimit
, threads
, cleanup
=False):
1731 Threads not used yet.
1734 '--num-objects', num_objects
,
1740 args
.append('--no-cleanup')
1741 return self
.do_rados(map(str, args
), pool
=pool
)
1743 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1745 Implement rados put operation
1747 args
= ['put', obj
, fname
]
1748 return self
.do_rados(
1755 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1757 Implement rados get operation
1759 args
= ['get', obj
, fname
]
1760 return self
.do_rados(
1764 namespace
=namespace
,
1767 def do_rm(self
, pool
, obj
, namespace
=None):
1769 Implement rados rm operation
1772 return self
.do_rados(
1779 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1782 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1784 def find_remote(self
, service_type
, service_id
):
1786 Get the Remote for the host where a particular service runs.
1788 :param service_type: 'mds', 'osd', 'client'
1789 :param service_id: The second part of a role, e.g. '0' for
1791 :return: a Remote instance for the host where the
1792 requested role is placed
1794 return get_remote(self
.ctx
, self
.cluster
,
1795 service_type
, service_id
)
1797 def admin_socket(self
, service_type
, service_id
,
1798 command
, check_status
=True, timeout
=0, stdout
=None):
1800 Remotely start up ceph specifying the admin socket
1801 :param command: a list of words to use as the command
1807 remote
= self
.find_remote(service_type
, service_id
)
1811 self
.ctx
, self
.cluster
, remote
,
1813 'ceph', 'daemon', '%s.%s' % (service_type
, service_id
),
1817 check_status
=check_status
,
1820 assert False, 'not implemented'
1822 testdir
= teuthology
.get_testdir(self
.ctx
)
1827 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1834 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1835 cluster
=self
.cluster
,
1839 args
.extend(command
)
1844 check_status
=check_status
1847 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1848 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1850 def get_pgid(self
, pool
, pgnum
):
1852 :param pool: pool name
1853 :param pgnum: pg number
1854 :returns: a string representing this pg.
1856 poolnum
= self
.get_pool_num(pool
)
1857 pg_str
= "{poolnum}.{pgnum}".format(
1862 def get_pg_replica(self
, pool
, pgnum
):
1864 get replica for pool, pgnum (e.g. (data, 0)->0
1866 pg_str
= self
.get_pgid(pool
, pgnum
)
1867 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1868 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1869 return int(j
['acting'][-1])
1872 def wait_for_pg_stats(func
):
1873 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1874 # by default, and take the faulty injection in ms into consideration,
1875 # 12 seconds are more than enough
1876 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1878 def wrapper(self
, *args
, **kwargs
):
1880 for delay
in delays
:
1882 return func(self
, *args
, **kwargs
)
1883 except AssertionError as e
:
1889 def get_pg_primary(self
, pool
, pgnum
):
1891 get primary for pool, pgnum (e.g. (data, 0)->0
1893 pg_str
= self
.get_pgid(pool
, pgnum
)
1894 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1895 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1896 return int(j
['acting'][0])
1899 def get_pool_num(self
, pool
):
1901 get number for pool (e.g., data -> 2)
1903 return int(self
.get_pool_dump(pool
)['pool'])
1905 def list_pools(self
):
1909 osd_dump
= self
.get_osd_dump_json()
1910 self
.log(osd_dump
['pools'])
1911 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1913 def clear_pools(self
):
1917 [self
.remove_pool(i
) for i
in self
.list_pools()]
1919 def kick_recovery_wq(self
, osdnum
):
1921 Run kick_recovery_wq on cluster.
1923 return self
.raw_cluster_cmd(
1924 'tell', "osd.%d" % (int(osdnum
),),
1929 def wait_run_admin_socket(self
, service_type
,
1930 service_id
, args
=['version'], timeout
=75, stdout
=None):
1932 If osd_admin_socket call succeeds, return. Otherwise wait
1933 five seconds and try again.
1939 proc
= self
.admin_socket(service_type
, service_id
,
1940 args
, check_status
=False, stdout
=stdout
)
1941 if proc
.exitstatus
== 0:
1945 if (tries
* 5) > timeout
:
1946 raise Exception('timed out waiting for admin_socket '
1947 'to appear after {type}.{id} restart'.
1948 format(type=service_type
,
1950 self
.log("waiting on admin_socket for {type}-{id}, "
1951 "{command}".format(type=service_type
,
1956 def get_pool_dump(self
, pool
):
1958 get the osd dump part of a pool
1960 osd_dump
= self
.get_osd_dump_json()
1961 for i
in osd_dump
['pools']:
1962 if i
['pool_name'] == pool
:
1966 def get_config(self
, service_type
, service_id
, name
):
1968 :param node: like 'mon.a'
1969 :param name: the option name
1971 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1973 j
= json
.loads(proc
.stdout
.getvalue())
1976 def inject_args(self
, service_type
, service_id
, name
, value
):
1977 whom
= '{0}.{1}'.format(service_type
, service_id
)
1978 if isinstance(value
, bool):
1979 value
= 'true' if value
else 'false'
1980 opt_arg
= '--{name}={value}'.format(name
=name
, value
=value
)
1981 self
.raw_cluster_cmd('--', 'tell', whom
, 'injectargs', opt_arg
)
1983 def set_config(self
, osdnum
, **argdict
):
1985 :param osdnum: osd number
1986 :param argdict: dictionary containing values to set.
1988 for k
, v
in argdict
.items():
1989 self
.wait_run_admin_socket(
1991 ['config', 'set', str(k
), str(v
)])
1993 def raw_cluster_status(self
):
1995 Get status from cluster
1997 status
= self
.raw_cluster_cmd('status', '--format=json')
1998 return json
.loads(status
)
2000 def raw_osd_status(self
):
2002 Get osd status from cluster
2004 return self
.raw_cluster_cmd('osd', 'dump')
2006 def get_osd_status(self
):
2008 Get osd statuses sorted by states that the osds are in.
2010 osd_lines
= list(filter(
2011 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
2012 self
.raw_osd_status().split('\n')))
2014 in_osds
= [int(i
[4:].split()[0])
2015 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
2016 out_osds
= [int(i
[4:].split()[0])
2017 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
2018 up_osds
= [int(i
[4:].split()[0])
2019 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
2020 down_osds
= [int(i
[4:].split()[0])
2021 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
2022 dead_osds
= [int(x
.id_
)
2023 for x
in filter(lambda x
:
2026 iter_daemons_of_role('osd', self
.cluster
))]
2027 live_osds
= [int(x
.id_
) for x
in
2030 self
.ctx
.daemons
.iter_daemons_of_role('osd',
2032 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
2033 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
2036 def get_num_pgs(self
):
2038 Check cluster status for the number of pgs
2040 status
= self
.raw_cluster_status()
2042 return status
['pgmap']['num_pgs']
2044 def create_erasure_code_profile(self
, profile_name
, profile
):
2046 Create an erasure code profile name that can be used as a parameter
2047 when creating an erasure coded pool.
2050 args
= cmd_erasure_code_profile(profile_name
, profile
)
2051 self
.raw_cluster_cmd(*args
)
2053 def create_pool_with_unique_name(self
, pg_num
=16,
2054 erasure_code_profile_name
=None,
2056 erasure_code_use_overwrites
=False):
2058 Create a pool named unique_pool_X where X is unique.
2062 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
2063 self
.next_pool_id
+= 1
2067 erasure_code_profile_name
=erasure_code_profile_name
,
2069 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
2072 @contextlib.contextmanager
2073 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
2074 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
2076 self
.remove_pool(pool_name
)
2078 def create_pool(self
, pool_name
, pg_num
=16,
2079 erasure_code_profile_name
=None,
2081 erasure_code_use_overwrites
=False):
2083 Create a pool named from the pool_name parameter.
2084 :param pool_name: name of the pool being created.
2085 :param pg_num: initial number of pgs.
2086 :param erasure_code_profile_name: if set and !None create an
2087 erasure coded pool using the profile
2088 :param erasure_code_use_overwrites: if true, allow overwrites
2091 assert isinstance(pool_name
, str)
2092 assert isinstance(pg_num
, int)
2093 assert pool_name
not in self
.pools
2094 self
.log("creating pool_name %s" % (pool_name
,))
2095 if erasure_code_profile_name
:
2096 self
.raw_cluster_cmd('osd', 'pool', 'create',
2097 pool_name
, str(pg_num
), str(pg_num
),
2098 'erasure', erasure_code_profile_name
)
2100 self
.raw_cluster_cmd('osd', 'pool', 'create',
2101 pool_name
, str(pg_num
))
2102 if min_size
is not None:
2103 self
.raw_cluster_cmd(
2104 'osd', 'pool', 'set', pool_name
,
2107 if erasure_code_use_overwrites
:
2108 self
.raw_cluster_cmd(
2109 'osd', 'pool', 'set', pool_name
,
2110 'allow_ec_overwrites',
2112 self
.raw_cluster_cmd(
2113 'osd', 'pool', 'application', 'enable',
2114 pool_name
, 'rados', '--yes-i-really-mean-it',
2115 run
.Raw('||'), 'true')
2116 self
.pools
[pool_name
] = pg_num
2119 def add_pool_snap(self
, pool_name
, snap_name
):
2122 :param pool_name: name of pool to snapshot
2123 :param snap_name: name of snapshot to take
2125 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
2126 str(pool_name
), str(snap_name
))
2128 def remove_pool_snap(self
, pool_name
, snap_name
):
2130 Remove pool snapshot
2131 :param pool_name: name of pool to snapshot
2132 :param snap_name: name of snapshot to remove
2134 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2135 str(pool_name
), str(snap_name
))
2137 def remove_pool(self
, pool_name
):
2139 Remove the indicated pool
2140 :param pool_name: Pool to be removed
2143 assert isinstance(pool_name
, str)
2144 assert pool_name
in self
.pools
2145 self
.log("removing pool_name %s" % (pool_name
,))
2146 del self
.pools
[pool_name
]
2147 self
.raw_cluster_cmd('osd', 'pool', 'rm', pool_name
, pool_name
,
2148 "--yes-i-really-really-mean-it")
2156 return random
.sample(self
.pools
.keys(), 1)[0]
2158 def get_pool_pg_num(self
, pool_name
):
2160 Return the number of pgs in the pool specified.
2163 assert isinstance(pool_name
, str)
2164 if pool_name
in self
.pools
:
2165 return self
.pools
[pool_name
]
2168 def get_pool_property(self
, pool_name
, prop
):
2170 :param pool_name: pool
2171 :param prop: property to be checked.
2172 :returns: property as string
2175 assert isinstance(pool_name
, str)
2176 assert isinstance(prop
, str)
2177 output
= self
.raw_cluster_cmd(
2183 return output
.split()[1]
2185 def get_pool_int_property(self
, pool_name
, prop
):
2186 return int(self
.get_pool_property(pool_name
, prop
))
2188 def set_pool_property(self
, pool_name
, prop
, val
):
2190 :param pool_name: pool
2191 :param prop: property to be set.
2192 :param val: value to set.
2194 This routine retries if set operation fails.
2197 assert isinstance(pool_name
, str)
2198 assert isinstance(prop
, str)
2199 assert isinstance(val
, int)
2202 r
= self
.raw_cluster_cmd_result(
2209 if r
!= 11: # EAGAIN
2213 raise Exception('timed out getting EAGAIN '
2214 'when setting pool property %s %s = %s' %
2215 (pool_name
, prop
, val
))
2216 self
.log('got EAGAIN setting pool property, '
2217 'waiting a few seconds...')
2220 def expand_pool(self
, pool_name
, by
, max_pgs
):
2222 Increase the number of pgs in a pool
2225 assert isinstance(pool_name
, str)
2226 assert isinstance(by
, int)
2227 assert pool_name
in self
.pools
2228 if self
.get_num_creating() > 0:
2230 if (self
.pools
[pool_name
] + by
) > max_pgs
:
2232 self
.log("increase pool size by %d" % (by
,))
2233 new_pg_num
= self
.pools
[pool_name
] + by
2234 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2235 self
.pools
[pool_name
] = new_pg_num
2238 def contract_pool(self
, pool_name
, by
, min_pgs
):
2240 Decrease the number of pgs in a pool
2243 self
.log('contract_pool %s by %s min %s' % (
2244 pool_name
, str(by
), str(min_pgs
)))
2245 assert isinstance(pool_name
, str)
2246 assert isinstance(by
, int)
2247 assert pool_name
in self
.pools
2248 if self
.get_num_creating() > 0:
2249 self
.log('too many creating')
2251 proj
= self
.pools
[pool_name
] - by
2253 self
.log('would drop below min_pgs, proj %d, currently %d' % (proj
,self
.pools
[pool_name
],))
2255 self
.log("decrease pool size by %d" % (by
,))
2256 new_pg_num
= self
.pools
[pool_name
] - by
2257 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2258 self
.pools
[pool_name
] = new_pg_num
2261 def stop_pg_num_changes(self
):
2263 Reset all pg_num_targets back to pg_num, canceling splits and merges
2265 self
.log('Canceling any pending splits or merges...')
2266 osd_dump
= self
.get_osd_dump_json()
2268 for pool
in osd_dump
['pools']:
2269 if pool
['pg_num'] != pool
['pg_num_target']:
2270 self
.log('Setting pool %s (%d) pg_num %d -> %d' %
2271 (pool
['pool_name'], pool
['pool'],
2272 pool
['pg_num_target'],
2274 self
.raw_cluster_cmd('osd', 'pool', 'set', pool
['pool_name'],
2275 'pg_num', str(pool
['pg_num']))
2277 # we don't support pg_num_target before nautilus
2280 def set_pool_pgpnum(self
, pool_name
, force
):
2282 Set pgpnum property of pool_name pool.
2285 assert isinstance(pool_name
, str)
2286 assert pool_name
in self
.pools
2287 if not force
and self
.get_num_creating() > 0:
2289 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
2292 def list_pg_unfound(self
, pgid
):
2294 return list of unfound pgs with the id specified
2299 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_unfound',
2305 r
['objects'].extend(j
['objects'])
2310 offset
= j
['objects'][-1]['oid']
2315 def get_pg_stats(self
):
2317 Dump the cluster and get pg stats
2319 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
2320 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2322 return j
['pg_map']['pg_stats']
2324 return j
['pg_stats']
2326 def get_pgids_to_force(self
, backfill
):
2328 Return the randomized list of PGs that can have their recovery/backfill forced
2330 j
= self
.get_pg_stats();
2333 wanted
= ['degraded', 'backfilling', 'backfill_wait']
2335 wanted
= ['recovering', 'degraded', 'recovery_wait']
2337 status
= pg
['state'].split('+')
2339 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
2340 pgids
.append(pg
['pgid'])
2344 def get_pgids_to_cancel_force(self
, backfill
):
2346 Return the randomized list of PGs whose recovery/backfill priority is forced
2348 j
= self
.get_pg_stats();
2351 wanted
= 'forced_backfill'
2353 wanted
= 'forced_recovery'
2355 status
= pg
['state'].split('+')
2356 if wanted
in status
and random
.random() > 0.5:
2357 pgids
.append(pg
['pgid'])
2360 def compile_pg_status(self
):
2362 Return a histogram of pg state values
2365 j
= self
.get_pg_stats()
2367 for status
in pg
['state'].split('+'):
2368 if status
not in ret
:
2373 @wait_for_pg_stats # type: ignore
2374 def with_pg_state(self
, pool
, pgnum
, check
):
2375 pgstr
= self
.get_pgid(pool
, pgnum
)
2376 stats
= self
.get_single_pg_stats(pgstr
)
2377 assert(check(stats
['state']))
2379 @wait_for_pg_stats # type: ignore
2380 def with_pg(self
, pool
, pgnum
, check
):
2381 pgstr
= self
.get_pgid(pool
, pgnum
)
2382 stats
= self
.get_single_pg_stats(pgstr
)
2385 def get_last_scrub_stamp(self
, pool
, pgnum
):
2387 Get the timestamp of the last scrub.
2389 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
2390 return stats
["last_scrub_stamp"]
2392 def do_pg_scrub(self
, pool
, pgnum
, stype
):
2394 Scrub pg and wait for scrubbing to finish
2396 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
2397 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
2398 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
2401 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
2402 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
2403 self
.log("waiting for scrub type %s" % (stype
,))
2404 if (timer
% RESEND_TIMEOUT
) == 0:
2405 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
2406 # The first time in this loop is the actual request
2407 if timer
!= 0 and stype
== "repair":
2408 self
.log("WARNING: Resubmitted a non-idempotent repair")
2409 time
.sleep(SLEEP_TIME
)
2412 def wait_snap_trimming_complete(self
, pool
):
2414 Wait for snap trimming on pool to end
2419 poolnum
= self
.get_pool_num(pool
)
2420 poolnumstr
= "%s." % (poolnum
,)
2423 if (now
- start
) > FATAL_TIMEOUT
:
2424 assert (now
- start
) < FATAL_TIMEOUT
, \
2425 'failed to complete snap trimming before timeout'
2426 all_stats
= self
.get_pg_stats()
2428 for pg
in all_stats
:
2429 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
2430 self
.log("pg {pg} in trimming, state: {state}".format(
2436 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
2437 time
.sleep(POLL_PERIOD
)
2439 def get_single_pg_stats(self
, pgid
):
2441 Return pg for the pgid specified.
2443 all_stats
= self
.get_pg_stats()
2445 for pg
in all_stats
:
2446 if pg
['pgid'] == pgid
:
2451 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
2454 pool_dump
= self
.get_pool_dump(pool
)
2455 object_map
= self
.get_object_map(pool
, name
)
2456 if pool_dump
["type"] == PoolType
.ERASURE_CODED
:
2457 shard
= object_map
['acting'].index(osdid
)
2458 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
2461 return object_map
['pgid']
2463 def get_object_primary(self
, pool
, name
):
2466 object_map
= self
.get_object_map(pool
, name
)
2467 return object_map
['acting_primary']
2469 def get_object_map(self
, pool
, name
):
2471 osd map --format=json converted to a python object
2472 :returns: the python object
2474 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
2475 return json
.loads('\n'.join(out
.split('\n')[1:]))
2477 def get_osd_dump_json(self
):
2479 osd dump --format=json converted to a python object
2480 :returns: the python object
2482 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
2483 return json
.loads('\n'.join(out
.split('\n')[1:]))
2485 def get_osd_dump(self
):
2490 return self
.get_osd_dump_json()['osds']
2492 def get_osd_metadata(self
):
2494 osd metadata --format=json converted to a python object
2495 :returns: the python object containing osd metadata information
2497 out
= self
.raw_cluster_cmd('osd', 'metadata', '--format=json')
2498 return json
.loads('\n'.join(out
.split('\n')[1:]))
2500 def get_mgr_dump(self
):
2501 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2502 return json
.loads(out
)
2504 def get_stuck_pgs(self
, type_
, threshold
):
2506 :returns: stuck pg information from the cluster
2508 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2510 return json
.loads(out
).get('stuck_pg_stats',[])
2512 def get_num_unfound_objects(self
):
2514 Check cluster status to get the number of unfound objects
2516 status
= self
.raw_cluster_status()
2518 return status
['pgmap'].get('unfound_objects', 0)
2520 def get_num_creating(self
):
2522 Find the number of pgs in creating mode.
2524 pgs
= self
.get_pg_stats()
2527 if 'creating' in pg
['state']:
2531 def get_num_active_clean(self
):
2533 Find the number of active and clean pgs.
2535 pgs
= self
.get_pg_stats()
2536 return self
._get
_num
_active
_clean
(pgs
)
2538 def _get_num_active_clean(self
, pgs
):
2541 if (pg
['state'].count('active') and
2542 pg
['state'].count('clean') and
2543 not pg
['state'].count('stale')):
2547 def get_num_active_recovered(self
):
2549 Find the number of active and recovered pgs.
2551 pgs
= self
.get_pg_stats()
2552 return self
._get
_num
_active
_recovered
(pgs
)
2554 def _get_num_active_recovered(self
, pgs
):
2557 if (pg
['state'].count('active') and
2558 not pg
['state'].count('recover') and
2559 not pg
['state'].count('backfilling') and
2560 not pg
['state'].count('stale')):
2564 def get_is_making_recovery_progress(self
):
2566 Return whether there is recovery progress discernable in the
2569 status
= self
.raw_cluster_status()
2570 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2571 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2572 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2573 return kps
> 0 or bps
> 0 or ops
> 0
2575 def get_num_active(self
):
2577 Find the number of active pgs.
2579 pgs
= self
.get_pg_stats()
2580 return self
._get
_num
_active
(pgs
)
2582 def _get_num_active(self
, pgs
):
2585 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2589 def get_num_down(self
):
2591 Find the number of pgs that are down.
2593 pgs
= self
.get_pg_stats()
2596 if ((pg
['state'].count('down') and not
2597 pg
['state'].count('stale')) or
2598 (pg
['state'].count('incomplete') and not
2599 pg
['state'].count('stale'))):
2603 def get_num_active_down(self
):
2605 Find the number of pgs that are either active or down.
2607 pgs
= self
.get_pg_stats()
2608 return self
._get
_num
_active
_down
(pgs
)
2610 def _get_num_active_down(self
, pgs
):
2613 if ((pg
['state'].count('active') and not
2614 pg
['state'].count('stale')) or
2615 (pg
['state'].count('down') and not
2616 pg
['state'].count('stale')) or
2617 (pg
['state'].count('incomplete') and not
2618 pg
['state'].count('stale'))):
2622 def get_num_peered(self
):
2624 Find the number of PGs that are peered
2626 pgs
= self
.get_pg_stats()
2627 return self
._get
_num
_peered
(pgs
)
2629 def _get_num_peered(self
, pgs
):
2632 if pg
['state'].count('peered') and not pg
['state'].count('stale'):
2638 True if all pgs are clean
2640 pgs
= self
.get_pg_stats()
2641 return self
._get
_num
_active
_clean
(pgs
) == len(pgs
)
2643 def is_recovered(self
):
2645 True if all pgs have recovered
2647 pgs
= self
.get_pg_stats()
2648 return self
._get
_num
_active
_recovered
(pgs
) == len(pgs
)
2650 def is_active_or_down(self
):
2652 True if all pgs are active or down
2654 pgs
= self
.get_pg_stats()
2655 return self
._get
_num
_active
_down
(pgs
) == len(pgs
)
2657 def dump_pgs_not_active_clean(self
):
2659 Dumps all pgs that are not active+clean
2661 pgs
= self
.get_pg_stats()
2663 if pg
['state'] != 'active+clean':
2664 self
.log('PG %s is not active+clean' % pg
['pgid'])
2667 def dump_pgs_not_active_down(self
):
2669 Dumps all pgs that are not active or down
2671 pgs
= self
.get_pg_stats()
2673 if 'active' not in pg
['state'] and 'down' not in pg
['state']:
2674 self
.log('PG %s is not active or down' % pg
['pgid'])
2677 def dump_pgs_not_active(self
):
2679 Dumps all pgs that are not active
2681 pgs
= self
.get_pg_stats()
2683 if 'active' not in pg
['state']:
2684 self
.log('PG %s is not active' % pg
['pgid'])
2687 def wait_for_clean(self
, timeout
=1200):
2689 Returns true when all pgs are clean.
2691 self
.log("waiting for clean")
2693 num_active_clean
= self
.get_num_active_clean()
2694 while not self
.is_clean():
2695 if timeout
is not None:
2696 if self
.get_is_making_recovery_progress():
2697 self
.log("making progress, resetting timeout")
2700 self
.log("no progress seen, keeping timeout for now")
2701 if time
.time() - start
>= timeout
:
2702 self
.log('dumping pgs not clean')
2703 self
.dump_pgs_not_active_clean()
2704 assert time
.time() - start
< timeout
, \
2705 'wait_for_clean: failed before timeout expired'
2706 cur_active_clean
= self
.get_num_active_clean()
2707 if cur_active_clean
!= num_active_clean
:
2709 num_active_clean
= cur_active_clean
2713 def are_all_osds_up(self
):
2715 Returns true if all osds are up.
2717 x
= self
.get_osd_dump()
2718 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2720 def wait_for_all_osds_up(self
, timeout
=None):
2722 When this exits, either the timeout has expired, or all
2725 self
.log("waiting for all up")
2727 while not self
.are_all_osds_up():
2728 if timeout
is not None:
2729 assert time
.time() - start
< timeout
, \
2730 'timeout expired in wait_for_all_osds_up'
2734 def pool_exists(self
, pool
):
2735 if pool
in self
.list_pools():
2739 def wait_for_pool(self
, pool
, timeout
=300):
2741 Wait for a pool to exist
2743 self
.log('waiting for pool %s to exist' % pool
)
2745 while not self
.pool_exists(pool
):
2746 if timeout
is not None:
2747 assert time
.time() - start
< timeout
, \
2748 'timeout expired in wait_for_pool'
2751 def wait_for_pools(self
, pools
):
2753 self
.wait_for_pool(pool
)
2755 def is_mgr_available(self
):
2756 x
= self
.get_mgr_dump()
2757 return x
.get('available', False)
2759 def wait_for_mgr_available(self
, timeout
=None):
2760 self
.log("waiting for mgr available")
2762 while not self
.is_mgr_available():
2763 if timeout
is not None:
2764 assert time
.time() - start
< timeout
, \
2765 'timeout expired in wait_for_mgr_available'
2767 self
.log("mgr available!")
2769 def wait_for_recovery(self
, timeout
=None):
2771 Check peering. When this exists, we have recovered.
2773 self
.log("waiting for recovery to complete")
2775 num_active_recovered
= self
.get_num_active_recovered()
2776 while not self
.is_recovered():
2778 if timeout
is not None:
2779 if self
.get_is_making_recovery_progress():
2780 self
.log("making progress, resetting timeout")
2783 self
.log("no progress seen, keeping timeout for now")
2784 if now
- start
>= timeout
:
2785 if self
.is_recovered():
2787 self
.log('dumping pgs not recovered yet')
2788 self
.dump_pgs_not_active_clean()
2789 assert now
- start
< timeout
, \
2790 'wait_for_recovery: failed before timeout expired'
2791 cur_active_recovered
= self
.get_num_active_recovered()
2792 if cur_active_recovered
!= num_active_recovered
:
2794 num_active_recovered
= cur_active_recovered
2796 self
.log("recovered!")
2798 def wait_for_active(self
, timeout
=None):
2800 Check peering. When this exists, we are definitely active
2802 self
.log("waiting for peering to complete")
2804 num_active
= self
.get_num_active()
2805 while not self
.is_active():
2806 if timeout
is not None:
2807 if time
.time() - start
>= timeout
:
2808 self
.log('dumping pgs not active')
2809 self
.dump_pgs_not_active()
2810 assert time
.time() - start
< timeout
, \
2811 'wait_for_active: failed before timeout expired'
2812 cur_active
= self
.get_num_active()
2813 if cur_active
!= num_active
:
2815 num_active
= cur_active
2819 def wait_for_active_or_down(self
, timeout
=None):
2821 Check peering. When this exists, we are definitely either
2824 self
.log("waiting for peering to complete or become blocked")
2826 num_active_down
= self
.get_num_active_down()
2827 while not self
.is_active_or_down():
2828 if timeout
is not None:
2829 if time
.time() - start
>= timeout
:
2830 self
.log('dumping pgs not active or down')
2831 self
.dump_pgs_not_active_down()
2832 assert time
.time() - start
< timeout
, \
2833 'wait_for_active_or_down: failed before timeout expired'
2834 cur_active_down
= self
.get_num_active_down()
2835 if cur_active_down
!= num_active_down
:
2837 num_active_down
= cur_active_down
2839 self
.log("active or down!")
2841 def osd_is_up(self
, osd
):
2843 Wrapper for osd check
2845 osds
= self
.get_osd_dump()
2846 return osds
[osd
]['up'] > 0
2848 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2850 Loop waiting for osd.
2852 self
.log('waiting for osd.%d to be up' % osd
)
2854 while not self
.osd_is_up(osd
):
2855 if timeout
is not None:
2856 assert time
.time() - start
< timeout
, \
2857 'osd.%d failed to come up before timeout expired' % osd
2859 self
.log('osd.%d is up' % osd
)
2861 def is_active(self
):
2863 Wrapper to check if all pgs are active
2865 return self
.get_num_active() == self
.get_num_pgs()
2867 def all_active_or_peered(self
):
2869 Wrapper to check if all PGs are active or peered
2871 pgs
= self
.get_pg_stats()
2872 return self
._get
_num
_active
(pgs
) + self
._get
_num
_peered
(pgs
) == len(pgs
)
2874 def wait_till_active(self
, timeout
=None):
2876 Wait until all pgs are active.
2878 self
.log("waiting till active")
2880 while not self
.is_active():
2881 if timeout
is not None:
2882 if time
.time() - start
>= timeout
:
2883 self
.log('dumping pgs not active')
2884 self
.dump_pgs_not_active()
2885 assert time
.time() - start
< timeout
, \
2886 'wait_till_active: failed before timeout expired'
2890 def wait_till_pg_convergence(self
, timeout
=None):
2893 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2894 if osd
['in'] and osd
['up']]
2896 # strictly speaking, no need to wait for mon. but due to the
2897 # "ms inject socket failures" setting, the osdmap could be delayed,
2898 # so mgr is likely to ignore the pg-stat messages with pgs serving
2899 # newly created pools which is not yet known by mgr. so, to make sure
2900 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2902 self
.flush_pg_stats(active_osds
)
2903 new_stats
= dict((stat
['pgid'], stat
['state'])
2904 for stat
in self
.get_pg_stats())
2905 if old_stats
== new_stats
:
2907 if timeout
is not None:
2908 assert time
.time() - start
< timeout
, \
2909 'failed to reach convergence before %d secs' % timeout
2910 old_stats
= new_stats
2911 # longer than mgr_stats_period
2914 def mark_out_osd(self
, osd
):
2916 Wrapper to mark osd out.
2918 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2920 def kill_osd(self
, osd
):
2922 Kill osds by either power cycling (if indicated by the config)
2925 if self
.config
.get('powercycle'):
2926 remote
= self
.find_remote('osd', osd
)
2927 self
.log('kill_osd on osd.{o} '
2928 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2929 self
._assert
_ipmi
(remote
)
2930 remote
.console
.power_off()
2931 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2932 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2935 'bdev-inject-crash', self
.config
.get('bdev_inject_crash'))
2937 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2941 raise RuntimeError('osd.%s did not fail' % osd
)
2943 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2945 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2948 def _assert_ipmi(remote
):
2949 assert remote
.console
.has_ipmi_credentials
, (
2950 "powercycling requested but RemoteConsole is not "
2951 "initialized. Check ipmi config.")
2953 def blackhole_kill_osd(self
, osd
):
2955 Stop osd if nothing else works.
2957 self
.inject_args('osd', osd
,
2958 'objectstore-blackhole', True)
2960 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2962 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
2964 Revive osds by either power cycling (if indicated by the config)
2967 if self
.config
.get('powercycle'):
2968 remote
= self
.find_remote('osd', osd
)
2969 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2970 format(o
=osd
, s
=remote
.name
))
2971 self
._assert
_ipmi
(remote
)
2972 remote
.console
.power_on()
2973 if not remote
.console
.check_status(300):
2974 raise Exception('Failed to revive osd.{o} via ipmi'.
2976 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2977 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2978 self
.make_admin_daemon_dir(remote
)
2979 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2980 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2982 if not skip_admin_check
:
2983 # wait for dump_ops_in_flight; this command doesn't appear
2984 # until after the signal handler is installed and it is safe
2985 # to stop the osd again without making valgrind leak checks
2986 # unhappy. see #5924.
2987 self
.wait_run_admin_socket('osd', osd
,
2988 args
=['dump_ops_in_flight'],
2989 timeout
=timeout
, stdout
=DEVNULL
)
2991 def mark_down_osd(self
, osd
):
2993 Cluster command wrapper
2995 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2997 def mark_in_osd(self
, osd
):
2999 Cluster command wrapper
3001 self
.raw_cluster_cmd('osd', 'in', str(osd
))
3003 def signal_osd(self
, osd
, sig
, silent
=False):
3005 Wrapper to local get_daemon call which sends the given
3006 signal to the given osd.
3008 self
.ctx
.daemons
.get_daemon('osd', osd
,
3009 self
.cluster
).signal(sig
, silent
=silent
)
3012 def signal_mon(self
, mon
, sig
, silent
=False):
3014 Wrapper to local get_daemon call
3016 self
.ctx
.daemons
.get_daemon('mon', mon
,
3017 self
.cluster
).signal(sig
, silent
=silent
)
3019 def kill_mon(self
, mon
):
3021 Kill the monitor by either power cycling (if the config says so),
3024 if self
.config
.get('powercycle'):
3025 remote
= self
.find_remote('mon', mon
)
3026 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
3027 format(m
=mon
, s
=remote
.name
))
3028 self
._assert
_ipmi
(remote
)
3029 remote
.console
.power_off()
3031 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
3033 def revive_mon(self
, mon
):
3035 Restart by either power cycling (if the config says so),
3036 or by doing a normal restart.
3038 if self
.config
.get('powercycle'):
3039 remote
= self
.find_remote('mon', mon
)
3040 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
3041 format(m
=mon
, s
=remote
.name
))
3042 self
._assert
_ipmi
(remote
)
3043 remote
.console
.power_on()
3044 self
.make_admin_daemon_dir(remote
)
3045 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
3047 def revive_mgr(self
, mgr
):
3049 Restart by either power cycling (if the config says so),
3050 or by doing a normal restart.
3052 if self
.config
.get('powercycle'):
3053 remote
= self
.find_remote('mgr', mgr
)
3054 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3055 format(m
=mgr
, s
=remote
.name
))
3056 self
._assert
_ipmi
(remote
)
3057 remote
.console
.power_on()
3058 self
.make_admin_daemon_dir(remote
)
3059 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
3061 def get_mon_status(self
, mon
):
3063 Extract all the monitor status information from the cluster
3065 out
= self
.raw_cluster_cmd('tell', 'mon.%s' % mon
, 'mon_status')
3066 return json
.loads(out
)
3068 def get_mon_quorum(self
):
3070 Extract monitor quorum information from the cluster
3072 out
= self
.raw_cluster_cmd('quorum_status')
3076 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
3078 Loop until quorum size is reached.
3080 self
.log('waiting for quorum size %d' % size
)
3082 while not len(self
.get_mon_quorum()) == size
:
3083 if timeout
is not None:
3084 assert time
.time() - start
< timeout
, \
3085 ('failed to reach quorum size %d '
3086 'before timeout expired' % size
)
3088 self
.log("quorum is size %d" % size
)
3090 def get_mon_health(self
, debug
=False):
3092 Extract all the monitor health information.
3094 out
= self
.raw_cluster_cmd('health', '--format=json')
3096 self
.log('health:\n{h}'.format(h
=out
))
3097 return json
.loads(out
)
3099 def wait_until_healthy(self
, timeout
=None):
3100 self
.log("wait_until_healthy")
3102 while self
.get_mon_health()['status'] != 'HEALTH_OK':
3103 if timeout
is not None:
3104 assert time
.time() - start
< timeout
, \
3105 'timeout expired in wait_until_healthy'
3107 self
.log("wait_until_healthy done")
3109 def get_filepath(self
):
3111 Return path to osd data with {id} needing to be replaced
3113 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
3115 def make_admin_daemon_dir(self
, remote
):
3117 Create /var/run/ceph directory on remote site.
3120 :param remote: Remote site
3122 remote
.run(args
=['sudo',
3123 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3125 def get_service_task_status(self
, service
, status_key
):
3127 Return daemon task status for a given ceph service.
3129 :param service: ceph service (mds, osd, etc...)
3130 :param status_key: matching task status key
3133 status
= self
.raw_cluster_status()
3135 for k
,v
in status
['servicemap']['services'][service
]['daemons'].items():
3136 ts
= dict(v
).get('task_status', None)
3138 task_status
[k
] = ts
[status_key
]
3139 except KeyError: # catches missing service and status key
3141 self
.log(task_status
)
3144 def utility_task(name
):
3146 Generate ceph_manager subtask corresponding to ceph_manager
3149 def task(ctx
, config
):
3152 args
= config
.get('args', [])
3153 kwargs
= config
.get('kwargs', {})
3154 cluster
= config
.get('cluster', 'ceph')
3155 fn
= getattr(ctx
.managers
[cluster
], name
)
3159 revive_osd
= utility_task("revive_osd")
3160 revive_mon
= utility_task("revive_mon")
3161 kill_osd
= utility_task("kill_osd")
3162 kill_mon
= utility_task("kill_mon")
3163 create_pool
= utility_task("create_pool")
3164 remove_pool
= utility_task("remove_pool")
3165 wait_for_clean
= utility_task("wait_for_clean")
3166 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
3167 set_pool_property
= utility_task("set_pool_property")
3168 do_pg_scrub
= utility_task("do_pg_scrub")
3169 wait_for_pool
= utility_task("wait_for_pool")
3170 wait_for_pools
= utility_task("wait_for_pools")