2 ceph manager -- Thrasher and CephManager objects
4 from functools
import wraps
19 from io
import BytesIO
, StringIO
20 from subprocess
import DEVNULL
21 from teuthology
import misc
as teuthology
22 from tasks
.scrub
import Scrubber
23 from tasks
.util
.rados
import cmd_erasure_code_profile
24 from tasks
.util
import get_remote
26 from teuthology
.contextutil
import safe_while
27 from teuthology
.orchestra
.remote
import Remote
28 from teuthology
.orchestra
import run
29 from teuthology
.parallel
import parallel
30 from teuthology
.exceptions
import CommandFailedError
31 from tasks
.thrasher
import Thrasher
34 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
36 log
= logging
.getLogger(__name__
)
38 # this is for cephadm clusters
39 def shell(ctx
, cluster_name
, remote
, args
, name
=None, **kwargs
):
42 extra_args
= ['-n', name
]
47 '--image', ctx
.ceph
[cluster_name
].image
,
50 '--fsid', ctx
.ceph
[cluster_name
].fsid
,
56 # this is for rook clusters
57 def toolbox(ctx
, cluster_name
, args
, **kwargs
):
58 return ctx
.rook
[cluster_name
].remote
.run(
63 ctx
.rook
[cluster_name
].toolbox
,
70 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
72 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
74 writes
= ctx
.cluster
.run(
76 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
77 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
78 'sudo', 'tee', conf_path
, run
.Raw('&&'),
79 'sudo', 'chmod', '0644', conf_path
,
80 run
.Raw('>'), '/dev/null',
85 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
88 def get_valgrind_args(testdir
, name
, preamble
, v
, exit_on_first_error
=True, cd
=True):
90 Build a command line for running valgrind.
92 testdir - test results directory
93 name - name of daemon (for naming hte log file)
94 preamble - stuff we should run before valgrind
95 v - valgrind arguments
99 if not isinstance(v
, list):
102 # https://tracker.ceph.com/issues/44362
104 'env', 'OPENSSL_ia32cap=~0x1000000000000000',
107 val_path
= '/var/log/ceph/valgrind'
108 if '--tool=memcheck' in v
or '--tool=helgrind' in v
:
111 '--trace-children=no',
112 '--child-silent-after-fork=yes',
113 '--soname-synonyms=somalloc=*tcmalloc*',
115 '--suppressions={tdir}/valgrind.supp'.format(tdir
=testdir
),
117 '--xml-file={vdir}/{n}.log'.format(vdir
=val_path
, n
=name
),
124 '--trace-children=no',
125 '--child-silent-after-fork=yes',
126 '--soname-synonyms=somalloc=*tcmalloc*',
127 '--suppressions={tdir}/valgrind.supp'.format(tdir
=testdir
),
128 '--log-file={vdir}/{n}.log'.format(vdir
=val_path
, n
=name
),
132 if exit_on_first_error
:
134 # at least Valgrind 3.14 is required
135 '--exit-on-first-error=yes',
136 '--error-exitcode=42',
140 args
+= ['cd', testdir
, run
.Raw('&&')]
141 args
+= preamble
+ extra_args
+ v
142 log
.debug('running %s under valgrind with args %s', name
, args
)
146 def mount_osd_data(ctx
, remote
, cluster
, osd
):
151 :param remote: Remote site
152 :param cluster: name of ceph cluster
155 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
156 role
= "{0}.osd.{1}".format(cluster
, osd
)
157 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
158 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
159 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
161 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
163 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
164 mount_options
= ctx
.disk_config
.\
165 remote_to_roles_to_dev_mount_options
[remote
][role
]
166 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
167 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
169 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
170 'mountpoint: {p}, type: {t}, options: {v}'.format(
171 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
179 '-o', ','.join(mount_options
),
192 self
.log(traceback
.format_exc())
202 class OSDThrasher(Thrasher
):
204 Object used to thrash Ceph
206 def __init__(self
, manager
, config
, name
, logger
):
207 super(OSDThrasher
, self
).__init
__()
209 self
.ceph_manager
= manager
210 self
.cluster
= manager
.cluster
211 self
.ceph_manager
.wait_for_clean()
212 osd_status
= self
.ceph_manager
.get_osd_status()
213 self
.in_osds
= osd_status
['in']
214 self
.live_osds
= osd_status
['live']
215 self
.out_osds
= osd_status
['out']
216 self
.dead_osds
= osd_status
['dead']
217 self
.stopping
= False
221 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
222 self
.pools_to_fix_pgp_num
= set()
223 if self
.config
.get('powercycle'):
224 self
.revive_timeout
+= 120
225 self
.clean_wait
= self
.config
.get('clean_wait', 0)
226 self
.minin
= self
.config
.get("min_in", 4)
227 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
228 self
.sighup_delay
= self
.config
.get('sighup_delay')
229 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
230 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
231 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
232 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
233 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
234 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
235 self
.random_eio
= self
.config
.get('random_eio')
236 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
238 num_osds
= self
.in_osds
+ self
.out_osds
239 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * len(num_osds
)
240 self
.min_pgs
= self
.config
.get("min_pgs_per_pool_osd", 1) * len(num_osds
)
241 if self
.config
is None:
243 # prevent monitor from auto-marking things out while thrasher runs
244 # try both old and new tell syntax, in case we are testing old code
245 self
.saved_options
= []
246 # assuming that the default settings do not vary from one daemon to
248 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
249 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
250 #why do we disable marking an OSD out automatically? :/
251 for service
, opt
, new_value
in opts
:
252 old_value
= manager
.get_config(first_mon
[0],
255 self
.saved_options
.append((service
, opt
, old_value
))
256 manager
.inject_args(service
, '*', opt
, new_value
)
257 # initialize ceph_objectstore_tool property - must be done before
258 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
259 if (self
.config
.get('powercycle') or
260 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
261 self
.config
.get('disable_objectstore_tool_tests', False)):
262 self
.ceph_objectstore_tool
= False
263 if self
.config
.get('powercycle'):
264 self
.log("Unable to test ceph-objectstore-tool, "
265 "powercycle testing")
267 self
.log("Unable to test ceph-objectstore-tool, "
268 "not available on all OSD nodes")
270 self
.ceph_objectstore_tool
= \
271 self
.config
.get('ceph_objectstore_tool', True)
273 self
.thread
= gevent
.spawn(self
.do_thrash
)
274 if self
.sighup_delay
:
275 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
276 if self
.optrack_toggle_delay
:
277 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
278 if self
.dump_ops_enable
== "true":
279 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
280 if self
.noscrub_toggle_delay
:
281 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
283 def log(self
, msg
, *args
, **kwargs
):
284 self
.logger
.info(msg
, *args
, **kwargs
)
286 def cmd_exists_on_osds(self
, cmd
):
287 if self
.ceph_manager
.cephadm
or self
.ceph_manager
.rook
:
289 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
290 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
291 allremotes
= list(set(allremotes
))
292 for remote
in allremotes
:
293 proc
= remote
.run(args
=['type', cmd
], wait
=True,
294 check_status
=False, stdout
=BytesIO(),
296 if proc
.exitstatus
!= 0:
300 def run_ceph_objectstore_tool(self
, remote
, osd
, cmd
):
301 if self
.ceph_manager
.cephadm
:
303 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
304 args
=['ceph-objectstore-tool', '--err-to-stderr'] + cmd
,
306 wait
=True, check_status
=False,
309 elif self
.ceph_manager
.rook
:
310 assert False, 'not implemented'
313 args
=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool', '--err-to-stderr'] + cmd
,
314 wait
=True, check_status
=False,
318 def run_ceph_bluestore_tool(self
, remote
, osd
, cmd
):
319 if self
.ceph_manager
.cephadm
:
321 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
322 args
=['ceph-bluestore-tool', '--err-to-stderr'] + cmd
,
324 wait
=True, check_status
=False,
327 elif self
.ceph_manager
.rook
:
328 assert False, 'not implemented'
331 args
=['sudo', 'ceph-bluestore-tool', '--err-to-stderr'] + cmd
,
332 wait
=True, check_status
=False,
336 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
338 :param osd: Osd to be killed.
339 :mark_down: Mark down if true.
340 :mark_out: Mark out if true.
343 osd
= random
.choice(self
.live_osds
)
344 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
345 str(self
.live_osds
)))
346 self
.live_osds
.remove(osd
)
347 self
.dead_osds
.append(osd
)
348 self
.ceph_manager
.kill_osd(osd
)
350 self
.ceph_manager
.mark_down_osd(osd
)
351 if mark_out
and osd
in self
.in_osds
:
353 if self
.ceph_objectstore_tool
:
354 self
.log("Testing ceph-objectstore-tool on down osd.%s" % osd
)
355 remote
= self
.ceph_manager
.find_remote('osd', osd
)
356 FSPATH
= self
.ceph_manager
.get_filepath()
357 JPATH
= os
.path
.join(FSPATH
, "journal")
358 exp_osd
= imp_osd
= osd
359 self
.log('remote for osd %s is %s' % (osd
, remote
))
360 exp_remote
= imp_remote
= remote
361 # If an older osd is available we'll move a pg from there
362 if (len(self
.dead_osds
) > 1 and
363 random
.random() < self
.chance_move_pg
):
364 exp_osd
= random
.choice(self
.dead_osds
[:-1])
365 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
366 self
.log('remote for exp osd %s is %s' % (exp_osd
, exp_remote
))
369 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
372 if self
.ceph_manager
.rook
:
373 assert False, 'not implemented'
375 if not self
.ceph_manager
.cephadm
:
376 # ceph-objectstore-tool might be temporarily absent during an
377 # upgrade - see http://tracker.ceph.com/issues/18014
378 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
380 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
381 wait
=True, check_status
=False, stdout
=BytesIO(),
383 if proc
.exitstatus
== 0:
385 log
.debug("ceph-objectstore-tool binary not present, trying again")
387 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
388 # see http://tracker.ceph.com/issues/19556
389 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
391 proc
= self
.run_ceph_objectstore_tool(
392 exp_remote
, 'osd.%s' % exp_osd
,
394 '--data-path', FSPATH
.format(id=exp_osd
),
395 '--journal-path', JPATH
.format(id=exp_osd
),
398 if proc
.exitstatus
== 0:
400 elif (proc
.exitstatus
== 1 and
401 proc
.stderr
.getvalue() == "OSD has the store locked"):
404 raise Exception("ceph-objectstore-tool: "
405 "exp list-pgs failure with status {ret}".
406 format(ret
=proc
.exitstatus
))
408 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
410 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
412 pg
= random
.choice(pgs
)
413 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
414 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
415 exp_path
= os
.path
.join('/var/log/ceph', # available inside 'shell' container
416 "exp.{pg}.{id}".format(
419 if self
.ceph_manager
.cephadm
:
420 exp_host_path
= os
.path
.join(
422 self
.ceph_manager
.ctx
.ceph
[self
.ceph_manager
.cluster
].fsid
,
423 "exp.{pg}.{id}".format(
427 exp_host_path
= exp_path
430 # Can't use new export-remove op since this is part of upgrade testing
431 proc
= self
.run_ceph_objectstore_tool(
432 exp_remote
, 'osd.%s' % exp_osd
,
434 '--data-path', FSPATH
.format(id=exp_osd
),
435 '--journal-path', JPATH
.format(id=exp_osd
),
441 raise Exception("ceph-objectstore-tool: "
442 "export failure with status {ret}".
443 format(ret
=proc
.exitstatus
))
445 proc
= self
.run_ceph_objectstore_tool(
446 exp_remote
, 'osd.%s' % exp_osd
,
448 '--data-path', FSPATH
.format(id=exp_osd
),
449 '--journal-path', JPATH
.format(id=exp_osd
),
455 raise Exception("ceph-objectstore-tool: "
456 "remove failure with status {ret}".
457 format(ret
=proc
.exitstatus
))
458 # If there are at least 2 dead osds we might move the pg
459 if exp_osd
!= imp_osd
:
460 # If pg isn't already on this osd, then we will move it there
461 proc
= self
.run_ceph_objectstore_tool(
465 '--data-path', FSPATH
.format(id=imp_osd
),
466 '--journal-path', JPATH
.format(id=imp_osd
),
470 raise Exception("ceph-objectstore-tool: "
471 "imp list-pgs failure with status {ret}".
472 format(ret
=proc
.exitstatus
))
473 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
475 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
476 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
477 if imp_remote
!= exp_remote
:
478 # Copy export file to the other machine
479 self
.log("Transfer export file from {srem} to {trem}".
480 format(srem
=exp_remote
, trem
=imp_remote
))
481 # just in case an upgrade make /var/log/ceph unreadable by non-root,
482 exp_remote
.run(args
=['sudo', 'chmod', '777',
484 imp_remote
.run(args
=['sudo', 'chmod', '777',
486 tmpexport
= Remote
.get_file(exp_remote
, exp_host_path
,
488 if exp_host_path
!= exp_path
:
489 # push to /var/log/ceph, then rename (we can't
490 # chmod 777 the /var/log/ceph/$fsid mountpoint)
491 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
492 imp_remote
.run(args
=[
493 'sudo', 'mv', exp_path
, exp_host_path
])
495 Remote
.put_file(imp_remote
, tmpexport
, exp_host_path
)
498 # Can't move the pg after all
500 imp_remote
= exp_remote
502 proc
= self
.run_ceph_objectstore_tool(
503 imp_remote
, 'osd.%s' % imp_osd
,
505 '--data-path', FSPATH
.format(id=imp_osd
),
506 '--journal-path', JPATH
.format(id=imp_osd
),
507 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
511 if proc
.exitstatus
== 1:
512 bogosity
= "The OSD you are using is older than the exported PG"
513 if bogosity
in proc
.stderr
.getvalue():
514 self
.log("OSD older than exported PG"
516 elif proc
.exitstatus
== 10:
517 self
.log("Pool went away before processing an import"
519 elif proc
.exitstatus
== 11:
520 self
.log("Attempt to import an incompatible export"
522 elif proc
.exitstatus
== 12:
523 # this should be safe to ignore because we only ever move 1
524 # copy of the pg at a time, and merge is only initiated when
525 # all replicas are peered and happy. /me crosses fingers
526 self
.log("PG merged on target"
528 elif proc
.exitstatus
:
529 raise Exception("ceph-objectstore-tool: "
530 "import failure with status {ret}".
531 format(ret
=proc
.exitstatus
))
532 cmd
= "sudo rm -f {file}".format(file=exp_host_path
)
533 exp_remote
.run(args
=cmd
)
534 if imp_remote
!= exp_remote
:
535 imp_remote
.run(args
=cmd
)
537 def blackhole_kill_osd(self
, osd
=None):
539 If all else fails, kill the osd.
540 :param osd: Osd to be killed.
543 osd
= random
.choice(self
.live_osds
)
544 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
545 (str(osd
), str(self
.live_osds
)))
546 self
.live_osds
.remove(osd
)
547 self
.dead_osds
.append(osd
)
548 self
.ceph_manager
.blackhole_kill_osd(osd
)
550 def revive_osd(self
, osd
=None, skip_admin_check
=False):
553 :param osd: Osd to be revived.
556 osd
= random
.choice(self
.dead_osds
)
557 self
.log("Reviving osd %s" % (str(osd
),))
558 self
.ceph_manager
.revive_osd(
561 skip_admin_check
=skip_admin_check
)
562 self
.dead_osds
.remove(osd
)
563 self
.live_osds
.append(osd
)
564 if self
.random_eio
> 0 and osd
== self
.rerrosd
:
565 self
.ceph_manager
.set_config(self
.rerrosd
,
566 filestore_debug_random_read_err
= self
.random_eio
)
567 self
.ceph_manager
.set_config(self
.rerrosd
,
568 bluestore_debug_random_read_err
= self
.random_eio
)
571 def out_osd(self
, osd
=None):
574 :param osd: Osd to be marked.
577 osd
= random
.choice(self
.in_osds
)
578 self
.log("Removing osd %s, in_osds are: %s" %
579 (str(osd
), str(self
.in_osds
)))
580 self
.ceph_manager
.mark_out_osd(osd
)
581 self
.in_osds
.remove(osd
)
582 self
.out_osds
.append(osd
)
584 def in_osd(self
, osd
=None):
587 :param osd: Osd to be marked.
590 osd
= random
.choice(self
.out_osds
)
591 if osd
in self
.dead_osds
:
592 return self
.revive_osd(osd
)
593 self
.log("Adding osd %s" % (str(osd
),))
594 self
.out_osds
.remove(osd
)
595 self
.in_osds
.append(osd
)
596 self
.ceph_manager
.mark_in_osd(osd
)
597 self
.log("Added osd %s" % (str(osd
),))
599 def reweight_osd_or_by_util(self
, osd
=None):
601 Reweight an osd that is in
602 :param osd: Osd to be marked.
604 if osd
is not None or random
.choice([True, False]):
606 osd
= random
.choice(self
.in_osds
)
607 val
= random
.uniform(.1, 1.0)
608 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
609 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
612 # do it several times, the option space is large
615 'max_change': random
.choice(['0.05', '1.0', '3.0']),
616 'overage': random
.choice(['110', '1000']),
617 'type': random
.choice([
618 'reweight-by-utilization',
619 'test-reweight-by-utilization']),
621 self
.log("Reweighting by: %s"%(str(options
),))
622 self
.ceph_manager
.raw_cluster_cmd(
626 options
['max_change'])
628 def primary_affinity(self
, osd
=None):
629 self
.log("primary_affinity")
631 osd
= random
.choice(self
.in_osds
)
632 if random
.random() >= .5:
634 elif random
.random() >= .5:
638 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
639 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
642 def thrash_cluster_full(self
):
644 Set and unset cluster full condition
646 self
.log('Setting full ratio to .001')
647 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
649 self
.log('Setting full ratio back to .95')
650 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
652 def thrash_pg_upmap(self
):
654 Install or remove random pg_upmap entries in OSDMap
656 self
.log("thrash_pg_upmap")
657 from random
import shuffle
658 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
660 self
.log('j is %s' % j
)
662 if random
.random() >= .3:
663 pgs
= self
.ceph_manager
.get_pg_stats()
665 self
.log('No pgs; doing nothing')
667 pg
= random
.choice(pgs
)
668 pgid
= str(pg
['pgid'])
669 poolid
= int(pgid
.split('.')[0])
670 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
672 self
.log('No pools; doing nothing')
675 osds
= self
.in_osds
+ self
.out_osds
678 self
.log('Setting %s to %s' % (pgid
, osds
))
679 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
680 self
.log('cmd %s' % cmd
)
681 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
687 self
.log('Clearing pg_upmap on %s' % pg
)
688 self
.ceph_manager
.raw_cluster_cmd(
693 self
.log('No pg_upmap entries; doing nothing')
694 except CommandFailedError
:
695 self
.log('Failed to rm-pg-upmap, ignoring')
697 def thrash_pg_upmap_items(self
):
699 Install or remove random pg_upmap_items entries in OSDMap
701 self
.log("thrash_pg_upmap_items")
702 from random
import shuffle
703 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
705 self
.log('j is %s' % j
)
707 if random
.random() >= .3:
708 pgs
= self
.ceph_manager
.get_pg_stats()
710 self
.log('No pgs; doing nothing')
712 pg
= random
.choice(pgs
)
713 pgid
= str(pg
['pgid'])
714 poolid
= int(pgid
.split('.')[0])
715 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
717 self
.log('No pools; doing nothing')
720 osds
= self
.in_osds
+ self
.out_osds
723 self
.log('Setting %s to %s' % (pgid
, osds
))
724 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
725 self
.log('cmd %s' % cmd
)
726 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
728 m
= j
['pg_upmap_items']
732 self
.log('Clearing pg_upmap on %s' % pg
)
733 self
.ceph_manager
.raw_cluster_cmd(
738 self
.log('No pg_upmap entries; doing nothing')
739 except CommandFailedError
:
740 self
.log('Failed to rm-pg-upmap-items, ignoring')
742 def force_recovery(self
):
744 Force recovery on some of PGs
746 backfill
= random
.random() >= 0.5
747 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
751 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
753 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
754 except CommandFailedError
:
755 self
.log('Failed to force backfill|recovery, ignoring')
758 def cancel_force_recovery(self
):
760 Force recovery on some of PGs
762 backfill
= random
.random() >= 0.5
763 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
767 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
769 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
770 except CommandFailedError
:
771 self
.log('Failed to force backfill|recovery, ignoring')
773 def force_cancel_recovery(self
):
775 Force or cancel forcing recovery
777 if random
.random() >= 0.4:
778 self
.force_recovery()
780 self
.cancel_force_recovery()
784 Make sure all osds are up and not out.
786 while len(self
.dead_osds
) > 0:
787 self
.log("reviving osd")
789 while len(self
.out_osds
) > 0:
790 self
.log("inning osd")
795 Make sure all osds are up and fully in.
798 for osd
in self
.live_osds
:
799 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
801 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
806 Break out of this Ceph loop
810 if self
.sighup_delay
:
811 self
.log("joining the do_sighup greenlet")
812 self
.sighup_thread
.get()
813 if self
.optrack_toggle_delay
:
814 self
.log("joining the do_optrack_toggle greenlet")
815 self
.optrack_toggle_thread
.join()
816 if self
.dump_ops_enable
== "true":
817 self
.log("joining the do_dump_ops greenlet")
818 self
.dump_ops_thread
.join()
819 if self
.noscrub_toggle_delay
:
820 self
.log("joining the do_noscrub_toggle greenlet")
821 self
.noscrub_toggle_thread
.join()
825 Increase the size of the pool
827 pool
= self
.ceph_manager
.get_pool()
830 self
.log("Growing pool %s" % (pool
,))
831 if self
.ceph_manager
.expand_pool(pool
,
832 self
.config
.get('pool_grow_by', 10),
834 self
.pools_to_fix_pgp_num
.add(pool
)
836 def shrink_pool(self
):
838 Decrease the size of the pool
840 pool
= self
.ceph_manager
.get_pool()
843 _
= self
.ceph_manager
.get_pool_pg_num(pool
)
844 self
.log("Shrinking pool %s" % (pool
,))
845 if self
.ceph_manager
.contract_pool(
847 self
.config
.get('pool_shrink_by', 10),
849 self
.pools_to_fix_pgp_num
.add(pool
)
851 def fix_pgp_num(self
, pool
=None):
853 Fix number of pgs in pool.
856 pool
= self
.ceph_manager
.get_pool()
862 self
.log("fixing pg num pool %s" % (pool
,))
863 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
864 self
.pools_to_fix_pgp_num
.discard(pool
)
866 def test_pool_min_size(self
):
868 Loop to selectively push PGs below their min_size and test that recovery
871 self
.log("test_pool_min_size")
873 time
.sleep(60) # buffer time for recovery to start.
874 self
.ceph_manager
.wait_for_recovery(
875 timeout
=self
.config
.get('timeout')
877 minout
= int(self
.config
.get("min_out", 1))
878 minlive
= int(self
.config
.get("min_live", 2))
879 mindead
= int(self
.config
.get("min_dead", 1))
880 self
.log("doing min_size thrashing")
881 self
.ceph_manager
.wait_for_clean(timeout
=180)
882 assert self
.ceph_manager
.is_clean(), \
883 'not clean before minsize thrashing starts'
884 while not self
.stopping
:
885 # look up k and m from all the pools on each loop, in case it
886 # changes as the cluster runs
890 pools_json
= self
.ceph_manager
.get_osd_dump_json()['pools']
892 for pool_json
in pools_json
:
893 pool
= pool_json
['pool_name']
895 pool_type
= pool_json
['type'] # 1 for rep, 3 for ec
896 min_size
= pool_json
['min_size']
897 self
.log("pool {pool} min_size is {min_size}".format(pool
=pool
,min_size
=min_size
))
899 ec_profile
= self
.ceph_manager
.get_pool_property(pool
, 'erasure_code_profile')
900 if pool_type
!= PoolType
.ERASURE_CODED
:
902 ec_profile
= pool_json
['erasure_code_profile']
903 ec_profile_json
= self
.ceph_manager
.raw_cluster_cmd(
905 'erasure-code-profile',
909 ec_json
= json
.loads(ec_profile_json
)
910 local_k
= int(ec_json
['k'])
911 local_m
= int(ec_json
['m'])
912 self
.log("pool {pool} local_k={k} local_m={m}".format(pool
=pool
,
913 k
=local_k
, m
=local_m
))
915 self
.log("setting k={local_k} from previous {k}".format(local_k
=local_k
, k
=k
))
918 self
.log("setting m={local_m} from previous {m}".format(local_m
=local_m
, m
=m
))
920 except CommandFailedError
:
921 self
.log("failed to read erasure_code_profile. %s was likely removed", pool
)
925 self
.log("using k={k}, m={m}".format(k
=k
,m
=m
))
927 self
.log("No pools yet, waiting")
931 if minout
> len(self
.out_osds
): # kill OSDs and mark out
932 self
.log("forced to out an osd")
933 self
.kill_osd(mark_out
=True)
935 elif mindead
> len(self
.dead_osds
): # kill OSDs but force timeout
936 self
.log("forced to kill an osd")
939 else: # make mostly-random choice to kill or revive OSDs
940 minup
= max(minlive
, k
)
941 rand_val
= random
.uniform(0, 1)
942 self
.log("choosing based on number of live OSDs and rand val {rand}".\
943 format(rand
=rand_val
))
944 if len(self
.live_osds
) > minup
+1 and rand_val
< 0.5:
945 # chose to knock out as many OSDs as we can w/out downing PGs
947 most_killable
= min(len(self
.live_osds
) - minup
, m
)
948 self
.log("chose to kill {n} OSDs".format(n
=most_killable
))
949 for i
in range(1, most_killable
):
950 self
.kill_osd(mark_out
=True)
952 # try a few times since there might be a concurrent pool
953 # creation or deletion
956 action
='check for active or peered') as proceed
:
958 if self
.ceph_manager
.all_active_or_peered():
960 self
.log('not all PGs are active or peered')
961 else: # chose to revive OSDs, bring up a random fraction of the dead ones
962 self
.log("chose to revive osds")
963 for i
in range(1, int(rand_val
* len(self
.dead_osds
))):
966 # let PGs repair themselves or our next knockout might kill one
967 self
.ceph_manager
.wait_for_clean(timeout
=self
.config
.get('timeout'))
969 # / while not self.stopping
972 self
.ceph_manager
.wait_for_recovery(
973 timeout
=self
.config
.get('timeout')
976 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
978 Pause injection testing. Check for osd being down when finished.
980 the_one
= random
.choice(self
.live_osds
)
981 self
.log("inject_pause on osd.{osd}".format(osd
=the_one
))
983 "Testing {key} pause injection for duration {duration}".format(
988 "Checking after {after}, should_be_down={shouldbedown}".format(
990 shouldbedown
=should_be_down
992 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
993 if not should_be_down
:
995 time
.sleep(check_after
)
996 status
= self
.ceph_manager
.get_osd_status()
997 assert the_one
in status
['down']
998 time
.sleep(duration
- check_after
+ 20)
999 status
= self
.ceph_manager
.get_osd_status()
1000 assert not the_one
in status
['down']
1002 def test_backfill_full(self
):
1004 Test backfills stopping when the replica fills up.
1006 First, use injectfull admin command to simulate a now full
1007 osd by setting it to 0 on all of the OSDs.
1009 Second, on a random subset, set
1010 osd_debug_skip_full_check_in_backfill_reservation to force
1011 the more complicated check in do_scan to be exercised.
1013 Then, verify that all backfillings stop.
1015 self
.log("injecting backfill full")
1016 for i
in self
.live_osds
:
1017 self
.ceph_manager
.set_config(
1019 osd_debug_skip_full_check_in_backfill_reservation
=
1020 random
.choice(['false', 'true']))
1021 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
1022 check_status
=True, timeout
=30, stdout
=DEVNULL
)
1024 status
= self
.ceph_manager
.compile_pg_status()
1025 if 'backfilling' not in status
.keys():
1028 "waiting for {still_going} backfillings".format(
1029 still_going
=status
.get('backfilling')))
1031 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
1032 for i
in self
.live_osds
:
1033 self
.ceph_manager
.set_config(
1035 osd_debug_skip_full_check_in_backfill_reservation
='false')
1036 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
1037 check_status
=True, timeout
=30, stdout
=DEVNULL
)
1040 def generate_random_sharding(self
):
1045 for prefix
in prefixes
:
1046 choose
= random
.choice([False, True])
1049 if new_sharding
!= '':
1050 new_sharding
= new_sharding
+ ' '
1051 columns
= random
.randint(1, 5)
1052 do_hash
= random
.choice([False, True])
1054 low_hash
= random
.choice([0, 5, 8])
1055 do_high_hash
= random
.choice([False, True])
1057 high_hash
= random
.choice([8, 16, 30]) + low_hash
1058 new_sharding
= new_sharding
+ prefix
+ '(' + str(columns
) + ',' + str(low_hash
) + '-' + str(high_hash
) + ')'
1060 new_sharding
= new_sharding
+ prefix
+ '(' + str(columns
) + ',' + str(low_hash
) + '-)'
1063 new_sharding
= new_sharding
+ prefix
1065 new_sharding
= new_sharding
+ prefix
+ '(' + str(columns
) + ')'
1068 def test_bluestore_reshard_action(self
):
1070 Test if resharding of bluestore works properly.
1071 If bluestore is not used, or bluestore is in version that
1072 does not support sharding, skip.
1075 osd
= random
.choice(self
.dead_osds
)
1076 remote
= self
.ceph_manager
.find_remote('osd', osd
)
1077 FSPATH
= self
.ceph_manager
.get_filepath()
1081 '--log-file=/var/log/ceph/bluestore_tool.$pid.log',
1083 '--path', FSPATH
.format(id=osd
)
1086 # sanity check if bluestore-tool accessible
1087 self
.log('checking if target objectstore is bluestore on osd.%s' % osd
)
1091 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1092 if proc
.exitstatus
!= 0:
1093 raise Exception("ceph-bluestore-tool access failed.")
1095 # check if sharding is possible
1096 self
.log('checking if target bluestore supports sharding on osd.%s' % osd
)
1100 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1101 if proc
.exitstatus
!= 0:
1102 self
.log("Unable to test resharding, "
1103 "ceph-bluestore-tool does not support it.")
1106 # now go for reshard to something else
1107 self
.log('applying new sharding to bluestore on osd.%s' % osd
)
1108 new_sharding
= self
.config
.get('bluestore_new_sharding','random')
1110 if new_sharding
== 'random':
1111 self
.log('generate random sharding')
1112 new_sharding
= self
.generate_random_sharding()
1114 self
.log("applying new sharding: " + new_sharding
)
1116 '--sharding', new_sharding
,
1119 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1120 if proc
.exitstatus
!= 0:
1121 raise Exception("ceph-bluestore-tool resharding failed.")
1124 self
.log('running fsck to verify new sharding on osd.%s' % osd
)
1128 proc
= self
.run_ceph_bluestore_tool(remote
, 'osd.%s' % osd
, cmd
)
1129 if proc
.exitstatus
!= 0:
1130 raise Exception("ceph-bluestore-tool fsck failed.")
1131 self
.log('resharding successfully completed')
1133 def test_bluestore_reshard(self
):
1136 2) reshards bluestore on killed osd
1139 self
.log('test_bluestore_reshard started')
1140 self
.kill_osd(mark_down
=True, mark_out
=True)
1141 self
.test_bluestore_reshard_action()
1143 self
.log('test_bluestore_reshard completed')
1146 def test_map_discontinuity(self
):
1148 1) Allows the osds to recover
1150 3) allows the remaining osds to recover
1151 4) waits for some time
1153 This sequence should cause the revived osd to have to handle
1154 a map gap since the mons would have trimmed
1156 self
.log("test_map_discontinuity")
1157 while len(self
.in_osds
) < (self
.minin
+ 1):
1159 self
.log("Waiting for recovery")
1160 self
.ceph_manager
.wait_for_all_osds_up(
1161 timeout
=self
.config
.get('timeout')
1163 # now we wait 20s for the pg status to change, if it takes longer,
1164 # the test *should* fail!
1166 self
.ceph_manager
.wait_for_clean(
1167 timeout
=self
.config
.get('timeout')
1170 # now we wait 20s for the backfill replicas to hear about the clean
1172 self
.log("Recovered, killing an osd")
1173 self
.kill_osd(mark_down
=True, mark_out
=True)
1174 self
.log("Waiting for clean again")
1175 self
.ceph_manager
.wait_for_clean(
1176 timeout
=self
.config
.get('timeout')
1178 self
.log("Waiting for trim")
1179 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
1182 def choose_action(self
):
1184 Random action selector.
1186 chance_down
= self
.config
.get('chance_down', 0.4)
1187 _
= self
.config
.get('chance_test_min_size', 0)
1188 chance_test_backfill_full
= \
1189 self
.config
.get('chance_test_backfill_full', 0)
1190 if isinstance(chance_down
, int):
1191 chance_down
= float(chance_down
) / 100
1193 minout
= int(self
.config
.get("min_out", 0))
1194 minlive
= int(self
.config
.get("min_live", 2))
1195 mindead
= int(self
.config
.get("min_dead", 0))
1197 self
.log('choose_action: min_in %d min_out '
1198 '%d min_live %d min_dead %d '
1199 'chance_down %.2f' %
1200 (minin
, minout
, minlive
, mindead
, chance_down
))
1202 if len(self
.in_osds
) > minin
:
1203 actions
.append((self
.out_osd
, 1.0,))
1204 if len(self
.live_osds
) > minlive
and chance_down
> 0:
1205 actions
.append((self
.kill_osd
, chance_down
,))
1206 if len(self
.out_osds
) > minout
:
1207 actions
.append((self
.in_osd
, 1.7,))
1208 if len(self
.dead_osds
) > mindead
:
1209 actions
.append((self
.revive_osd
, 1.0,))
1210 if self
.config
.get('thrash_primary_affinity', True):
1211 actions
.append((self
.primary_affinity
, 1.0,))
1212 actions
.append((self
.reweight_osd_or_by_util
,
1213 self
.config
.get('reweight_osd', .5),))
1214 actions
.append((self
.grow_pool
,
1215 self
.config
.get('chance_pgnum_grow', 0),))
1216 actions
.append((self
.shrink_pool
,
1217 self
.config
.get('chance_pgnum_shrink', 0),))
1218 actions
.append((self
.fix_pgp_num
,
1219 self
.config
.get('chance_pgpnum_fix', 0),))
1220 actions
.append((self
.test_pool_min_size
,
1221 self
.config
.get('chance_test_min_size', 0),))
1222 actions
.append((self
.test_backfill_full
,
1223 chance_test_backfill_full
,))
1224 if self
.chance_thrash_cluster_full
> 0:
1225 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
1226 if self
.chance_thrash_pg_upmap
> 0:
1227 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
1228 if self
.chance_thrash_pg_upmap_items
> 0:
1229 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
1230 if self
.chance_force_recovery
> 0:
1231 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
1233 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1236 self
.inject_pause(key
,
1237 self
.config
.get('pause_short', 3),
1240 self
.config
.get('chance_inject_pause_short', 1),),
1242 self
.inject_pause(key
,
1243 self
.config
.get('pause_long', 80),
1244 self
.config
.get('pause_check_after', 70),
1246 self
.config
.get('chance_inject_pause_long', 0),)]:
1247 actions
.append(scenario
)
1249 # only consider resharding if objectstore is bluestore
1250 cluster_name
= self
.ceph_manager
.cluster
1251 cluster
= self
.ceph_manager
.ctx
.ceph
[cluster_name
]
1252 if cluster
.conf
.get('osd', {}).get('osd objectstore', 'bluestore') == 'bluestore':
1253 actions
.append((self
.test_bluestore_reshard
,
1254 self
.config
.get('chance_bluestore_reshard', 0),))
1256 total
= sum([y
for (x
, y
) in actions
])
1257 val
= random
.uniform(0, total
)
1258 for (action
, prob
) in actions
:
1264 def do_thrash(self
):
1266 _do_thrash() wrapper.
1270 except Exception as e
:
1271 # See _run exception comment for MDSThrasher
1272 self
.set_thrasher_exception(e
)
1273 self
.logger
.exception("exception:")
1274 # Allow successful completion so gevent doesn't see an exception.
1275 # The DaemonWatchdog will observe the error and tear down the test.
1278 def do_sighup(self
):
1280 Loops and sends signal.SIGHUP to a random live osd.
1282 Loop delay is controlled by the config value sighup_delay.
1284 delay
= float(self
.sighup_delay
)
1285 self
.log("starting do_sighup with a delay of {0}".format(delay
))
1286 while not self
.stopping
:
1287 osd
= random
.choice(self
.live_osds
)
1288 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
1292 def do_optrack_toggle(self
):
1294 Loops and toggle op tracking to all osds.
1296 Loop delay is controlled by the config value optrack_toggle_delay.
1298 delay
= float(self
.optrack_toggle_delay
)
1300 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
1301 while not self
.stopping
:
1302 if osd_state
== "true":
1307 self
.ceph_manager
.inject_args('osd', '*',
1308 'osd_enable_op_tracker',
1310 except CommandFailedError
:
1311 self
.log('Failed to tell all osds, ignoring')
1315 def do_dump_ops(self
):
1317 Loops and does op dumps on all osds
1319 self
.log("starting do_dump_ops")
1320 while not self
.stopping
:
1321 for osd
in self
.live_osds
:
1322 # Ignore errors because live_osds is in flux
1323 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
1324 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1325 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
1326 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1327 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
1328 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1332 def do_noscrub_toggle(self
):
1334 Loops and toggle noscrub flags
1336 Loop delay is controlled by the config value noscrub_toggle_delay.
1338 delay
= float(self
.noscrub_toggle_delay
)
1339 scrub_state
= "none"
1340 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
1341 while not self
.stopping
:
1342 if scrub_state
== "none":
1343 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
1344 scrub_state
= "noscrub"
1345 elif scrub_state
== "noscrub":
1346 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1347 scrub_state
= "both"
1348 elif scrub_state
== "both":
1349 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1350 scrub_state
= "nodeep-scrub"
1352 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1353 scrub_state
= "none"
1355 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1356 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1359 def _do_thrash(self
):
1361 Loop to select random actions to thrash ceph manager with.
1363 cleanint
= self
.config
.get("clean_interval", 60)
1364 scrubint
= self
.config
.get("scrub_interval", -1)
1365 maxdead
= self
.config
.get("max_dead", 0)
1366 delay
= self
.config
.get("op_delay", 5)
1367 self
.rerrosd
= self
.live_osds
[0]
1368 if self
.random_eio
> 0:
1369 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1370 'filestore_debug_random_read_err',
1372 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1373 'bluestore_debug_random_read_err',
1375 self
.log("starting do_thrash")
1376 while not self
.stopping
:
1377 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1378 "out_osds: ", self
.out_osds
,
1379 "dead_osds: ", self
.dead_osds
,
1380 "live_osds: ", self
.live_osds
]]
1381 self
.log(" ".join(to_log
))
1382 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1383 while len(self
.dead_osds
) > maxdead
:
1385 for osd
in self
.in_osds
:
1386 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1388 if random
.uniform(0, 1) < float(
1389 self
.config
.get('chance_test_map_discontinuity', 0)) \
1390 and len(self
.live_osds
) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1391 self
.test_map_discontinuity()
1393 self
.ceph_manager
.wait_for_recovery(
1394 timeout
=self
.config
.get('timeout')
1396 time
.sleep(self
.clean_wait
)
1398 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1399 self
.log('Scrubbing while thrashing being performed')
1400 Scrubber(self
.ceph_manager
, self
.config
)
1401 self
.choose_action()()
1404 if self
.random_eio
> 0:
1405 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1406 'filestore_debug_random_read_err', '0.0')
1407 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1408 'bluestore_debug_random_read_err', '0.0')
1409 for pool
in list(self
.pools_to_fix_pgp_num
):
1410 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1411 self
.fix_pgp_num(pool
)
1412 self
.pools_to_fix_pgp_num
.clear()
1413 for service
, opt
, saved_value
in self
.saved_options
:
1414 self
.ceph_manager
.inject_args(service
, '*', opt
, saved_value
)
1415 self
.saved_options
= []
1419 class ObjectStoreTool
:
1421 def __init__(self
, manager
, pool
, **kwargs
):
1422 self
.manager
= manager
1424 self
.osd
= kwargs
.get('osd', None)
1425 self
.object_name
= kwargs
.get('object_name', None)
1426 self
.do_revive
= kwargs
.get('do_revive', True)
1427 if self
.osd
and self
.pool
and self
.object_name
:
1428 if self
.osd
== "primary":
1429 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1431 assert self
.osd
is not None
1432 if self
.object_name
:
1433 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1436 self
.remote
= next(iter(self
.manager
.ctx
.\
1437 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()))
1438 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1439 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1442 def build_cmd(self
, options
, args
, stdin
):
1444 if self
.object_name
:
1445 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1446 "{paths} --pgid {pgid} --op list |"
1447 "grep '\"oid\":\"{name}\"')".
1448 format(paths
=self
.paths
,
1450 name
=self
.object_name
))
1451 args
= '"$object" ' + args
1452 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1453 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1454 format(paths
=self
.paths
,
1458 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1459 format(payload
=base64
.encode(stdin
),
1462 return "\n".join(lines
)
1464 def run(self
, options
, args
):
1465 self
.manager
.kill_osd(self
.osd
)
1466 cmd
= self
.build_cmd(options
, args
, None)
1467 self
.manager
.log(cmd
)
1469 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1474 if proc
.exitstatus
!= 0:
1475 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1476 error
= proc
.stdout
.getvalue().decode() + " " + \
1477 proc
.stderr
.getvalue().decode()
1478 raise Exception(error
)
1481 self
.manager
.revive_osd(self
.osd
)
1482 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1485 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1489 Ceph manager object.
1490 Contains several local functions that form a bulk of this module.
1492 :param controller: the remote machine where the Ceph commands should be
1494 :param ctx: the cluster context
1495 :param config: path to Ceph config file
1496 :param logger: for logging messages
1497 :param cluster: name of the Ceph cluster
1500 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1501 cluster
='ceph', cephadm
=False, rook
=False) -> None:
1502 self
.lock
= threading
.RLock()
1504 self
.config
= config
1505 self
.controller
= controller
1506 self
.next_pool_id
= 0
1507 self
.cluster
= cluster
1510 self
.log
= lambda x
: logger
.info(x
)
1514 implement log behavior.
1519 if self
.config
is None:
1520 self
.config
= dict()
1522 # NOTE: These variables are meant to be overriden by vstart_runner.py.
1524 self
.cephadm
= cephadm
1525 self
.testdir
= teuthology
.get_testdir(self
.ctx
)
1526 # prefix args for ceph cmds to be executed
1527 pre
= ['adjust-ulimits', 'ceph-coverage',
1528 f
'{self.testdir}/archive/coverage']
1529 self
.CEPH_CMD
= ['sudo'] + pre
+ ['timeout', '120', 'ceph',
1530 '--cluster', self
.cluster
]
1531 self
.RADOS_CMD
= pre
+ ['rados', '--cluster', self
.cluster
]
1532 self
.run_ceph_w_prefix
= ['sudo', 'daemon-helper', 'kill', 'ceph',
1533 '--cluster', self
.cluster
]
1535 pools
= self
.list_pools()
1538 # we may race with a pool deletion; ignore failures here
1540 self
.pools
[pool
] = self
.get_pool_int_property(pool
, 'pg_num')
1541 except CommandFailedError
:
1542 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1544 def ceph(self
, cmd
, **kwargs
):
1546 Simple Ceph admin command wrapper around run_cluster_cmd.
1549 kwargs
.pop('args', None)
1550 args
= shlex
.split(cmd
)
1551 stdout
= kwargs
.pop('stdout', StringIO())
1552 stderr
= kwargs
.pop('stderr', StringIO())
1553 return self
.run_cluster_cmd(args
=args
, stdout
=stdout
, stderr
=stderr
, **kwargs
)
1555 def run_cluster_cmd(self
, **kwargs
):
1557 Run a Ceph command and return the object representing the process
1560 Accepts arguments same as that of teuthology.orchestra.run.run()
1562 if isinstance(kwargs
['args'], str):
1563 kwargs
['args'] = shlex
.split(kwargs
['args'])
1564 elif isinstance(kwargs
['args'], tuple):
1565 kwargs
['args'] = list(kwargs
['args'])
1568 timeoutcmd
= kwargs
.pop('timeoutcmd', None)
1569 if timeoutcmd
is not None:
1570 prefixcmd
+= ['timeout', str(timeoutcmd
)]
1573 prefixcmd
+= ['ceph']
1574 cmd
= prefixcmd
+ list(kwargs
['args'])
1575 return shell(self
.ctx
, self
.cluster
, self
.controller
,
1578 check_status
=kwargs
.get('check_status', True))
1580 prefixcmd
+= ['ceph']
1581 cmd
= prefixcmd
+ list(kwargs
['args'])
1582 return toolbox(self
.ctx
, self
.cluster
,
1585 check_status
=kwargs
.get('check_status', True))
1587 kwargs
['args'] = prefixcmd
+ self
.CEPH_CMD
+ kwargs
['args']
1588 return self
.controller
.run(**kwargs
)
1590 def raw_cluster_cmd(self
, *args
, **kwargs
) -> str:
1592 Start ceph on a raw cluster. Return count
1594 if kwargs
.get('args') is None and args
:
1595 kwargs
['args'] = args
1596 kwargs
['stdout'] = kwargs
.pop('stdout', StringIO())
1597 return self
.run_cluster_cmd(**kwargs
).stdout
.getvalue()
1599 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1601 Start ceph on a cluster. Return success or failure information.
1603 if kwargs
.get('args') is None and args
:
1604 kwargs
['args'] = args
1605 kwargs
['check_status'] = False
1606 return self
.run_cluster_cmd(**kwargs
).exitstatus
1608 def get_keyring(self
, client_id
):
1610 Return keyring for the given client.
1612 :param client_id: str
1613 :return keyring: str
1615 if client_id
.find('client.') != -1:
1616 client_id
= client_id
.replace('client.', '')
1618 keyring
= self
.run_cluster_cmd(args
=f
'auth get client.{client_id}',
1619 stdout
=StringIO()).\
1620 stdout
.getvalue().strip()
1622 assert isinstance(keyring
, str) and keyring
!= ''
1625 def run_ceph_w(self
, watch_channel
=None):
1627 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1628 and return the RemoteProcess.
1630 :param watch_channel: Specifies the channel to be watched. This can be
1631 'cluster', 'audit', ...
1632 :type watch_channel: str
1634 args
= self
.run_ceph_w_prefix
+ ['-w']
1635 if watch_channel
is not None:
1636 args
.append("--watch-channel")
1637 args
.append(watch_channel
)
1638 return self
.controller
.run(args
=args
, wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1640 def get_mon_socks(self
):
1642 Get monitor sockets.
1644 :return socks: tuple of strings; strings are individual sockets.
1646 from json
import loads
1648 output
= loads(self
.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1650 for mon
in output
['mons']:
1651 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1652 socks
.append(addrvec_mem
['addr'])
1655 def get_msgrv1_mon_socks(self
):
1657 Get monitor sockets that use msgrv1 to operate.
1659 :return socks: tuple of strings; strings are individual sockets.
1661 from json
import loads
1663 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1665 for mon
in output
['mons']:
1666 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1667 if addrvec_mem
['type'] == 'v1':
1668 socks
.append(addrvec_mem
['addr'])
1671 def get_msgrv2_mon_socks(self
):
1673 Get monitor sockets that use msgrv2 to operate.
1675 :return socks: tuple of strings; strings are individual sockets.
1677 from json
import loads
1679 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1681 for mon
in output
['mons']:
1682 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1683 if addrvec_mem
['type'] == 'v2':
1684 socks
.append(addrvec_mem
['addr'])
1687 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1689 Flush pg stats from a list of OSD ids, ensuring they are reflected
1690 all the way to the monitor. Luminous and later only.
1692 :param osds: list of OSDs to flush
1693 :param no_wait: list of OSDs not to wait for seq id. by default, we
1694 wait for all specified osds, but some of them could be
1695 moved out of osdmap, so we cannot get their updated
1696 stat seq from monitor anymore. in that case, you need
1697 to pass a blocklist.
1698 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1699 it. (5 min by default)
1704 def flush_one_osd(osd
: int, wait_for_mon
: int):
1705 need
= int(self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats'))
1706 if not wait_for_mon
:
1711 while wait_for_mon
> 0:
1712 got
= int(self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
))
1713 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1714 need
=need
, got
=got
, osd
=osd
))
1719 wait_for_mon
-= A_WHILE
1721 raise Exception('timed out waiting for mon to be updated with '
1722 'osd.{osd}: {got} < {need}'.
1723 format(osd
=osd
, got
=got
, need
=need
))
1725 with
parallel() as p
:
1727 p
.spawn(flush_one_osd
, osd
, wait_for_mon
)
1729 def flush_all_pg_stats(self
):
1730 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1732 def do_rados(self
, cmd
, pool
=None, namespace
=None, remote
=None, **kwargs
):
1734 Execute a remote rados command.
1737 remote
= self
.controller
1739 pre
= self
.RADOS_CMD
+ [] # deep-copying!
1740 if pool
is not None:
1741 pre
+= ['--pool', pool
]
1742 if namespace
is not None:
1743 pre
+= ['--namespace', namespace
]
1752 def rados_write_objects(self
, pool
, num_objects
, size
,
1753 timelimit
, threads
, cleanup
=False):
1756 Threads not used yet.
1759 '--num-objects', num_objects
,
1765 args
.append('--no-cleanup')
1766 return self
.do_rados(map(str, args
), pool
=pool
)
1768 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1770 Implement rados put operation
1772 args
= ['put', obj
, fname
]
1773 return self
.do_rados(
1780 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1782 Implement rados get operation
1784 args
= ['get', obj
, fname
]
1785 return self
.do_rados(
1789 namespace
=namespace
,
1792 def do_rm(self
, pool
, obj
, namespace
=None):
1794 Implement rados rm operation
1797 return self
.do_rados(
1804 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1807 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1809 def find_remote(self
, service_type
, service_id
):
1811 Get the Remote for the host where a particular service runs.
1813 :param service_type: 'mds', 'osd', 'client'
1814 :param service_id: The second part of a role, e.g. '0' for
1816 :return: a Remote instance for the host where the
1817 requested role is placed
1819 return get_remote(self
.ctx
, self
.cluster
,
1820 service_type
, service_id
)
1822 def admin_socket(self
, service_type
, service_id
,
1823 command
, check_status
=True, timeout
=0, stdout
=None):
1825 Remotely start up ceph specifying the admin socket
1826 :param command: a list of words to use as the command
1832 remote
= self
.find_remote(service_type
, service_id
)
1836 self
.ctx
, self
.cluster
, remote
,
1838 'ceph', 'daemon', '%s.%s' % (service_type
, service_id
),
1842 check_status
=check_status
,
1845 assert False, 'not implemented'
1851 f
'{self.testdir}/archive/coverage',
1858 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1859 cluster
=self
.cluster
,
1863 args
.extend(command
)
1868 check_status
=check_status
1871 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1872 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1874 def get_pgid(self
, pool
, pgnum
):
1876 :param pool: pool name
1877 :param pgnum: pg number
1878 :returns: a string representing this pg.
1880 poolnum
= self
.get_pool_num(pool
)
1881 pg_str
= "{poolnum}.{pgnum}".format(
1886 def get_pg_replica(self
, pool
, pgnum
):
1888 get replica for pool, pgnum (e.g. (data, 0)->0
1890 pg_str
= self
.get_pgid(pool
, pgnum
)
1891 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1892 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1893 return int(j
['acting'][-1])
1896 def wait_for_pg_stats(func
):
1897 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1898 # by default, and take the faulty injection in ms into consideration,
1899 # 12 seconds are more than enough
1900 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1902 def wrapper(self
, *args
, **kwargs
):
1904 for delay
in delays
:
1906 return func(self
, *args
, **kwargs
)
1907 except AssertionError as e
:
1913 def get_pg_primary(self
, pool
, pgnum
):
1915 get primary for pool, pgnum (e.g. (data, 0)->0
1917 pg_str
= self
.get_pgid(pool
, pgnum
)
1918 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1919 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1920 return int(j
['acting'][0])
1923 def get_pool_num(self
, pool
):
1925 get number for pool (e.g., data -> 2)
1927 return int(self
.get_pool_dump(pool
)['pool'])
1929 def list_pools(self
):
1933 osd_dump
= self
.get_osd_dump_json()
1934 self
.log(osd_dump
['pools'])
1935 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1937 def clear_pools(self
):
1941 [self
.remove_pool(i
) for i
in self
.list_pools()]
1943 def kick_recovery_wq(self
, osdnum
):
1945 Run kick_recovery_wq on cluster.
1947 return self
.raw_cluster_cmd(
1948 'tell', "osd.%d" % (int(osdnum
),),
1953 def wait_run_admin_socket(self
, service_type
,
1954 service_id
, args
=['version'], timeout
=75, stdout
=None):
1956 If osd_admin_socket call succeeds, return. Otherwise wait
1957 five seconds and try again.
1963 proc
= self
.admin_socket(service_type
, service_id
,
1964 args
, check_status
=False, stdout
=stdout
)
1965 if proc
.exitstatus
== 0:
1969 if (tries
* 5) > timeout
:
1970 raise Exception('timed out waiting for admin_socket '
1971 'to appear after {type}.{id} restart'.
1972 format(type=service_type
,
1974 self
.log("waiting on admin_socket for {type}-{id}, "
1975 "{command}".format(type=service_type
,
1980 def get_pool_dump(self
, pool
):
1982 get the osd dump part of a pool
1984 osd_dump
= self
.get_osd_dump_json()
1985 for i
in osd_dump
['pools']:
1986 if i
['pool_name'] == pool
:
1990 def get_config(self
, service_type
, service_id
, name
):
1992 :param node: like 'mon.a'
1993 :param name: the option name
1995 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1997 j
= json
.loads(proc
.stdout
.getvalue())
2000 def inject_args(self
, service_type
, service_id
, name
, value
):
2001 whom
= '{0}.{1}'.format(service_type
, service_id
)
2002 if isinstance(value
, bool):
2003 value
= 'true' if value
else 'false'
2004 opt_arg
= '--{name}={value}'.format(name
=name
, value
=value
)
2005 self
.raw_cluster_cmd('--', 'tell', whom
, 'injectargs', opt_arg
)
2007 def set_config(self
, osdnum
, **argdict
):
2009 :param osdnum: osd number
2010 :param argdict: dictionary containing values to set.
2012 for k
, v
in argdict
.items():
2013 self
.wait_run_admin_socket(
2015 ['config', 'set', str(k
), str(v
)])
2017 def raw_cluster_status(self
):
2019 Get status from cluster
2021 status
= self
.raw_cluster_cmd('status', '--format=json')
2022 return json
.loads(status
)
2024 def raw_osd_status(self
):
2026 Get osd status from cluster
2028 return self
.raw_cluster_cmd('osd', 'dump')
2030 def get_osd_status(self
):
2032 Get osd statuses sorted by states that the osds are in.
2034 osd_lines
= list(filter(
2035 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
2036 self
.raw_osd_status().split('\n')))
2038 in_osds
= [int(i
[4:].split()[0])
2039 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
2040 out_osds
= [int(i
[4:].split()[0])
2041 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
2042 up_osds
= [int(i
[4:].split()[0])
2043 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
2044 down_osds
= [int(i
[4:].split()[0])
2045 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
2046 dead_osds
= [int(x
.id_
)
2047 for x
in filter(lambda x
:
2050 iter_daemons_of_role('osd', self
.cluster
))]
2051 live_osds
= [int(x
.id_
) for x
in
2054 self
.ctx
.daemons
.iter_daemons_of_role('osd',
2056 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
2057 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
2060 def get_num_pgs(self
):
2062 Check cluster status for the number of pgs
2064 status
= self
.raw_cluster_status()
2066 return status
['pgmap']['num_pgs']
2068 def create_erasure_code_profile(self
, profile_name
, profile
):
2070 Create an erasure code profile name that can be used as a parameter
2071 when creating an erasure coded pool.
2074 args
= cmd_erasure_code_profile(profile_name
, profile
)
2075 self
.raw_cluster_cmd(*args
)
2077 def create_pool_with_unique_name(self
, pg_num
=16,
2078 erasure_code_profile_name
=None,
2080 erasure_code_use_overwrites
=False):
2082 Create a pool named unique_pool_X where X is unique.
2086 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
2087 self
.next_pool_id
+= 1
2091 erasure_code_profile_name
=erasure_code_profile_name
,
2093 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
2096 @contextlib.contextmanager
2097 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
2098 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
2100 self
.remove_pool(pool_name
)
2102 def create_pool(self
, pool_name
, pg_num
=16,
2103 erasure_code_profile_name
=None,
2105 erasure_code_use_overwrites
=False):
2107 Create a pool named from the pool_name parameter.
2108 :param pool_name: name of the pool being created.
2109 :param pg_num: initial number of pgs.
2110 :param erasure_code_profile_name: if set and !None create an
2111 erasure coded pool using the profile
2112 :param erasure_code_use_overwrites: if true, allow overwrites
2115 assert isinstance(pool_name
, str)
2116 assert isinstance(pg_num
, int)
2117 assert pool_name
not in self
.pools
2118 self
.log("creating pool_name %s" % (pool_name
,))
2119 if erasure_code_profile_name
:
2120 self
.raw_cluster_cmd('osd', 'pool', 'create',
2121 pool_name
, str(pg_num
), str(pg_num
),
2122 'erasure', erasure_code_profile_name
)
2124 self
.raw_cluster_cmd('osd', 'pool', 'create',
2125 pool_name
, str(pg_num
))
2126 if min_size
is not None:
2127 self
.raw_cluster_cmd(
2128 'osd', 'pool', 'set', pool_name
,
2131 if erasure_code_use_overwrites
:
2132 self
.raw_cluster_cmd(
2133 'osd', 'pool', 'set', pool_name
,
2134 'allow_ec_overwrites',
2136 self
.raw_cluster_cmd(
2137 'osd', 'pool', 'application', 'enable',
2138 pool_name
, 'rados', '--yes-i-really-mean-it',
2139 run
.Raw('||'), 'true')
2140 self
.pools
[pool_name
] = pg_num
2143 def add_pool_snap(self
, pool_name
, snap_name
):
2146 :param pool_name: name of pool to snapshot
2147 :param snap_name: name of snapshot to take
2149 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
2150 str(pool_name
), str(snap_name
))
2152 def remove_pool_snap(self
, pool_name
, snap_name
):
2154 Remove pool snapshot
2155 :param pool_name: name of pool to snapshot
2156 :param snap_name: name of snapshot to remove
2158 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
2159 str(pool_name
), str(snap_name
))
2161 def remove_pool(self
, pool_name
):
2163 Remove the indicated pool
2164 :param pool_name: Pool to be removed
2167 assert isinstance(pool_name
, str)
2168 assert pool_name
in self
.pools
2169 self
.log("removing pool_name %s" % (pool_name
,))
2170 del self
.pools
[pool_name
]
2171 self
.raw_cluster_cmd('osd', 'pool', 'rm', pool_name
, pool_name
,
2172 "--yes-i-really-really-mean-it")
2180 return random
.sample(self
.pools
.keys(), 1)[0]
2182 def get_pool_pg_num(self
, pool_name
):
2184 Return the number of pgs in the pool specified.
2187 assert isinstance(pool_name
, str)
2188 if pool_name
in self
.pools
:
2189 return self
.pools
[pool_name
]
2192 def get_pool_property(self
, pool_name
, prop
):
2194 :param pool_name: pool
2195 :param prop: property to be checked.
2196 :returns: property as string
2199 assert isinstance(pool_name
, str)
2200 assert isinstance(prop
, str)
2201 output
= self
.raw_cluster_cmd(
2207 return output
.split()[1]
2209 def get_pool_int_property(self
, pool_name
, prop
):
2210 return int(self
.get_pool_property(pool_name
, prop
))
2212 def set_pool_property(self
, pool_name
, prop
, val
):
2214 :param pool_name: pool
2215 :param prop: property to be set.
2216 :param val: value to set.
2218 This routine retries if set operation fails.
2221 assert isinstance(pool_name
, str)
2222 assert isinstance(prop
, str)
2223 assert isinstance(val
, int)
2226 r
= self
.raw_cluster_cmd_result(
2233 if r
!= 11: # EAGAIN
2237 raise Exception('timed out getting EAGAIN '
2238 'when setting pool property %s %s = %s' %
2239 (pool_name
, prop
, val
))
2240 self
.log('got EAGAIN setting pool property, '
2241 'waiting a few seconds...')
2244 def expand_pool(self
, pool_name
, by
, max_pgs
):
2246 Increase the number of pgs in a pool
2249 assert isinstance(pool_name
, str)
2250 assert isinstance(by
, int)
2251 assert pool_name
in self
.pools
2252 if self
.get_num_creating() > 0:
2254 if (self
.pools
[pool_name
] + by
) > max_pgs
:
2256 self
.log("increase pool size by %d" % (by
,))
2257 new_pg_num
= self
.pools
[pool_name
] + by
2258 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2259 self
.pools
[pool_name
] = new_pg_num
2262 def contract_pool(self
, pool_name
, by
, min_pgs
):
2264 Decrease the number of pgs in a pool
2267 self
.log('contract_pool %s by %s min %s' % (
2268 pool_name
, str(by
), str(min_pgs
)))
2269 assert isinstance(pool_name
, str)
2270 assert isinstance(by
, int)
2271 assert pool_name
in self
.pools
2272 if self
.get_num_creating() > 0:
2273 self
.log('too many creating')
2275 proj
= self
.pools
[pool_name
] - by
2277 self
.log('would drop below min_pgs, proj %d, currently %d' % (proj
,self
.pools
[pool_name
],))
2279 self
.log("decrease pool size by %d" % (by
,))
2280 new_pg_num
= self
.pools
[pool_name
] - by
2281 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2282 self
.pools
[pool_name
] = new_pg_num
2285 def stop_pg_num_changes(self
):
2287 Reset all pg_num_targets back to pg_num, canceling splits and merges
2289 self
.log('Canceling any pending splits or merges...')
2290 osd_dump
= self
.get_osd_dump_json()
2292 for pool
in osd_dump
['pools']:
2293 if pool
['pg_num'] != pool
['pg_num_target']:
2294 self
.log('Setting pool %s (%d) pg_num %d -> %d' %
2295 (pool
['pool_name'], pool
['pool'],
2296 pool
['pg_num_target'],
2298 self
.raw_cluster_cmd('osd', 'pool', 'set', pool
['pool_name'],
2299 'pg_num', str(pool
['pg_num']))
2301 # we don't support pg_num_target before nautilus
2304 def set_pool_pgpnum(self
, pool_name
, force
):
2306 Set pgpnum property of pool_name pool.
2309 assert isinstance(pool_name
, str)
2310 assert pool_name
in self
.pools
2311 if not force
and self
.get_num_creating() > 0:
2313 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
2316 def list_pg_unfound(self
, pgid
):
2318 return list of unfound pgs with the id specified
2323 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_unfound',
2329 r
['objects'].extend(j
['objects'])
2334 offset
= j
['objects'][-1]['oid']
2339 def get_pg_stats(self
):
2341 Dump the cluster and get pg stats
2343 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
2344 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2346 return j
['pg_map']['pg_stats']
2348 return j
['pg_stats']
2350 def get_osd_df(self
, osdid
):
2352 Get the osd df stats
2354 out
= self
.raw_cluster_cmd('osd', 'df', 'name', 'osd.{}'.format(osdid
),
2356 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2357 return j
['nodes'][0]
2359 def get_pool_df(self
, name
):
2361 Get the pool df stats
2363 out
= self
.raw_cluster_cmd('df', 'detail', '--format=json')
2364 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2365 return next((p
['stats'] for p
in j
['pools'] if p
['name'] == name
),
2368 def get_pgids_to_force(self
, backfill
):
2370 Return the randomized list of PGs that can have their recovery/backfill forced
2372 j
= self
.get_pg_stats();
2375 wanted
= ['degraded', 'backfilling', 'backfill_wait']
2377 wanted
= ['recovering', 'degraded', 'recovery_wait']
2379 status
= pg
['state'].split('+')
2381 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
2382 pgids
.append(pg
['pgid'])
2386 def get_pgids_to_cancel_force(self
, backfill
):
2388 Return the randomized list of PGs whose recovery/backfill priority is forced
2390 j
= self
.get_pg_stats();
2393 wanted
= 'forced_backfill'
2395 wanted
= 'forced_recovery'
2397 status
= pg
['state'].split('+')
2398 if wanted
in status
and random
.random() > 0.5:
2399 pgids
.append(pg
['pgid'])
2402 def compile_pg_status(self
):
2404 Return a histogram of pg state values
2407 j
= self
.get_pg_stats()
2409 for status
in pg
['state'].split('+'):
2410 if status
not in ret
:
2415 @wait_for_pg_stats # type: ignore
2416 def with_pg_state(self
, pool
, pgnum
, check
):
2417 pgstr
= self
.get_pgid(pool
, pgnum
)
2418 stats
= self
.get_single_pg_stats(pgstr
)
2419 assert(check(stats
['state']))
2421 @wait_for_pg_stats # type: ignore
2422 def with_pg(self
, pool
, pgnum
, check
):
2423 pgstr
= self
.get_pgid(pool
, pgnum
)
2424 stats
= self
.get_single_pg_stats(pgstr
)
2427 def get_last_scrub_stamp(self
, pool
, pgnum
):
2429 Get the timestamp of the last scrub.
2431 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
2432 return stats
["last_scrub_stamp"]
2434 def do_pg_scrub(self
, pool
, pgnum
, stype
):
2436 Scrub pg and wait for scrubbing to finish
2438 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
2439 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
2440 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
2443 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
2444 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
2445 self
.log("waiting for scrub type %s" % (stype
,))
2446 if (timer
% RESEND_TIMEOUT
) == 0:
2447 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
2448 # The first time in this loop is the actual request
2449 if timer
!= 0 and stype
== "repair":
2450 self
.log("WARNING: Resubmitted a non-idempotent repair")
2451 time
.sleep(SLEEP_TIME
)
2454 def wait_snap_trimming_complete(self
, pool
):
2456 Wait for snap trimming on pool to end
2461 poolnum
= self
.get_pool_num(pool
)
2462 poolnumstr
= "%s." % (poolnum
,)
2465 if (now
- start
) > FATAL_TIMEOUT
:
2466 assert (now
- start
) < FATAL_TIMEOUT
, \
2467 'failed to complete snap trimming before timeout'
2468 all_stats
= self
.get_pg_stats()
2470 for pg
in all_stats
:
2471 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
2472 self
.log("pg {pg} in trimming, state: {state}".format(
2478 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
2479 time
.sleep(POLL_PERIOD
)
2481 def get_single_pg_stats(self
, pgid
):
2483 Return pg for the pgid specified.
2485 all_stats
= self
.get_pg_stats()
2487 for pg
in all_stats
:
2488 if pg
['pgid'] == pgid
:
2493 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
2496 pool_dump
= self
.get_pool_dump(pool
)
2497 object_map
= self
.get_object_map(pool
, name
)
2498 if pool_dump
["type"] == PoolType
.ERASURE_CODED
:
2499 shard
= object_map
['acting'].index(osdid
)
2500 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
2503 return object_map
['pgid']
2505 def get_object_primary(self
, pool
, name
):
2508 object_map
= self
.get_object_map(pool
, name
)
2509 return object_map
['acting_primary']
2511 def get_object_map(self
, pool
, name
):
2513 osd map --format=json converted to a python object
2514 :returns: the python object
2516 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
2517 return json
.loads('\n'.join(out
.split('\n')[1:]))
2519 def get_osd_dump_json(self
):
2521 osd dump --format=json converted to a python object
2522 :returns: the python object
2524 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
2525 return json
.loads('\n'.join(out
.split('\n')[1:]))
2527 def get_osd_dump(self
):
2532 return self
.get_osd_dump_json()['osds']
2534 def get_osd_metadata(self
):
2536 osd metadata --format=json converted to a python object
2537 :returns: the python object containing osd metadata information
2539 out
= self
.raw_cluster_cmd('osd', 'metadata', '--format=json')
2540 return json
.loads('\n'.join(out
.split('\n')[1:]))
2542 def get_mgr_dump(self
):
2543 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2544 return json
.loads(out
)
2546 def get_stuck_pgs(self
, type_
, threshold
):
2548 :returns: stuck pg information from the cluster
2550 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2552 return json
.loads(out
).get('stuck_pg_stats',[])
2554 def get_num_unfound_objects(self
):
2556 Check cluster status to get the number of unfound objects
2558 status
= self
.raw_cluster_status()
2560 return status
['pgmap'].get('unfound_objects', 0)
2562 def get_num_creating(self
):
2564 Find the number of pgs in creating mode.
2566 pgs
= self
.get_pg_stats()
2569 if 'creating' in pg
['state']:
2573 def get_num_active_clean(self
):
2575 Find the number of active and clean pgs.
2577 pgs
= self
.get_pg_stats()
2578 return self
._get
_num
_active
_clean
(pgs
)
2580 def _get_num_active_clean(self
, pgs
):
2583 if (pg
['state'].count('active') and
2584 pg
['state'].count('clean') and
2585 not pg
['state'].count('stale')):
2589 def get_num_active_recovered(self
):
2591 Find the number of active and recovered pgs.
2593 pgs
= self
.get_pg_stats()
2594 return self
._get
_num
_active
_recovered
(pgs
)
2596 def _get_num_active_recovered(self
, pgs
):
2599 if (pg
['state'].count('active') and
2600 not pg
['state'].count('recover') and
2601 not pg
['state'].count('backfilling') and
2602 not pg
['state'].count('stale')):
2606 def get_is_making_recovery_progress(self
):
2608 Return whether there is recovery progress discernable in the
2611 status
= self
.raw_cluster_status()
2612 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2613 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2614 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2615 return kps
> 0 or bps
> 0 or ops
> 0
2617 def get_num_active(self
):
2619 Find the number of active pgs.
2621 pgs
= self
.get_pg_stats()
2622 return self
._get
_num
_active
(pgs
)
2624 def _get_num_active(self
, pgs
):
2627 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2631 def get_num_down(self
):
2633 Find the number of pgs that are down.
2635 pgs
= self
.get_pg_stats()
2638 if ((pg
['state'].count('down') and not
2639 pg
['state'].count('stale')) or
2640 (pg
['state'].count('incomplete') and not
2641 pg
['state'].count('stale'))):
2645 def get_num_active_down(self
):
2647 Find the number of pgs that are either active or down.
2649 pgs
= self
.get_pg_stats()
2650 return self
._get
_num
_active
_down
(pgs
)
2652 def _get_num_active_down(self
, pgs
):
2655 if ((pg
['state'].count('active') and not
2656 pg
['state'].count('stale')) or
2657 (pg
['state'].count('down') and not
2658 pg
['state'].count('stale')) or
2659 (pg
['state'].count('incomplete') and not
2660 pg
['state'].count('stale'))):
2664 def get_num_peered(self
):
2666 Find the number of PGs that are peered
2668 pgs
= self
.get_pg_stats()
2669 return self
._get
_num
_peered
(pgs
)
2671 def _get_num_peered(self
, pgs
):
2674 if pg
['state'].count('peered') and not pg
['state'].count('stale'):
2680 True if all pgs are clean
2682 pgs
= self
.get_pg_stats()
2683 if self
._get
_num
_active
_clean
(pgs
) == len(pgs
):
2686 self
.dump_pgs_not_active_clean()
2689 def is_recovered(self
):
2691 True if all pgs have recovered
2693 pgs
= self
.get_pg_stats()
2694 return self
._get
_num
_active
_recovered
(pgs
) == len(pgs
)
2696 def is_active_or_down(self
):
2698 True if all pgs are active or down
2700 pgs
= self
.get_pg_stats()
2701 return self
._get
_num
_active
_down
(pgs
) == len(pgs
)
2703 def dump_pgs_not_active_clean(self
):
2705 Dumps all pgs that are not active+clean
2707 pgs
= self
.get_pg_stats()
2709 if pg
['state'] != 'active+clean':
2710 self
.log('PG %s is not active+clean' % pg
['pgid'])
2713 def dump_pgs_not_active_down(self
):
2715 Dumps all pgs that are not active or down
2717 pgs
= self
.get_pg_stats()
2719 if 'active' not in pg
['state'] and 'down' not in pg
['state']:
2720 self
.log('PG %s is not active or down' % pg
['pgid'])
2723 def dump_pgs_not_active(self
):
2725 Dumps all pgs that are not active
2727 pgs
= self
.get_pg_stats()
2729 if 'active' not in pg
['state']:
2730 self
.log('PG %s is not active' % pg
['pgid'])
2733 def dump_pgs_not_active_peered(self
, pgs
):
2735 if (not pg
['state'].count('active')) and (not pg
['state'].count('peered')):
2736 self
.log('PG %s is not active or peered' % pg
['pgid'])
2739 def wait_for_clean(self
, timeout
=1200):
2741 Returns true when all pgs are clean.
2743 self
.log("waiting for clean")
2745 num_active_clean
= self
.get_num_active_clean()
2746 while not self
.is_clean():
2747 if timeout
is not None:
2748 if self
.get_is_making_recovery_progress():
2749 self
.log("making progress, resetting timeout")
2752 self
.log("no progress seen, keeping timeout for now")
2753 if time
.time() - start
>= timeout
:
2754 self
.log('dumping pgs not clean')
2755 self
.dump_pgs_not_active_clean()
2756 assert time
.time() - start
< timeout
, \
2757 'wait_for_clean: failed before timeout expired'
2758 cur_active_clean
= self
.get_num_active_clean()
2759 if cur_active_clean
!= num_active_clean
:
2761 num_active_clean
= cur_active_clean
2765 def are_all_osds_up(self
):
2767 Returns true if all osds are up.
2769 x
= self
.get_osd_dump()
2770 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2772 def wait_for_all_osds_up(self
, timeout
=None):
2774 When this exits, either the timeout has expired, or all
2777 self
.log("waiting for all up")
2779 while not self
.are_all_osds_up():
2780 if timeout
is not None:
2781 assert time
.time() - start
< timeout
, \
2782 'timeout expired in wait_for_all_osds_up'
2786 def pool_exists(self
, pool
):
2787 if pool
in self
.list_pools():
2791 def wait_for_pool(self
, pool
, timeout
=300):
2793 Wait for a pool to exist
2795 self
.log('waiting for pool %s to exist' % pool
)
2797 while not self
.pool_exists(pool
):
2798 if timeout
is not None:
2799 assert time
.time() - start
< timeout
, \
2800 'timeout expired in wait_for_pool'
2803 def wait_for_pools(self
, pools
):
2805 self
.wait_for_pool(pool
)
2807 def is_mgr_available(self
):
2808 x
= self
.get_mgr_dump()
2809 return x
.get('available', False)
2811 def wait_for_mgr_available(self
, timeout
=None):
2812 self
.log("waiting for mgr available")
2814 while not self
.is_mgr_available():
2815 if timeout
is not None:
2816 assert time
.time() - start
< timeout
, \
2817 'timeout expired in wait_for_mgr_available'
2819 self
.log("mgr available!")
2821 def wait_for_recovery(self
, timeout
=None):
2823 Check peering. When this exists, we have recovered.
2825 self
.log("waiting for recovery to complete")
2827 num_active_recovered
= self
.get_num_active_recovered()
2828 while not self
.is_recovered():
2830 if timeout
is not None:
2831 if self
.get_is_making_recovery_progress():
2832 self
.log("making progress, resetting timeout")
2835 self
.log("no progress seen, keeping timeout for now")
2836 if now
- start
>= timeout
:
2837 if self
.is_recovered():
2839 self
.log('dumping pgs not recovered yet')
2840 self
.dump_pgs_not_active_clean()
2841 assert now
- start
< timeout
, \
2842 'wait_for_recovery: failed before timeout expired'
2843 cur_active_recovered
= self
.get_num_active_recovered()
2844 if cur_active_recovered
!= num_active_recovered
:
2846 num_active_recovered
= cur_active_recovered
2848 self
.log("recovered!")
2850 def wait_for_active(self
, timeout
=None):
2852 Check peering. When this exists, we are definitely active
2854 self
.log("waiting for peering to complete")
2856 num_active
= self
.get_num_active()
2857 while not self
.is_active():
2858 if timeout
is not None:
2859 if time
.time() - start
>= timeout
:
2860 self
.log('dumping pgs not active')
2861 self
.dump_pgs_not_active()
2862 assert time
.time() - start
< timeout
, \
2863 'wait_for_active: failed before timeout expired'
2864 cur_active
= self
.get_num_active()
2865 if cur_active
!= num_active
:
2867 num_active
= cur_active
2871 def wait_for_active_or_down(self
, timeout
=None):
2873 Check peering. When this exists, we are definitely either
2876 self
.log("waiting for peering to complete or become blocked")
2878 num_active_down
= self
.get_num_active_down()
2879 while not self
.is_active_or_down():
2880 if timeout
is not None:
2881 if time
.time() - start
>= timeout
:
2882 self
.log('dumping pgs not active or down')
2883 self
.dump_pgs_not_active_down()
2884 assert time
.time() - start
< timeout
, \
2885 'wait_for_active_or_down: failed before timeout expired'
2886 cur_active_down
= self
.get_num_active_down()
2887 if cur_active_down
!= num_active_down
:
2889 num_active_down
= cur_active_down
2891 self
.log("active or down!")
2893 def osd_is_up(self
, osd
):
2895 Wrapper for osd check
2897 osds
= self
.get_osd_dump()
2898 return osds
[osd
]['up'] > 0
2900 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2902 Loop waiting for osd.
2904 self
.log('waiting for osd.%d to be up' % osd
)
2906 while not self
.osd_is_up(osd
):
2907 if timeout
is not None:
2908 assert time
.time() - start
< timeout
, \
2909 'osd.%d failed to come up before timeout expired' % osd
2911 self
.log('osd.%d is up' % osd
)
2913 def is_active(self
):
2915 Wrapper to check if all pgs are active
2917 return self
.get_num_active() == self
.get_num_pgs()
2919 def all_active_or_peered(self
):
2921 Wrapper to check if all PGs are active or peered
2923 pgs
= self
.get_pg_stats()
2924 if self
._get
_num
_active
(pgs
) + self
._get
_num
_peered
(pgs
) == len(pgs
):
2927 self
.dump_pgs_not_active_peered(pgs
)
2930 def wait_till_active(self
, timeout
=None):
2932 Wait until all pgs are active.
2934 self
.log("waiting till active")
2936 while not self
.is_active():
2937 if timeout
is not None:
2938 if time
.time() - start
>= timeout
:
2939 self
.log('dumping pgs not active')
2940 self
.dump_pgs_not_active()
2941 assert time
.time() - start
< timeout
, \
2942 'wait_till_active: failed before timeout expired'
2946 def wait_till_pg_convergence(self
, timeout
=None):
2949 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2950 if osd
['in'] and osd
['up']]
2952 # strictly speaking, no need to wait for mon. but due to the
2953 # "ms inject socket failures" setting, the osdmap could be delayed,
2954 # so mgr is likely to ignore the pg-stat messages with pgs serving
2955 # newly created pools which is not yet known by mgr. so, to make sure
2956 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2958 self
.flush_pg_stats(active_osds
)
2959 new_stats
= dict((stat
['pgid'], stat
['state'])
2960 for stat
in self
.get_pg_stats())
2961 if old_stats
== new_stats
:
2963 if timeout
is not None:
2964 assert time
.time() - start
< timeout
, \
2965 'failed to reach convergence before %d secs' % timeout
2966 old_stats
= new_stats
2967 # longer than mgr_stats_period
2970 def mark_out_osd(self
, osd
):
2972 Wrapper to mark osd out.
2974 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2976 def kill_osd(self
, osd
):
2978 Kill osds by either power cycling (if indicated by the config)
2981 if self
.config
.get('powercycle'):
2982 remote
= self
.find_remote('osd', osd
)
2983 self
.log('kill_osd on osd.{o} '
2984 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2985 self
._assert
_ipmi
(remote
)
2986 remote
.console
.power_off()
2987 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2988 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2991 'bdev-inject-crash', self
.config
.get('bdev_inject_crash'))
2993 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2997 raise RuntimeError('osd.%s did not fail' % osd
)
2999 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
3001 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
3004 def _assert_ipmi(remote
):
3005 assert remote
.console
.has_ipmi_credentials
, (
3006 "powercycling requested but RemoteConsole is not "
3007 "initialized. Check ipmi config.")
3009 def blackhole_kill_osd(self
, osd
):
3011 Stop osd if nothing else works.
3013 self
.inject_args('osd', osd
,
3014 'objectstore-blackhole', True)
3016 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
3018 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
3020 Revive osds by either power cycling (if indicated by the config)
3023 if self
.config
.get('powercycle'):
3024 remote
= self
.find_remote('osd', osd
)
3025 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
3026 format(o
=osd
, s
=remote
.name
))
3027 self
._assert
_ipmi
(remote
)
3028 remote
.console
.power_on()
3029 if not remote
.console
.check_status(300):
3030 raise Exception('Failed to revive osd.{o} via ipmi'.
3032 teuthology
.reconnect(self
.ctx
, 60, [remote
])
3033 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
3034 self
.make_admin_daemon_dir(remote
)
3035 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
3036 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
3038 if not skip_admin_check
:
3039 # wait for dump_ops_in_flight; this command doesn't appear
3040 # until after the signal handler is installed and it is safe
3041 # to stop the osd again without making valgrind leak checks
3042 # unhappy. see #5924.
3043 self
.wait_run_admin_socket('osd', osd
,
3044 args
=['dump_ops_in_flight'],
3045 timeout
=timeout
, stdout
=DEVNULL
)
3047 def mark_down_osd(self
, osd
):
3049 Cluster command wrapper
3051 self
.raw_cluster_cmd('osd', 'down', str(osd
))
3053 def mark_in_osd(self
, osd
):
3055 Cluster command wrapper
3057 self
.raw_cluster_cmd('osd', 'in', str(osd
))
3059 def signal_osd(self
, osd
, sig
, silent
=False):
3061 Wrapper to local get_daemon call which sends the given
3062 signal to the given osd.
3064 self
.ctx
.daemons
.get_daemon('osd', osd
,
3065 self
.cluster
).signal(sig
, silent
=silent
)
3068 def signal_mon(self
, mon
, sig
, silent
=False):
3070 Wrapper to local get_daemon call
3072 self
.ctx
.daemons
.get_daemon('mon', mon
,
3073 self
.cluster
).signal(sig
, silent
=silent
)
3075 def kill_mon(self
, mon
):
3077 Kill the monitor by either power cycling (if the config says so),
3080 if self
.config
.get('powercycle'):
3081 remote
= self
.find_remote('mon', mon
)
3082 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
3083 format(m
=mon
, s
=remote
.name
))
3084 self
._assert
_ipmi
(remote
)
3085 remote
.console
.power_off()
3087 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
3089 def revive_mon(self
, mon
):
3091 Restart by either power cycling (if the config says so),
3092 or by doing a normal restart.
3094 if self
.config
.get('powercycle'):
3095 remote
= self
.find_remote('mon', mon
)
3096 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
3097 format(m
=mon
, s
=remote
.name
))
3098 self
._assert
_ipmi
(remote
)
3099 remote
.console
.power_on()
3100 self
.make_admin_daemon_dir(remote
)
3101 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
3103 def revive_mgr(self
, mgr
):
3105 Restart by either power cycling (if the config says so),
3106 or by doing a normal restart.
3108 if self
.config
.get('powercycle'):
3109 remote
= self
.find_remote('mgr', mgr
)
3110 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
3111 format(m
=mgr
, s
=remote
.name
))
3112 self
._assert
_ipmi
(remote
)
3113 remote
.console
.power_on()
3114 self
.make_admin_daemon_dir(remote
)
3115 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
3117 def get_mon_status(self
, mon
):
3119 Extract all the monitor status information from the cluster
3121 out
= self
.raw_cluster_cmd('tell', 'mon.%s' % mon
, 'mon_status')
3122 return json
.loads(out
)
3124 def get_mon_quorum(self
):
3126 Extract monitor quorum information from the cluster
3128 out
= self
.raw_cluster_cmd('quorum_status')
3132 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
3134 Loop until quorum size is reached.
3136 self
.log('waiting for quorum size %d' % size
)
3138 with
safe_while(sleep
=sleep
,
3139 tries
=timeout
// sleep
,
3140 action
=f
'wait for quorum size {size}') as proceed
:
3143 if len(self
.get_mon_quorum()) == size
:
3145 except CommandFailedError
as e
:
3146 # could fail instea4d of blocked if the rotating key of the
3147 # connected monitor is not updated yet after they form the
3149 if e
.exitstatus
== errno
.EACCES
:
3153 self
.log("quorum is size %d" % size
)
3155 def get_mon_health(self
, debug
=False):
3157 Extract all the monitor health information.
3159 out
= self
.raw_cluster_cmd('health', '--format=json')
3161 self
.log('health:\n{h}'.format(h
=out
))
3162 return json
.loads(out
)
3164 def wait_until_healthy(self
, timeout
=None):
3165 self
.log("wait_until_healthy")
3167 while self
.get_mon_health()['status'] != 'HEALTH_OK':
3168 if timeout
is not None:
3169 assert time
.time() - start
< timeout
, \
3170 'timeout expired in wait_until_healthy'
3172 self
.log("wait_until_healthy done")
3174 def get_filepath(self
):
3176 Return path to osd data with {id} needing to be replaced
3178 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
3180 def make_admin_daemon_dir(self
, remote
):
3182 Create /var/run/ceph directory on remote site.
3185 :param remote: Remote site
3187 remote
.run(args
=['sudo',
3188 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
3190 def get_service_task_status(self
, service
, status_key
):
3192 Return daemon task status for a given ceph service.
3194 :param service: ceph service (mds, osd, etc...)
3195 :param status_key: matching task status key
3198 status
= self
.raw_cluster_status()
3200 for k
,v
in status
['servicemap']['services'][service
]['daemons'].items():
3201 ts
= dict(v
).get('task_status', None)
3203 task_status
[k
] = ts
[status_key
]
3204 except KeyError: # catches missing service and status key
3206 self
.log(task_status
)
3209 def utility_task(name
):
3211 Generate ceph_manager subtask corresponding to ceph_manager
3214 def task(ctx
, config
):
3217 args
= config
.get('args', [])
3218 kwargs
= config
.get('kwargs', {})
3219 cluster
= config
.get('cluster', 'ceph')
3220 fn
= getattr(ctx
.managers
[cluster
], name
)
3224 revive_osd
= utility_task("revive_osd")
3225 revive_mon
= utility_task("revive_mon")
3226 kill_osd
= utility_task("kill_osd")
3227 kill_mon
= utility_task("kill_mon")
3228 create_pool
= utility_task("create_pool")
3229 remove_pool
= utility_task("remove_pool")
3230 wait_for_clean
= utility_task("wait_for_clean")
3231 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
3232 set_pool_property
= utility_task("set_pool_property")
3233 do_pg_scrub
= utility_task("do_pg_scrub")
3234 wait_for_pool
= utility_task("wait_for_pool")
3235 wait_for_pools
= utility_task("wait_for_pools")