2 ceph manager -- Thrasher and CephManager objects
4 from functools
import wraps
18 from io
import BytesIO
19 from teuthology
import misc
as teuthology
20 from tasks
.scrub
import Scrubber
21 from tasks
.util
.rados
import cmd_erasure_code_profile
22 from tasks
.util
import get_remote
23 from teuthology
.contextutil
import safe_while
24 from teuthology
.orchestra
.remote
import Remote
25 from teuthology
.orchestra
import run
26 from teuthology
.exceptions
import CommandFailedError
27 from tasks
.thrasher
import Thrasher
28 from six
import StringIO
31 from subprocess
import DEVNULL
# py3k
33 DEVNULL
= open(os
.devnull
, 'r+') # type: ignore
35 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
37 log
= logging
.getLogger(__name__
)
39 # this is for cephadm clusters
40 def shell(ctx
, cluster_name
, remote
, args
, name
=None, **kwargs
):
41 testdir
= teuthology
.get_testdir(ctx
)
44 extra_args
= ['-n', name
]
49 '--image', ctx
.ceph
[cluster_name
].image
,
52 '--fsid', ctx
.ceph
[cluster_name
].fsid
,
58 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
60 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
62 writes
= ctx
.cluster
.run(
64 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
65 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
66 'sudo', 'tee', conf_path
, run
.Raw('&&'),
67 'sudo', 'chmod', '0644', conf_path
,
68 run
.Raw('>'), '/dev/null',
73 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
77 def mount_osd_data(ctx
, remote
, cluster
, osd
):
82 :param remote: Remote site
83 :param cluster: name of ceph cluster
86 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
87 role
= "{0}.osd.{1}".format(cluster
, osd
)
88 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
89 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
90 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
92 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
94 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
95 mount_options
= ctx
.disk_config
.\
96 remote_to_roles_to_dev_mount_options
[remote
][role
]
97 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
98 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
100 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
101 'mountpoint: {p}, type: {t}, options: {v}'.format(
102 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
110 '-o', ','.join(mount_options
),
123 self
.log(traceback
.format_exc())
133 class OSDThrasher(Thrasher
):
135 Object used to thrash Ceph
137 def __init__(self
, manager
, config
, name
, logger
):
138 super(OSDThrasher
, self
).__init
__()
140 self
.ceph_manager
= manager
141 self
.cluster
= manager
.cluster
142 self
.ceph_manager
.wait_for_clean()
143 osd_status
= self
.ceph_manager
.get_osd_status()
144 self
.in_osds
= osd_status
['in']
145 self
.live_osds
= osd_status
['live']
146 self
.out_osds
= osd_status
['out']
147 self
.dead_osds
= osd_status
['dead']
148 self
.stopping
= False
152 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
153 self
.pools_to_fix_pgp_num
= set()
154 if self
.config
.get('powercycle'):
155 self
.revive_timeout
+= 120
156 self
.clean_wait
= self
.config
.get('clean_wait', 0)
157 self
.minin
= self
.config
.get("min_in", 4)
158 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
159 self
.sighup_delay
= self
.config
.get('sighup_delay')
160 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
161 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
162 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
163 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
164 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
165 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
166 self
.random_eio
= self
.config
.get('random_eio')
167 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
169 num_osds
= self
.in_osds
+ self
.out_osds
170 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * len(num_osds
)
171 self
.min_pgs
= self
.config
.get("min_pgs_per_pool_osd", 1) * len(num_osds
)
172 if self
.config
is None:
174 # prevent monitor from auto-marking things out while thrasher runs
175 # try both old and new tell syntax, in case we are testing old code
176 self
.saved_options
= []
177 # assuming that the default settings do not vary from one daemon to
179 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
180 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
181 #why do we disable marking an OSD out automatically? :/
182 for service
, opt
, new_value
in opts
:
183 old_value
= manager
.get_config(first_mon
[0],
186 self
.saved_options
.append((service
, opt
, old_value
))
187 manager
.inject_args(service
, '*', opt
, new_value
)
188 # initialize ceph_objectstore_tool property - must be done before
189 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
190 if (self
.config
.get('powercycle') or
191 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
192 self
.config
.get('disable_objectstore_tool_tests', False)):
193 self
.ceph_objectstore_tool
= False
194 if self
.config
.get('powercycle'):
195 self
.log("Unable to test ceph-objectstore-tool, "
196 "powercycle testing")
198 self
.log("Unable to test ceph-objectstore-tool, "
199 "not available on all OSD nodes")
201 self
.ceph_objectstore_tool
= \
202 self
.config
.get('ceph_objectstore_tool', True)
204 self
.thread
= gevent
.spawn(self
.do_thrash
)
205 if self
.sighup_delay
:
206 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
207 if self
.optrack_toggle_delay
:
208 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
209 if self
.dump_ops_enable
== "true":
210 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
211 if self
.noscrub_toggle_delay
:
212 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
214 def log(self
, msg
, *args
, **kwargs
):
215 self
.logger
.info(msg
, *args
, **kwargs
)
217 def cmd_exists_on_osds(self
, cmd
):
218 if self
.ceph_manager
.cephadm
:
220 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
221 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
222 allremotes
= list(set(allremotes
))
223 for remote
in allremotes
:
224 proc
= remote
.run(args
=['type', cmd
], wait
=True,
225 check_status
=False, stdout
=BytesIO(),
227 if proc
.exitstatus
!= 0:
231 def run_ceph_objectstore_tool(self
, remote
, osd
, cmd
):
232 if self
.ceph_manager
.cephadm
:
234 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
235 args
=['ceph-objectstore-tool'] + cmd
,
237 wait
=True, check_status
=False,
242 args
=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd
,
243 wait
=True, check_status
=False,
247 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
249 :param osd: Osd to be killed.
250 :mark_down: Mark down if true.
251 :mark_out: Mark out if true.
254 osd
= random
.choice(self
.live_osds
)
255 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
256 str(self
.live_osds
)))
257 self
.live_osds
.remove(osd
)
258 self
.dead_osds
.append(osd
)
259 self
.ceph_manager
.kill_osd(osd
)
261 self
.ceph_manager
.mark_down_osd(osd
)
262 if mark_out
and osd
in self
.in_osds
:
264 if self
.ceph_objectstore_tool
:
265 self
.log("Testing ceph-objectstore-tool on down osd.%s" % osd
)
266 remote
= self
.ceph_manager
.find_remote('osd', osd
)
267 FSPATH
= self
.ceph_manager
.get_filepath()
268 JPATH
= os
.path
.join(FSPATH
, "journal")
269 exp_osd
= imp_osd
= osd
270 self
.log('remote for osd %s is %s' % (osd
, remote
))
271 exp_remote
= imp_remote
= remote
272 # If an older osd is available we'll move a pg from there
273 if (len(self
.dead_osds
) > 1 and
274 random
.random() < self
.chance_move_pg
):
275 exp_osd
= random
.choice(self
.dead_osds
[:-1])
276 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
277 self
.log('remote for exp osd %s is %s' % (exp_osd
, exp_remote
))
280 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
283 if not self
.ceph_manager
.cephadm
:
284 # ceph-objectstore-tool might be temporarily absent during an
285 # upgrade - see http://tracker.ceph.com/issues/18014
286 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
288 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
289 wait
=True, check_status
=False, stdout
=BytesIO(),
291 if proc
.exitstatus
== 0:
293 log
.debug("ceph-objectstore-tool binary not present, trying again")
295 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
296 # see http://tracker.ceph.com/issues/19556
297 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
299 proc
= self
.run_ceph_objectstore_tool(
300 exp_remote
, 'osd.%s' % exp_osd
,
302 '--data-path', FSPATH
.format(id=exp_osd
),
303 '--journal-path', JPATH
.format(id=exp_osd
),
306 if proc
.exitstatus
== 0:
308 elif (proc
.exitstatus
== 1 and
309 proc
.stderr
.getvalue() == "OSD has the store locked"):
312 raise Exception("ceph-objectstore-tool: "
313 "exp list-pgs failure with status {ret}".
314 format(ret
=proc
.exitstatus
))
316 pgs
= six
.ensure_str(proc
.stdout
.getvalue()).split('\n')[:-1]
318 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
320 pg
= random
.choice(pgs
)
321 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
322 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
323 exp_path
= os
.path
.join('/var/log/ceph', # available inside 'shell' container
324 "exp.{pg}.{id}".format(
327 if self
.ceph_manager
.cephadm
:
328 exp_host_path
= os
.path
.join(
330 self
.ceph_manager
.ctx
.ceph
[self
.ceph_manager
.cluster
].fsid
,
331 "exp.{pg}.{id}".format(
335 exp_host_path
= exp_path
338 # Can't use new export-remove op since this is part of upgrade testing
339 proc
= self
.run_ceph_objectstore_tool(
340 exp_remote
, 'osd.%s' % exp_osd
,
342 '--data-path', FSPATH
.format(id=exp_osd
),
343 '--journal-path', JPATH
.format(id=exp_osd
),
349 raise Exception("ceph-objectstore-tool: "
350 "export failure with status {ret}".
351 format(ret
=proc
.exitstatus
))
353 proc
= self
.run_ceph_objectstore_tool(
354 exp_remote
, 'osd.%s' % exp_osd
,
356 '--data-path', FSPATH
.format(id=exp_osd
),
357 '--journal-path', JPATH
.format(id=exp_osd
),
363 raise Exception("ceph-objectstore-tool: "
364 "remove failure with status {ret}".
365 format(ret
=proc
.exitstatus
))
366 # If there are at least 2 dead osds we might move the pg
367 if exp_osd
!= imp_osd
:
368 # If pg isn't already on this osd, then we will move it there
369 proc
= self
.run_ceph_objectstore_tool(
373 '--data-path', FSPATH
.format(id=imp_osd
),
374 '--journal-path', JPATH
.format(id=imp_osd
),
378 raise Exception("ceph-objectstore-tool: "
379 "imp list-pgs failure with status {ret}".
380 format(ret
=proc
.exitstatus
))
381 pgs
= six
.ensure_str(proc
.stdout
.getvalue()).split('\n')[:-1]
383 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
384 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
385 if imp_remote
!= exp_remote
:
386 # Copy export file to the other machine
387 self
.log("Transfer export file from {srem} to {trem}".
388 format(srem
=exp_remote
, trem
=imp_remote
))
389 # just in case an upgrade make /var/log/ceph unreadable by non-root,
390 exp_remote
.run(args
=['sudo', 'chmod', '777',
392 imp_remote
.run(args
=['sudo', 'chmod', '777',
394 tmpexport
= Remote
.get_file(exp_remote
, exp_host_path
,
396 if exp_host_path
!= exp_path
:
397 # push to /var/log/ceph, then rename (we can't
398 # chmod 777 the /var/log/ceph/$fsid mountpoint)
399 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
400 imp_remote
.run(args
=[
401 'sudo', 'mv', exp_path
, exp_host_path
])
403 Remote
.put_file(imp_remote
, tmpexport
, exp_host_path
)
406 # Can't move the pg after all
408 imp_remote
= exp_remote
410 proc
= self
.run_ceph_objectstore_tool(
411 imp_remote
, 'osd.%s' % imp_osd
,
413 '--data-path', FSPATH
.format(id=imp_osd
),
414 '--journal-path', JPATH
.format(id=imp_osd
),
415 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
419 if proc
.exitstatus
== 1:
420 bogosity
= "The OSD you are using is older than the exported PG"
421 if bogosity
in proc
.stderr
.getvalue():
422 self
.log("OSD older than exported PG"
424 elif proc
.exitstatus
== 10:
425 self
.log("Pool went away before processing an import"
427 elif proc
.exitstatus
== 11:
428 self
.log("Attempt to import an incompatible export"
430 elif proc
.exitstatus
== 12:
431 # this should be safe to ignore because we only ever move 1
432 # copy of the pg at a time, and merge is only initiated when
433 # all replicas are peered and happy. /me crosses fingers
434 self
.log("PG merged on target"
436 elif proc
.exitstatus
:
437 raise Exception("ceph-objectstore-tool: "
438 "import failure with status {ret}".
439 format(ret
=proc
.exitstatus
))
440 cmd
= "sudo rm -f {file}".format(file=exp_host_path
)
441 exp_remote
.run(args
=cmd
)
442 if imp_remote
!= exp_remote
:
443 imp_remote
.run(args
=cmd
)
445 # apply low split settings to each pool
446 if not self
.ceph_manager
.cephadm
:
447 for pool
in self
.ceph_manager
.list_pools():
448 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
449 "--filestore-split-multiple 1' sudo -E "
450 + 'ceph-objectstore-tool '
451 + ' '.join(prefix
+ [
452 '--data-path', FSPATH
.format(id=imp_osd
),
453 '--journal-path', JPATH
.format(id=imp_osd
),
455 + " --op apply-layout-settings --pool " + pool
).format(id=osd
)
456 proc
= imp_remote
.run(args
=cmd
,
457 wait
=True, check_status
=False,
459 if 'Couldn\'t find pool' in proc
.stderr
.getvalue():
462 raise Exception("ceph-objectstore-tool apply-layout-settings"
463 " failed with {status}".format(status
=proc
.exitstatus
))
466 def blackhole_kill_osd(self
, osd
=None):
468 If all else fails, kill the osd.
469 :param osd: Osd to be killed.
472 osd
= random
.choice(self
.live_osds
)
473 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
474 (str(osd
), str(self
.live_osds
)))
475 self
.live_osds
.remove(osd
)
476 self
.dead_osds
.append(osd
)
477 self
.ceph_manager
.blackhole_kill_osd(osd
)
479 def revive_osd(self
, osd
=None, skip_admin_check
=False):
482 :param osd: Osd to be revived.
485 osd
= random
.choice(self
.dead_osds
)
486 self
.log("Reviving osd %s" % (str(osd
),))
487 self
.ceph_manager
.revive_osd(
490 skip_admin_check
=skip_admin_check
)
491 self
.dead_osds
.remove(osd
)
492 self
.live_osds
.append(osd
)
493 if self
.random_eio
> 0 and osd
== self
.rerrosd
:
494 self
.ceph_manager
.set_config(self
.rerrosd
,
495 filestore_debug_random_read_err
= self
.random_eio
)
496 self
.ceph_manager
.set_config(self
.rerrosd
,
497 bluestore_debug_random_read_err
= self
.random_eio
)
500 def out_osd(self
, osd
=None):
503 :param osd: Osd to be marked.
506 osd
= random
.choice(self
.in_osds
)
507 self
.log("Removing osd %s, in_osds are: %s" %
508 (str(osd
), str(self
.in_osds
)))
509 self
.ceph_manager
.mark_out_osd(osd
)
510 self
.in_osds
.remove(osd
)
511 self
.out_osds
.append(osd
)
513 def in_osd(self
, osd
=None):
516 :param osd: Osd to be marked.
519 osd
= random
.choice(self
.out_osds
)
520 if osd
in self
.dead_osds
:
521 return self
.revive_osd(osd
)
522 self
.log("Adding osd %s" % (str(osd
),))
523 self
.out_osds
.remove(osd
)
524 self
.in_osds
.append(osd
)
525 self
.ceph_manager
.mark_in_osd(osd
)
526 self
.log("Added osd %s" % (str(osd
),))
528 def reweight_osd_or_by_util(self
, osd
=None):
530 Reweight an osd that is in
531 :param osd: Osd to be marked.
533 if osd
is not None or random
.choice([True, False]):
535 osd
= random
.choice(self
.in_osds
)
536 val
= random
.uniform(.1, 1.0)
537 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
538 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
541 # do it several times, the option space is large
544 'max_change': random
.choice(['0.05', '1.0', '3.0']),
545 'overage': random
.choice(['110', '1000']),
546 'type': random
.choice([
547 'reweight-by-utilization',
548 'test-reweight-by-utilization']),
550 self
.log("Reweighting by: %s"%(str(options
),))
551 self
.ceph_manager
.raw_cluster_cmd(
555 options
['max_change'])
557 def primary_affinity(self
, osd
=None):
559 osd
= random
.choice(self
.in_osds
)
560 if random
.random() >= .5:
562 elif random
.random() >= .5:
566 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
567 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
570 def thrash_cluster_full(self
):
572 Set and unset cluster full condition
574 self
.log('Setting full ratio to .001')
575 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
577 self
.log('Setting full ratio back to .95')
578 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
580 def thrash_pg_upmap(self
):
582 Install or remove random pg_upmap entries in OSDMap
584 from random
import shuffle
585 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
587 self
.log('j is %s' % j
)
589 if random
.random() >= .3:
590 pgs
= self
.ceph_manager
.get_pg_stats()
593 pg
= random
.choice(pgs
)
594 pgid
= str(pg
['pgid'])
595 poolid
= int(pgid
.split('.')[0])
596 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
600 osds
= self
.in_osds
+ self
.out_osds
603 self
.log('Setting %s to %s' % (pgid
, osds
))
604 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
605 self
.log('cmd %s' % cmd
)
606 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
612 self
.log('Clearing pg_upmap on %s' % pg
)
613 self
.ceph_manager
.raw_cluster_cmd(
618 self
.log('No pg_upmap entries; doing nothing')
619 except CommandFailedError
:
620 self
.log('Failed to rm-pg-upmap, ignoring')
622 def thrash_pg_upmap_items(self
):
624 Install or remove random pg_upmap_items entries in OSDMap
626 from random
import shuffle
627 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
629 self
.log('j is %s' % j
)
631 if random
.random() >= .3:
632 pgs
= self
.ceph_manager
.get_pg_stats()
635 pg
= random
.choice(pgs
)
636 pgid
= str(pg
['pgid'])
637 poolid
= int(pgid
.split('.')[0])
638 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
642 osds
= self
.in_osds
+ self
.out_osds
645 self
.log('Setting %s to %s' % (pgid
, osds
))
646 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
647 self
.log('cmd %s' % cmd
)
648 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
650 m
= j
['pg_upmap_items']
654 self
.log('Clearing pg_upmap on %s' % pg
)
655 self
.ceph_manager
.raw_cluster_cmd(
660 self
.log('No pg_upmap entries; doing nothing')
661 except CommandFailedError
:
662 self
.log('Failed to rm-pg-upmap-items, ignoring')
664 def force_recovery(self
):
666 Force recovery on some of PGs
668 backfill
= random
.random() >= 0.5
669 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
673 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
675 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
676 except CommandFailedError
:
677 self
.log('Failed to force backfill|recovery, ignoring')
680 def cancel_force_recovery(self
):
682 Force recovery on some of PGs
684 backfill
= random
.random() >= 0.5
685 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
689 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
691 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
692 except CommandFailedError
:
693 self
.log('Failed to force backfill|recovery, ignoring')
695 def force_cancel_recovery(self
):
697 Force or cancel forcing recovery
699 if random
.random() >= 0.4:
700 self
.force_recovery()
702 self
.cancel_force_recovery()
706 Make sure all osds are up and not out.
708 while len(self
.dead_osds
) > 0:
709 self
.log("reviving osd")
711 while len(self
.out_osds
) > 0:
712 self
.log("inning osd")
717 Make sure all osds are up and fully in.
720 for osd
in self
.live_osds
:
721 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
723 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
728 Break out of this Ceph loop
732 if self
.sighup_delay
:
733 self
.log("joining the do_sighup greenlet")
734 self
.sighup_thread
.get()
735 if self
.optrack_toggle_delay
:
736 self
.log("joining the do_optrack_toggle greenlet")
737 self
.optrack_toggle_thread
.join()
738 if self
.dump_ops_enable
== "true":
739 self
.log("joining the do_dump_ops greenlet")
740 self
.dump_ops_thread
.join()
741 if self
.noscrub_toggle_delay
:
742 self
.log("joining the do_noscrub_toggle greenlet")
743 self
.noscrub_toggle_thread
.join()
747 Increase the size of the pool
749 pool
= self
.ceph_manager
.get_pool()
752 self
.log("Growing pool %s" % (pool
,))
753 if self
.ceph_manager
.expand_pool(pool
,
754 self
.config
.get('pool_grow_by', 10),
756 self
.pools_to_fix_pgp_num
.add(pool
)
758 def shrink_pool(self
):
760 Decrease the size of the pool
762 pool
= self
.ceph_manager
.get_pool()
765 _
= self
.ceph_manager
.get_pool_pg_num(pool
)
766 self
.log("Shrinking pool %s" % (pool
,))
767 if self
.ceph_manager
.contract_pool(
769 self
.config
.get('pool_shrink_by', 10),
771 self
.pools_to_fix_pgp_num
.add(pool
)
773 def fix_pgp_num(self
, pool
=None):
775 Fix number of pgs in pool.
778 pool
= self
.ceph_manager
.get_pool()
784 self
.log("fixing pg num pool %s" % (pool
,))
785 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
786 self
.pools_to_fix_pgp_num
.discard(pool
)
788 def test_pool_min_size(self
):
790 Loop to selectively push PGs below their min_size and test that recovery
793 self
.log("test_pool_min_size")
795 self
.ceph_manager
.wait_for_recovery(
796 timeout
=self
.config
.get('timeout')
799 minout
= int(self
.config
.get("min_out", 1))
800 minlive
= int(self
.config
.get("min_live", 2))
801 mindead
= int(self
.config
.get("min_dead", 1))
802 self
.log("doing min_size thrashing")
803 self
.ceph_manager
.wait_for_clean(timeout
=60)
804 assert self
.ceph_manager
.is_clean(), \
805 'not clean before minsize thrashing starts'
806 while not self
.stopping
:
807 # look up k and m from all the pools on each loop, in case it
808 # changes as the cluster runs
812 pools_json
= self
.ceph_manager
.get_osd_dump_json()['pools']
814 for pool_json
in pools_json
:
815 pool
= pool_json
['pool_name']
817 pool_type
= pool_json
['type'] # 1 for rep, 3 for ec
818 min_size
= pool_json
['min_size']
819 self
.log("pool {pool} min_size is {min_size}".format(pool
=pool
,min_size
=min_size
))
821 ec_profile
= self
.ceph_manager
.get_pool_property(pool
, 'erasure_code_profile')
822 if pool_type
!= PoolType
.ERASURE_CODED
:
824 ec_profile
= pool_json
['erasure_code_profile']
825 ec_profile_json
= self
.ceph_manager
.raw_cluster_cmd(
827 'erasure-code-profile',
831 ec_json
= json
.loads(ec_profile_json
)
832 local_k
= int(ec_json
['k'])
833 local_m
= int(ec_json
['m'])
834 self
.log("pool {pool} local_k={k} local_m={m}".format(pool
=pool
,
835 k
=local_k
, m
=local_m
))
837 self
.log("setting k={local_k} from previous {k}".format(local_k
=local_k
, k
=k
))
840 self
.log("setting m={local_m} from previous {m}".format(local_m
=local_m
, m
=m
))
842 except CommandFailedError
:
843 self
.log("failed to read erasure_code_profile. %s was likely removed", pool
)
847 self
.log("using k={k}, m={m}".format(k
=k
,m
=m
))
849 self
.log("No pools yet, waiting")
853 if minout
> len(self
.out_osds
): # kill OSDs and mark out
854 self
.log("forced to out an osd")
855 self
.kill_osd(mark_out
=True)
857 elif mindead
> len(self
.dead_osds
): # kill OSDs but force timeout
858 self
.log("forced to kill an osd")
861 else: # make mostly-random choice to kill or revive OSDs
862 minup
= max(minlive
, k
)
863 rand_val
= random
.uniform(0, 1)
864 self
.log("choosing based on number of live OSDs and rand val {rand}".\
865 format(rand
=rand_val
))
866 if len(self
.live_osds
) > minup
+1 and rand_val
< 0.5:
867 # chose to knock out as many OSDs as we can w/out downing PGs
869 most_killable
= min(len(self
.live_osds
) - minup
, m
)
870 self
.log("chose to kill {n} OSDs".format(n
=most_killable
))
871 for i
in range(1, most_killable
):
872 self
.kill_osd(mark_out
=True)
874 # try a few times since there might be a concurrent pool
875 # creation or deletion
878 action
='check for active or peered') as proceed
:
880 if self
.ceph_manager
.all_active_or_peered():
882 self
.log('not all PGs are active or peered')
883 else: # chose to revive OSDs, bring up a random fraction of the dead ones
884 self
.log("chose to revive osds")
885 for i
in range(1, int(rand_val
* len(self
.dead_osds
))):
888 # let PGs repair themselves or our next knockout might kill one
889 self
.ceph_manager
.wait_for_clean(timeout
=self
.config
.get('timeout'))
891 # / while not self.stopping
894 self
.ceph_manager
.wait_for_recovery(
895 timeout
=self
.config
.get('timeout')
898 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
900 Pause injection testing. Check for osd being down when finished.
902 the_one
= random
.choice(self
.live_osds
)
903 self
.log("inject_pause on {osd}".format(osd
=the_one
))
905 "Testing {key} pause injection for duration {duration}".format(
910 "Checking after {after}, should_be_down={shouldbedown}".format(
912 shouldbedown
=should_be_down
914 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
915 if not should_be_down
:
917 time
.sleep(check_after
)
918 status
= self
.ceph_manager
.get_osd_status()
919 assert the_one
in status
['down']
920 time
.sleep(duration
- check_after
+ 20)
921 status
= self
.ceph_manager
.get_osd_status()
922 assert not the_one
in status
['down']
924 def test_backfill_full(self
):
926 Test backfills stopping when the replica fills up.
928 First, use injectfull admin command to simulate a now full
929 osd by setting it to 0 on all of the OSDs.
931 Second, on a random subset, set
932 osd_debug_skip_full_check_in_backfill_reservation to force
933 the more complicated check in do_scan to be exercised.
935 Then, verify that all backfillings stop.
937 self
.log("injecting backfill full")
938 for i
in self
.live_osds
:
939 self
.ceph_manager
.set_config(
941 osd_debug_skip_full_check_in_backfill_reservation
=
942 random
.choice(['false', 'true']))
943 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
944 check_status
=True, timeout
=30, stdout
=DEVNULL
)
946 status
= self
.ceph_manager
.compile_pg_status()
947 if 'backfilling' not in status
.keys():
950 "waiting for {still_going} backfillings".format(
951 still_going
=status
.get('backfilling')))
953 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
954 for i
in self
.live_osds
:
955 self
.ceph_manager
.set_config(
957 osd_debug_skip_full_check_in_backfill_reservation
='false')
958 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
959 check_status
=True, timeout
=30, stdout
=DEVNULL
)
961 def test_map_discontinuity(self
):
963 1) Allows the osds to recover
965 3) allows the remaining osds to recover
966 4) waits for some time
968 This sequence should cause the revived osd to have to handle
969 a map gap since the mons would have trimmed
971 while len(self
.in_osds
) < (self
.minin
+ 1):
973 self
.log("Waiting for recovery")
974 self
.ceph_manager
.wait_for_all_osds_up(
975 timeout
=self
.config
.get('timeout')
977 # now we wait 20s for the pg status to change, if it takes longer,
978 # the test *should* fail!
980 self
.ceph_manager
.wait_for_clean(
981 timeout
=self
.config
.get('timeout')
984 # now we wait 20s for the backfill replicas to hear about the clean
986 self
.log("Recovered, killing an osd")
987 self
.kill_osd(mark_down
=True, mark_out
=True)
988 self
.log("Waiting for clean again")
989 self
.ceph_manager
.wait_for_clean(
990 timeout
=self
.config
.get('timeout')
992 self
.log("Waiting for trim")
993 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
996 def choose_action(self
):
998 Random action selector.
1000 chance_down
= self
.config
.get('chance_down', 0.4)
1001 _
= self
.config
.get('chance_test_min_size', 0)
1002 chance_test_backfill_full
= \
1003 self
.config
.get('chance_test_backfill_full', 0)
1004 if isinstance(chance_down
, int):
1005 chance_down
= float(chance_down
) / 100
1007 minout
= int(self
.config
.get("min_out", 0))
1008 minlive
= int(self
.config
.get("min_live", 2))
1009 mindead
= int(self
.config
.get("min_dead", 0))
1011 self
.log('choose_action: min_in %d min_out '
1012 '%d min_live %d min_dead %d' %
1013 (minin
, minout
, minlive
, mindead
))
1015 if len(self
.in_osds
) > minin
:
1016 actions
.append((self
.out_osd
, 1.0,))
1017 if len(self
.live_osds
) > minlive
and chance_down
> 0:
1018 actions
.append((self
.kill_osd
, chance_down
,))
1019 if len(self
.out_osds
) > minout
:
1020 actions
.append((self
.in_osd
, 1.7,))
1021 if len(self
.dead_osds
) > mindead
:
1022 actions
.append((self
.revive_osd
, 1.0,))
1023 if self
.config
.get('thrash_primary_affinity', True):
1024 actions
.append((self
.primary_affinity
, 1.0,))
1025 actions
.append((self
.reweight_osd_or_by_util
,
1026 self
.config
.get('reweight_osd', .5),))
1027 actions
.append((self
.grow_pool
,
1028 self
.config
.get('chance_pgnum_grow', 0),))
1029 actions
.append((self
.shrink_pool
,
1030 self
.config
.get('chance_pgnum_shrink', 0),))
1031 actions
.append((self
.fix_pgp_num
,
1032 self
.config
.get('chance_pgpnum_fix', 0),))
1033 actions
.append((self
.test_pool_min_size
,
1034 self
.config
.get('chance_test_min_size', 0),))
1035 actions
.append((self
.test_backfill_full
,
1036 chance_test_backfill_full
,))
1037 if self
.chance_thrash_cluster_full
> 0:
1038 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
1039 if self
.chance_thrash_pg_upmap
> 0:
1040 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
1041 if self
.chance_thrash_pg_upmap_items
> 0:
1042 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
1043 if self
.chance_force_recovery
> 0:
1044 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
1046 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1049 self
.inject_pause(key
,
1050 self
.config
.get('pause_short', 3),
1053 self
.config
.get('chance_inject_pause_short', 1),),
1055 self
.inject_pause(key
,
1056 self
.config
.get('pause_long', 80),
1057 self
.config
.get('pause_check_after', 70),
1059 self
.config
.get('chance_inject_pause_long', 0),)]:
1060 actions
.append(scenario
)
1062 total
= sum([y
for (x
, y
) in actions
])
1063 val
= random
.uniform(0, total
)
1064 for (action
, prob
) in actions
:
1070 def do_thrash(self
):
1072 _do_thrash() wrapper.
1076 except Exception as e
:
1077 # See _run exception comment for MDSThrasher
1078 self
.set_thrasher_exception(e
)
1079 self
.logger
.exception("exception:")
1080 # Allow successful completion so gevent doesn't see an exception.
1081 # The DaemonWatchdog will observe the error and tear down the test.
1084 def do_sighup(self
):
1086 Loops and sends signal.SIGHUP to a random live osd.
1088 Loop delay is controlled by the config value sighup_delay.
1090 delay
= float(self
.sighup_delay
)
1091 self
.log("starting do_sighup with a delay of {0}".format(delay
))
1092 while not self
.stopping
:
1093 osd
= random
.choice(self
.live_osds
)
1094 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
1098 def do_optrack_toggle(self
):
1100 Loops and toggle op tracking to all osds.
1102 Loop delay is controlled by the config value optrack_toggle_delay.
1104 delay
= float(self
.optrack_toggle_delay
)
1106 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
1107 while not self
.stopping
:
1108 if osd_state
== "true":
1113 self
.ceph_manager
.inject_args('osd', '*',
1114 'osd_enable_op_tracker',
1116 except CommandFailedError
:
1117 self
.log('Failed to tell all osds, ignoring')
1121 def do_dump_ops(self
):
1123 Loops and does op dumps on all osds
1125 self
.log("starting do_dump_ops")
1126 while not self
.stopping
:
1127 for osd
in self
.live_osds
:
1128 # Ignore errors because live_osds is in flux
1129 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
1130 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1131 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
1132 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1133 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
1134 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1138 def do_noscrub_toggle(self
):
1140 Loops and toggle noscrub flags
1142 Loop delay is controlled by the config value noscrub_toggle_delay.
1144 delay
= float(self
.noscrub_toggle_delay
)
1145 scrub_state
= "none"
1146 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
1147 while not self
.stopping
:
1148 if scrub_state
== "none":
1149 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
1150 scrub_state
= "noscrub"
1151 elif scrub_state
== "noscrub":
1152 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1153 scrub_state
= "both"
1154 elif scrub_state
== "both":
1155 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1156 scrub_state
= "nodeep-scrub"
1158 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1159 scrub_state
= "none"
1161 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1162 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1165 def _do_thrash(self
):
1167 Loop to select random actions to thrash ceph manager with.
1169 cleanint
= self
.config
.get("clean_interval", 60)
1170 scrubint
= self
.config
.get("scrub_interval", -1)
1171 maxdead
= self
.config
.get("max_dead", 0)
1172 delay
= self
.config
.get("op_delay", 5)
1173 self
.rerrosd
= self
.live_osds
[0]
1174 if self
.random_eio
> 0:
1175 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1176 'filestore_debug_random_read_err',
1178 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1179 'bluestore_debug_random_read_err',
1181 self
.log("starting do_thrash")
1182 while not self
.stopping
:
1183 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1184 "out_osds: ", self
.out_osds
,
1185 "dead_osds: ", self
.dead_osds
,
1186 "live_osds: ", self
.live_osds
]]
1187 self
.log(" ".join(to_log
))
1188 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1189 while len(self
.dead_osds
) > maxdead
:
1191 for osd
in self
.in_osds
:
1192 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1194 if random
.uniform(0, 1) < float(
1195 self
.config
.get('chance_test_map_discontinuity', 0)) \
1196 and len(self
.live_osds
) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1197 self
.test_map_discontinuity()
1199 self
.ceph_manager
.wait_for_recovery(
1200 timeout
=self
.config
.get('timeout')
1202 time
.sleep(self
.clean_wait
)
1204 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1205 self
.log('Scrubbing while thrashing being performed')
1206 Scrubber(self
.ceph_manager
, self
.config
)
1207 self
.choose_action()()
1210 if self
.random_eio
> 0:
1211 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1212 'filestore_debug_random_read_err', '0.0')
1213 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1214 'bluestore_debug_random_read_err', '0.0')
1215 for pool
in list(self
.pools_to_fix_pgp_num
):
1216 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1217 self
.fix_pgp_num(pool
)
1218 self
.pools_to_fix_pgp_num
.clear()
1219 for service
, opt
, saved_value
in self
.saved_options
:
1220 self
.ceph_manager
.inject_args(service
, '*', opt
, saved_value
)
1221 self
.saved_options
= []
1225 class ObjectStoreTool
:
1227 def __init__(self
, manager
, pool
, **kwargs
):
1228 self
.manager
= manager
1230 self
.osd
= kwargs
.get('osd', None)
1231 self
.object_name
= kwargs
.get('object_name', None)
1232 self
.do_revive
= kwargs
.get('do_revive', True)
1233 if self
.osd
and self
.pool
and self
.object_name
:
1234 if self
.osd
== "primary":
1235 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1238 if self
.object_name
:
1239 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1242 self
.remote
= self
.manager
.ctx
.\
1243 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()[0]
1244 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1245 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1248 def build_cmd(self
, options
, args
, stdin
):
1250 if self
.object_name
:
1251 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1252 "{paths} --pgid {pgid} --op list |"
1253 "grep '\"oid\":\"{name}\"')".
1254 format(paths
=self
.paths
,
1256 name
=self
.object_name
))
1257 args
= '"$object" ' + args
1258 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1259 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1260 format(paths
=self
.paths
,
1264 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1265 format(payload
=base64
.encode(stdin
),
1268 return "\n".join(lines
)
1270 def run(self
, options
, args
, stdin
=None, stdout
=None):
1273 self
.manager
.kill_osd(self
.osd
)
1274 cmd
= self
.build_cmd(options
, args
, stdin
)
1275 self
.manager
.log(cmd
)
1277 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1282 if proc
.exitstatus
!= 0:
1283 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1284 error
= six
.ensure_str(proc
.stdout
.getvalue()) + " " + \
1285 six
.ensure_str(proc
.stderr
.getvalue())
1286 raise Exception(error
)
1289 self
.manager
.revive_osd(self
.osd
)
1290 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1293 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1297 Ceph manager object.
1298 Contains several local functions that form a bulk of this module.
1300 :param controller: the remote machine where the Ceph commands should be
1302 :param ctx: the cluster context
1303 :param config: path to Ceph config file
1304 :param logger: for logging messages
1305 :param cluster: name of the Ceph cluster
1308 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1309 cluster
='ceph', cephadm
=False):
1310 self
.lock
= threading
.RLock()
1312 self
.config
= config
1313 self
.controller
= controller
1314 self
.next_pool_id
= 0
1315 self
.cluster
= cluster
1316 self
.cephadm
= cephadm
1318 self
.log
= lambda x
: logger
.info(x
)
1322 implement log behavior.
1326 if self
.config
is None:
1327 self
.config
= dict()
1328 pools
= self
.list_pools()
1331 # we may race with a pool deletion; ignore failures here
1333 self
.pools
[pool
] = self
.get_pool_int_property(pool
, 'pg_num')
1334 except CommandFailedError
:
1335 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1337 def raw_cluster_cmd(self
, *args
):
1339 Start ceph on a raw cluster. Return count
1342 proc
= shell(self
.ctx
, self
.cluster
, self
.controller
,
1343 args
=['ceph'] + list(args
),
1346 testdir
= teuthology
.get_testdir(self
.ctx
)
1351 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1359 ceph_args
.extend(args
)
1360 proc
= self
.controller
.run(
1364 return six
.ensure_str(proc
.stdout
.getvalue())
1366 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1368 Start ceph on a cluster. Return success or failure information.
1371 proc
= shell(self
.ctx
, self
.cluster
, self
.controller
,
1372 args
=['ceph'] + list(args
),
1375 testdir
= teuthology
.get_testdir(self
.ctx
)
1380 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1387 ceph_args
.extend(args
)
1388 kwargs
['args'] = ceph_args
1389 kwargs
['check_status'] = False
1390 proc
= self
.controller
.run(**kwargs
)
1391 return proc
.exitstatus
1393 def run_ceph_w(self
, watch_channel
=None):
1395 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1396 and return the RemoteProcess.
1398 :param watch_channel: Specifies the channel to be watched. This can be
1399 'cluster', 'audit', ...
1400 :type watch_channel: str
1409 if watch_channel
is not None:
1410 args
.append("--watch-channel")
1411 args
.append(watch_channel
)
1412 return self
.controller
.run(args
=args
, wait
=False, stdout
=BytesIO(), stdin
=run
.PIPE
)
1414 def get_mon_socks(self
):
1416 Get monitor sockets.
1418 :return socks: tuple of strings; strings are individual sockets.
1420 from json
import loads
1422 output
= loads(self
.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1424 for mon
in output
['mons']:
1425 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1426 socks
.append(addrvec_mem
['addr'])
1429 def get_msgrv1_mon_socks(self
):
1431 Get monitor sockets that use msgrv1 to operate.
1433 :return socks: tuple of strings; strings are individual sockets.
1435 from json
import loads
1437 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1439 for mon
in output
['mons']:
1440 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1441 if addrvec_mem
['type'] == 'v1':
1442 socks
.append(addrvec_mem
['addr'])
1445 def get_msgrv2_mon_socks(self
):
1447 Get monitor sockets that use msgrv2 to operate.
1449 :return socks: tuple of strings; strings are individual sockets.
1451 from json
import loads
1453 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1455 for mon
in output
['mons']:
1456 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1457 if addrvec_mem
['type'] == 'v2':
1458 socks
.append(addrvec_mem
['addr'])
1461 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1463 Flush pg stats from a list of OSD ids, ensuring they are reflected
1464 all the way to the monitor. Luminous and later only.
1466 :param osds: list of OSDs to flush
1467 :param no_wait: list of OSDs not to wait for seq id. by default, we
1468 wait for all specified osds, but some of them could be
1469 moved out of osdmap, so we cannot get their updated
1470 stat seq from monitor anymore. in that case, you need
1471 to pass a blacklist.
1472 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1473 it. (5 min by default)
1475 seq
= {osd
: int(self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats'))
1477 if not wait_for_mon
:
1481 for osd
, need
in seq
.items():
1485 while wait_for_mon
> 0:
1486 got
= int(self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
))
1487 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1488 need
=need
, got
=got
, osd
=osd
))
1493 wait_for_mon
-= A_WHILE
1495 raise Exception('timed out waiting for mon to be updated with '
1496 'osd.{osd}: {got} < {need}'.
1497 format(osd
=osd
, got
=got
, need
=need
))
1499 def flush_all_pg_stats(self
):
1500 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1502 def do_rados(self
, remote
, cmd
, check_status
=True):
1504 Execute a remote rados command.
1506 testdir
= teuthology
.get_testdir(self
.ctx
)
1510 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1519 check_status
=check_status
1523 def rados_write_objects(self
, pool
, num_objects
, size
,
1524 timelimit
, threads
, cleanup
=False):
1527 Threads not used yet.
1531 '--num-objects', num_objects
,
1537 args
.append('--no-cleanup')
1538 return self
.do_rados(self
.controller
, map(str, args
))
1540 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1542 Implement rados put operation
1545 if namespace
is not None:
1546 args
+= ['-N', namespace
]
1552 return self
.do_rados(
1558 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1560 Implement rados get operation
1563 if namespace
is not None:
1564 args
+= ['-N', namespace
]
1570 return self
.do_rados(
1576 def do_rm(self
, pool
, obj
, namespace
=None):
1578 Implement rados rm operation
1581 if namespace
is not None:
1582 args
+= ['-N', namespace
]
1587 return self
.do_rados(
1593 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1596 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1598 def find_remote(self
, service_type
, service_id
):
1600 Get the Remote for the host where a particular service runs.
1602 :param service_type: 'mds', 'osd', 'client'
1603 :param service_id: The second part of a role, e.g. '0' for
1605 :return: a Remote instance for the host where the
1606 requested role is placed
1608 return get_remote(self
.ctx
, self
.cluster
,
1609 service_type
, service_id
)
1611 def admin_socket(self
, service_type
, service_id
,
1612 command
, check_status
=True, timeout
=0, stdout
=None):
1614 Remotely start up ceph specifying the admin socket
1615 :param command: a list of words to use as the command
1621 remote
= self
.find_remote(service_type
, service_id
)
1625 self
.ctx
, self
.cluster
, remote
,
1627 'ceph', 'daemon', '%s.%s' % (service_type
, service_id
),
1631 check_status
=check_status
,
1634 testdir
= teuthology
.get_testdir(self
.ctx
)
1639 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1646 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1647 cluster
=self
.cluster
,
1651 args
.extend(command
)
1656 check_status
=check_status
1659 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1660 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1662 def get_pgid(self
, pool
, pgnum
):
1664 :param pool: pool name
1665 :param pgnum: pg number
1666 :returns: a string representing this pg.
1668 poolnum
= self
.get_pool_num(pool
)
1669 pg_str
= "{poolnum}.{pgnum}".format(
1674 def get_pg_replica(self
, pool
, pgnum
):
1676 get replica for pool, pgnum (e.g. (data, 0)->0
1678 pg_str
= self
.get_pgid(pool
, pgnum
)
1679 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1680 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1681 return int(j
['acting'][-1])
1684 def wait_for_pg_stats(func
):
1685 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1686 # by default, and take the faulty injection in ms into consideration,
1687 # 12 seconds are more than enough
1688 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1690 def wrapper(self
, *args
, **kwargs
):
1692 for delay
in delays
:
1694 return func(self
, *args
, **kwargs
)
1695 except AssertionError as e
:
1701 def get_pg_primary(self
, pool
, pgnum
):
1703 get primary for pool, pgnum (e.g. (data, 0)->0
1705 pg_str
= self
.get_pgid(pool
, pgnum
)
1706 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1707 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1708 return int(j
['acting'][0])
1711 def get_pool_num(self
, pool
):
1713 get number for pool (e.g., data -> 2)
1715 return int(self
.get_pool_dump(pool
)['pool'])
1717 def list_pools(self
):
1721 osd_dump
= self
.get_osd_dump_json()
1722 self
.log(osd_dump
['pools'])
1723 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1725 def clear_pools(self
):
1729 [self
.remove_pool(i
) for i
in self
.list_pools()]
1731 def kick_recovery_wq(self
, osdnum
):
1733 Run kick_recovery_wq on cluster.
1735 return self
.raw_cluster_cmd(
1736 'tell', "osd.%d" % (int(osdnum
),),
1741 def wait_run_admin_socket(self
, service_type
,
1742 service_id
, args
=['version'], timeout
=75, stdout
=None):
1744 If osd_admin_socket call succeeds, return. Otherwise wait
1745 five seconds and try again.
1751 proc
= self
.admin_socket(service_type
, service_id
,
1752 args
, check_status
=False, stdout
=stdout
)
1753 if proc
.exitstatus
== 0:
1757 if (tries
* 5) > timeout
:
1758 raise Exception('timed out waiting for admin_socket '
1759 'to appear after {type}.{id} restart'.
1760 format(type=service_type
,
1762 self
.log("waiting on admin_socket for {type}-{id}, "
1763 "{command}".format(type=service_type
,
1768 def get_pool_dump(self
, pool
):
1770 get the osd dump part of a pool
1772 osd_dump
= self
.get_osd_dump_json()
1773 for i
in osd_dump
['pools']:
1774 if i
['pool_name'] == pool
:
1778 def get_config(self
, service_type
, service_id
, name
):
1780 :param node: like 'mon.a'
1781 :param name: the option name
1783 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1785 j
= json
.loads(proc
.stdout
.getvalue())
1788 def inject_args(self
, service_type
, service_id
, name
, value
):
1789 whom
= '{0}.{1}'.format(service_type
, service_id
)
1790 if isinstance(value
, bool):
1791 value
= 'true' if value
else 'false'
1792 opt_arg
= '--{name}={value}'.format(name
=name
, value
=value
)
1793 self
.raw_cluster_cmd('--', 'tell', whom
, 'injectargs', opt_arg
)
1795 def set_config(self
, osdnum
, **argdict
):
1797 :param osdnum: osd number
1798 :param argdict: dictionary containing values to set.
1800 for k
, v
in argdict
.items():
1801 self
.wait_run_admin_socket(
1803 ['config', 'set', str(k
), str(v
)])
1805 def raw_cluster_status(self
):
1807 Get status from cluster
1809 status
= self
.raw_cluster_cmd('status', '--format=json')
1810 return json
.loads(status
)
1812 def raw_osd_status(self
):
1814 Get osd status from cluster
1816 return self
.raw_cluster_cmd('osd', 'dump')
1818 def get_osd_status(self
):
1820 Get osd statuses sorted by states that the osds are in.
1823 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1824 self
.raw_osd_status().split('\n'))
1826 in_osds
= [int(i
[4:].split()[0])
1827 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1828 out_osds
= [int(i
[4:].split()[0])
1829 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1830 up_osds
= [int(i
[4:].split()[0])
1831 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1832 down_osds
= [int(i
[4:].split()[0])
1833 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1834 dead_osds
= [int(x
.id_
)
1835 for x
in filter(lambda x
:
1838 iter_daemons_of_role('osd', self
.cluster
))]
1839 live_osds
= [int(x
.id_
) for x
in
1842 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1844 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1845 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1848 def get_num_pgs(self
):
1850 Check cluster status for the number of pgs
1852 status
= self
.raw_cluster_status()
1854 return status
['pgmap']['num_pgs']
1856 def create_erasure_code_profile(self
, profile_name
, profile
):
1858 Create an erasure code profile name that can be used as a parameter
1859 when creating an erasure coded pool.
1862 args
= cmd_erasure_code_profile(profile_name
, profile
)
1863 self
.raw_cluster_cmd(*args
)
1865 def create_pool_with_unique_name(self
, pg_num
=16,
1866 erasure_code_profile_name
=None,
1868 erasure_code_use_overwrites
=False):
1870 Create a pool named unique_pool_X where X is unique.
1874 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1875 self
.next_pool_id
+= 1
1879 erasure_code_profile_name
=erasure_code_profile_name
,
1881 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1884 @contextlib.contextmanager
1885 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1886 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1888 self
.remove_pool(pool_name
)
1890 def create_pool(self
, pool_name
, pg_num
=16,
1891 erasure_code_profile_name
=None,
1893 erasure_code_use_overwrites
=False):
1895 Create a pool named from the pool_name parameter.
1896 :param pool_name: name of the pool being created.
1897 :param pg_num: initial number of pgs.
1898 :param erasure_code_profile_name: if set and !None create an
1899 erasure coded pool using the profile
1900 :param erasure_code_use_overwrites: if true, allow overwrites
1903 assert isinstance(pool_name
, six
.string_types
)
1904 assert isinstance(pg_num
, int)
1905 assert pool_name
not in self
.pools
1906 self
.log("creating pool_name %s" % (pool_name
,))
1907 if erasure_code_profile_name
:
1908 self
.raw_cluster_cmd('osd', 'pool', 'create',
1909 pool_name
, str(pg_num
), str(pg_num
),
1910 'erasure', erasure_code_profile_name
)
1912 self
.raw_cluster_cmd('osd', 'pool', 'create',
1913 pool_name
, str(pg_num
))
1914 if min_size
is not None:
1915 self
.raw_cluster_cmd(
1916 'osd', 'pool', 'set', pool_name
,
1919 if erasure_code_use_overwrites
:
1920 self
.raw_cluster_cmd(
1921 'osd', 'pool', 'set', pool_name
,
1922 'allow_ec_overwrites',
1924 self
.raw_cluster_cmd(
1925 'osd', 'pool', 'application', 'enable',
1926 pool_name
, 'rados', '--yes-i-really-mean-it',
1927 run
.Raw('||'), 'true')
1928 self
.pools
[pool_name
] = pg_num
1931 def add_pool_snap(self
, pool_name
, snap_name
):
1934 :param pool_name: name of pool to snapshot
1935 :param snap_name: name of snapshot to take
1937 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1938 str(pool_name
), str(snap_name
))
1940 def remove_pool_snap(self
, pool_name
, snap_name
):
1942 Remove pool snapshot
1943 :param pool_name: name of pool to snapshot
1944 :param snap_name: name of snapshot to remove
1946 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1947 str(pool_name
), str(snap_name
))
1949 def remove_pool(self
, pool_name
):
1951 Remove the indicated pool
1952 :param pool_name: Pool to be removed
1955 assert isinstance(pool_name
, six
.string_types
)
1956 assert pool_name
in self
.pools
1957 self
.log("removing pool_name %s" % (pool_name
,))
1958 del self
.pools
[pool_name
]
1959 self
.raw_cluster_cmd('osd', 'pool', 'rm', pool_name
, pool_name
,
1960 "--yes-i-really-really-mean-it")
1968 return random
.choice(self
.pools
.keys())
1970 def get_pool_pg_num(self
, pool_name
):
1972 Return the number of pgs in the pool specified.
1975 assert isinstance(pool_name
, six
.string_types
)
1976 if pool_name
in self
.pools
:
1977 return self
.pools
[pool_name
]
1980 def get_pool_property(self
, pool_name
, prop
):
1982 :param pool_name: pool
1983 :param prop: property to be checked.
1984 :returns: property as string
1987 assert isinstance(pool_name
, six
.string_types
)
1988 assert isinstance(prop
, six
.string_types
)
1989 output
= self
.raw_cluster_cmd(
1995 return output
.split()[1]
1997 def get_pool_int_property(self
, pool_name
, prop
):
1998 return int(self
.get_pool_property(pool_name
, prop
))
2000 def set_pool_property(self
, pool_name
, prop
, val
):
2002 :param pool_name: pool
2003 :param prop: property to be set.
2004 :param val: value to set.
2006 This routine retries if set operation fails.
2009 assert isinstance(pool_name
, six
.string_types
)
2010 assert isinstance(prop
, six
.string_types
)
2011 assert isinstance(val
, int)
2014 r
= self
.raw_cluster_cmd_result(
2021 if r
!= 11: # EAGAIN
2025 raise Exception('timed out getting EAGAIN '
2026 'when setting pool property %s %s = %s' %
2027 (pool_name
, prop
, val
))
2028 self
.log('got EAGAIN setting pool property, '
2029 'waiting a few seconds...')
2032 def expand_pool(self
, pool_name
, by
, max_pgs
):
2034 Increase the number of pgs in a pool
2037 assert isinstance(pool_name
, six
.string_types
)
2038 assert isinstance(by
, int)
2039 assert pool_name
in self
.pools
2040 if self
.get_num_creating() > 0:
2042 if (self
.pools
[pool_name
] + by
) > max_pgs
:
2044 self
.log("increase pool size by %d" % (by
,))
2045 new_pg_num
= self
.pools
[pool_name
] + by
2046 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2047 self
.pools
[pool_name
] = new_pg_num
2050 def contract_pool(self
, pool_name
, by
, min_pgs
):
2052 Decrease the number of pgs in a pool
2055 self
.log('contract_pool %s by %s min %s' % (
2056 pool_name
, str(by
), str(min_pgs
)))
2057 assert isinstance(pool_name
, six
.string_types
)
2058 assert isinstance(by
, int)
2059 assert pool_name
in self
.pools
2060 if self
.get_num_creating() > 0:
2061 self
.log('too many creating')
2063 proj
= self
.pools
[pool_name
] - by
2065 self
.log('would drop below min_pgs, proj %d, currently %d' % (proj
,self
.pools
[pool_name
],))
2067 self
.log("decrease pool size by %d" % (by
,))
2068 new_pg_num
= self
.pools
[pool_name
] - by
2069 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2070 self
.pools
[pool_name
] = new_pg_num
2073 def stop_pg_num_changes(self
):
2075 Reset all pg_num_targets back to pg_num, canceling splits and merges
2077 self
.log('Canceling any pending splits or merges...')
2078 osd_dump
= self
.get_osd_dump_json()
2080 for pool
in osd_dump
['pools']:
2081 if pool
['pg_num'] != pool
['pg_num_target']:
2082 self
.log('Setting pool %s (%d) pg_num %d -> %d' %
2083 (pool
['pool_name'], pool
['pool'],
2084 pool
['pg_num_target'],
2086 self
.raw_cluster_cmd('osd', 'pool', 'set', pool
['pool_name'],
2087 'pg_num', str(pool
['pg_num']))
2089 # we don't support pg_num_target before nautilus
2092 def set_pool_pgpnum(self
, pool_name
, force
):
2094 Set pgpnum property of pool_name pool.
2097 assert isinstance(pool_name
, six
.string_types
)
2098 assert pool_name
in self
.pools
2099 if not force
and self
.get_num_creating() > 0:
2101 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
2104 def list_pg_unfound(self
, pgid
):
2106 return list of unfound pgs with the id specified
2111 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_unfound',
2117 r
['objects'].extend(j
['objects'])
2122 offset
= j
['objects'][-1]['oid']
2127 def get_pg_stats(self
):
2129 Dump the cluster and get pg stats
2131 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
2132 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2134 return j
['pg_map']['pg_stats']
2136 return j
['pg_stats']
2138 def get_pgids_to_force(self
, backfill
):
2140 Return the randomized list of PGs that can have their recovery/backfill forced
2142 j
= self
.get_pg_stats();
2145 wanted
= ['degraded', 'backfilling', 'backfill_wait']
2147 wanted
= ['recovering', 'degraded', 'recovery_wait']
2149 status
= pg
['state'].split('+')
2151 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
2152 pgids
.append(pg
['pgid'])
2156 def get_pgids_to_cancel_force(self
, backfill
):
2158 Return the randomized list of PGs whose recovery/backfill priority is forced
2160 j
= self
.get_pg_stats();
2163 wanted
= 'forced_backfill'
2165 wanted
= 'forced_recovery'
2167 status
= pg
['state'].split('+')
2168 if wanted
in status
and random
.random() > 0.5:
2169 pgids
.append(pg
['pgid'])
2172 def compile_pg_status(self
):
2174 Return a histogram of pg state values
2177 j
= self
.get_pg_stats()
2179 for status
in pg
['state'].split('+'):
2180 if status
not in ret
:
2185 @wait_for_pg_stats # type: ignore
2186 def with_pg_state(self
, pool
, pgnum
, check
):
2187 pgstr
= self
.get_pgid(pool
, pgnum
)
2188 stats
= self
.get_single_pg_stats(pgstr
)
2189 assert(check(stats
['state']))
2191 @wait_for_pg_stats # type: ignore
2192 def with_pg(self
, pool
, pgnum
, check
):
2193 pgstr
= self
.get_pgid(pool
, pgnum
)
2194 stats
= self
.get_single_pg_stats(pgstr
)
2197 def get_last_scrub_stamp(self
, pool
, pgnum
):
2199 Get the timestamp of the last scrub.
2201 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
2202 return stats
["last_scrub_stamp"]
2204 def do_pg_scrub(self
, pool
, pgnum
, stype
):
2206 Scrub pg and wait for scrubbing to finish
2208 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
2209 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
2210 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
2213 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
2214 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
2215 self
.log("waiting for scrub type %s" % (stype
,))
2216 if (timer
% RESEND_TIMEOUT
) == 0:
2217 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
2218 # The first time in this loop is the actual request
2219 if timer
!= 0 and stype
== "repair":
2220 self
.log("WARNING: Resubmitted a non-idempotent repair")
2221 time
.sleep(SLEEP_TIME
)
2224 def wait_snap_trimming_complete(self
, pool
):
2226 Wait for snap trimming on pool to end
2231 poolnum
= self
.get_pool_num(pool
)
2232 poolnumstr
= "%s." % (poolnum
,)
2235 if (now
- start
) > FATAL_TIMEOUT
:
2236 assert (now
- start
) < FATAL_TIMEOUT
, \
2237 'failed to complete snap trimming before timeout'
2238 all_stats
= self
.get_pg_stats()
2240 for pg
in all_stats
:
2241 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
2242 self
.log("pg {pg} in trimming, state: {state}".format(
2248 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
2249 time
.sleep(POLL_PERIOD
)
2251 def get_single_pg_stats(self
, pgid
):
2253 Return pg for the pgid specified.
2255 all_stats
= self
.get_pg_stats()
2257 for pg
in all_stats
:
2258 if pg
['pgid'] == pgid
:
2263 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
2266 pool_dump
= self
.get_pool_dump(pool
)
2267 object_map
= self
.get_object_map(pool
, name
)
2268 if pool_dump
["type"] == PoolType
.ERASURE_CODED
:
2269 shard
= object_map
['acting'].index(osdid
)
2270 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
2273 return object_map
['pgid']
2275 def get_object_primary(self
, pool
, name
):
2278 object_map
= self
.get_object_map(pool
, name
)
2279 return object_map
['acting_primary']
2281 def get_object_map(self
, pool
, name
):
2283 osd map --format=json converted to a python object
2284 :returns: the python object
2286 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
2287 return json
.loads('\n'.join(out
.split('\n')[1:]))
2289 def get_osd_dump_json(self
):
2291 osd dump --format=json converted to a python object
2292 :returns: the python object
2294 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
2295 return json
.loads('\n'.join(out
.split('\n')[1:]))
2297 def get_osd_dump(self
):
2302 return self
.get_osd_dump_json()['osds']
2304 def get_osd_metadata(self
):
2306 osd metadata --format=json converted to a python object
2307 :returns: the python object containing osd metadata information
2309 out
= self
.raw_cluster_cmd('osd', 'metadata', '--format=json')
2310 return json
.loads('\n'.join(out
.split('\n')[1:]))
2312 def get_mgr_dump(self
):
2313 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2314 return json
.loads(out
)
2316 def get_stuck_pgs(self
, type_
, threshold
):
2318 :returns: stuck pg information from the cluster
2320 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2322 return json
.loads(out
).get('stuck_pg_stats',[])
2324 def get_num_unfound_objects(self
):
2326 Check cluster status to get the number of unfound objects
2328 status
= self
.raw_cluster_status()
2330 return status
['pgmap'].get('unfound_objects', 0)
2332 def get_num_creating(self
):
2334 Find the number of pgs in creating mode.
2336 pgs
= self
.get_pg_stats()
2339 if 'creating' in pg
['state']:
2343 def get_num_active_clean(self
):
2345 Find the number of active and clean pgs.
2347 pgs
= self
.get_pg_stats()
2348 return self
._get
_num
_active
_clean
(pgs
)
2350 def _get_num_active_clean(self
, pgs
):
2353 if (pg
['state'].count('active') and
2354 pg
['state'].count('clean') and
2355 not pg
['state'].count('stale')):
2359 def get_num_active_recovered(self
):
2361 Find the number of active and recovered pgs.
2363 pgs
= self
.get_pg_stats()
2364 return self
._get
_num
_active
_recovered
(pgs
)
2366 def _get_num_active_recovered(self
, pgs
):
2369 if (pg
['state'].count('active') and
2370 not pg
['state'].count('recover') and
2371 not pg
['state'].count('backfilling') and
2372 not pg
['state'].count('stale')):
2376 def get_is_making_recovery_progress(self
):
2378 Return whether there is recovery progress discernable in the
2381 status
= self
.raw_cluster_status()
2382 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2383 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2384 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2385 return kps
> 0 or bps
> 0 or ops
> 0
2387 def get_num_active(self
):
2389 Find the number of active pgs.
2391 pgs
= self
.get_pg_stats()
2392 return self
._get
_num
_active
(pgs
)
2394 def _get_num_active(self
, pgs
):
2397 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2401 def get_num_down(self
):
2403 Find the number of pgs that are down.
2405 pgs
= self
.get_pg_stats()
2408 if ((pg
['state'].count('down') and not
2409 pg
['state'].count('stale')) or
2410 (pg
['state'].count('incomplete') and not
2411 pg
['state'].count('stale'))):
2415 def get_num_active_down(self
):
2417 Find the number of pgs that are either active or down.
2419 pgs
= self
.get_pg_stats()
2420 return self
._get
_num
_active
_down
(pgs
)
2422 def _get_num_active_down(self
, pgs
):
2425 if ((pg
['state'].count('active') and not
2426 pg
['state'].count('stale')) or
2427 (pg
['state'].count('down') and not
2428 pg
['state'].count('stale')) or
2429 (pg
['state'].count('incomplete') and not
2430 pg
['state'].count('stale'))):
2434 def get_num_peered(self
):
2436 Find the number of PGs that are peered
2438 pgs
= self
.get_pg_stats()
2439 return self
._get
_num
_peered
(pgs
)
2441 def _get_num_peered(self
, pgs
):
2444 if pg
['state'].count('peered') and not pg
['state'].count('stale'):
2450 True if all pgs are clean
2452 pgs
= self
.get_pg_stats()
2453 return self
._get
_num
_active
_clean
(pgs
) == len(pgs
)
2455 def is_recovered(self
):
2457 True if all pgs have recovered
2459 pgs
= self
.get_pg_stats()
2460 return self
._get
_num
_active
_recovered
(pgs
) == len(pgs
)
2462 def is_active_or_down(self
):
2464 True if all pgs are active or down
2466 pgs
= self
.get_pg_stats()
2467 return self
._get
_num
_active
_down
(pgs
) == len(pgs
)
2469 def wait_for_clean(self
, timeout
=1200):
2471 Returns true when all pgs are clean.
2473 self
.log("waiting for clean")
2475 num_active_clean
= self
.get_num_active_clean()
2476 while not self
.is_clean():
2477 if timeout
is not None:
2478 if self
.get_is_making_recovery_progress():
2479 self
.log("making progress, resetting timeout")
2482 self
.log("no progress seen, keeping timeout for now")
2483 if time
.time() - start
>= timeout
:
2484 self
.log('dumping pgs')
2485 out
= self
.raw_cluster_cmd('pg', 'dump')
2487 assert time
.time() - start
< timeout
, \
2488 'failed to become clean before timeout expired'
2489 cur_active_clean
= self
.get_num_active_clean()
2490 if cur_active_clean
!= num_active_clean
:
2492 num_active_clean
= cur_active_clean
2496 def are_all_osds_up(self
):
2498 Returns true if all osds are up.
2500 x
= self
.get_osd_dump()
2501 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2503 def wait_for_all_osds_up(self
, timeout
=None):
2505 When this exits, either the timeout has expired, or all
2508 self
.log("waiting for all up")
2510 while not self
.are_all_osds_up():
2511 if timeout
is not None:
2512 assert time
.time() - start
< timeout
, \
2513 'timeout expired in wait_for_all_osds_up'
2517 def pool_exists(self
, pool
):
2518 if pool
in self
.list_pools():
2522 def wait_for_pool(self
, pool
, timeout
=300):
2524 Wait for a pool to exist
2526 self
.log('waiting for pool %s to exist' % pool
)
2528 while not self
.pool_exists(pool
):
2529 if timeout
is not None:
2530 assert time
.time() - start
< timeout
, \
2531 'timeout expired in wait_for_pool'
2534 def wait_for_pools(self
, pools
):
2536 self
.wait_for_pool(pool
)
2538 def is_mgr_available(self
):
2539 x
= self
.get_mgr_dump()
2540 return x
.get('available', False)
2542 def wait_for_mgr_available(self
, timeout
=None):
2543 self
.log("waiting for mgr available")
2545 while not self
.is_mgr_available():
2546 if timeout
is not None:
2547 assert time
.time() - start
< timeout
, \
2548 'timeout expired in wait_for_mgr_available'
2550 self
.log("mgr available!")
2552 def wait_for_recovery(self
, timeout
=None):
2554 Check peering. When this exists, we have recovered.
2556 self
.log("waiting for recovery to complete")
2558 num_active_recovered
= self
.get_num_active_recovered()
2559 while not self
.is_recovered():
2561 if timeout
is not None:
2562 if self
.get_is_making_recovery_progress():
2563 self
.log("making progress, resetting timeout")
2566 self
.log("no progress seen, keeping timeout for now")
2567 if now
- start
>= timeout
:
2568 if self
.is_recovered():
2570 self
.log('dumping pgs')
2571 out
= self
.raw_cluster_cmd('pg', 'dump')
2573 assert now
- start
< timeout
, \
2574 'failed to recover before timeout expired'
2575 cur_active_recovered
= self
.get_num_active_recovered()
2576 if cur_active_recovered
!= num_active_recovered
:
2578 num_active_recovered
= cur_active_recovered
2580 self
.log("recovered!")
2582 def wait_for_active(self
, timeout
=None):
2584 Check peering. When this exists, we are definitely active
2586 self
.log("waiting for peering to complete")
2588 num_active
= self
.get_num_active()
2589 while not self
.is_active():
2590 if timeout
is not None:
2591 if time
.time() - start
>= timeout
:
2592 self
.log('dumping pgs')
2593 out
= self
.raw_cluster_cmd('pg', 'dump')
2595 assert time
.time() - start
< timeout
, \
2596 'failed to recover before timeout expired'
2597 cur_active
= self
.get_num_active()
2598 if cur_active
!= num_active
:
2600 num_active
= cur_active
2604 def wait_for_active_or_down(self
, timeout
=None):
2606 Check peering. When this exists, we are definitely either
2609 self
.log("waiting for peering to complete or become blocked")
2611 num_active_down
= self
.get_num_active_down()
2612 while not self
.is_active_or_down():
2613 if timeout
is not None:
2614 if time
.time() - start
>= timeout
:
2615 self
.log('dumping pgs')
2616 out
= self
.raw_cluster_cmd('pg', 'dump')
2618 assert time
.time() - start
< timeout
, \
2619 'failed to recover before timeout expired'
2620 cur_active_down
= self
.get_num_active_down()
2621 if cur_active_down
!= num_active_down
:
2623 num_active_down
= cur_active_down
2625 self
.log("active or down!")
2627 def osd_is_up(self
, osd
):
2629 Wrapper for osd check
2631 osds
= self
.get_osd_dump()
2632 return osds
[osd
]['up'] > 0
2634 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2636 Loop waiting for osd.
2638 self
.log('waiting for osd.%d to be up' % osd
)
2640 while not self
.osd_is_up(osd
):
2641 if timeout
is not None:
2642 assert time
.time() - start
< timeout
, \
2643 'osd.%d failed to come up before timeout expired' % osd
2645 self
.log('osd.%d is up' % osd
)
2647 def is_active(self
):
2649 Wrapper to check if all pgs are active
2651 return self
.get_num_active() == self
.get_num_pgs()
2653 def all_active_or_peered(self
):
2655 Wrapper to check if all PGs are active or peered
2657 pgs
= self
.get_pg_stats()
2658 return self
._get
_num
_active
(pgs
) + self
._get
_num
_peered
(pgs
) == len(pgs
)
2660 def wait_till_active(self
, timeout
=None):
2662 Wait until all pgs are active.
2664 self
.log("waiting till active")
2666 while not self
.is_active():
2667 if timeout
is not None:
2668 if time
.time() - start
>= timeout
:
2669 self
.log('dumping pgs')
2670 out
= self
.raw_cluster_cmd('pg', 'dump')
2672 assert time
.time() - start
< timeout
, \
2673 'failed to become active before timeout expired'
2677 def wait_till_pg_convergence(self
, timeout
=None):
2680 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2681 if osd
['in'] and osd
['up']]
2683 # strictly speaking, no need to wait for mon. but due to the
2684 # "ms inject socket failures" setting, the osdmap could be delayed,
2685 # so mgr is likely to ignore the pg-stat messages with pgs serving
2686 # newly created pools which is not yet known by mgr. so, to make sure
2687 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2689 self
.flush_pg_stats(active_osds
)
2690 new_stats
= dict((stat
['pgid'], stat
['state'])
2691 for stat
in self
.get_pg_stats())
2692 if old_stats
== new_stats
:
2694 if timeout
is not None:
2695 assert time
.time() - start
< timeout
, \
2696 'failed to reach convergence before %d secs' % timeout
2697 old_stats
= new_stats
2698 # longer than mgr_stats_period
2701 def mark_out_osd(self
, osd
):
2703 Wrapper to mark osd out.
2705 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2707 def kill_osd(self
, osd
):
2709 Kill osds by either power cycling (if indicated by the config)
2712 if self
.config
.get('powercycle'):
2713 remote
= self
.find_remote('osd', osd
)
2714 self
.log('kill_osd on osd.{o} '
2715 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2716 self
._assert
_ipmi
(remote
)
2717 remote
.console
.power_off()
2718 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2719 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2722 'bdev-inject-crash', self
.config
.get('bdev_inject_crash'))
2724 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2728 raise RuntimeError('osd.%s did not fail' % osd
)
2730 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2732 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2735 def _assert_ipmi(remote
):
2736 assert remote
.console
.has_ipmi_credentials
, (
2737 "powercycling requested but RemoteConsole is not "
2738 "initialized. Check ipmi config.")
2740 def blackhole_kill_osd(self
, osd
):
2742 Stop osd if nothing else works.
2744 self
.inject_args('osd', osd
,
2745 'objectstore-blackhole', True)
2747 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2749 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
2751 Revive osds by either power cycling (if indicated by the config)
2754 if self
.config
.get('powercycle'):
2755 remote
= self
.find_remote('osd', osd
)
2756 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2757 format(o
=osd
, s
=remote
.name
))
2758 self
._assert
_ipmi
(remote
)
2759 remote
.console
.power_on()
2760 if not remote
.console
.check_status(300):
2761 raise Exception('Failed to revive osd.{o} via ipmi'.
2763 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2764 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2765 self
.make_admin_daemon_dir(remote
)
2766 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2767 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2769 if not skip_admin_check
:
2770 # wait for dump_ops_in_flight; this command doesn't appear
2771 # until after the signal handler is installed and it is safe
2772 # to stop the osd again without making valgrind leak checks
2773 # unhappy. see #5924.
2774 self
.wait_run_admin_socket('osd', osd
,
2775 args
=['dump_ops_in_flight'],
2776 timeout
=timeout
, stdout
=DEVNULL
)
2778 def mark_down_osd(self
, osd
):
2780 Cluster command wrapper
2782 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2784 def mark_in_osd(self
, osd
):
2786 Cluster command wrapper
2788 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2790 def signal_osd(self
, osd
, sig
, silent
=False):
2792 Wrapper to local get_daemon call which sends the given
2793 signal to the given osd.
2795 self
.ctx
.daemons
.get_daemon('osd', osd
,
2796 self
.cluster
).signal(sig
, silent
=silent
)
2799 def signal_mon(self
, mon
, sig
, silent
=False):
2801 Wrapper to local get_daemon call
2803 self
.ctx
.daemons
.get_daemon('mon', mon
,
2804 self
.cluster
).signal(sig
, silent
=silent
)
2806 def kill_mon(self
, mon
):
2808 Kill the monitor by either power cycling (if the config says so),
2811 if self
.config
.get('powercycle'):
2812 remote
= self
.find_remote('mon', mon
)
2813 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2814 format(m
=mon
, s
=remote
.name
))
2815 self
._assert
_ipmi
(remote
)
2816 remote
.console
.power_off()
2818 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2820 def revive_mon(self
, mon
):
2822 Restart by either power cycling (if the config says so),
2823 or by doing a normal restart.
2825 if self
.config
.get('powercycle'):
2826 remote
= self
.find_remote('mon', mon
)
2827 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2828 format(m
=mon
, s
=remote
.name
))
2829 self
._assert
_ipmi
(remote
)
2830 remote
.console
.power_on()
2831 self
.make_admin_daemon_dir(remote
)
2832 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2834 def revive_mgr(self
, mgr
):
2836 Restart by either power cycling (if the config says so),
2837 or by doing a normal restart.
2839 if self
.config
.get('powercycle'):
2840 remote
= self
.find_remote('mgr', mgr
)
2841 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2842 format(m
=mgr
, s
=remote
.name
))
2843 self
._assert
_ipmi
(remote
)
2844 remote
.console
.power_on()
2845 self
.make_admin_daemon_dir(remote
)
2846 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
2848 def get_mon_status(self
, mon
):
2850 Extract all the monitor status information from the cluster
2852 out
= self
.raw_cluster_cmd('tell', 'mon.%s' % mon
, 'mon_status')
2853 return json
.loads(out
)
2855 def get_mon_quorum(self
):
2857 Extract monitor quorum information from the cluster
2859 out
= self
.raw_cluster_cmd('quorum_status')
2861 self
.log('quorum_status is %s' % out
)
2864 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2866 Loop until quorum size is reached.
2868 self
.log('waiting for quorum size %d' % size
)
2870 while not len(self
.get_mon_quorum()) == size
:
2871 if timeout
is not None:
2872 assert time
.time() - start
< timeout
, \
2873 ('failed to reach quorum size %d '
2874 'before timeout expired' % size
)
2876 self
.log("quorum is size %d" % size
)
2878 def get_mon_health(self
, debug
=False):
2880 Extract all the monitor health information.
2882 out
= self
.raw_cluster_cmd('health', '--format=json')
2884 self
.log('health:\n{h}'.format(h
=out
))
2885 return json
.loads(out
)
2887 def wait_until_healthy(self
, timeout
=None):
2888 self
.log("wait_until_healthy")
2890 while self
.get_mon_health()['status'] != 'HEALTH_OK':
2891 if timeout
is not None:
2892 assert time
.time() - start
< timeout
, \
2893 'timeout expired in wait_until_healthy'
2895 self
.log("wait_until_healthy done")
2897 def get_filepath(self
):
2899 Return path to osd data with {id} needing to be replaced
2901 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2903 def make_admin_daemon_dir(self
, remote
):
2905 Create /var/run/ceph directory on remote site.
2908 :param remote: Remote site
2910 remote
.run(args
=['sudo',
2911 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2913 def get_service_task_status(self
, service
, status_key
):
2915 Return daemon task status for a given ceph service.
2917 :param service: ceph service (mds, osd, etc...)
2918 :param status_key: matching task status key
2921 status
= self
.raw_cluster_status()
2923 for k
,v
in status
['servicemap']['services'][service
]['daemons'].items():
2924 ts
= dict(v
).get('task_status', None)
2926 task_status
[k
] = ts
[status_key
]
2927 except KeyError: # catches missing service and status key
2929 self
.log(task_status
)
2932 def utility_task(name
):
2934 Generate ceph_manager subtask corresponding to ceph_manager
2937 def task(ctx
, config
):
2940 args
= config
.get('args', [])
2941 kwargs
= config
.get('kwargs', {})
2942 cluster
= config
.get('cluster', 'ceph')
2943 fn
= getattr(ctx
.managers
[cluster
], name
)
2947 revive_osd
= utility_task("revive_osd")
2948 revive_mon
= utility_task("revive_mon")
2949 kill_osd
= utility_task("kill_osd")
2950 kill_mon
= utility_task("kill_mon")
2951 create_pool
= utility_task("create_pool")
2952 remove_pool
= utility_task("remove_pool")
2953 wait_for_clean
= utility_task("wait_for_clean")
2954 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
2955 set_pool_property
= utility_task("set_pool_property")
2956 do_pg_scrub
= utility_task("do_pg_scrub")
2957 wait_for_pool
= utility_task("wait_for_pool")
2958 wait_for_pools
= utility_task("wait_for_pools")