2 ceph manager -- Thrasher and CephManager objects
4 from functools
import wraps
18 from io
import BytesIO
, StringIO
19 from teuthology
import misc
as teuthology
20 from tasks
.scrub
import Scrubber
21 from tasks
.util
.rados
import cmd_erasure_code_profile
22 from tasks
.util
import get_remote
23 from teuthology
.contextutil
import safe_while
24 from teuthology
.orchestra
.remote
import Remote
25 from teuthology
.orchestra
import run
26 from teuthology
.exceptions
import CommandFailedError
27 from tasks
.thrasher
import Thrasher
28 from six
import StringIO
31 from subprocess
import DEVNULL
# py3k
33 DEVNULL
= open(os
.devnull
, 'r+') # type: ignore
35 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
37 log
= logging
.getLogger(__name__
)
39 # this is for cephadm clusters
40 def shell(ctx
, cluster_name
, remote
, args
, name
=None, **kwargs
):
41 testdir
= teuthology
.get_testdir(ctx
)
44 extra_args
= ['-n', name
]
49 '--image', ctx
.ceph
[cluster_name
].image
,
52 '--fsid', ctx
.ceph
[cluster_name
].fsid
,
58 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
60 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
62 writes
= ctx
.cluster
.run(
64 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
65 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
66 'sudo', 'tee', conf_path
, run
.Raw('&&'),
67 'sudo', 'chmod', '0644', conf_path
,
68 run
.Raw('>'), '/dev/null',
73 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
77 def mount_osd_data(ctx
, remote
, cluster
, osd
):
82 :param remote: Remote site
83 :param cluster: name of ceph cluster
86 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
87 role
= "{0}.osd.{1}".format(cluster
, osd
)
88 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
89 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
90 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
92 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
94 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
95 mount_options
= ctx
.disk_config
.\
96 remote_to_roles_to_dev_mount_options
[remote
][role
]
97 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
98 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
100 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
101 'mountpoint: {p}, type: {t}, options: {v}'.format(
102 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
110 '-o', ','.join(mount_options
),
123 self
.log(traceback
.format_exc())
133 class OSDThrasher(Thrasher
):
135 Object used to thrash Ceph
137 def __init__(self
, manager
, config
, name
, logger
):
138 super(OSDThrasher
, self
).__init
__()
140 self
.ceph_manager
= manager
141 self
.cluster
= manager
.cluster
142 self
.ceph_manager
.wait_for_clean()
143 osd_status
= self
.ceph_manager
.get_osd_status()
144 self
.in_osds
= osd_status
['in']
145 self
.live_osds
= osd_status
['live']
146 self
.out_osds
= osd_status
['out']
147 self
.dead_osds
= osd_status
['dead']
148 self
.stopping
= False
152 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
153 self
.pools_to_fix_pgp_num
= set()
154 if self
.config
.get('powercycle'):
155 self
.revive_timeout
+= 120
156 self
.clean_wait
= self
.config
.get('clean_wait', 0)
157 self
.minin
= self
.config
.get("min_in", 4)
158 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
159 self
.sighup_delay
= self
.config
.get('sighup_delay')
160 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
161 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
162 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
163 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
164 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
165 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
166 self
.random_eio
= self
.config
.get('random_eio')
167 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
169 num_osds
= self
.in_osds
+ self
.out_osds
170 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * len(num_osds
)
171 self
.min_pgs
= self
.config
.get("min_pgs_per_pool_osd", 1) * len(num_osds
)
172 if self
.config
is None:
174 # prevent monitor from auto-marking things out while thrasher runs
175 # try both old and new tell syntax, in case we are testing old code
176 self
.saved_options
= []
177 # assuming that the default settings do not vary from one daemon to
179 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
180 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
181 #why do we disable marking an OSD out automatically? :/
182 for service
, opt
, new_value
in opts
:
183 old_value
= manager
.get_config(first_mon
[0],
186 self
.saved_options
.append((service
, opt
, old_value
))
187 manager
.inject_args(service
, '*', opt
, new_value
)
188 # initialize ceph_objectstore_tool property - must be done before
189 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
190 if (self
.config
.get('powercycle') or
191 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
192 self
.config
.get('disable_objectstore_tool_tests', False)):
193 self
.ceph_objectstore_tool
= False
194 if self
.config
.get('powercycle'):
195 self
.log("Unable to test ceph-objectstore-tool, "
196 "powercycle testing")
198 self
.log("Unable to test ceph-objectstore-tool, "
199 "not available on all OSD nodes")
201 self
.ceph_objectstore_tool
= \
202 self
.config
.get('ceph_objectstore_tool', True)
204 self
.thread
= gevent
.spawn(self
.do_thrash
)
205 if self
.sighup_delay
:
206 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
207 if self
.optrack_toggle_delay
:
208 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
209 if self
.dump_ops_enable
== "true":
210 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
211 if self
.noscrub_toggle_delay
:
212 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
214 def log(self
, msg
, *args
, **kwargs
):
215 self
.logger
.info(msg
, *args
, **kwargs
)
217 def cmd_exists_on_osds(self
, cmd
):
218 if self
.ceph_manager
.cephadm
:
220 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
221 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
222 allremotes
= list(set(allremotes
))
223 for remote
in allremotes
:
224 proc
= remote
.run(args
=['type', cmd
], wait
=True,
225 check_status
=False, stdout
=BytesIO(),
227 if proc
.exitstatus
!= 0:
231 def run_ceph_objectstore_tool(self
, remote
, osd
, cmd
):
232 if self
.ceph_manager
.cephadm
:
234 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
235 args
=['ceph-objectstore-tool'] + cmd
,
237 wait
=True, check_status
=False,
242 args
=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd
,
243 wait
=True, check_status
=False,
247 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
249 :param osd: Osd to be killed.
250 :mark_down: Mark down if true.
251 :mark_out: Mark out if true.
254 osd
= random
.choice(self
.live_osds
)
255 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
256 str(self
.live_osds
)))
257 self
.live_osds
.remove(osd
)
258 self
.dead_osds
.append(osd
)
259 self
.ceph_manager
.kill_osd(osd
)
261 self
.ceph_manager
.mark_down_osd(osd
)
262 if mark_out
and osd
in self
.in_osds
:
264 if self
.ceph_objectstore_tool
:
265 self
.log("Testing ceph-objectstore-tool on down osd.%s" % osd
)
266 remote
= self
.ceph_manager
.find_remote('osd', osd
)
267 FSPATH
= self
.ceph_manager
.get_filepath()
268 JPATH
= os
.path
.join(FSPATH
, "journal")
269 exp_osd
= imp_osd
= osd
270 self
.log('remote for osd %s is %s' % (osd
, remote
))
271 exp_remote
= imp_remote
= remote
272 # If an older osd is available we'll move a pg from there
273 if (len(self
.dead_osds
) > 1 and
274 random
.random() < self
.chance_move_pg
):
275 exp_osd
= random
.choice(self
.dead_osds
[:-1])
276 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
277 self
.log('remote for exp osd %s is %s' % (exp_osd
, exp_remote
))
280 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
283 if not self
.ceph_manager
.cephadm
:
284 # ceph-objectstore-tool might be temporarily absent during an
285 # upgrade - see http://tracker.ceph.com/issues/18014
286 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
288 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
289 wait
=True, check_status
=False, stdout
=BytesIO(),
291 if proc
.exitstatus
== 0:
293 log
.debug("ceph-objectstore-tool binary not present, trying again")
295 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
296 # see http://tracker.ceph.com/issues/19556
297 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
299 proc
= self
.run_ceph_objectstore_tool(
300 exp_remote
, 'osd.%s' % exp_osd
,
302 '--data-path', FSPATH
.format(id=exp_osd
),
303 '--journal-path', JPATH
.format(id=exp_osd
),
306 if proc
.exitstatus
== 0:
308 elif (proc
.exitstatus
== 1 and
309 proc
.stderr
.getvalue() == "OSD has the store locked"):
312 raise Exception("ceph-objectstore-tool: "
313 "exp list-pgs failure with status {ret}".
314 format(ret
=proc
.exitstatus
))
316 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
318 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
320 pg
= random
.choice(pgs
)
321 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
322 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
323 exp_path
= os
.path
.join('/var/log/ceph', # available inside 'shell' container
324 "exp.{pg}.{id}".format(
327 if self
.ceph_manager
.cephadm
:
328 exp_host_path
= os
.path
.join(
330 self
.ceph_manager
.ctx
.ceph
[self
.ceph_manager
.cluster
].fsid
,
331 "exp.{pg}.{id}".format(
335 exp_host_path
= exp_path
338 # Can't use new export-remove op since this is part of upgrade testing
339 proc
= self
.run_ceph_objectstore_tool(
340 exp_remote
, 'osd.%s' % exp_osd
,
342 '--data-path', FSPATH
.format(id=exp_osd
),
343 '--journal-path', JPATH
.format(id=exp_osd
),
349 raise Exception("ceph-objectstore-tool: "
350 "export failure with status {ret}".
351 format(ret
=proc
.exitstatus
))
353 proc
= self
.run_ceph_objectstore_tool(
354 exp_remote
, 'osd.%s' % exp_osd
,
356 '--data-path', FSPATH
.format(id=exp_osd
),
357 '--journal-path', JPATH
.format(id=exp_osd
),
363 raise Exception("ceph-objectstore-tool: "
364 "remove failure with status {ret}".
365 format(ret
=proc
.exitstatus
))
366 # If there are at least 2 dead osds we might move the pg
367 if exp_osd
!= imp_osd
:
368 # If pg isn't already on this osd, then we will move it there
369 proc
= self
.run_ceph_objectstore_tool(
373 '--data-path', FSPATH
.format(id=imp_osd
),
374 '--journal-path', JPATH
.format(id=imp_osd
),
378 raise Exception("ceph-objectstore-tool: "
379 "imp list-pgs failure with status {ret}".
380 format(ret
=proc
.exitstatus
))
381 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
383 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
384 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
385 if imp_remote
!= exp_remote
:
386 # Copy export file to the other machine
387 self
.log("Transfer export file from {srem} to {trem}".
388 format(srem
=exp_remote
, trem
=imp_remote
))
389 # just in case an upgrade make /var/log/ceph unreadable by non-root,
390 exp_remote
.run(args
=['sudo', 'chmod', '777',
392 imp_remote
.run(args
=['sudo', 'chmod', '777',
394 tmpexport
= Remote
.get_file(exp_remote
, exp_host_path
,
396 if exp_host_path
!= exp_path
:
397 # push to /var/log/ceph, then rename (we can't
398 # chmod 777 the /var/log/ceph/$fsid mountpoint)
399 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
400 imp_remote
.run(args
=[
401 'sudo', 'mv', exp_path
, exp_host_path
])
403 Remote
.put_file(imp_remote
, tmpexport
, exp_host_path
)
406 # Can't move the pg after all
408 imp_remote
= exp_remote
410 proc
= self
.run_ceph_objectstore_tool(
411 imp_remote
, 'osd.%s' % imp_osd
,
413 '--data-path', FSPATH
.format(id=imp_osd
),
414 '--journal-path', JPATH
.format(id=imp_osd
),
415 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
419 if proc
.exitstatus
== 1:
420 bogosity
= "The OSD you are using is older than the exported PG"
421 if bogosity
in proc
.stderr
.getvalue():
422 self
.log("OSD older than exported PG"
424 elif proc
.exitstatus
== 10:
425 self
.log("Pool went away before processing an import"
427 elif proc
.exitstatus
== 11:
428 self
.log("Attempt to import an incompatible export"
430 elif proc
.exitstatus
== 12:
431 # this should be safe to ignore because we only ever move 1
432 # copy of the pg at a time, and merge is only initiated when
433 # all replicas are peered and happy. /me crosses fingers
434 self
.log("PG merged on target"
436 elif proc
.exitstatus
:
437 raise Exception("ceph-objectstore-tool: "
438 "import failure with status {ret}".
439 format(ret
=proc
.exitstatus
))
440 cmd
= "sudo rm -f {file}".format(file=exp_host_path
)
441 exp_remote
.run(args
=cmd
)
442 if imp_remote
!= exp_remote
:
443 imp_remote
.run(args
=cmd
)
445 # apply low split settings to each pool
446 if not self
.ceph_manager
.cephadm
:
447 for pool
in self
.ceph_manager
.list_pools():
448 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
449 "--filestore-split-multiple 1' sudo -E "
450 + 'ceph-objectstore-tool '
451 + ' '.join(prefix
+ [
452 '--data-path', FSPATH
.format(id=imp_osd
),
453 '--journal-path', JPATH
.format(id=imp_osd
),
455 + " --op apply-layout-settings --pool " + pool
).format(id=osd
)
456 proc
= imp_remote
.run(args
=cmd
,
457 wait
=True, check_status
=False,
459 if 'Couldn\'t find pool' in proc
.stderr
.getvalue():
462 raise Exception("ceph-objectstore-tool apply-layout-settings"
463 " failed with {status}".format(status
=proc
.exitstatus
))
466 def blackhole_kill_osd(self
, osd
=None):
468 If all else fails, kill the osd.
469 :param osd: Osd to be killed.
472 osd
= random
.choice(self
.live_osds
)
473 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
474 (str(osd
), str(self
.live_osds
)))
475 self
.live_osds
.remove(osd
)
476 self
.dead_osds
.append(osd
)
477 self
.ceph_manager
.blackhole_kill_osd(osd
)
479 def revive_osd(self
, osd
=None, skip_admin_check
=False):
482 :param osd: Osd to be revived.
485 osd
= random
.choice(self
.dead_osds
)
486 self
.log("Reviving osd %s" % (str(osd
),))
487 self
.ceph_manager
.revive_osd(
490 skip_admin_check
=skip_admin_check
)
491 self
.dead_osds
.remove(osd
)
492 self
.live_osds
.append(osd
)
493 if self
.random_eio
> 0 and osd
== self
.rerrosd
:
494 self
.ceph_manager
.set_config(self
.rerrosd
,
495 filestore_debug_random_read_err
= self
.random_eio
)
496 self
.ceph_manager
.set_config(self
.rerrosd
,
497 bluestore_debug_random_read_err
= self
.random_eio
)
500 def out_osd(self
, osd
=None):
503 :param osd: Osd to be marked.
506 osd
= random
.choice(self
.in_osds
)
507 self
.log("Removing osd %s, in_osds are: %s" %
508 (str(osd
), str(self
.in_osds
)))
509 self
.ceph_manager
.mark_out_osd(osd
)
510 self
.in_osds
.remove(osd
)
511 self
.out_osds
.append(osd
)
513 def in_osd(self
, osd
=None):
516 :param osd: Osd to be marked.
519 osd
= random
.choice(self
.out_osds
)
520 if osd
in self
.dead_osds
:
521 return self
.revive_osd(osd
)
522 self
.log("Adding osd %s" % (str(osd
),))
523 self
.out_osds
.remove(osd
)
524 self
.in_osds
.append(osd
)
525 self
.ceph_manager
.mark_in_osd(osd
)
526 self
.log("Added osd %s" % (str(osd
),))
528 def reweight_osd_or_by_util(self
, osd
=None):
530 Reweight an osd that is in
531 :param osd: Osd to be marked.
533 if osd
is not None or random
.choice([True, False]):
535 osd
= random
.choice(self
.in_osds
)
536 val
= random
.uniform(.1, 1.0)
537 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
538 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
541 # do it several times, the option space is large
544 'max_change': random
.choice(['0.05', '1.0', '3.0']),
545 'overage': random
.choice(['110', '1000']),
546 'type': random
.choice([
547 'reweight-by-utilization',
548 'test-reweight-by-utilization']),
550 self
.log("Reweighting by: %s"%(str(options
),))
551 self
.ceph_manager
.raw_cluster_cmd(
555 options
['max_change'])
557 def primary_affinity(self
, osd
=None):
559 osd
= random
.choice(self
.in_osds
)
560 if random
.random() >= .5:
562 elif random
.random() >= .5:
566 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
567 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
570 def thrash_cluster_full(self
):
572 Set and unset cluster full condition
574 self
.log('Setting full ratio to .001')
575 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
577 self
.log('Setting full ratio back to .95')
578 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
580 def thrash_pg_upmap(self
):
582 Install or remove random pg_upmap entries in OSDMap
584 from random
import shuffle
585 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
587 self
.log('j is %s' % j
)
589 if random
.random() >= .3:
590 pgs
= self
.ceph_manager
.get_pg_stats()
593 pg
= random
.choice(pgs
)
594 pgid
= str(pg
['pgid'])
595 poolid
= int(pgid
.split('.')[0])
596 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
600 osds
= self
.in_osds
+ self
.out_osds
603 self
.log('Setting %s to %s' % (pgid
, osds
))
604 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
605 self
.log('cmd %s' % cmd
)
606 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
612 self
.log('Clearing pg_upmap on %s' % pg
)
613 self
.ceph_manager
.raw_cluster_cmd(
618 self
.log('No pg_upmap entries; doing nothing')
619 except CommandFailedError
:
620 self
.log('Failed to rm-pg-upmap, ignoring')
622 def thrash_pg_upmap_items(self
):
624 Install or remove random pg_upmap_items entries in OSDMap
626 from random
import shuffle
627 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
629 self
.log('j is %s' % j
)
631 if random
.random() >= .3:
632 pgs
= self
.ceph_manager
.get_pg_stats()
635 pg
= random
.choice(pgs
)
636 pgid
= str(pg
['pgid'])
637 poolid
= int(pgid
.split('.')[0])
638 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
642 osds
= self
.in_osds
+ self
.out_osds
645 self
.log('Setting %s to %s' % (pgid
, osds
))
646 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
647 self
.log('cmd %s' % cmd
)
648 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
650 m
= j
['pg_upmap_items']
654 self
.log('Clearing pg_upmap on %s' % pg
)
655 self
.ceph_manager
.raw_cluster_cmd(
660 self
.log('No pg_upmap entries; doing nothing')
661 except CommandFailedError
:
662 self
.log('Failed to rm-pg-upmap-items, ignoring')
664 def force_recovery(self
):
666 Force recovery on some of PGs
668 backfill
= random
.random() >= 0.5
669 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
673 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
675 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
676 except CommandFailedError
:
677 self
.log('Failed to force backfill|recovery, ignoring')
680 def cancel_force_recovery(self
):
682 Force recovery on some of PGs
684 backfill
= random
.random() >= 0.5
685 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
689 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
691 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
692 except CommandFailedError
:
693 self
.log('Failed to force backfill|recovery, ignoring')
695 def force_cancel_recovery(self
):
697 Force or cancel forcing recovery
699 if random
.random() >= 0.4:
700 self
.force_recovery()
702 self
.cancel_force_recovery()
706 Make sure all osds are up and not out.
708 while len(self
.dead_osds
) > 0:
709 self
.log("reviving osd")
711 while len(self
.out_osds
) > 0:
712 self
.log("inning osd")
717 Make sure all osds are up and fully in.
720 for osd
in self
.live_osds
:
721 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
723 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
728 Break out of this Ceph loop
732 if self
.sighup_delay
:
733 self
.log("joining the do_sighup greenlet")
734 self
.sighup_thread
.get()
735 if self
.optrack_toggle_delay
:
736 self
.log("joining the do_optrack_toggle greenlet")
737 self
.optrack_toggle_thread
.join()
738 if self
.dump_ops_enable
== "true":
739 self
.log("joining the do_dump_ops greenlet")
740 self
.dump_ops_thread
.join()
741 if self
.noscrub_toggle_delay
:
742 self
.log("joining the do_noscrub_toggle greenlet")
743 self
.noscrub_toggle_thread
.join()
747 Increase the size of the pool
749 pool
= self
.ceph_manager
.get_pool()
752 self
.log("Growing pool %s" % (pool
,))
753 if self
.ceph_manager
.expand_pool(pool
,
754 self
.config
.get('pool_grow_by', 10),
756 self
.pools_to_fix_pgp_num
.add(pool
)
758 def shrink_pool(self
):
760 Decrease the size of the pool
762 pool
= self
.ceph_manager
.get_pool()
765 _
= self
.ceph_manager
.get_pool_pg_num(pool
)
766 self
.log("Shrinking pool %s" % (pool
,))
767 if self
.ceph_manager
.contract_pool(
769 self
.config
.get('pool_shrink_by', 10),
771 self
.pools_to_fix_pgp_num
.add(pool
)
773 def fix_pgp_num(self
, pool
=None):
775 Fix number of pgs in pool.
778 pool
= self
.ceph_manager
.get_pool()
784 self
.log("fixing pg num pool %s" % (pool
,))
785 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
786 self
.pools_to_fix_pgp_num
.discard(pool
)
788 def test_pool_min_size(self
):
790 Loop to selectively push PGs below their min_size and test that recovery
793 self
.log("test_pool_min_size")
795 self
.ceph_manager
.wait_for_recovery(
796 timeout
=self
.config
.get('timeout')
799 minout
= int(self
.config
.get("min_out", 1))
800 minlive
= int(self
.config
.get("min_live", 2))
801 mindead
= int(self
.config
.get("min_dead", 1))
802 self
.log("doing min_size thrashing")
803 self
.ceph_manager
.wait_for_clean(timeout
=60)
804 assert self
.ceph_manager
.is_clean(), \
805 'not clean before minsize thrashing starts'
806 while not self
.stopping
:
807 # look up k and m from all the pools on each loop, in case it
808 # changes as the cluster runs
812 pools_json
= self
.ceph_manager
.get_osd_dump_json()['pools']
814 for pool_json
in pools_json
:
815 pool
= pool_json
['pool_name']
817 pool_type
= pool_json
['type'] # 1 for rep, 3 for ec
818 min_size
= pool_json
['min_size']
819 self
.log("pool {pool} min_size is {min_size}".format(pool
=pool
,min_size
=min_size
))
821 ec_profile
= self
.ceph_manager
.get_pool_property(pool
, 'erasure_code_profile')
822 if pool_type
!= PoolType
.ERASURE_CODED
:
824 ec_profile
= pool_json
['erasure_code_profile']
825 ec_profile_json
= self
.ceph_manager
.raw_cluster_cmd(
827 'erasure-code-profile',
831 ec_json
= json
.loads(ec_profile_json
)
832 local_k
= int(ec_json
['k'])
833 local_m
= int(ec_json
['m'])
834 self
.log("pool {pool} local_k={k} local_m={m}".format(pool
=pool
,
835 k
=local_k
, m
=local_m
))
837 self
.log("setting k={local_k} from previous {k}".format(local_k
=local_k
, k
=k
))
840 self
.log("setting m={local_m} from previous {m}".format(local_m
=local_m
, m
=m
))
842 except CommandFailedError
:
843 self
.log("failed to read erasure_code_profile. %s was likely removed", pool
)
847 self
.log("using k={k}, m={m}".format(k
=k
,m
=m
))
849 self
.log("No pools yet, waiting")
853 if minout
> len(self
.out_osds
): # kill OSDs and mark out
854 self
.log("forced to out an osd")
855 self
.kill_osd(mark_out
=True)
857 elif mindead
> len(self
.dead_osds
): # kill OSDs but force timeout
858 self
.log("forced to kill an osd")
861 else: # make mostly-random choice to kill or revive OSDs
862 minup
= max(minlive
, k
)
863 rand_val
= random
.uniform(0, 1)
864 self
.log("choosing based on number of live OSDs and rand val {rand}".\
865 format(rand
=rand_val
))
866 if len(self
.live_osds
) > minup
+1 and rand_val
< 0.5:
867 # chose to knock out as many OSDs as we can w/out downing PGs
869 most_killable
= min(len(self
.live_osds
) - minup
, m
)
870 self
.log("chose to kill {n} OSDs".format(n
=most_killable
))
871 for i
in range(1, most_killable
):
872 self
.kill_osd(mark_out
=True)
874 # try a few times since there might be a concurrent pool
875 # creation or deletion
878 action
='check for active or peered') as proceed
:
880 if self
.ceph_manager
.all_active_or_peered():
882 self
.log('not all PGs are active or peered')
883 else: # chose to revive OSDs, bring up a random fraction of the dead ones
884 self
.log("chose to revive osds")
885 for i
in range(1, int(rand_val
* len(self
.dead_osds
))):
888 # let PGs repair themselves or our next knockout might kill one
889 self
.ceph_manager
.wait_for_clean(timeout
=self
.config
.get('timeout'))
891 # / while not self.stopping
894 self
.ceph_manager
.wait_for_recovery(
895 timeout
=self
.config
.get('timeout')
898 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
900 Pause injection testing. Check for osd being down when finished.
902 the_one
= random
.choice(self
.live_osds
)
903 self
.log("inject_pause on {osd}".format(osd
=the_one
))
905 "Testing {key} pause injection for duration {duration}".format(
910 "Checking after {after}, should_be_down={shouldbedown}".format(
912 shouldbedown
=should_be_down
914 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
915 if not should_be_down
:
917 time
.sleep(check_after
)
918 status
= self
.ceph_manager
.get_osd_status()
919 assert the_one
in status
['down']
920 time
.sleep(duration
- check_after
+ 20)
921 status
= self
.ceph_manager
.get_osd_status()
922 assert not the_one
in status
['down']
924 def test_backfill_full(self
):
926 Test backfills stopping when the replica fills up.
928 First, use injectfull admin command to simulate a now full
929 osd by setting it to 0 on all of the OSDs.
931 Second, on a random subset, set
932 osd_debug_skip_full_check_in_backfill_reservation to force
933 the more complicated check in do_scan to be exercised.
935 Then, verify that all backfillings stop.
937 self
.log("injecting backfill full")
938 for i
in self
.live_osds
:
939 self
.ceph_manager
.set_config(
941 osd_debug_skip_full_check_in_backfill_reservation
=
942 random
.choice(['false', 'true']))
943 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
944 check_status
=True, timeout
=30, stdout
=DEVNULL
)
946 status
= self
.ceph_manager
.compile_pg_status()
947 if 'backfilling' not in status
.keys():
950 "waiting for {still_going} backfillings".format(
951 still_going
=status
.get('backfilling')))
953 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
954 for i
in self
.live_osds
:
955 self
.ceph_manager
.set_config(
957 osd_debug_skip_full_check_in_backfill_reservation
='false')
958 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
959 check_status
=True, timeout
=30, stdout
=DEVNULL
)
961 def test_map_discontinuity(self
):
963 1) Allows the osds to recover
965 3) allows the remaining osds to recover
966 4) waits for some time
968 This sequence should cause the revived osd to have to handle
969 a map gap since the mons would have trimmed
971 while len(self
.in_osds
) < (self
.minin
+ 1):
973 self
.log("Waiting for recovery")
974 self
.ceph_manager
.wait_for_all_osds_up(
975 timeout
=self
.config
.get('timeout')
977 # now we wait 20s for the pg status to change, if it takes longer,
978 # the test *should* fail!
980 self
.ceph_manager
.wait_for_clean(
981 timeout
=self
.config
.get('timeout')
984 # now we wait 20s for the backfill replicas to hear about the clean
986 self
.log("Recovered, killing an osd")
987 self
.kill_osd(mark_down
=True, mark_out
=True)
988 self
.log("Waiting for clean again")
989 self
.ceph_manager
.wait_for_clean(
990 timeout
=self
.config
.get('timeout')
992 self
.log("Waiting for trim")
993 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
996 def choose_action(self
):
998 Random action selector.
1000 chance_down
= self
.config
.get('chance_down', 0.4)
1001 _
= self
.config
.get('chance_test_min_size', 0)
1002 chance_test_backfill_full
= \
1003 self
.config
.get('chance_test_backfill_full', 0)
1004 if isinstance(chance_down
, int):
1005 chance_down
= float(chance_down
) / 100
1007 minout
= int(self
.config
.get("min_out", 0))
1008 minlive
= int(self
.config
.get("min_live", 2))
1009 mindead
= int(self
.config
.get("min_dead", 0))
1011 self
.log('choose_action: min_in %d min_out '
1012 '%d min_live %d min_dead %d' %
1013 (minin
, minout
, minlive
, mindead
))
1015 if len(self
.in_osds
) > minin
:
1016 actions
.append((self
.out_osd
, 1.0,))
1017 if len(self
.live_osds
) > minlive
and chance_down
> 0:
1018 actions
.append((self
.kill_osd
, chance_down
,))
1019 if len(self
.out_osds
) > minout
:
1020 actions
.append((self
.in_osd
, 1.7,))
1021 if len(self
.dead_osds
) > mindead
:
1022 actions
.append((self
.revive_osd
, 1.0,))
1023 if self
.config
.get('thrash_primary_affinity', True):
1024 actions
.append((self
.primary_affinity
, 1.0,))
1025 actions
.append((self
.reweight_osd_or_by_util
,
1026 self
.config
.get('reweight_osd', .5),))
1027 actions
.append((self
.grow_pool
,
1028 self
.config
.get('chance_pgnum_grow', 0),))
1029 actions
.append((self
.shrink_pool
,
1030 self
.config
.get('chance_pgnum_shrink', 0),))
1031 actions
.append((self
.fix_pgp_num
,
1032 self
.config
.get('chance_pgpnum_fix', 0),))
1033 actions
.append((self
.test_pool_min_size
,
1034 self
.config
.get('chance_test_min_size', 0),))
1035 actions
.append((self
.test_backfill_full
,
1036 chance_test_backfill_full
,))
1037 if self
.chance_thrash_cluster_full
> 0:
1038 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
1039 if self
.chance_thrash_pg_upmap
> 0:
1040 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
1041 if self
.chance_thrash_pg_upmap_items
> 0:
1042 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
1043 if self
.chance_force_recovery
> 0:
1044 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
1046 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1049 self
.inject_pause(key
,
1050 self
.config
.get('pause_short', 3),
1053 self
.config
.get('chance_inject_pause_short', 1),),
1055 self
.inject_pause(key
,
1056 self
.config
.get('pause_long', 80),
1057 self
.config
.get('pause_check_after', 70),
1059 self
.config
.get('chance_inject_pause_long', 0),)]:
1060 actions
.append(scenario
)
1062 total
= sum([y
for (x
, y
) in actions
])
1063 val
= random
.uniform(0, total
)
1064 for (action
, prob
) in actions
:
1070 def do_thrash(self
):
1072 _do_thrash() wrapper.
1076 except Exception as e
:
1077 # See _run exception comment for MDSThrasher
1078 self
.set_thrasher_exception(e
)
1079 self
.logger
.exception("exception:")
1080 # Allow successful completion so gevent doesn't see an exception.
1081 # The DaemonWatchdog will observe the error and tear down the test.
1084 def do_sighup(self
):
1086 Loops and sends signal.SIGHUP to a random live osd.
1088 Loop delay is controlled by the config value sighup_delay.
1090 delay
= float(self
.sighup_delay
)
1091 self
.log("starting do_sighup with a delay of {0}".format(delay
))
1092 while not self
.stopping
:
1093 osd
= random
.choice(self
.live_osds
)
1094 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
1098 def do_optrack_toggle(self
):
1100 Loops and toggle op tracking to all osds.
1102 Loop delay is controlled by the config value optrack_toggle_delay.
1104 delay
= float(self
.optrack_toggle_delay
)
1106 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
1107 while not self
.stopping
:
1108 if osd_state
== "true":
1113 self
.ceph_manager
.inject_args('osd', '*',
1114 'osd_enable_op_tracker',
1116 except CommandFailedError
:
1117 self
.log('Failed to tell all osds, ignoring')
1121 def do_dump_ops(self
):
1123 Loops and does op dumps on all osds
1125 self
.log("starting do_dump_ops")
1126 while not self
.stopping
:
1127 for osd
in self
.live_osds
:
1128 # Ignore errors because live_osds is in flux
1129 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
1130 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1131 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
1132 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1133 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
1134 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1138 def do_noscrub_toggle(self
):
1140 Loops and toggle noscrub flags
1142 Loop delay is controlled by the config value noscrub_toggle_delay.
1144 delay
= float(self
.noscrub_toggle_delay
)
1145 scrub_state
= "none"
1146 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
1147 while not self
.stopping
:
1148 if scrub_state
== "none":
1149 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
1150 scrub_state
= "noscrub"
1151 elif scrub_state
== "noscrub":
1152 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1153 scrub_state
= "both"
1154 elif scrub_state
== "both":
1155 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1156 scrub_state
= "nodeep-scrub"
1158 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1159 scrub_state
= "none"
1161 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1162 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1165 def _do_thrash(self
):
1167 Loop to select random actions to thrash ceph manager with.
1169 cleanint
= self
.config
.get("clean_interval", 60)
1170 scrubint
= self
.config
.get("scrub_interval", -1)
1171 maxdead
= self
.config
.get("max_dead", 0)
1172 delay
= self
.config
.get("op_delay", 5)
1173 self
.rerrosd
= self
.live_osds
[0]
1174 if self
.random_eio
> 0:
1175 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1176 'filestore_debug_random_read_err',
1178 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1179 'bluestore_debug_random_read_err',
1181 self
.log("starting do_thrash")
1182 while not self
.stopping
:
1183 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1184 "out_osds: ", self
.out_osds
,
1185 "dead_osds: ", self
.dead_osds
,
1186 "live_osds: ", self
.live_osds
]]
1187 self
.log(" ".join(to_log
))
1188 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1189 while len(self
.dead_osds
) > maxdead
:
1191 for osd
in self
.in_osds
:
1192 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1194 if random
.uniform(0, 1) < float(
1195 self
.config
.get('chance_test_map_discontinuity', 0)) \
1196 and len(self
.live_osds
) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1197 self
.test_map_discontinuity()
1199 self
.ceph_manager
.wait_for_recovery(
1200 timeout
=self
.config
.get('timeout')
1202 time
.sleep(self
.clean_wait
)
1204 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1205 self
.log('Scrubbing while thrashing being performed')
1206 Scrubber(self
.ceph_manager
, self
.config
)
1207 self
.choose_action()()
1210 if self
.random_eio
> 0:
1211 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1212 'filestore_debug_random_read_err', '0.0')
1213 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1214 'bluestore_debug_random_read_err', '0.0')
1215 for pool
in list(self
.pools_to_fix_pgp_num
):
1216 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1217 self
.fix_pgp_num(pool
)
1218 self
.pools_to_fix_pgp_num
.clear()
1219 for service
, opt
, saved_value
in self
.saved_options
:
1220 self
.ceph_manager
.inject_args(service
, '*', opt
, saved_value
)
1221 self
.saved_options
= []
1225 class ObjectStoreTool
:
1227 def __init__(self
, manager
, pool
, **kwargs
):
1228 self
.manager
= manager
1230 self
.osd
= kwargs
.get('osd', None)
1231 self
.object_name
= kwargs
.get('object_name', None)
1232 self
.do_revive
= kwargs
.get('do_revive', True)
1233 if self
.osd
and self
.pool
and self
.object_name
:
1234 if self
.osd
== "primary":
1235 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1238 if self
.object_name
:
1239 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1242 self
.remote
= next(iter(self
.manager
.ctx
.\
1243 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()))
1244 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1245 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1248 def build_cmd(self
, options
, args
, stdin
):
1250 if self
.object_name
:
1251 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1252 "{paths} --pgid {pgid} --op list |"
1253 "grep '\"oid\":\"{name}\"')".
1254 format(paths
=self
.paths
,
1256 name
=self
.object_name
))
1257 args
= '"$object" ' + args
1258 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1259 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1260 format(paths
=self
.paths
,
1264 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1265 format(payload
=base64
.encode(stdin
),
1268 return "\n".join(lines
)
1270 def run(self
, options
, args
):
1271 self
.manager
.kill_osd(self
.osd
)
1272 cmd
= self
.build_cmd(options
, args
, None)
1273 self
.manager
.log(cmd
)
1275 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1280 if proc
.exitstatus
!= 0:
1281 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1282 error
= proc
.stdout
.getvalue().decode() + " " + \
1283 proc
.stderr
.getvalue().decode()
1284 raise Exception(error
)
1287 self
.manager
.revive_osd(self
.osd
)
1288 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1291 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1295 Ceph manager object.
1296 Contains several local functions that form a bulk of this module.
1298 :param controller: the remote machine where the Ceph commands should be
1300 :param ctx: the cluster context
1301 :param config: path to Ceph config file
1302 :param logger: for logging messages
1303 :param cluster: name of the Ceph cluster
1306 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1307 cluster
='ceph', cephadm
=False):
1308 self
.lock
= threading
.RLock()
1310 self
.config
= config
1311 self
.controller
= controller
1312 self
.next_pool_id
= 0
1313 self
.cluster
= cluster
1314 self
.cephadm
= cephadm
1316 self
.log
= lambda x
: logger
.info(x
)
1320 implement log behavior.
1324 if self
.config
is None:
1325 self
.config
= dict()
1326 pools
= self
.list_pools()
1329 # we may race with a pool deletion; ignore failures here
1331 self
.pools
[pool
] = self
.get_pool_int_property(pool
, 'pg_num')
1332 except CommandFailedError
:
1333 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1335 def raw_cluster_cmd(self
, *args
):
1337 Start ceph on a raw cluster. Return count
1340 proc
= shell(self
.ctx
, self
.cluster
, self
.controller
,
1341 args
=['ceph'] + list(args
),
1344 testdir
= teuthology
.get_testdir(self
.ctx
)
1349 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1357 ceph_args
.extend(args
)
1358 proc
= self
.controller
.run(
1362 return six
.ensure_str(proc
.stdout
.getvalue())
1364 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1366 Start ceph on a cluster. Return success or failure information.
1369 proc
= shell(self
.ctx
, self
.cluster
, self
.controller
,
1370 args
=['ceph'] + list(args
),
1373 testdir
= teuthology
.get_testdir(self
.ctx
)
1378 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1385 ceph_args
.extend(args
)
1386 kwargs
['args'] = ceph_args
1387 kwargs
['check_status'] = False
1388 proc
= self
.controller
.run(**kwargs
)
1389 return proc
.exitstatus
1391 def run_ceph_w(self
, watch_channel
=None):
1393 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1394 and return the RemoteProcess.
1396 :param watch_channel: Specifies the channel to be watched. This can be
1397 'cluster', 'audit', ...
1398 :type watch_channel: str
1407 if watch_channel
is not None:
1408 args
.append("--watch-channel")
1409 args
.append(watch_channel
)
1410 return self
.controller
.run(args
=args
, wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1412 def get_mon_socks(self
):
1414 Get monitor sockets.
1416 :return socks: tuple of strings; strings are individual sockets.
1418 from json
import loads
1420 output
= loads(self
.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1422 for mon
in output
['mons']:
1423 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1424 socks
.append(addrvec_mem
['addr'])
1427 def get_msgrv1_mon_socks(self
):
1429 Get monitor sockets that use msgrv1 to operate.
1431 :return socks: tuple of strings; strings are individual sockets.
1433 from json
import loads
1435 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1437 for mon
in output
['mons']:
1438 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1439 if addrvec_mem
['type'] == 'v1':
1440 socks
.append(addrvec_mem
['addr'])
1443 def get_msgrv2_mon_socks(self
):
1445 Get monitor sockets that use msgrv2 to operate.
1447 :return socks: tuple of strings; strings are individual sockets.
1449 from json
import loads
1451 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1453 for mon
in output
['mons']:
1454 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1455 if addrvec_mem
['type'] == 'v2':
1456 socks
.append(addrvec_mem
['addr'])
1459 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1461 Flush pg stats from a list of OSD ids, ensuring they are reflected
1462 all the way to the monitor. Luminous and later only.
1464 :param osds: list of OSDs to flush
1465 :param no_wait: list of OSDs not to wait for seq id. by default, we
1466 wait for all specified osds, but some of them could be
1467 moved out of osdmap, so we cannot get their updated
1468 stat seq from monitor anymore. in that case, you need
1469 to pass a blacklist.
1470 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1471 it. (5 min by default)
1473 seq
= {osd
: int(self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats'))
1475 if not wait_for_mon
:
1479 for osd
, need
in seq
.items():
1483 while wait_for_mon
> 0:
1484 got
= int(self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
))
1485 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1486 need
=need
, got
=got
, osd
=osd
))
1491 wait_for_mon
-= A_WHILE
1493 raise Exception('timed out waiting for mon to be updated with '
1494 'osd.{osd}: {got} < {need}'.
1495 format(osd
=osd
, got
=got
, need
=need
))
1497 def flush_all_pg_stats(self
):
1498 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1500 def do_rados(self
, remote
, cmd
, check_status
=True):
1502 Execute a remote rados command.
1504 testdir
= teuthology
.get_testdir(self
.ctx
)
1508 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1517 check_status
=check_status
1521 def rados_write_objects(self
, pool
, num_objects
, size
,
1522 timelimit
, threads
, cleanup
=False):
1525 Threads not used yet.
1529 '--num-objects', num_objects
,
1535 args
.append('--no-cleanup')
1536 return self
.do_rados(self
.controller
, map(str, args
))
1538 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1540 Implement rados put operation
1543 if namespace
is not None:
1544 args
+= ['-N', namespace
]
1550 return self
.do_rados(
1556 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1558 Implement rados get operation
1561 if namespace
is not None:
1562 args
+= ['-N', namespace
]
1568 return self
.do_rados(
1574 def do_rm(self
, pool
, obj
, namespace
=None):
1576 Implement rados rm operation
1579 if namespace
is not None:
1580 args
+= ['-N', namespace
]
1585 return self
.do_rados(
1591 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1594 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1596 def find_remote(self
, service_type
, service_id
):
1598 Get the Remote for the host where a particular service runs.
1600 :param service_type: 'mds', 'osd', 'client'
1601 :param service_id: The second part of a role, e.g. '0' for
1603 :return: a Remote instance for the host where the
1604 requested role is placed
1606 return get_remote(self
.ctx
, self
.cluster
,
1607 service_type
, service_id
)
1609 def admin_socket(self
, service_type
, service_id
,
1610 command
, check_status
=True, timeout
=0, stdout
=None):
1612 Remotely start up ceph specifying the admin socket
1613 :param command: a list of words to use as the command
1619 remote
= self
.find_remote(service_type
, service_id
)
1623 self
.ctx
, self
.cluster
, remote
,
1625 'ceph', 'daemon', '%s.%s' % (service_type
, service_id
),
1629 check_status
=check_status
,
1632 testdir
= teuthology
.get_testdir(self
.ctx
)
1637 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1644 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1645 cluster
=self
.cluster
,
1649 args
.extend(command
)
1654 check_status
=check_status
1657 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1658 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1660 def get_pgid(self
, pool
, pgnum
):
1662 :param pool: pool name
1663 :param pgnum: pg number
1664 :returns: a string representing this pg.
1666 poolnum
= self
.get_pool_num(pool
)
1667 pg_str
= "{poolnum}.{pgnum}".format(
1672 def get_pg_replica(self
, pool
, pgnum
):
1674 get replica for pool, pgnum (e.g. (data, 0)->0
1676 pg_str
= self
.get_pgid(pool
, pgnum
)
1677 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1678 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1679 return int(j
['acting'][-1])
1682 def wait_for_pg_stats(func
):
1683 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1684 # by default, and take the faulty injection in ms into consideration,
1685 # 12 seconds are more than enough
1686 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1688 def wrapper(self
, *args
, **kwargs
):
1690 for delay
in delays
:
1692 return func(self
, *args
, **kwargs
)
1693 except AssertionError as e
:
1699 def get_pg_primary(self
, pool
, pgnum
):
1701 get primary for pool, pgnum (e.g. (data, 0)->0
1703 pg_str
= self
.get_pgid(pool
, pgnum
)
1704 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1705 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1706 return int(j
['acting'][0])
1709 def get_pool_num(self
, pool
):
1711 get number for pool (e.g., data -> 2)
1713 return int(self
.get_pool_dump(pool
)['pool'])
1715 def list_pools(self
):
1719 osd_dump
= self
.get_osd_dump_json()
1720 self
.log(osd_dump
['pools'])
1721 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1723 def clear_pools(self
):
1727 [self
.remove_pool(i
) for i
in self
.list_pools()]
1729 def kick_recovery_wq(self
, osdnum
):
1731 Run kick_recovery_wq on cluster.
1733 return self
.raw_cluster_cmd(
1734 'tell', "osd.%d" % (int(osdnum
),),
1739 def wait_run_admin_socket(self
, service_type
,
1740 service_id
, args
=['version'], timeout
=75, stdout
=None):
1742 If osd_admin_socket call succeeds, return. Otherwise wait
1743 five seconds and try again.
1749 proc
= self
.admin_socket(service_type
, service_id
,
1750 args
, check_status
=False, stdout
=stdout
)
1751 if proc
.exitstatus
== 0:
1755 if (tries
* 5) > timeout
:
1756 raise Exception('timed out waiting for admin_socket '
1757 'to appear after {type}.{id} restart'.
1758 format(type=service_type
,
1760 self
.log("waiting on admin_socket for {type}-{id}, "
1761 "{command}".format(type=service_type
,
1766 def get_pool_dump(self
, pool
):
1768 get the osd dump part of a pool
1770 osd_dump
= self
.get_osd_dump_json()
1771 for i
in osd_dump
['pools']:
1772 if i
['pool_name'] == pool
:
1776 def get_config(self
, service_type
, service_id
, name
):
1778 :param node: like 'mon.a'
1779 :param name: the option name
1781 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1783 j
= json
.loads(proc
.stdout
.getvalue())
1786 def inject_args(self
, service_type
, service_id
, name
, value
):
1787 whom
= '{0}.{1}'.format(service_type
, service_id
)
1788 if isinstance(value
, bool):
1789 value
= 'true' if value
else 'false'
1790 opt_arg
= '--{name}={value}'.format(name
=name
, value
=value
)
1791 self
.raw_cluster_cmd('--', 'tell', whom
, 'injectargs', opt_arg
)
1793 def set_config(self
, osdnum
, **argdict
):
1795 :param osdnum: osd number
1796 :param argdict: dictionary containing values to set.
1798 for k
, v
in argdict
.items():
1799 self
.wait_run_admin_socket(
1801 ['config', 'set', str(k
), str(v
)])
1803 def raw_cluster_status(self
):
1805 Get status from cluster
1807 status
= self
.raw_cluster_cmd('status', '--format=json')
1808 return json
.loads(status
)
1810 def raw_osd_status(self
):
1812 Get osd status from cluster
1814 return self
.raw_cluster_cmd('osd', 'dump')
1816 def get_osd_status(self
):
1818 Get osd statuses sorted by states that the osds are in.
1820 osd_lines
= list(filter(
1821 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1822 self
.raw_osd_status().split('\n')))
1824 in_osds
= [int(i
[4:].split()[0])
1825 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1826 out_osds
= [int(i
[4:].split()[0])
1827 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1828 up_osds
= [int(i
[4:].split()[0])
1829 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1830 down_osds
= [int(i
[4:].split()[0])
1831 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1832 dead_osds
= [int(x
.id_
)
1833 for x
in filter(lambda x
:
1836 iter_daemons_of_role('osd', self
.cluster
))]
1837 live_osds
= [int(x
.id_
) for x
in
1840 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1842 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1843 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1846 def get_num_pgs(self
):
1848 Check cluster status for the number of pgs
1850 status
= self
.raw_cluster_status()
1852 return status
['pgmap']['num_pgs']
1854 def create_erasure_code_profile(self
, profile_name
, profile
):
1856 Create an erasure code profile name that can be used as a parameter
1857 when creating an erasure coded pool.
1860 args
= cmd_erasure_code_profile(profile_name
, profile
)
1861 self
.raw_cluster_cmd(*args
)
1863 def create_pool_with_unique_name(self
, pg_num
=16,
1864 erasure_code_profile_name
=None,
1866 erasure_code_use_overwrites
=False):
1868 Create a pool named unique_pool_X where X is unique.
1872 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1873 self
.next_pool_id
+= 1
1877 erasure_code_profile_name
=erasure_code_profile_name
,
1879 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1882 @contextlib.contextmanager
1883 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1884 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1886 self
.remove_pool(pool_name
)
1888 def create_pool(self
, pool_name
, pg_num
=16,
1889 erasure_code_profile_name
=None,
1891 erasure_code_use_overwrites
=False):
1893 Create a pool named from the pool_name parameter.
1894 :param pool_name: name of the pool being created.
1895 :param pg_num: initial number of pgs.
1896 :param erasure_code_profile_name: if set and !None create an
1897 erasure coded pool using the profile
1898 :param erasure_code_use_overwrites: if true, allow overwrites
1901 assert isinstance(pool_name
, str)
1902 assert isinstance(pg_num
, int)
1903 assert pool_name
not in self
.pools
1904 self
.log("creating pool_name %s" % (pool_name
,))
1905 if erasure_code_profile_name
:
1906 self
.raw_cluster_cmd('osd', 'pool', 'create',
1907 pool_name
, str(pg_num
), str(pg_num
),
1908 'erasure', erasure_code_profile_name
)
1910 self
.raw_cluster_cmd('osd', 'pool', 'create',
1911 pool_name
, str(pg_num
))
1912 if min_size
is not None:
1913 self
.raw_cluster_cmd(
1914 'osd', 'pool', 'set', pool_name
,
1917 if erasure_code_use_overwrites
:
1918 self
.raw_cluster_cmd(
1919 'osd', 'pool', 'set', pool_name
,
1920 'allow_ec_overwrites',
1922 self
.raw_cluster_cmd(
1923 'osd', 'pool', 'application', 'enable',
1924 pool_name
, 'rados', '--yes-i-really-mean-it',
1925 run
.Raw('||'), 'true')
1926 self
.pools
[pool_name
] = pg_num
1929 def add_pool_snap(self
, pool_name
, snap_name
):
1932 :param pool_name: name of pool to snapshot
1933 :param snap_name: name of snapshot to take
1935 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1936 str(pool_name
), str(snap_name
))
1938 def remove_pool_snap(self
, pool_name
, snap_name
):
1940 Remove pool snapshot
1941 :param pool_name: name of pool to snapshot
1942 :param snap_name: name of snapshot to remove
1944 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1945 str(pool_name
), str(snap_name
))
1947 def remove_pool(self
, pool_name
):
1949 Remove the indicated pool
1950 :param pool_name: Pool to be removed
1953 assert isinstance(pool_name
, str)
1954 assert pool_name
in self
.pools
1955 self
.log("removing pool_name %s" % (pool_name
,))
1956 del self
.pools
[pool_name
]
1957 self
.raw_cluster_cmd('osd', 'pool', 'rm', pool_name
, pool_name
,
1958 "--yes-i-really-really-mean-it")
1966 return random
.sample(self
.pools
.keys(), 1)[0]
1968 def get_pool_pg_num(self
, pool_name
):
1970 Return the number of pgs in the pool specified.
1973 assert isinstance(pool_name
, str)
1974 if pool_name
in self
.pools
:
1975 return self
.pools
[pool_name
]
1978 def get_pool_property(self
, pool_name
, prop
):
1980 :param pool_name: pool
1981 :param prop: property to be checked.
1982 :returns: property as string
1985 assert isinstance(pool_name
, str)
1986 assert isinstance(prop
, str)
1987 output
= self
.raw_cluster_cmd(
1993 return output
.split()[1]
1995 def get_pool_int_property(self
, pool_name
, prop
):
1996 return int(self
.get_pool_property(pool_name
, prop
))
1998 def set_pool_property(self
, pool_name
, prop
, val
):
2000 :param pool_name: pool
2001 :param prop: property to be set.
2002 :param val: value to set.
2004 This routine retries if set operation fails.
2007 assert isinstance(pool_name
, str)
2008 assert isinstance(prop
, str)
2009 assert isinstance(val
, int)
2012 r
= self
.raw_cluster_cmd_result(
2019 if r
!= 11: # EAGAIN
2023 raise Exception('timed out getting EAGAIN '
2024 'when setting pool property %s %s = %s' %
2025 (pool_name
, prop
, val
))
2026 self
.log('got EAGAIN setting pool property, '
2027 'waiting a few seconds...')
2030 def expand_pool(self
, pool_name
, by
, max_pgs
):
2032 Increase the number of pgs in a pool
2035 assert isinstance(pool_name
, str)
2036 assert isinstance(by
, int)
2037 assert pool_name
in self
.pools
2038 if self
.get_num_creating() > 0:
2040 if (self
.pools
[pool_name
] + by
) > max_pgs
:
2042 self
.log("increase pool size by %d" % (by
,))
2043 new_pg_num
= self
.pools
[pool_name
] + by
2044 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2045 self
.pools
[pool_name
] = new_pg_num
2048 def contract_pool(self
, pool_name
, by
, min_pgs
):
2050 Decrease the number of pgs in a pool
2053 self
.log('contract_pool %s by %s min %s' % (
2054 pool_name
, str(by
), str(min_pgs
)))
2055 assert isinstance(pool_name
, str)
2056 assert isinstance(by
, int)
2057 assert pool_name
in self
.pools
2058 if self
.get_num_creating() > 0:
2059 self
.log('too many creating')
2061 proj
= self
.pools
[pool_name
] - by
2063 self
.log('would drop below min_pgs, proj %d, currently %d' % (proj
,self
.pools
[pool_name
],))
2065 self
.log("decrease pool size by %d" % (by
,))
2066 new_pg_num
= self
.pools
[pool_name
] - by
2067 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2068 self
.pools
[pool_name
] = new_pg_num
2071 def stop_pg_num_changes(self
):
2073 Reset all pg_num_targets back to pg_num, canceling splits and merges
2075 self
.log('Canceling any pending splits or merges...')
2076 osd_dump
= self
.get_osd_dump_json()
2078 for pool
in osd_dump
['pools']:
2079 if pool
['pg_num'] != pool
['pg_num_target']:
2080 self
.log('Setting pool %s (%d) pg_num %d -> %d' %
2081 (pool
['pool_name'], pool
['pool'],
2082 pool
['pg_num_target'],
2084 self
.raw_cluster_cmd('osd', 'pool', 'set', pool
['pool_name'],
2085 'pg_num', str(pool
['pg_num']))
2087 # we don't support pg_num_target before nautilus
2090 def set_pool_pgpnum(self
, pool_name
, force
):
2092 Set pgpnum property of pool_name pool.
2095 assert isinstance(pool_name
, str)
2096 assert pool_name
in self
.pools
2097 if not force
and self
.get_num_creating() > 0:
2099 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
2102 def list_pg_unfound(self
, pgid
):
2104 return list of unfound pgs with the id specified
2109 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_unfound',
2115 r
['objects'].extend(j
['objects'])
2120 offset
= j
['objects'][-1]['oid']
2125 def get_pg_stats(self
):
2127 Dump the cluster and get pg stats
2129 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
2130 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2132 return j
['pg_map']['pg_stats']
2134 return j
['pg_stats']
2136 def get_pgids_to_force(self
, backfill
):
2138 Return the randomized list of PGs that can have their recovery/backfill forced
2140 j
= self
.get_pg_stats();
2143 wanted
= ['degraded', 'backfilling', 'backfill_wait']
2145 wanted
= ['recovering', 'degraded', 'recovery_wait']
2147 status
= pg
['state'].split('+')
2149 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
2150 pgids
.append(pg
['pgid'])
2154 def get_pgids_to_cancel_force(self
, backfill
):
2156 Return the randomized list of PGs whose recovery/backfill priority is forced
2158 j
= self
.get_pg_stats();
2161 wanted
= 'forced_backfill'
2163 wanted
= 'forced_recovery'
2165 status
= pg
['state'].split('+')
2166 if wanted
in status
and random
.random() > 0.5:
2167 pgids
.append(pg
['pgid'])
2170 def compile_pg_status(self
):
2172 Return a histogram of pg state values
2175 j
= self
.get_pg_stats()
2177 for status
in pg
['state'].split('+'):
2178 if status
not in ret
:
2183 @wait_for_pg_stats # type: ignore
2184 def with_pg_state(self
, pool
, pgnum
, check
):
2185 pgstr
= self
.get_pgid(pool
, pgnum
)
2186 stats
= self
.get_single_pg_stats(pgstr
)
2187 assert(check(stats
['state']))
2189 @wait_for_pg_stats # type: ignore
2190 def with_pg(self
, pool
, pgnum
, check
):
2191 pgstr
= self
.get_pgid(pool
, pgnum
)
2192 stats
= self
.get_single_pg_stats(pgstr
)
2195 def get_last_scrub_stamp(self
, pool
, pgnum
):
2197 Get the timestamp of the last scrub.
2199 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
2200 return stats
["last_scrub_stamp"]
2202 def do_pg_scrub(self
, pool
, pgnum
, stype
):
2204 Scrub pg and wait for scrubbing to finish
2206 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
2207 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
2208 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
2211 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
2212 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
2213 self
.log("waiting for scrub type %s" % (stype
,))
2214 if (timer
% RESEND_TIMEOUT
) == 0:
2215 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
2216 # The first time in this loop is the actual request
2217 if timer
!= 0 and stype
== "repair":
2218 self
.log("WARNING: Resubmitted a non-idempotent repair")
2219 time
.sleep(SLEEP_TIME
)
2222 def wait_snap_trimming_complete(self
, pool
):
2224 Wait for snap trimming on pool to end
2229 poolnum
= self
.get_pool_num(pool
)
2230 poolnumstr
= "%s." % (poolnum
,)
2233 if (now
- start
) > FATAL_TIMEOUT
:
2234 assert (now
- start
) < FATAL_TIMEOUT
, \
2235 'failed to complete snap trimming before timeout'
2236 all_stats
= self
.get_pg_stats()
2238 for pg
in all_stats
:
2239 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
2240 self
.log("pg {pg} in trimming, state: {state}".format(
2246 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
2247 time
.sleep(POLL_PERIOD
)
2249 def get_single_pg_stats(self
, pgid
):
2251 Return pg for the pgid specified.
2253 all_stats
= self
.get_pg_stats()
2255 for pg
in all_stats
:
2256 if pg
['pgid'] == pgid
:
2261 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
2264 pool_dump
= self
.get_pool_dump(pool
)
2265 object_map
= self
.get_object_map(pool
, name
)
2266 if pool_dump
["type"] == PoolType
.ERASURE_CODED
:
2267 shard
= object_map
['acting'].index(osdid
)
2268 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
2271 return object_map
['pgid']
2273 def get_object_primary(self
, pool
, name
):
2276 object_map
= self
.get_object_map(pool
, name
)
2277 return object_map
['acting_primary']
2279 def get_object_map(self
, pool
, name
):
2281 osd map --format=json converted to a python object
2282 :returns: the python object
2284 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
2285 return json
.loads('\n'.join(out
.split('\n')[1:]))
2287 def get_osd_dump_json(self
):
2289 osd dump --format=json converted to a python object
2290 :returns: the python object
2292 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
2293 return json
.loads('\n'.join(out
.split('\n')[1:]))
2295 def get_osd_dump(self
):
2300 return self
.get_osd_dump_json()['osds']
2302 def get_osd_metadata(self
):
2304 osd metadata --format=json converted to a python object
2305 :returns: the python object containing osd metadata information
2307 out
= self
.raw_cluster_cmd('osd', 'metadata', '--format=json')
2308 return json
.loads('\n'.join(out
.split('\n')[1:]))
2310 def get_mgr_dump(self
):
2311 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2312 return json
.loads(out
)
2314 def get_stuck_pgs(self
, type_
, threshold
):
2316 :returns: stuck pg information from the cluster
2318 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2320 return json
.loads(out
).get('stuck_pg_stats',[])
2322 def get_num_unfound_objects(self
):
2324 Check cluster status to get the number of unfound objects
2326 status
= self
.raw_cluster_status()
2328 return status
['pgmap'].get('unfound_objects', 0)
2330 def get_num_creating(self
):
2332 Find the number of pgs in creating mode.
2334 pgs
= self
.get_pg_stats()
2337 if 'creating' in pg
['state']:
2341 def get_num_active_clean(self
):
2343 Find the number of active and clean pgs.
2345 pgs
= self
.get_pg_stats()
2346 return self
._get
_num
_active
_clean
(pgs
)
2348 def _get_num_active_clean(self
, pgs
):
2351 if (pg
['state'].count('active') and
2352 pg
['state'].count('clean') and
2353 not pg
['state'].count('stale')):
2357 def get_num_active_recovered(self
):
2359 Find the number of active and recovered pgs.
2361 pgs
= self
.get_pg_stats()
2362 return self
._get
_num
_active
_recovered
(pgs
)
2364 def _get_num_active_recovered(self
, pgs
):
2367 if (pg
['state'].count('active') and
2368 not pg
['state'].count('recover') and
2369 not pg
['state'].count('backfilling') and
2370 not pg
['state'].count('stale')):
2374 def get_is_making_recovery_progress(self
):
2376 Return whether there is recovery progress discernable in the
2379 status
= self
.raw_cluster_status()
2380 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2381 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2382 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2383 return kps
> 0 or bps
> 0 or ops
> 0
2385 def get_num_active(self
):
2387 Find the number of active pgs.
2389 pgs
= self
.get_pg_stats()
2390 return self
._get
_num
_active
(pgs
)
2392 def _get_num_active(self
, pgs
):
2395 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2399 def get_num_down(self
):
2401 Find the number of pgs that are down.
2403 pgs
= self
.get_pg_stats()
2406 if ((pg
['state'].count('down') and not
2407 pg
['state'].count('stale')) or
2408 (pg
['state'].count('incomplete') and not
2409 pg
['state'].count('stale'))):
2413 def get_num_active_down(self
):
2415 Find the number of pgs that are either active or down.
2417 pgs
= self
.get_pg_stats()
2418 return self
._get
_num
_active
_down
(pgs
)
2420 def _get_num_active_down(self
, pgs
):
2423 if ((pg
['state'].count('active') and not
2424 pg
['state'].count('stale')) or
2425 (pg
['state'].count('down') and not
2426 pg
['state'].count('stale')) or
2427 (pg
['state'].count('incomplete') and not
2428 pg
['state'].count('stale'))):
2432 def get_num_peered(self
):
2434 Find the number of PGs that are peered
2436 pgs
= self
.get_pg_stats()
2437 return self
._get
_num
_peered
(pgs
)
2439 def _get_num_peered(self
, pgs
):
2442 if pg
['state'].count('peered') and not pg
['state'].count('stale'):
2448 True if all pgs are clean
2450 pgs
= self
.get_pg_stats()
2451 return self
._get
_num
_active
_clean
(pgs
) == len(pgs
)
2453 def is_recovered(self
):
2455 True if all pgs have recovered
2457 pgs
= self
.get_pg_stats()
2458 return self
._get
_num
_active
_recovered
(pgs
) == len(pgs
)
2460 def is_active_or_down(self
):
2462 True if all pgs are active or down
2464 pgs
= self
.get_pg_stats()
2465 return self
._get
_num
_active
_down
(pgs
) == len(pgs
)
2467 def dump_pgs_not_active_clean(self
):
2469 Dumps all pgs that are not active+clean
2471 pgs
= self
.get_pg_stats()
2473 if pg
['state'] != 'active+clean':
2474 self
.log('PG %s is not active+clean' % pg
['pgid'])
2477 def dump_pgs_not_active_down(self
):
2479 Dumps all pgs that are not active or down
2481 pgs
= self
.get_pg_stats()
2483 if 'active' not in pg
['state'] and 'down' not in pg
['state']:
2484 self
.log('PG %s is not active or down' % pg
['pgid'])
2487 def dump_pgs_not_active(self
):
2489 Dumps all pgs that are not active
2491 pgs
= self
.get_pg_stats()
2493 if 'active' not in pg
['state']:
2494 self
.log('PG %s is not active' % pg
['pgid'])
2497 def wait_for_clean(self
, timeout
=1200):
2499 Returns true when all pgs are clean.
2501 self
.log("waiting for clean")
2503 num_active_clean
= self
.get_num_active_clean()
2504 while not self
.is_clean():
2505 if timeout
is not None:
2506 if self
.get_is_making_recovery_progress():
2507 self
.log("making progress, resetting timeout")
2510 self
.log("no progress seen, keeping timeout for now")
2511 if time
.time() - start
>= timeout
:
2512 self
.log('dumping pgs not clean')
2513 self
.dump_pgs_not_active_clean()
2514 assert time
.time() - start
< timeout
, \
2515 'wait_for_clean: failed before timeout expired'
2516 cur_active_clean
= self
.get_num_active_clean()
2517 if cur_active_clean
!= num_active_clean
:
2519 num_active_clean
= cur_active_clean
2523 def are_all_osds_up(self
):
2525 Returns true if all osds are up.
2527 x
= self
.get_osd_dump()
2528 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2530 def wait_for_all_osds_up(self
, timeout
=None):
2532 When this exits, either the timeout has expired, or all
2535 self
.log("waiting for all up")
2537 while not self
.are_all_osds_up():
2538 if timeout
is not None:
2539 assert time
.time() - start
< timeout
, \
2540 'timeout expired in wait_for_all_osds_up'
2544 def pool_exists(self
, pool
):
2545 if pool
in self
.list_pools():
2549 def wait_for_pool(self
, pool
, timeout
=300):
2551 Wait for a pool to exist
2553 self
.log('waiting for pool %s to exist' % pool
)
2555 while not self
.pool_exists(pool
):
2556 if timeout
is not None:
2557 assert time
.time() - start
< timeout
, \
2558 'timeout expired in wait_for_pool'
2561 def wait_for_pools(self
, pools
):
2563 self
.wait_for_pool(pool
)
2565 def is_mgr_available(self
):
2566 x
= self
.get_mgr_dump()
2567 return x
.get('available', False)
2569 def wait_for_mgr_available(self
, timeout
=None):
2570 self
.log("waiting for mgr available")
2572 while not self
.is_mgr_available():
2573 if timeout
is not None:
2574 assert time
.time() - start
< timeout
, \
2575 'timeout expired in wait_for_mgr_available'
2577 self
.log("mgr available!")
2579 def wait_for_recovery(self
, timeout
=None):
2581 Check peering. When this exists, we have recovered.
2583 self
.log("waiting for recovery to complete")
2585 num_active_recovered
= self
.get_num_active_recovered()
2586 while not self
.is_recovered():
2588 if timeout
is not None:
2589 if self
.get_is_making_recovery_progress():
2590 self
.log("making progress, resetting timeout")
2593 self
.log("no progress seen, keeping timeout for now")
2594 if now
- start
>= timeout
:
2595 if self
.is_recovered():
2597 self
.log('dumping pgs not recovered yet')
2598 self
.dump_pgs_not_active_clean()
2599 assert now
- start
< timeout
, \
2600 'wait_for_recovery: failed before timeout expired'
2601 cur_active_recovered
= self
.get_num_active_recovered()
2602 if cur_active_recovered
!= num_active_recovered
:
2604 num_active_recovered
= cur_active_recovered
2606 self
.log("recovered!")
2608 def wait_for_active(self
, timeout
=None):
2610 Check peering. When this exists, we are definitely active
2612 self
.log("waiting for peering to complete")
2614 num_active
= self
.get_num_active()
2615 while not self
.is_active():
2616 if timeout
is not None:
2617 if time
.time() - start
>= timeout
:
2618 self
.log('dumping pgs not active')
2619 self
.dump_pgs_not_active()
2620 assert time
.time() - start
< timeout
, \
2621 'wait_for_active: failed before timeout expired'
2622 cur_active
= self
.get_num_active()
2623 if cur_active
!= num_active
:
2625 num_active
= cur_active
2629 def wait_for_active_or_down(self
, timeout
=None):
2631 Check peering. When this exists, we are definitely either
2634 self
.log("waiting for peering to complete or become blocked")
2636 num_active_down
= self
.get_num_active_down()
2637 while not self
.is_active_or_down():
2638 if timeout
is not None:
2639 if time
.time() - start
>= timeout
:
2640 self
.log('dumping pgs not active or down')
2641 self
.dump_pgs_not_active_down()
2642 assert time
.time() - start
< timeout
, \
2643 'wait_for_active_or_down: failed before timeout expired'
2644 cur_active_down
= self
.get_num_active_down()
2645 if cur_active_down
!= num_active_down
:
2647 num_active_down
= cur_active_down
2649 self
.log("active or down!")
2651 def osd_is_up(self
, osd
):
2653 Wrapper for osd check
2655 osds
= self
.get_osd_dump()
2656 return osds
[osd
]['up'] > 0
2658 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2660 Loop waiting for osd.
2662 self
.log('waiting for osd.%d to be up' % osd
)
2664 while not self
.osd_is_up(osd
):
2665 if timeout
is not None:
2666 assert time
.time() - start
< timeout
, \
2667 'osd.%d failed to come up before timeout expired' % osd
2669 self
.log('osd.%d is up' % osd
)
2671 def is_active(self
):
2673 Wrapper to check if all pgs are active
2675 return self
.get_num_active() == self
.get_num_pgs()
2677 def all_active_or_peered(self
):
2679 Wrapper to check if all PGs are active or peered
2681 pgs
= self
.get_pg_stats()
2682 return self
._get
_num
_active
(pgs
) + self
._get
_num
_peered
(pgs
) == len(pgs
)
2684 def wait_till_active(self
, timeout
=None):
2686 Wait until all pgs are active.
2688 self
.log("waiting till active")
2690 while not self
.is_active():
2691 if timeout
is not None:
2692 if time
.time() - start
>= timeout
:
2693 self
.log('dumping pgs not active')
2694 self
.dump_pgs_not_active()
2695 assert time
.time() - start
< timeout
, \
2696 'wait_till_active: failed before timeout expired'
2700 def wait_till_pg_convergence(self
, timeout
=None):
2703 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2704 if osd
['in'] and osd
['up']]
2706 # strictly speaking, no need to wait for mon. but due to the
2707 # "ms inject socket failures" setting, the osdmap could be delayed,
2708 # so mgr is likely to ignore the pg-stat messages with pgs serving
2709 # newly created pools which is not yet known by mgr. so, to make sure
2710 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2712 self
.flush_pg_stats(active_osds
)
2713 new_stats
= dict((stat
['pgid'], stat
['state'])
2714 for stat
in self
.get_pg_stats())
2715 if old_stats
== new_stats
:
2717 if timeout
is not None:
2718 assert time
.time() - start
< timeout
, \
2719 'failed to reach convergence before %d secs' % timeout
2720 old_stats
= new_stats
2721 # longer than mgr_stats_period
2724 def mark_out_osd(self
, osd
):
2726 Wrapper to mark osd out.
2728 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2730 def kill_osd(self
, osd
):
2732 Kill osds by either power cycling (if indicated by the config)
2735 if self
.config
.get('powercycle'):
2736 remote
= self
.find_remote('osd', osd
)
2737 self
.log('kill_osd on osd.{o} '
2738 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2739 self
._assert
_ipmi
(remote
)
2740 remote
.console
.power_off()
2741 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2742 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2745 'bdev-inject-crash', self
.config
.get('bdev_inject_crash'))
2747 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2751 raise RuntimeError('osd.%s did not fail' % osd
)
2753 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2755 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2758 def _assert_ipmi(remote
):
2759 assert remote
.console
.has_ipmi_credentials
, (
2760 "powercycling requested but RemoteConsole is not "
2761 "initialized. Check ipmi config.")
2763 def blackhole_kill_osd(self
, osd
):
2765 Stop osd if nothing else works.
2767 self
.inject_args('osd', osd
,
2768 'objectstore-blackhole', True)
2770 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2772 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
2774 Revive osds by either power cycling (if indicated by the config)
2777 if self
.config
.get('powercycle'):
2778 remote
= self
.find_remote('osd', osd
)
2779 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2780 format(o
=osd
, s
=remote
.name
))
2781 self
._assert
_ipmi
(remote
)
2782 remote
.console
.power_on()
2783 if not remote
.console
.check_status(300):
2784 raise Exception('Failed to revive osd.{o} via ipmi'.
2786 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2787 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2788 self
.make_admin_daemon_dir(remote
)
2789 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2790 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2792 if not skip_admin_check
:
2793 # wait for dump_ops_in_flight; this command doesn't appear
2794 # until after the signal handler is installed and it is safe
2795 # to stop the osd again without making valgrind leak checks
2796 # unhappy. see #5924.
2797 self
.wait_run_admin_socket('osd', osd
,
2798 args
=['dump_ops_in_flight'],
2799 timeout
=timeout
, stdout
=DEVNULL
)
2801 def mark_down_osd(self
, osd
):
2803 Cluster command wrapper
2805 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2807 def mark_in_osd(self
, osd
):
2809 Cluster command wrapper
2811 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2813 def signal_osd(self
, osd
, sig
, silent
=False):
2815 Wrapper to local get_daemon call which sends the given
2816 signal to the given osd.
2818 self
.ctx
.daemons
.get_daemon('osd', osd
,
2819 self
.cluster
).signal(sig
, silent
=silent
)
2822 def signal_mon(self
, mon
, sig
, silent
=False):
2824 Wrapper to local get_daemon call
2826 self
.ctx
.daemons
.get_daemon('mon', mon
,
2827 self
.cluster
).signal(sig
, silent
=silent
)
2829 def kill_mon(self
, mon
):
2831 Kill the monitor by either power cycling (if the config says so),
2834 if self
.config
.get('powercycle'):
2835 remote
= self
.find_remote('mon', mon
)
2836 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2837 format(m
=mon
, s
=remote
.name
))
2838 self
._assert
_ipmi
(remote
)
2839 remote
.console
.power_off()
2841 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2843 def revive_mon(self
, mon
):
2845 Restart by either power cycling (if the config says so),
2846 or by doing a normal restart.
2848 if self
.config
.get('powercycle'):
2849 remote
= self
.find_remote('mon', mon
)
2850 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2851 format(m
=mon
, s
=remote
.name
))
2852 self
._assert
_ipmi
(remote
)
2853 remote
.console
.power_on()
2854 self
.make_admin_daemon_dir(remote
)
2855 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2857 def revive_mgr(self
, mgr
):
2859 Restart by either power cycling (if the config says so),
2860 or by doing a normal restart.
2862 if self
.config
.get('powercycle'):
2863 remote
= self
.find_remote('mgr', mgr
)
2864 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2865 format(m
=mgr
, s
=remote
.name
))
2866 self
._assert
_ipmi
(remote
)
2867 remote
.console
.power_on()
2868 self
.make_admin_daemon_dir(remote
)
2869 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
2871 def get_mon_status(self
, mon
):
2873 Extract all the monitor status information from the cluster
2875 out
= self
.raw_cluster_cmd('tell', 'mon.%s' % mon
, 'mon_status')
2876 return json
.loads(out
)
2878 def get_mon_quorum(self
):
2880 Extract monitor quorum information from the cluster
2882 out
= self
.raw_cluster_cmd('quorum_status')
2884 self
.log('quorum_status is %s' % out
)
2887 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2889 Loop until quorum size is reached.
2891 self
.log('waiting for quorum size %d' % size
)
2893 while not len(self
.get_mon_quorum()) == size
:
2894 if timeout
is not None:
2895 assert time
.time() - start
< timeout
, \
2896 ('failed to reach quorum size %d '
2897 'before timeout expired' % size
)
2899 self
.log("quorum is size %d" % size
)
2901 def get_mon_health(self
, debug
=False):
2903 Extract all the monitor health information.
2905 out
= self
.raw_cluster_cmd('health', '--format=json')
2907 self
.log('health:\n{h}'.format(h
=out
))
2908 return json
.loads(out
)
2910 def wait_until_healthy(self
, timeout
=None):
2911 self
.log("wait_until_healthy")
2913 while self
.get_mon_health()['status'] != 'HEALTH_OK':
2914 if timeout
is not None:
2915 assert time
.time() - start
< timeout
, \
2916 'timeout expired in wait_until_healthy'
2918 self
.log("wait_until_healthy done")
2920 def get_filepath(self
):
2922 Return path to osd data with {id} needing to be replaced
2924 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2926 def make_admin_daemon_dir(self
, remote
):
2928 Create /var/run/ceph directory on remote site.
2931 :param remote: Remote site
2933 remote
.run(args
=['sudo',
2934 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2936 def get_service_task_status(self
, service
, status_key
):
2938 Return daemon task status for a given ceph service.
2940 :param service: ceph service (mds, osd, etc...)
2941 :param status_key: matching task status key
2944 status
= self
.raw_cluster_status()
2946 for k
,v
in status
['servicemap']['services'][service
]['daemons'].items():
2947 ts
= dict(v
).get('task_status', None)
2949 task_status
[k
] = ts
[status_key
]
2950 except KeyError: # catches missing service and status key
2952 self
.log(task_status
)
2955 def utility_task(name
):
2957 Generate ceph_manager subtask corresponding to ceph_manager
2960 def task(ctx
, config
):
2963 args
= config
.get('args', [])
2964 kwargs
= config
.get('kwargs', {})
2965 cluster
= config
.get('cluster', 'ceph')
2966 fn
= getattr(ctx
.managers
[cluster
], name
)
2970 revive_osd
= utility_task("revive_osd")
2971 revive_mon
= utility_task("revive_mon")
2972 kill_osd
= utility_task("kill_osd")
2973 kill_mon
= utility_task("kill_mon")
2974 create_pool
= utility_task("create_pool")
2975 remove_pool
= utility_task("remove_pool")
2976 wait_for_clean
= utility_task("wait_for_clean")
2977 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
2978 set_pool_property
= utility_task("set_pool_property")
2979 do_pg_scrub
= utility_task("do_pg_scrub")
2980 wait_for_pool
= utility_task("wait_for_pool")
2981 wait_for_pools
= utility_task("wait_for_pools")