2 ceph manager -- Thrasher and CephManager objects
4 from functools
import wraps
18 from io
import BytesIO
19 from six
import StringIO
20 from teuthology
import misc
as teuthology
21 from tasks
.scrub
import Scrubber
22 from tasks
.util
.rados
import cmd_erasure_code_profile
23 from tasks
.util
import get_remote
24 from teuthology
.contextutil
import safe_while
25 from teuthology
.orchestra
.remote
import Remote
26 from teuthology
.orchestra
import run
27 from teuthology
.exceptions
import CommandFailedError
28 from tasks
.thrasher
import Thrasher
29 from six
import StringIO
32 from subprocess
import DEVNULL
# py3k
34 DEVNULL
= open(os
.devnull
, 'r+') # type: ignore
36 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
38 log
= logging
.getLogger(__name__
)
40 # this is for cephadm clusters
41 def shell(ctx
, cluster_name
, remote
, args
, name
=None, **kwargs
):
42 testdir
= teuthology
.get_testdir(ctx
)
45 extra_args
= ['-n', name
]
50 '--image', ctx
.ceph
[cluster_name
].image
,
53 '--fsid', ctx
.ceph
[cluster_name
].fsid
,
59 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
61 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
63 writes
= ctx
.cluster
.run(
65 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
66 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
67 'sudo', 'tee', conf_path
, run
.Raw('&&'),
68 'sudo', 'chmod', '0644', conf_path
,
69 run
.Raw('>'), '/dev/null',
74 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
78 def mount_osd_data(ctx
, remote
, cluster
, osd
):
83 :param remote: Remote site
84 :param cluster: name of ceph cluster
87 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
88 role
= "{0}.osd.{1}".format(cluster
, osd
)
89 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
90 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
91 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
93 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
95 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
96 mount_options
= ctx
.disk_config
.\
97 remote_to_roles_to_dev_mount_options
[remote
][role
]
98 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
99 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
101 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
102 'mountpoint: {p}, type: {t}, options: {v}'.format(
103 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
111 '-o', ','.join(mount_options
),
124 self
.log(traceback
.format_exc())
134 class OSDThrasher(Thrasher
):
136 Object used to thrash Ceph
138 def __init__(self
, manager
, config
, name
, logger
):
139 super(OSDThrasher
, self
).__init
__()
141 self
.ceph_manager
= manager
142 self
.cluster
= manager
.cluster
143 self
.ceph_manager
.wait_for_clean()
144 osd_status
= self
.ceph_manager
.get_osd_status()
145 self
.in_osds
= osd_status
['in']
146 self
.live_osds
= osd_status
['live']
147 self
.out_osds
= osd_status
['out']
148 self
.dead_osds
= osd_status
['dead']
149 self
.stopping
= False
153 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
154 self
.pools_to_fix_pgp_num
= set()
155 if self
.config
.get('powercycle'):
156 self
.revive_timeout
+= 120
157 self
.clean_wait
= self
.config
.get('clean_wait', 0)
158 self
.minin
= self
.config
.get("min_in", 4)
159 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
160 self
.sighup_delay
= self
.config
.get('sighup_delay')
161 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
162 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
163 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
164 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
165 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
166 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
167 self
.random_eio
= self
.config
.get('random_eio')
168 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
170 num_osds
= self
.in_osds
+ self
.out_osds
171 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * len(num_osds
)
172 self
.min_pgs
= self
.config
.get("min_pgs_per_pool_osd", 1) * len(num_osds
)
173 if self
.config
is None:
175 # prevent monitor from auto-marking things out while thrasher runs
176 # try both old and new tell syntax, in case we are testing old code
177 self
.saved_options
= []
178 # assuming that the default settings do not vary from one daemon to
180 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
181 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
182 #why do we disable marking an OSD out automatically? :/
183 for service
, opt
, new_value
in opts
:
184 old_value
= manager
.get_config(first_mon
[0],
187 self
.saved_options
.append((service
, opt
, old_value
))
188 manager
.inject_args(service
, '*', opt
, new_value
)
189 # initialize ceph_objectstore_tool property - must be done before
190 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
191 if (self
.config
.get('powercycle') or
192 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
193 self
.config
.get('disable_objectstore_tool_tests', False)):
194 self
.ceph_objectstore_tool
= False
195 if self
.config
.get('powercycle'):
196 self
.log("Unable to test ceph-objectstore-tool, "
197 "powercycle testing")
199 self
.log("Unable to test ceph-objectstore-tool, "
200 "not available on all OSD nodes")
202 self
.ceph_objectstore_tool
= \
203 self
.config
.get('ceph_objectstore_tool', True)
205 self
.thread
= gevent
.spawn(self
.do_thrash
)
206 if self
.sighup_delay
:
207 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
208 if self
.optrack_toggle_delay
:
209 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
210 if self
.dump_ops_enable
== "true":
211 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
212 if self
.noscrub_toggle_delay
:
213 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
215 def log(self
, msg
, *args
, **kwargs
):
216 self
.logger
.info(msg
, *args
, **kwargs
)
218 def cmd_exists_on_osds(self
, cmd
):
219 if self
.ceph_manager
.cephadm
:
221 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
222 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
223 allremotes
= list(set(allremotes
))
224 for remote
in allremotes
:
225 proc
= remote
.run(args
=['type', cmd
], wait
=True,
226 check_status
=False, stdout
=BytesIO(),
228 if proc
.exitstatus
!= 0:
232 def run_ceph_objectstore_tool(self
, remote
, osd
, cmd
):
233 if self
.ceph_manager
.cephadm
:
235 self
.ceph_manager
.ctx
, self
.ceph_manager
.cluster
, remote
,
236 args
=['ceph-objectstore-tool'] + cmd
,
238 wait
=True, check_status
=False,
243 args
=['sudo', 'adjust-ulimits', 'ceph-objectstore-tool'] + cmd
,
244 wait
=True, check_status
=False,
248 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
250 :param osd: Osd to be killed.
251 :mark_down: Mark down if true.
252 :mark_out: Mark out if true.
255 osd
= random
.choice(self
.live_osds
)
256 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
257 str(self
.live_osds
)))
258 self
.live_osds
.remove(osd
)
259 self
.dead_osds
.append(osd
)
260 self
.ceph_manager
.kill_osd(osd
)
262 self
.ceph_manager
.mark_down_osd(osd
)
263 if mark_out
and osd
in self
.in_osds
:
265 if self
.ceph_objectstore_tool
:
266 self
.log("Testing ceph-objectstore-tool on down osd.%s" % osd
)
267 remote
= self
.ceph_manager
.find_remote('osd', osd
)
268 FSPATH
= self
.ceph_manager
.get_filepath()
269 JPATH
= os
.path
.join(FSPATH
, "journal")
270 exp_osd
= imp_osd
= osd
271 self
.log('remote for osd %s is %s' % (osd
, remote
))
272 exp_remote
= imp_remote
= remote
273 # If an older osd is available we'll move a pg from there
274 if (len(self
.dead_osds
) > 1 and
275 random
.random() < self
.chance_move_pg
):
276 exp_osd
= random
.choice(self
.dead_osds
[:-1])
277 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
278 self
.log('remote for exp osd %s is %s' % (exp_osd
, exp_remote
))
281 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
284 if not self
.ceph_manager
.cephadm
:
285 # ceph-objectstore-tool might be temporarily absent during an
286 # upgrade - see http://tracker.ceph.com/issues/18014
287 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
289 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
290 wait
=True, check_status
=False, stdout
=BytesIO(),
292 if proc
.exitstatus
== 0:
294 log
.debug("ceph-objectstore-tool binary not present, trying again")
296 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
297 # see http://tracker.ceph.com/issues/19556
298 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
300 proc
= self
.run_ceph_objectstore_tool(
301 exp_remote
, 'osd.%s' % exp_osd
,
303 '--data-path', FSPATH
.format(id=exp_osd
),
304 '--journal-path', JPATH
.format(id=exp_osd
),
307 if proc
.exitstatus
== 0:
309 elif (proc
.exitstatus
== 1 and
310 proc
.stderr
.getvalue() == "OSD has the store locked"):
313 raise Exception("ceph-objectstore-tool: "
314 "exp list-pgs failure with status {ret}".
315 format(ret
=proc
.exitstatus
))
317 pgs
= six
.ensure_str(proc
.stdout
.getvalue()).split('\n')[:-1]
319 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
321 pg
= random
.choice(pgs
)
322 #exp_path = teuthology.get_testdir(self.ceph_manager.ctx)
323 #exp_path = os.path.join(exp_path, '{0}.data'.format(self.cluster))
324 exp_path
= os
.path
.join('/var/log/ceph', # available inside 'shell' container
325 "exp.{pg}.{id}".format(
328 if self
.ceph_manager
.cephadm
:
329 exp_host_path
= os
.path
.join(
331 self
.ceph_manager
.ctx
.ceph
[self
.ceph_manager
.cluster
].fsid
,
332 "exp.{pg}.{id}".format(
336 exp_host_path
= exp_path
339 # Can't use new export-remove op since this is part of upgrade testing
340 proc
= self
.run_ceph_objectstore_tool(
341 exp_remote
, 'osd.%s' % exp_osd
,
343 '--data-path', FSPATH
.format(id=exp_osd
),
344 '--journal-path', JPATH
.format(id=exp_osd
),
350 raise Exception("ceph-objectstore-tool: "
351 "export failure with status {ret}".
352 format(ret
=proc
.exitstatus
))
354 proc
= self
.run_ceph_objectstore_tool(
355 exp_remote
, 'osd.%s' % exp_osd
,
357 '--data-path', FSPATH
.format(id=exp_osd
),
358 '--journal-path', JPATH
.format(id=exp_osd
),
364 raise Exception("ceph-objectstore-tool: "
365 "remove failure with status {ret}".
366 format(ret
=proc
.exitstatus
))
367 # If there are at least 2 dead osds we might move the pg
368 if exp_osd
!= imp_osd
:
369 # If pg isn't already on this osd, then we will move it there
370 proc
= self
.run_ceph_objectstore_tool(
374 '--data-path', FSPATH
.format(id=imp_osd
),
375 '--journal-path', JPATH
.format(id=imp_osd
),
379 raise Exception("ceph-objectstore-tool: "
380 "imp list-pgs failure with status {ret}".
381 format(ret
=proc
.exitstatus
))
382 pgs
= six
.ensure_str(proc
.stdout
.getvalue()).split('\n')[:-1]
384 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
385 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
386 if imp_remote
!= exp_remote
:
387 # Copy export file to the other machine
388 self
.log("Transfer export file from {srem} to {trem}".
389 format(srem
=exp_remote
, trem
=imp_remote
))
390 # just in case an upgrade make /var/log/ceph unreadable by non-root,
391 exp_remote
.run(args
=['sudo', 'chmod', '777',
393 imp_remote
.run(args
=['sudo', 'chmod', '777',
395 tmpexport
= Remote
.get_file(exp_remote
, exp_host_path
,
397 if exp_host_path
!= exp_path
:
398 # push to /var/log/ceph, then rename (we can't
399 # chmod 777 the /var/log/ceph/$fsid mountpoint)
400 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
401 imp_remote
.run(args
=[
402 'sudo', 'mv', exp_path
, exp_host_path
])
404 Remote
.put_file(imp_remote
, tmpexport
, exp_host_path
)
407 # Can't move the pg after all
409 imp_remote
= exp_remote
411 proc
= self
.run_ceph_objectstore_tool(
412 imp_remote
, 'osd.%s' % imp_osd
,
414 '--data-path', FSPATH
.format(id=imp_osd
),
415 '--journal-path', JPATH
.format(id=imp_osd
),
416 '--log-file=/var/log/ceph/objectstore_tool.$pid.log',
420 if proc
.exitstatus
== 1:
421 bogosity
= "The OSD you are using is older than the exported PG"
422 if bogosity
in proc
.stderr
.getvalue():
423 self
.log("OSD older than exported PG"
425 elif proc
.exitstatus
== 10:
426 self
.log("Pool went away before processing an import"
428 elif proc
.exitstatus
== 11:
429 self
.log("Attempt to import an incompatible export"
431 elif proc
.exitstatus
== 12:
432 # this should be safe to ignore because we only ever move 1
433 # copy of the pg at a time, and merge is only initiated when
434 # all replicas are peered and happy. /me crosses fingers
435 self
.log("PG merged on target"
437 elif proc
.exitstatus
:
438 raise Exception("ceph-objectstore-tool: "
439 "import failure with status {ret}".
440 format(ret
=proc
.exitstatus
))
441 cmd
= "sudo rm -f {file}".format(file=exp_host_path
)
442 exp_remote
.run(args
=cmd
)
443 if imp_remote
!= exp_remote
:
444 imp_remote
.run(args
=cmd
)
446 # apply low split settings to each pool
447 if not self
.ceph_manager
.cephadm
:
448 for pool
in self
.ceph_manager
.list_pools():
449 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
450 "--filestore-split-multiple 1' sudo -E "
451 + 'ceph-objectstore-tool '
452 + ' '.join(prefix
+ [
453 '--data-path', FSPATH
.format(id=imp_osd
),
454 '--journal-path', JPATH
.format(id=imp_osd
),
456 + " --op apply-layout-settings --pool " + pool
).format(id=osd
)
457 proc
= imp_remote
.run(args
=cmd
,
458 wait
=True, check_status
=False,
460 if 'Couldn\'t find pool' in proc
.stderr
.getvalue():
463 raise Exception("ceph-objectstore-tool apply-layout-settings"
464 " failed with {status}".format(status
=proc
.exitstatus
))
467 def blackhole_kill_osd(self
, osd
=None):
469 If all else fails, kill the osd.
470 :param osd: Osd to be killed.
473 osd
= random
.choice(self
.live_osds
)
474 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
475 (str(osd
), str(self
.live_osds
)))
476 self
.live_osds
.remove(osd
)
477 self
.dead_osds
.append(osd
)
478 self
.ceph_manager
.blackhole_kill_osd(osd
)
480 def revive_osd(self
, osd
=None, skip_admin_check
=False):
483 :param osd: Osd to be revived.
486 osd
= random
.choice(self
.dead_osds
)
487 self
.log("Reviving osd %s" % (str(osd
),))
488 self
.ceph_manager
.revive_osd(
491 skip_admin_check
=skip_admin_check
)
492 self
.dead_osds
.remove(osd
)
493 self
.live_osds
.append(osd
)
494 if self
.random_eio
> 0 and osd
== self
.rerrosd
:
495 self
.ceph_manager
.set_config(self
.rerrosd
,
496 filestore_debug_random_read_err
= self
.random_eio
)
497 self
.ceph_manager
.set_config(self
.rerrosd
,
498 bluestore_debug_random_read_err
= self
.random_eio
)
501 def out_osd(self
, osd
=None):
504 :param osd: Osd to be marked.
507 osd
= random
.choice(self
.in_osds
)
508 self
.log("Removing osd %s, in_osds are: %s" %
509 (str(osd
), str(self
.in_osds
)))
510 self
.ceph_manager
.mark_out_osd(osd
)
511 self
.in_osds
.remove(osd
)
512 self
.out_osds
.append(osd
)
514 def in_osd(self
, osd
=None):
517 :param osd: Osd to be marked.
520 osd
= random
.choice(self
.out_osds
)
521 if osd
in self
.dead_osds
:
522 return self
.revive_osd(osd
)
523 self
.log("Adding osd %s" % (str(osd
),))
524 self
.out_osds
.remove(osd
)
525 self
.in_osds
.append(osd
)
526 self
.ceph_manager
.mark_in_osd(osd
)
527 self
.log("Added osd %s" % (str(osd
),))
529 def reweight_osd_or_by_util(self
, osd
=None):
531 Reweight an osd that is in
532 :param osd: Osd to be marked.
534 if osd
is not None or random
.choice([True, False]):
536 osd
= random
.choice(self
.in_osds
)
537 val
= random
.uniform(.1, 1.0)
538 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
539 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
542 # do it several times, the option space is large
545 'max_change': random
.choice(['0.05', '1.0', '3.0']),
546 'overage': random
.choice(['110', '1000']),
547 'type': random
.choice([
548 'reweight-by-utilization',
549 'test-reweight-by-utilization']),
551 self
.log("Reweighting by: %s"%(str(options
),))
552 self
.ceph_manager
.raw_cluster_cmd(
556 options
['max_change'])
558 def primary_affinity(self
, osd
=None):
560 osd
= random
.choice(self
.in_osds
)
561 if random
.random() >= .5:
563 elif random
.random() >= .5:
567 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
568 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
571 def thrash_cluster_full(self
):
573 Set and unset cluster full condition
575 self
.log('Setting full ratio to .001')
576 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
578 self
.log('Setting full ratio back to .95')
579 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
581 def thrash_pg_upmap(self
):
583 Install or remove random pg_upmap entries in OSDMap
585 from random
import shuffle
586 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
588 self
.log('j is %s' % j
)
590 if random
.random() >= .3:
591 pgs
= self
.ceph_manager
.get_pg_stats()
594 pg
= random
.choice(pgs
)
595 pgid
= str(pg
['pgid'])
596 poolid
= int(pgid
.split('.')[0])
597 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
601 osds
= self
.in_osds
+ self
.out_osds
604 self
.log('Setting %s to %s' % (pgid
, osds
))
605 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
606 self
.log('cmd %s' % cmd
)
607 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
613 self
.log('Clearing pg_upmap on %s' % pg
)
614 self
.ceph_manager
.raw_cluster_cmd(
619 self
.log('No pg_upmap entries; doing nothing')
620 except CommandFailedError
:
621 self
.log('Failed to rm-pg-upmap, ignoring')
623 def thrash_pg_upmap_items(self
):
625 Install or remove random pg_upmap_items entries in OSDMap
627 from random
import shuffle
628 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
630 self
.log('j is %s' % j
)
632 if random
.random() >= .3:
633 pgs
= self
.ceph_manager
.get_pg_stats()
636 pg
= random
.choice(pgs
)
637 pgid
= str(pg
['pgid'])
638 poolid
= int(pgid
.split('.')[0])
639 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
643 osds
= self
.in_osds
+ self
.out_osds
646 self
.log('Setting %s to %s' % (pgid
, osds
))
647 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
648 self
.log('cmd %s' % cmd
)
649 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
651 m
= j
['pg_upmap_items']
655 self
.log('Clearing pg_upmap on %s' % pg
)
656 self
.ceph_manager
.raw_cluster_cmd(
661 self
.log('No pg_upmap entries; doing nothing')
662 except CommandFailedError
:
663 self
.log('Failed to rm-pg-upmap-items, ignoring')
665 def force_recovery(self
):
667 Force recovery on some of PGs
669 backfill
= random
.random() >= 0.5
670 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
674 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
676 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
677 except CommandFailedError
:
678 self
.log('Failed to force backfill|recovery, ignoring')
681 def cancel_force_recovery(self
):
683 Force recovery on some of PGs
685 backfill
= random
.random() >= 0.5
686 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
690 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
692 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
693 except CommandFailedError
:
694 self
.log('Failed to force backfill|recovery, ignoring')
696 def force_cancel_recovery(self
):
698 Force or cancel forcing recovery
700 if random
.random() >= 0.4:
701 self
.force_recovery()
703 self
.cancel_force_recovery()
707 Make sure all osds are up and not out.
709 while len(self
.dead_osds
) > 0:
710 self
.log("reviving osd")
712 while len(self
.out_osds
) > 0:
713 self
.log("inning osd")
718 Make sure all osds are up and fully in.
721 for osd
in self
.live_osds
:
722 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
724 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
729 Break out of this Ceph loop
733 if self
.sighup_delay
:
734 self
.log("joining the do_sighup greenlet")
735 self
.sighup_thread
.get()
736 if self
.optrack_toggle_delay
:
737 self
.log("joining the do_optrack_toggle greenlet")
738 self
.optrack_toggle_thread
.join()
739 if self
.dump_ops_enable
== "true":
740 self
.log("joining the do_dump_ops greenlet")
741 self
.dump_ops_thread
.join()
742 if self
.noscrub_toggle_delay
:
743 self
.log("joining the do_noscrub_toggle greenlet")
744 self
.noscrub_toggle_thread
.join()
748 Increase the size of the pool
750 pool
= self
.ceph_manager
.get_pool()
753 self
.log("Growing pool %s" % (pool
,))
754 if self
.ceph_manager
.expand_pool(pool
,
755 self
.config
.get('pool_grow_by', 10),
757 self
.pools_to_fix_pgp_num
.add(pool
)
759 def shrink_pool(self
):
761 Decrease the size of the pool
763 pool
= self
.ceph_manager
.get_pool()
766 _
= self
.ceph_manager
.get_pool_pg_num(pool
)
767 self
.log("Shrinking pool %s" % (pool
,))
768 if self
.ceph_manager
.contract_pool(
770 self
.config
.get('pool_shrink_by', 10),
772 self
.pools_to_fix_pgp_num
.add(pool
)
774 def fix_pgp_num(self
, pool
=None):
776 Fix number of pgs in pool.
779 pool
= self
.ceph_manager
.get_pool()
785 self
.log("fixing pg num pool %s" % (pool
,))
786 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
787 self
.pools_to_fix_pgp_num
.discard(pool
)
789 def test_pool_min_size(self
):
791 Loop to selectively push PGs below their min_size and test that recovery
794 self
.log("test_pool_min_size")
796 self
.ceph_manager
.wait_for_recovery(
797 timeout
=self
.config
.get('timeout')
800 minout
= int(self
.config
.get("min_out", 1))
801 minlive
= int(self
.config
.get("min_live", 2))
802 mindead
= int(self
.config
.get("min_dead", 1))
803 self
.log("doing min_size thrashing")
804 self
.ceph_manager
.wait_for_clean(timeout
=60)
805 assert self
.ceph_manager
.is_clean(), \
806 'not clean before minsize thrashing starts'
807 while not self
.stopping
:
808 # look up k and m from all the pools on each loop, in case it
809 # changes as the cluster runs
813 pools_json
= self
.ceph_manager
.get_osd_dump_json()['pools']
815 for pool_json
in pools_json
:
816 pool
= pool_json
['pool_name']
818 pool_type
= pool_json
['type'] # 1 for rep, 3 for ec
819 min_size
= pool_json
['min_size']
820 self
.log("pool {pool} min_size is {min_size}".format(pool
=pool
,min_size
=min_size
))
822 ec_profile
= self
.ceph_manager
.get_pool_property(pool
, 'erasure_code_profile')
823 if pool_type
!= PoolType
.ERASURE_CODED
:
825 ec_profile
= pool_json
['erasure_code_profile']
826 ec_profile_json
= self
.ceph_manager
.raw_cluster_cmd(
828 'erasure-code-profile',
832 ec_json
= json
.loads(ec_profile_json
)
833 local_k
= int(ec_json
['k'])
834 local_m
= int(ec_json
['m'])
835 self
.log("pool {pool} local_k={k} local_m={m}".format(pool
=pool
,
836 k
=local_k
, m
=local_m
))
838 self
.log("setting k={local_k} from previous {k}".format(local_k
=local_k
, k
=k
))
841 self
.log("setting m={local_m} from previous {m}".format(local_m
=local_m
, m
=m
))
843 except CommandFailedError
:
844 self
.log("failed to read erasure_code_profile. %s was likely removed", pool
)
848 self
.log("using k={k}, m={m}".format(k
=k
,m
=m
))
850 self
.log("No pools yet, waiting")
854 if minout
> len(self
.out_osds
): # kill OSDs and mark out
855 self
.log("forced to out an osd")
856 self
.kill_osd(mark_out
=True)
858 elif mindead
> len(self
.dead_osds
): # kill OSDs but force timeout
859 self
.log("forced to kill an osd")
862 else: # make mostly-random choice to kill or revive OSDs
863 minup
= max(minlive
, k
)
864 rand_val
= random
.uniform(0, 1)
865 self
.log("choosing based on number of live OSDs and rand val {rand}".\
866 format(rand
=rand_val
))
867 if len(self
.live_osds
) > minup
+1 and rand_val
< 0.5:
868 # chose to knock out as many OSDs as we can w/out downing PGs
870 most_killable
= min(len(self
.live_osds
) - minup
, m
)
871 self
.log("chose to kill {n} OSDs".format(n
=most_killable
))
872 for i
in range(1, most_killable
):
873 self
.kill_osd(mark_out
=True)
875 # try a few times since there might be a concurrent pool
876 # creation or deletion
879 action
='check for active or peered') as proceed
:
881 if self
.ceph_manager
.all_active_or_peered():
883 self
.log('not all PGs are active or peered')
884 else: # chose to revive OSDs, bring up a random fraction of the dead ones
885 self
.log("chose to revive osds")
886 for i
in range(1, int(rand_val
* len(self
.dead_osds
))):
889 # let PGs repair themselves or our next knockout might kill one
890 self
.ceph_manager
.wait_for_clean(timeout
=self
.config
.get('timeout'))
892 # / while not self.stopping
895 self
.ceph_manager
.wait_for_recovery(
896 timeout
=self
.config
.get('timeout')
899 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
901 Pause injection testing. Check for osd being down when finished.
903 the_one
= random
.choice(self
.live_osds
)
904 self
.log("inject_pause on {osd}".format(osd
=the_one
))
906 "Testing {key} pause injection for duration {duration}".format(
911 "Checking after {after}, should_be_down={shouldbedown}".format(
913 shouldbedown
=should_be_down
915 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
916 if not should_be_down
:
918 time
.sleep(check_after
)
919 status
= self
.ceph_manager
.get_osd_status()
920 assert the_one
in status
['down']
921 time
.sleep(duration
- check_after
+ 20)
922 status
= self
.ceph_manager
.get_osd_status()
923 assert not the_one
in status
['down']
925 def test_backfill_full(self
):
927 Test backfills stopping when the replica fills up.
929 First, use injectfull admin command to simulate a now full
930 osd by setting it to 0 on all of the OSDs.
932 Second, on a random subset, set
933 osd_debug_skip_full_check_in_backfill_reservation to force
934 the more complicated check in do_scan to be exercised.
936 Then, verify that all backfillings stop.
938 self
.log("injecting backfill full")
939 for i
in self
.live_osds
:
940 self
.ceph_manager
.set_config(
942 osd_debug_skip_full_check_in_backfill_reservation
=
943 random
.choice(['false', 'true']))
944 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
945 check_status
=True, timeout
=30, stdout
=DEVNULL
)
947 status
= self
.ceph_manager
.compile_pg_status()
948 if 'backfilling' not in status
.keys():
951 "waiting for {still_going} backfillings".format(
952 still_going
=status
.get('backfilling')))
954 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
955 for i
in self
.live_osds
:
956 self
.ceph_manager
.set_config(
958 osd_debug_skip_full_check_in_backfill_reservation
='false')
959 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
960 check_status
=True, timeout
=30, stdout
=DEVNULL
)
962 def test_map_discontinuity(self
):
964 1) Allows the osds to recover
966 3) allows the remaining osds to recover
967 4) waits for some time
969 This sequence should cause the revived osd to have to handle
970 a map gap since the mons would have trimmed
972 while len(self
.in_osds
) < (self
.minin
+ 1):
974 self
.log("Waiting for recovery")
975 self
.ceph_manager
.wait_for_all_osds_up(
976 timeout
=self
.config
.get('timeout')
978 # now we wait 20s for the pg status to change, if it takes longer,
979 # the test *should* fail!
981 self
.ceph_manager
.wait_for_clean(
982 timeout
=self
.config
.get('timeout')
985 # now we wait 20s for the backfill replicas to hear about the clean
987 self
.log("Recovered, killing an osd")
988 self
.kill_osd(mark_down
=True, mark_out
=True)
989 self
.log("Waiting for clean again")
990 self
.ceph_manager
.wait_for_clean(
991 timeout
=self
.config
.get('timeout')
993 self
.log("Waiting for trim")
994 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
997 def choose_action(self
):
999 Random action selector.
1001 chance_down
= self
.config
.get('chance_down', 0.4)
1002 _
= self
.config
.get('chance_test_min_size', 0)
1003 chance_test_backfill_full
= \
1004 self
.config
.get('chance_test_backfill_full', 0)
1005 if isinstance(chance_down
, int):
1006 chance_down
= float(chance_down
) / 100
1008 minout
= int(self
.config
.get("min_out", 0))
1009 minlive
= int(self
.config
.get("min_live", 2))
1010 mindead
= int(self
.config
.get("min_dead", 0))
1012 self
.log('choose_action: min_in %d min_out '
1013 '%d min_live %d min_dead %d' %
1014 (minin
, minout
, minlive
, mindead
))
1016 if len(self
.in_osds
) > minin
:
1017 actions
.append((self
.out_osd
, 1.0,))
1018 if len(self
.live_osds
) > minlive
and chance_down
> 0:
1019 actions
.append((self
.kill_osd
, chance_down
,))
1020 if len(self
.out_osds
) > minout
:
1021 actions
.append((self
.in_osd
, 1.7,))
1022 if len(self
.dead_osds
) > mindead
:
1023 actions
.append((self
.revive_osd
, 1.0,))
1024 if self
.config
.get('thrash_primary_affinity', True):
1025 actions
.append((self
.primary_affinity
, 1.0,))
1026 actions
.append((self
.reweight_osd_or_by_util
,
1027 self
.config
.get('reweight_osd', .5),))
1028 actions
.append((self
.grow_pool
,
1029 self
.config
.get('chance_pgnum_grow', 0),))
1030 actions
.append((self
.shrink_pool
,
1031 self
.config
.get('chance_pgnum_shrink', 0),))
1032 actions
.append((self
.fix_pgp_num
,
1033 self
.config
.get('chance_pgpnum_fix', 0),))
1034 actions
.append((self
.test_pool_min_size
,
1035 self
.config
.get('chance_test_min_size', 0),))
1036 actions
.append((self
.test_backfill_full
,
1037 chance_test_backfill_full
,))
1038 if self
.chance_thrash_cluster_full
> 0:
1039 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
1040 if self
.chance_thrash_pg_upmap
> 0:
1041 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
1042 if self
.chance_thrash_pg_upmap_items
> 0:
1043 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
1044 if self
.chance_force_recovery
> 0:
1045 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
1047 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
1050 self
.inject_pause(key
,
1051 self
.config
.get('pause_short', 3),
1054 self
.config
.get('chance_inject_pause_short', 1),),
1056 self
.inject_pause(key
,
1057 self
.config
.get('pause_long', 80),
1058 self
.config
.get('pause_check_after', 70),
1060 self
.config
.get('chance_inject_pause_long', 0),)]:
1061 actions
.append(scenario
)
1063 total
= sum([y
for (x
, y
) in actions
])
1064 val
= random
.uniform(0, total
)
1065 for (action
, prob
) in actions
:
1071 def do_thrash(self
):
1073 _do_thrash() wrapper.
1077 except Exception as e
:
1078 # See _run exception comment for MDSThrasher
1079 self
.set_thrasher_exception(e
)
1080 self
.logger
.exception("exception:")
1081 # Allow successful completion so gevent doesn't see an exception.
1082 # The DaemonWatchdog will observe the error and tear down the test.
1085 def do_sighup(self
):
1087 Loops and sends signal.SIGHUP to a random live osd.
1089 Loop delay is controlled by the config value sighup_delay.
1091 delay
= float(self
.sighup_delay
)
1092 self
.log("starting do_sighup with a delay of {0}".format(delay
))
1093 while not self
.stopping
:
1094 osd
= random
.choice(self
.live_osds
)
1095 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
1099 def do_optrack_toggle(self
):
1101 Loops and toggle op tracking to all osds.
1103 Loop delay is controlled by the config value optrack_toggle_delay.
1105 delay
= float(self
.optrack_toggle_delay
)
1107 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
1108 while not self
.stopping
:
1109 if osd_state
== "true":
1114 self
.ceph_manager
.inject_args('osd', '*',
1115 'osd_enable_op_tracker',
1117 except CommandFailedError
:
1118 self
.log('Failed to tell all osds, ignoring')
1122 def do_dump_ops(self
):
1124 Loops and does op dumps on all osds
1126 self
.log("starting do_dump_ops")
1127 while not self
.stopping
:
1128 for osd
in self
.live_osds
:
1129 # Ignore errors because live_osds is in flux
1130 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
1131 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1132 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
1133 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1134 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
1135 check_status
=False, timeout
=30, stdout
=DEVNULL
)
1139 def do_noscrub_toggle(self
):
1141 Loops and toggle noscrub flags
1143 Loop delay is controlled by the config value noscrub_toggle_delay.
1145 delay
= float(self
.noscrub_toggle_delay
)
1146 scrub_state
= "none"
1147 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
1148 while not self
.stopping
:
1149 if scrub_state
== "none":
1150 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
1151 scrub_state
= "noscrub"
1152 elif scrub_state
== "noscrub":
1153 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
1154 scrub_state
= "both"
1155 elif scrub_state
== "both":
1156 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1157 scrub_state
= "nodeep-scrub"
1159 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1160 scrub_state
= "none"
1162 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
1163 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
1166 def _do_thrash(self
):
1168 Loop to select random actions to thrash ceph manager with.
1170 cleanint
= self
.config
.get("clean_interval", 60)
1171 scrubint
= self
.config
.get("scrub_interval", -1)
1172 maxdead
= self
.config
.get("max_dead", 0)
1173 delay
= self
.config
.get("op_delay", 5)
1174 self
.rerrosd
= self
.live_osds
[0]
1175 if self
.random_eio
> 0:
1176 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1177 'filestore_debug_random_read_err',
1179 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1180 'bluestore_debug_random_read_err',
1182 self
.log("starting do_thrash")
1183 while not self
.stopping
:
1184 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
1185 "out_osds: ", self
.out_osds
,
1186 "dead_osds: ", self
.dead_osds
,
1187 "live_osds: ", self
.live_osds
]]
1188 self
.log(" ".join(to_log
))
1189 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
1190 while len(self
.dead_osds
) > maxdead
:
1192 for osd
in self
.in_osds
:
1193 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1195 if random
.uniform(0, 1) < float(
1196 self
.config
.get('chance_test_map_discontinuity', 0)) \
1197 and len(self
.live_osds
) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1198 self
.test_map_discontinuity()
1200 self
.ceph_manager
.wait_for_recovery(
1201 timeout
=self
.config
.get('timeout')
1203 time
.sleep(self
.clean_wait
)
1205 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1206 self
.log('Scrubbing while thrashing being performed')
1207 Scrubber(self
.ceph_manager
, self
.config
)
1208 self
.choose_action()()
1211 if self
.random_eio
> 0:
1212 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1213 'filestore_debug_random_read_err', '0.0')
1214 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1215 'bluestore_debug_random_read_err', '0.0')
1216 for pool
in list(self
.pools_to_fix_pgp_num
):
1217 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1218 self
.fix_pgp_num(pool
)
1219 self
.pools_to_fix_pgp_num
.clear()
1220 for service
, opt
, saved_value
in self
.saved_options
:
1221 self
.ceph_manager
.inject_args(service
, '*', opt
, saved_value
)
1222 self
.saved_options
= []
1226 class ObjectStoreTool
:
1228 def __init__(self
, manager
, pool
, **kwargs
):
1229 self
.manager
= manager
1231 self
.osd
= kwargs
.get('osd', None)
1232 self
.object_name
= kwargs
.get('object_name', None)
1233 self
.do_revive
= kwargs
.get('do_revive', True)
1234 if self
.osd
and self
.pool
and self
.object_name
:
1235 if self
.osd
== "primary":
1236 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1239 if self
.object_name
:
1240 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1243 self
.remote
= next(iter(self
.manager
.ctx
.\
1244 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()))
1245 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1246 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1249 def build_cmd(self
, options
, args
, stdin
):
1251 if self
.object_name
:
1252 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1253 "{paths} --pgid {pgid} --op list |"
1254 "grep '\"oid\":\"{name}\"')".
1255 format(paths
=self
.paths
,
1257 name
=self
.object_name
))
1258 args
= '"$object" ' + args
1259 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1260 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1261 format(paths
=self
.paths
,
1265 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1266 format(payload
=base64
.encode(stdin
),
1269 return "\n".join(lines
)
1271 def run(self
, options
, args
, stdin
=None, stdout
=None):
1274 self
.manager
.kill_osd(self
.osd
)
1275 cmd
= self
.build_cmd(options
, args
, stdin
)
1276 self
.manager
.log(cmd
)
1278 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1283 if proc
.exitstatus
!= 0:
1284 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1285 error
= six
.ensure_str(proc
.stdout
.getvalue()) + " " + \
1286 six
.ensure_str(proc
.stderr
.getvalue())
1287 raise Exception(error
)
1290 self
.manager
.revive_osd(self
.osd
)
1291 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1294 # XXX: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1298 Ceph manager object.
1299 Contains several local functions that form a bulk of this module.
1301 :param controller: the remote machine where the Ceph commands should be
1303 :param ctx: the cluster context
1304 :param config: path to Ceph config file
1305 :param logger: for logging messages
1306 :param cluster: name of the Ceph cluster
1309 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1310 cluster
='ceph', cephadm
=False):
1311 self
.lock
= threading
.RLock()
1313 self
.config
= config
1314 self
.controller
= controller
1315 self
.next_pool_id
= 0
1316 self
.cluster
= cluster
1317 self
.cephadm
= cephadm
1319 self
.log
= lambda x
: logger
.info(x
)
1323 implement log behavior.
1327 if self
.config
is None:
1328 self
.config
= dict()
1329 pools
= self
.list_pools()
1332 # we may race with a pool deletion; ignore failures here
1334 self
.pools
[pool
] = self
.get_pool_int_property(pool
, 'pg_num')
1335 except CommandFailedError
:
1336 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1338 def raw_cluster_cmd(self
, *args
):
1340 Start ceph on a raw cluster. Return count
1343 proc
= shell(self
.ctx
, self
.cluster
, self
.controller
,
1344 args
=['ceph'] + list(args
),
1347 testdir
= teuthology
.get_testdir(self
.ctx
)
1352 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1360 ceph_args
.extend(args
)
1361 proc
= self
.controller
.run(
1365 return six
.ensure_str(proc
.stdout
.getvalue())
1367 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1369 Start ceph on a cluster. Return success or failure information.
1372 proc
= shell(self
.ctx
, self
.cluster
, self
.controller
,
1373 args
=['ceph'] + list(args
),
1376 testdir
= teuthology
.get_testdir(self
.ctx
)
1381 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1388 ceph_args
.extend(args
)
1389 kwargs
['args'] = ceph_args
1390 kwargs
['check_status'] = False
1391 proc
= self
.controller
.run(**kwargs
)
1392 return proc
.exitstatus
1394 def run_ceph_w(self
, watch_channel
=None):
1396 Execute "ceph -w" in the background with stdout connected to a BytesIO,
1397 and return the RemoteProcess.
1399 :param watch_channel: Specifies the channel to be watched. This can be
1400 'cluster', 'audit', ...
1401 :type watch_channel: str
1410 if watch_channel
is not None:
1411 args
.append("--watch-channel")
1412 args
.append(watch_channel
)
1413 return self
.controller
.run(args
=args
, wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1415 def get_mon_socks(self
):
1417 Get monitor sockets.
1419 :return socks: tuple of strings; strings are individual sockets.
1421 from json
import loads
1423 output
= loads(self
.raw_cluster_cmd(['--format=json', 'mon', 'dump']))
1425 for mon
in output
['mons']:
1426 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1427 socks
.append(addrvec_mem
['addr'])
1430 def get_msgrv1_mon_socks(self
):
1432 Get monitor sockets that use msgrv1 to operate.
1434 :return socks: tuple of strings; strings are individual sockets.
1436 from json
import loads
1438 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1440 for mon
in output
['mons']:
1441 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1442 if addrvec_mem
['type'] == 'v1':
1443 socks
.append(addrvec_mem
['addr'])
1446 def get_msgrv2_mon_socks(self
):
1448 Get monitor sockets that use msgrv2 to operate.
1450 :return socks: tuple of strings; strings are individual sockets.
1452 from json
import loads
1454 output
= loads(self
.raw_cluster_cmd('--format=json', 'mon', 'dump'))
1456 for mon
in output
['mons']:
1457 for addrvec_mem
in mon
['public_addrs']['addrvec']:
1458 if addrvec_mem
['type'] == 'v2':
1459 socks
.append(addrvec_mem
['addr'])
1462 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1464 Flush pg stats from a list of OSD ids, ensuring they are reflected
1465 all the way to the monitor. Luminous and later only.
1467 :param osds: list of OSDs to flush
1468 :param no_wait: list of OSDs not to wait for seq id. by default, we
1469 wait for all specified osds, but some of them could be
1470 moved out of osdmap, so we cannot get their updated
1471 stat seq from monitor anymore. in that case, you need
1472 to pass a blacklist.
1473 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1474 it. (5 min by default)
1476 seq
= {osd
: int(self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats'))
1478 if not wait_for_mon
:
1482 for osd
, need
in seq
.items():
1486 while wait_for_mon
> 0:
1487 got
= int(self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
))
1488 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1489 need
=need
, got
=got
, osd
=osd
))
1494 wait_for_mon
-= A_WHILE
1496 raise Exception('timed out waiting for mon to be updated with '
1497 'osd.{osd}: {got} < {need}'.
1498 format(osd
=osd
, got
=got
, need
=need
))
1500 def flush_all_pg_stats(self
):
1501 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1503 def do_rados(self
, remote
, cmd
, check_status
=True):
1505 Execute a remote rados command.
1507 testdir
= teuthology
.get_testdir(self
.ctx
)
1511 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1520 check_status
=check_status
1524 def rados_write_objects(self
, pool
, num_objects
, size
,
1525 timelimit
, threads
, cleanup
=False):
1528 Threads not used yet.
1532 '--num-objects', num_objects
,
1538 args
.append('--no-cleanup')
1539 return self
.do_rados(self
.controller
, map(str, args
))
1541 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1543 Implement rados put operation
1546 if namespace
is not None:
1547 args
+= ['-N', namespace
]
1553 return self
.do_rados(
1559 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1561 Implement rados get operation
1564 if namespace
is not None:
1565 args
+= ['-N', namespace
]
1571 return self
.do_rados(
1577 def do_rm(self
, pool
, obj
, namespace
=None):
1579 Implement rados rm operation
1582 if namespace
is not None:
1583 args
+= ['-N', namespace
]
1588 return self
.do_rados(
1594 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1597 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1599 def find_remote(self
, service_type
, service_id
):
1601 Get the Remote for the host where a particular service runs.
1603 :param service_type: 'mds', 'osd', 'client'
1604 :param service_id: The second part of a role, e.g. '0' for
1606 :return: a Remote instance for the host where the
1607 requested role is placed
1609 return get_remote(self
.ctx
, self
.cluster
,
1610 service_type
, service_id
)
1612 def admin_socket(self
, service_type
, service_id
,
1613 command
, check_status
=True, timeout
=0, stdout
=None):
1615 Remotely start up ceph specifying the admin socket
1616 :param command: a list of words to use as the command
1622 remote
= self
.find_remote(service_type
, service_id
)
1626 self
.ctx
, self
.cluster
, remote
,
1628 'ceph', 'daemon', '%s.%s' % (service_type
, service_id
),
1632 check_status
=check_status
,
1635 testdir
= teuthology
.get_testdir(self
.ctx
)
1640 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1647 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1648 cluster
=self
.cluster
,
1652 args
.extend(command
)
1657 check_status
=check_status
1660 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1661 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1663 def get_pgid(self
, pool
, pgnum
):
1665 :param pool: pool name
1666 :param pgnum: pg number
1667 :returns: a string representing this pg.
1669 poolnum
= self
.get_pool_num(pool
)
1670 pg_str
= "{poolnum}.{pgnum}".format(
1675 def get_pg_replica(self
, pool
, pgnum
):
1677 get replica for pool, pgnum (e.g. (data, 0)->0
1679 pg_str
= self
.get_pgid(pool
, pgnum
)
1680 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1681 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1682 return int(j
['acting'][-1])
1685 def wait_for_pg_stats(func
):
1686 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1687 # by default, and take the faulty injection in ms into consideration,
1688 # 12 seconds are more than enough
1689 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1691 def wrapper(self
, *args
, **kwargs
):
1693 for delay
in delays
:
1695 return func(self
, *args
, **kwargs
)
1696 except AssertionError as e
:
1702 def get_pg_primary(self
, pool
, pgnum
):
1704 get primary for pool, pgnum (e.g. (data, 0)->0
1706 pg_str
= self
.get_pgid(pool
, pgnum
)
1707 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1708 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1709 return int(j
['acting'][0])
1712 def get_pool_num(self
, pool
):
1714 get number for pool (e.g., data -> 2)
1716 return int(self
.get_pool_dump(pool
)['pool'])
1718 def list_pools(self
):
1722 osd_dump
= self
.get_osd_dump_json()
1723 self
.log(osd_dump
['pools'])
1724 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1726 def clear_pools(self
):
1730 [self
.remove_pool(i
) for i
in self
.list_pools()]
1732 def kick_recovery_wq(self
, osdnum
):
1734 Run kick_recovery_wq on cluster.
1736 return self
.raw_cluster_cmd(
1737 'tell', "osd.%d" % (int(osdnum
),),
1742 def wait_run_admin_socket(self
, service_type
,
1743 service_id
, args
=['version'], timeout
=75, stdout
=None):
1745 If osd_admin_socket call succeeds, return. Otherwise wait
1746 five seconds and try again.
1752 proc
= self
.admin_socket(service_type
, service_id
,
1753 args
, check_status
=False, stdout
=stdout
)
1754 if proc
.exitstatus
== 0:
1758 if (tries
* 5) > timeout
:
1759 raise Exception('timed out waiting for admin_socket '
1760 'to appear after {type}.{id} restart'.
1761 format(type=service_type
,
1763 self
.log("waiting on admin_socket for {type}-{id}, "
1764 "{command}".format(type=service_type
,
1769 def get_pool_dump(self
, pool
):
1771 get the osd dump part of a pool
1773 osd_dump
= self
.get_osd_dump_json()
1774 for i
in osd_dump
['pools']:
1775 if i
['pool_name'] == pool
:
1779 def get_config(self
, service_type
, service_id
, name
):
1781 :param node: like 'mon.a'
1782 :param name: the option name
1784 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1786 j
= json
.loads(proc
.stdout
.getvalue())
1789 def inject_args(self
, service_type
, service_id
, name
, value
):
1790 whom
= '{0}.{1}'.format(service_type
, service_id
)
1791 if isinstance(value
, bool):
1792 value
= 'true' if value
else 'false'
1793 opt_arg
= '--{name}={value}'.format(name
=name
, value
=value
)
1794 self
.raw_cluster_cmd('--', 'tell', whom
, 'injectargs', opt_arg
)
1796 def set_config(self
, osdnum
, **argdict
):
1798 :param osdnum: osd number
1799 :param argdict: dictionary containing values to set.
1801 for k
, v
in argdict
.items():
1802 self
.wait_run_admin_socket(
1804 ['config', 'set', str(k
), str(v
)])
1806 def raw_cluster_status(self
):
1808 Get status from cluster
1810 status
= self
.raw_cluster_cmd('status', '--format=json')
1811 return json
.loads(status
)
1813 def raw_osd_status(self
):
1815 Get osd status from cluster
1817 return self
.raw_cluster_cmd('osd', 'dump')
1819 def get_osd_status(self
):
1821 Get osd statuses sorted by states that the osds are in.
1823 osd_lines
= list(filter(
1824 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1825 self
.raw_osd_status().split('\n')))
1827 in_osds
= [int(i
[4:].split()[0])
1828 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1829 out_osds
= [int(i
[4:].split()[0])
1830 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1831 up_osds
= [int(i
[4:].split()[0])
1832 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1833 down_osds
= [int(i
[4:].split()[0])
1834 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1835 dead_osds
= [int(x
.id_
)
1836 for x
in filter(lambda x
:
1839 iter_daemons_of_role('osd', self
.cluster
))]
1840 live_osds
= [int(x
.id_
) for x
in
1843 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1845 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1846 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1849 def get_num_pgs(self
):
1851 Check cluster status for the number of pgs
1853 status
= self
.raw_cluster_status()
1855 return status
['pgmap']['num_pgs']
1857 def create_erasure_code_profile(self
, profile_name
, profile
):
1859 Create an erasure code profile name that can be used as a parameter
1860 when creating an erasure coded pool.
1863 args
= cmd_erasure_code_profile(profile_name
, profile
)
1864 self
.raw_cluster_cmd(*args
)
1866 def create_pool_with_unique_name(self
, pg_num
=16,
1867 erasure_code_profile_name
=None,
1869 erasure_code_use_overwrites
=False):
1871 Create a pool named unique_pool_X where X is unique.
1875 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1876 self
.next_pool_id
+= 1
1880 erasure_code_profile_name
=erasure_code_profile_name
,
1882 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1885 @contextlib.contextmanager
1886 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1887 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1889 self
.remove_pool(pool_name
)
1891 def create_pool(self
, pool_name
, pg_num
=16,
1892 erasure_code_profile_name
=None,
1894 erasure_code_use_overwrites
=False):
1896 Create a pool named from the pool_name parameter.
1897 :param pool_name: name of the pool being created.
1898 :param pg_num: initial number of pgs.
1899 :param erasure_code_profile_name: if set and !None create an
1900 erasure coded pool using the profile
1901 :param erasure_code_use_overwrites: if true, allow overwrites
1904 assert isinstance(pool_name
, six
.string_types
)
1905 assert isinstance(pg_num
, int)
1906 assert pool_name
not in self
.pools
1907 self
.log("creating pool_name %s" % (pool_name
,))
1908 if erasure_code_profile_name
:
1909 self
.raw_cluster_cmd('osd', 'pool', 'create',
1910 pool_name
, str(pg_num
), str(pg_num
),
1911 'erasure', erasure_code_profile_name
)
1913 self
.raw_cluster_cmd('osd', 'pool', 'create',
1914 pool_name
, str(pg_num
))
1915 if min_size
is not None:
1916 self
.raw_cluster_cmd(
1917 'osd', 'pool', 'set', pool_name
,
1920 if erasure_code_use_overwrites
:
1921 self
.raw_cluster_cmd(
1922 'osd', 'pool', 'set', pool_name
,
1923 'allow_ec_overwrites',
1925 self
.raw_cluster_cmd(
1926 'osd', 'pool', 'application', 'enable',
1927 pool_name
, 'rados', '--yes-i-really-mean-it',
1928 run
.Raw('||'), 'true')
1929 self
.pools
[pool_name
] = pg_num
1932 def add_pool_snap(self
, pool_name
, snap_name
):
1935 :param pool_name: name of pool to snapshot
1936 :param snap_name: name of snapshot to take
1938 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1939 str(pool_name
), str(snap_name
))
1941 def remove_pool_snap(self
, pool_name
, snap_name
):
1943 Remove pool snapshot
1944 :param pool_name: name of pool to snapshot
1945 :param snap_name: name of snapshot to remove
1947 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1948 str(pool_name
), str(snap_name
))
1950 def remove_pool(self
, pool_name
):
1952 Remove the indicated pool
1953 :param pool_name: Pool to be removed
1956 assert isinstance(pool_name
, six
.string_types
)
1957 assert pool_name
in self
.pools
1958 self
.log("removing pool_name %s" % (pool_name
,))
1959 del self
.pools
[pool_name
]
1960 self
.raw_cluster_cmd('osd', 'pool', 'rm', pool_name
, pool_name
,
1961 "--yes-i-really-really-mean-it")
1969 return random
.sample(self
.pools
.keys(), 1)[0]
1971 def get_pool_pg_num(self
, pool_name
):
1973 Return the number of pgs in the pool specified.
1976 assert isinstance(pool_name
, six
.string_types
)
1977 if pool_name
in self
.pools
:
1978 return self
.pools
[pool_name
]
1981 def get_pool_property(self
, pool_name
, prop
):
1983 :param pool_name: pool
1984 :param prop: property to be checked.
1985 :returns: property as string
1988 assert isinstance(pool_name
, six
.string_types
)
1989 assert isinstance(prop
, six
.string_types
)
1990 output
= self
.raw_cluster_cmd(
1996 return output
.split()[1]
1998 def get_pool_int_property(self
, pool_name
, prop
):
1999 return int(self
.get_pool_property(pool_name
, prop
))
2001 def set_pool_property(self
, pool_name
, prop
, val
):
2003 :param pool_name: pool
2004 :param prop: property to be set.
2005 :param val: value to set.
2007 This routine retries if set operation fails.
2010 assert isinstance(pool_name
, six
.string_types
)
2011 assert isinstance(prop
, six
.string_types
)
2012 assert isinstance(val
, int)
2015 r
= self
.raw_cluster_cmd_result(
2022 if r
!= 11: # EAGAIN
2026 raise Exception('timed out getting EAGAIN '
2027 'when setting pool property %s %s = %s' %
2028 (pool_name
, prop
, val
))
2029 self
.log('got EAGAIN setting pool property, '
2030 'waiting a few seconds...')
2033 def expand_pool(self
, pool_name
, by
, max_pgs
):
2035 Increase the number of pgs in a pool
2038 assert isinstance(pool_name
, six
.string_types
)
2039 assert isinstance(by
, int)
2040 assert pool_name
in self
.pools
2041 if self
.get_num_creating() > 0:
2043 if (self
.pools
[pool_name
] + by
) > max_pgs
:
2045 self
.log("increase pool size by %d" % (by
,))
2046 new_pg_num
= self
.pools
[pool_name
] + by
2047 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2048 self
.pools
[pool_name
] = new_pg_num
2051 def contract_pool(self
, pool_name
, by
, min_pgs
):
2053 Decrease the number of pgs in a pool
2056 self
.log('contract_pool %s by %s min %s' % (
2057 pool_name
, str(by
), str(min_pgs
)))
2058 assert isinstance(pool_name
, six
.string_types
)
2059 assert isinstance(by
, int)
2060 assert pool_name
in self
.pools
2061 if self
.get_num_creating() > 0:
2062 self
.log('too many creating')
2064 proj
= self
.pools
[pool_name
] - by
2066 self
.log('would drop below min_pgs, proj %d, currently %d' % (proj
,self
.pools
[pool_name
],))
2068 self
.log("decrease pool size by %d" % (by
,))
2069 new_pg_num
= self
.pools
[pool_name
] - by
2070 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
2071 self
.pools
[pool_name
] = new_pg_num
2074 def stop_pg_num_changes(self
):
2076 Reset all pg_num_targets back to pg_num, canceling splits and merges
2078 self
.log('Canceling any pending splits or merges...')
2079 osd_dump
= self
.get_osd_dump_json()
2081 for pool
in osd_dump
['pools']:
2082 if pool
['pg_num'] != pool
['pg_num_target']:
2083 self
.log('Setting pool %s (%d) pg_num %d -> %d' %
2084 (pool
['pool_name'], pool
['pool'],
2085 pool
['pg_num_target'],
2087 self
.raw_cluster_cmd('osd', 'pool', 'set', pool
['pool_name'],
2088 'pg_num', str(pool
['pg_num']))
2090 # we don't support pg_num_target before nautilus
2093 def set_pool_pgpnum(self
, pool_name
, force
):
2095 Set pgpnum property of pool_name pool.
2098 assert isinstance(pool_name
, six
.string_types
)
2099 assert pool_name
in self
.pools
2100 if not force
and self
.get_num_creating() > 0:
2102 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
2105 def list_pg_unfound(self
, pgid
):
2107 return list of unfound pgs with the id specified
2112 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_unfound',
2118 r
['objects'].extend(j
['objects'])
2123 offset
= j
['objects'][-1]['oid']
2128 def get_pg_stats(self
):
2130 Dump the cluster and get pg stats
2132 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
2133 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
2135 return j
['pg_map']['pg_stats']
2137 return j
['pg_stats']
2139 def get_pgids_to_force(self
, backfill
):
2141 Return the randomized list of PGs that can have their recovery/backfill forced
2143 j
= self
.get_pg_stats();
2146 wanted
= ['degraded', 'backfilling', 'backfill_wait']
2148 wanted
= ['recovering', 'degraded', 'recovery_wait']
2150 status
= pg
['state'].split('+')
2152 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
2153 pgids
.append(pg
['pgid'])
2157 def get_pgids_to_cancel_force(self
, backfill
):
2159 Return the randomized list of PGs whose recovery/backfill priority is forced
2161 j
= self
.get_pg_stats();
2164 wanted
= 'forced_backfill'
2166 wanted
= 'forced_recovery'
2168 status
= pg
['state'].split('+')
2169 if wanted
in status
and random
.random() > 0.5:
2170 pgids
.append(pg
['pgid'])
2173 def compile_pg_status(self
):
2175 Return a histogram of pg state values
2178 j
= self
.get_pg_stats()
2180 for status
in pg
['state'].split('+'):
2181 if status
not in ret
:
2186 @wait_for_pg_stats # type: ignore
2187 def with_pg_state(self
, pool
, pgnum
, check
):
2188 pgstr
= self
.get_pgid(pool
, pgnum
)
2189 stats
= self
.get_single_pg_stats(pgstr
)
2190 assert(check(stats
['state']))
2192 @wait_for_pg_stats # type: ignore
2193 def with_pg(self
, pool
, pgnum
, check
):
2194 pgstr
= self
.get_pgid(pool
, pgnum
)
2195 stats
= self
.get_single_pg_stats(pgstr
)
2198 def get_last_scrub_stamp(self
, pool
, pgnum
):
2200 Get the timestamp of the last scrub.
2202 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
2203 return stats
["last_scrub_stamp"]
2205 def do_pg_scrub(self
, pool
, pgnum
, stype
):
2207 Scrub pg and wait for scrubbing to finish
2209 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
2210 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
2211 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
2214 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
2215 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
2216 self
.log("waiting for scrub type %s" % (stype
,))
2217 if (timer
% RESEND_TIMEOUT
) == 0:
2218 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
2219 # The first time in this loop is the actual request
2220 if timer
!= 0 and stype
== "repair":
2221 self
.log("WARNING: Resubmitted a non-idempotent repair")
2222 time
.sleep(SLEEP_TIME
)
2225 def wait_snap_trimming_complete(self
, pool
):
2227 Wait for snap trimming on pool to end
2232 poolnum
= self
.get_pool_num(pool
)
2233 poolnumstr
= "%s." % (poolnum
,)
2236 if (now
- start
) > FATAL_TIMEOUT
:
2237 assert (now
- start
) < FATAL_TIMEOUT
, \
2238 'failed to complete snap trimming before timeout'
2239 all_stats
= self
.get_pg_stats()
2241 for pg
in all_stats
:
2242 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
2243 self
.log("pg {pg} in trimming, state: {state}".format(
2249 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
2250 time
.sleep(POLL_PERIOD
)
2252 def get_single_pg_stats(self
, pgid
):
2254 Return pg for the pgid specified.
2256 all_stats
= self
.get_pg_stats()
2258 for pg
in all_stats
:
2259 if pg
['pgid'] == pgid
:
2264 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
2267 pool_dump
= self
.get_pool_dump(pool
)
2268 object_map
= self
.get_object_map(pool
, name
)
2269 if pool_dump
["type"] == PoolType
.ERASURE_CODED
:
2270 shard
= object_map
['acting'].index(osdid
)
2271 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
2274 return object_map
['pgid']
2276 def get_object_primary(self
, pool
, name
):
2279 object_map
= self
.get_object_map(pool
, name
)
2280 return object_map
['acting_primary']
2282 def get_object_map(self
, pool
, name
):
2284 osd map --format=json converted to a python object
2285 :returns: the python object
2287 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
2288 return json
.loads('\n'.join(out
.split('\n')[1:]))
2290 def get_osd_dump_json(self
):
2292 osd dump --format=json converted to a python object
2293 :returns: the python object
2295 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
2296 return json
.loads('\n'.join(out
.split('\n')[1:]))
2298 def get_osd_dump(self
):
2303 return self
.get_osd_dump_json()['osds']
2305 def get_osd_metadata(self
):
2307 osd metadata --format=json converted to a python object
2308 :returns: the python object containing osd metadata information
2310 out
= self
.raw_cluster_cmd('osd', 'metadata', '--format=json')
2311 return json
.loads('\n'.join(out
.split('\n')[1:]))
2313 def get_mgr_dump(self
):
2314 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2315 return json
.loads(out
)
2317 def get_stuck_pgs(self
, type_
, threshold
):
2319 :returns: stuck pg information from the cluster
2321 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2323 return json
.loads(out
).get('stuck_pg_stats',[])
2325 def get_num_unfound_objects(self
):
2327 Check cluster status to get the number of unfound objects
2329 status
= self
.raw_cluster_status()
2331 return status
['pgmap'].get('unfound_objects', 0)
2333 def get_num_creating(self
):
2335 Find the number of pgs in creating mode.
2337 pgs
= self
.get_pg_stats()
2340 if 'creating' in pg
['state']:
2344 def get_num_active_clean(self
):
2346 Find the number of active and clean pgs.
2348 pgs
= self
.get_pg_stats()
2349 return self
._get
_num
_active
_clean
(pgs
)
2351 def _get_num_active_clean(self
, pgs
):
2354 if (pg
['state'].count('active') and
2355 pg
['state'].count('clean') and
2356 not pg
['state'].count('stale')):
2360 def get_num_active_recovered(self
):
2362 Find the number of active and recovered pgs.
2364 pgs
= self
.get_pg_stats()
2365 return self
._get
_num
_active
_recovered
(pgs
)
2367 def _get_num_active_recovered(self
, pgs
):
2370 if (pg
['state'].count('active') and
2371 not pg
['state'].count('recover') and
2372 not pg
['state'].count('backfilling') and
2373 not pg
['state'].count('stale')):
2377 def get_is_making_recovery_progress(self
):
2379 Return whether there is recovery progress discernable in the
2382 status
= self
.raw_cluster_status()
2383 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2384 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2385 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2386 return kps
> 0 or bps
> 0 or ops
> 0
2388 def get_num_active(self
):
2390 Find the number of active pgs.
2392 pgs
= self
.get_pg_stats()
2393 return self
._get
_num
_active
(pgs
)
2395 def _get_num_active(self
, pgs
):
2398 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2402 def get_num_down(self
):
2404 Find the number of pgs that are down.
2406 pgs
= self
.get_pg_stats()
2409 if ((pg
['state'].count('down') and not
2410 pg
['state'].count('stale')) or
2411 (pg
['state'].count('incomplete') and not
2412 pg
['state'].count('stale'))):
2416 def get_num_active_down(self
):
2418 Find the number of pgs that are either active or down.
2420 pgs
= self
.get_pg_stats()
2421 return self
._get
_num
_active
_down
(pgs
)
2423 def _get_num_active_down(self
, pgs
):
2426 if ((pg
['state'].count('active') and not
2427 pg
['state'].count('stale')) or
2428 (pg
['state'].count('down') and not
2429 pg
['state'].count('stale')) or
2430 (pg
['state'].count('incomplete') and not
2431 pg
['state'].count('stale'))):
2435 def get_num_peered(self
):
2437 Find the number of PGs that are peered
2439 pgs
= self
.get_pg_stats()
2440 return self
._get
_num
_peered
(pgs
)
2442 def _get_num_peered(self
, pgs
):
2445 if pg
['state'].count('peered') and not pg
['state'].count('stale'):
2451 True if all pgs are clean
2453 pgs
= self
.get_pg_stats()
2454 return self
._get
_num
_active
_clean
(pgs
) == len(pgs
)
2456 def is_recovered(self
):
2458 True if all pgs have recovered
2460 pgs
= self
.get_pg_stats()
2461 return self
._get
_num
_active
_recovered
(pgs
) == len(pgs
)
2463 def is_active_or_down(self
):
2465 True if all pgs are active or down
2467 pgs
= self
.get_pg_stats()
2468 return self
._get
_num
_active
_down
(pgs
) == len(pgs
)
2470 def wait_for_clean(self
, timeout
=1200):
2472 Returns true when all pgs are clean.
2474 self
.log("waiting for clean")
2476 num_active_clean
= self
.get_num_active_clean()
2477 while not self
.is_clean():
2478 if timeout
is not None:
2479 if self
.get_is_making_recovery_progress():
2480 self
.log("making progress, resetting timeout")
2483 self
.log("no progress seen, keeping timeout for now")
2484 if time
.time() - start
>= timeout
:
2485 self
.log('dumping pgs')
2486 out
= self
.raw_cluster_cmd('pg', 'dump')
2488 assert time
.time() - start
< timeout
, \
2489 'failed to become clean before timeout expired'
2490 cur_active_clean
= self
.get_num_active_clean()
2491 if cur_active_clean
!= num_active_clean
:
2493 num_active_clean
= cur_active_clean
2497 def are_all_osds_up(self
):
2499 Returns true if all osds are up.
2501 x
= self
.get_osd_dump()
2502 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2504 def wait_for_all_osds_up(self
, timeout
=None):
2506 When this exits, either the timeout has expired, or all
2509 self
.log("waiting for all up")
2511 while not self
.are_all_osds_up():
2512 if timeout
is not None:
2513 assert time
.time() - start
< timeout
, \
2514 'timeout expired in wait_for_all_osds_up'
2518 def pool_exists(self
, pool
):
2519 if pool
in self
.list_pools():
2523 def wait_for_pool(self
, pool
, timeout
=300):
2525 Wait for a pool to exist
2527 self
.log('waiting for pool %s to exist' % pool
)
2529 while not self
.pool_exists(pool
):
2530 if timeout
is not None:
2531 assert time
.time() - start
< timeout
, \
2532 'timeout expired in wait_for_pool'
2535 def wait_for_pools(self
, pools
):
2537 self
.wait_for_pool(pool
)
2539 def is_mgr_available(self
):
2540 x
= self
.get_mgr_dump()
2541 return x
.get('available', False)
2543 def wait_for_mgr_available(self
, timeout
=None):
2544 self
.log("waiting for mgr available")
2546 while not self
.is_mgr_available():
2547 if timeout
is not None:
2548 assert time
.time() - start
< timeout
, \
2549 'timeout expired in wait_for_mgr_available'
2551 self
.log("mgr available!")
2553 def wait_for_recovery(self
, timeout
=None):
2555 Check peering. When this exists, we have recovered.
2557 self
.log("waiting for recovery to complete")
2559 num_active_recovered
= self
.get_num_active_recovered()
2560 while not self
.is_recovered():
2562 if timeout
is not None:
2563 if self
.get_is_making_recovery_progress():
2564 self
.log("making progress, resetting timeout")
2567 self
.log("no progress seen, keeping timeout for now")
2568 if now
- start
>= timeout
:
2569 if self
.is_recovered():
2571 self
.log('dumping pgs')
2572 out
= self
.raw_cluster_cmd('pg', 'dump')
2574 assert now
- start
< timeout
, \
2575 'failed to recover before timeout expired'
2576 cur_active_recovered
= self
.get_num_active_recovered()
2577 if cur_active_recovered
!= num_active_recovered
:
2579 num_active_recovered
= cur_active_recovered
2581 self
.log("recovered!")
2583 def wait_for_active(self
, timeout
=None):
2585 Check peering. When this exists, we are definitely active
2587 self
.log("waiting for peering to complete")
2589 num_active
= self
.get_num_active()
2590 while not self
.is_active():
2591 if timeout
is not None:
2592 if time
.time() - start
>= timeout
:
2593 self
.log('dumping pgs')
2594 out
= self
.raw_cluster_cmd('pg', 'dump')
2596 assert time
.time() - start
< timeout
, \
2597 'failed to recover before timeout expired'
2598 cur_active
= self
.get_num_active()
2599 if cur_active
!= num_active
:
2601 num_active
= cur_active
2605 def wait_for_active_or_down(self
, timeout
=None):
2607 Check peering. When this exists, we are definitely either
2610 self
.log("waiting for peering to complete or become blocked")
2612 num_active_down
= self
.get_num_active_down()
2613 while not self
.is_active_or_down():
2614 if timeout
is not None:
2615 if time
.time() - start
>= timeout
:
2616 self
.log('dumping pgs')
2617 out
= self
.raw_cluster_cmd('pg', 'dump')
2619 assert time
.time() - start
< timeout
, \
2620 'failed to recover before timeout expired'
2621 cur_active_down
= self
.get_num_active_down()
2622 if cur_active_down
!= num_active_down
:
2624 num_active_down
= cur_active_down
2626 self
.log("active or down!")
2628 def osd_is_up(self
, osd
):
2630 Wrapper for osd check
2632 osds
= self
.get_osd_dump()
2633 return osds
[osd
]['up'] > 0
2635 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2637 Loop waiting for osd.
2639 self
.log('waiting for osd.%d to be up' % osd
)
2641 while not self
.osd_is_up(osd
):
2642 if timeout
is not None:
2643 assert time
.time() - start
< timeout
, \
2644 'osd.%d failed to come up before timeout expired' % osd
2646 self
.log('osd.%d is up' % osd
)
2648 def is_active(self
):
2650 Wrapper to check if all pgs are active
2652 return self
.get_num_active() == self
.get_num_pgs()
2654 def all_active_or_peered(self
):
2656 Wrapper to check if all PGs are active or peered
2658 pgs
= self
.get_pg_stats()
2659 return self
._get
_num
_active
(pgs
) + self
._get
_num
_peered
(pgs
) == len(pgs
)
2661 def wait_till_active(self
, timeout
=None):
2663 Wait until all pgs are active.
2665 self
.log("waiting till active")
2667 while not self
.is_active():
2668 if timeout
is not None:
2669 if time
.time() - start
>= timeout
:
2670 self
.log('dumping pgs')
2671 out
= self
.raw_cluster_cmd('pg', 'dump')
2673 assert time
.time() - start
< timeout
, \
2674 'failed to become active before timeout expired'
2678 def wait_till_pg_convergence(self
, timeout
=None):
2681 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2682 if osd
['in'] and osd
['up']]
2684 # strictly speaking, no need to wait for mon. but due to the
2685 # "ms inject socket failures" setting, the osdmap could be delayed,
2686 # so mgr is likely to ignore the pg-stat messages with pgs serving
2687 # newly created pools which is not yet known by mgr. so, to make sure
2688 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2690 self
.flush_pg_stats(active_osds
)
2691 new_stats
= dict((stat
['pgid'], stat
['state'])
2692 for stat
in self
.get_pg_stats())
2693 if old_stats
== new_stats
:
2695 if timeout
is not None:
2696 assert time
.time() - start
< timeout
, \
2697 'failed to reach convergence before %d secs' % timeout
2698 old_stats
= new_stats
2699 # longer than mgr_stats_period
2702 def mark_out_osd(self
, osd
):
2704 Wrapper to mark osd out.
2706 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2708 def kill_osd(self
, osd
):
2710 Kill osds by either power cycling (if indicated by the config)
2713 if self
.config
.get('powercycle'):
2714 remote
= self
.find_remote('osd', osd
)
2715 self
.log('kill_osd on osd.{o} '
2716 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2717 self
._assert
_ipmi
(remote
)
2718 remote
.console
.power_off()
2719 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2720 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2723 'bdev-inject-crash', self
.config
.get('bdev_inject_crash'))
2725 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2729 raise RuntimeError('osd.%s did not fail' % osd
)
2731 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2733 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2736 def _assert_ipmi(remote
):
2737 assert remote
.console
.has_ipmi_credentials
, (
2738 "powercycling requested but RemoteConsole is not "
2739 "initialized. Check ipmi config.")
2741 def blackhole_kill_osd(self
, osd
):
2743 Stop osd if nothing else works.
2745 self
.inject_args('osd', osd
,
2746 'objectstore-blackhole', True)
2748 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2750 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
2752 Revive osds by either power cycling (if indicated by the config)
2755 if self
.config
.get('powercycle'):
2756 remote
= self
.find_remote('osd', osd
)
2757 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2758 format(o
=osd
, s
=remote
.name
))
2759 self
._assert
_ipmi
(remote
)
2760 remote
.console
.power_on()
2761 if not remote
.console
.check_status(300):
2762 raise Exception('Failed to revive osd.{o} via ipmi'.
2764 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2765 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2766 self
.make_admin_daemon_dir(remote
)
2767 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2768 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2770 if not skip_admin_check
:
2771 # wait for dump_ops_in_flight; this command doesn't appear
2772 # until after the signal handler is installed and it is safe
2773 # to stop the osd again without making valgrind leak checks
2774 # unhappy. see #5924.
2775 self
.wait_run_admin_socket('osd', osd
,
2776 args
=['dump_ops_in_flight'],
2777 timeout
=timeout
, stdout
=DEVNULL
)
2779 def mark_down_osd(self
, osd
):
2781 Cluster command wrapper
2783 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2785 def mark_in_osd(self
, osd
):
2787 Cluster command wrapper
2789 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2791 def signal_osd(self
, osd
, sig
, silent
=False):
2793 Wrapper to local get_daemon call which sends the given
2794 signal to the given osd.
2796 self
.ctx
.daemons
.get_daemon('osd', osd
,
2797 self
.cluster
).signal(sig
, silent
=silent
)
2800 def signal_mon(self
, mon
, sig
, silent
=False):
2802 Wrapper to local get_daemon call
2804 self
.ctx
.daemons
.get_daemon('mon', mon
,
2805 self
.cluster
).signal(sig
, silent
=silent
)
2807 def kill_mon(self
, mon
):
2809 Kill the monitor by either power cycling (if the config says so),
2812 if self
.config
.get('powercycle'):
2813 remote
= self
.find_remote('mon', mon
)
2814 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2815 format(m
=mon
, s
=remote
.name
))
2816 self
._assert
_ipmi
(remote
)
2817 remote
.console
.power_off()
2819 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2821 def revive_mon(self
, mon
):
2823 Restart by either power cycling (if the config says so),
2824 or by doing a normal restart.
2826 if self
.config
.get('powercycle'):
2827 remote
= self
.find_remote('mon', mon
)
2828 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2829 format(m
=mon
, s
=remote
.name
))
2830 self
._assert
_ipmi
(remote
)
2831 remote
.console
.power_on()
2832 self
.make_admin_daemon_dir(remote
)
2833 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2835 def revive_mgr(self
, mgr
):
2837 Restart by either power cycling (if the config says so),
2838 or by doing a normal restart.
2840 if self
.config
.get('powercycle'):
2841 remote
= self
.find_remote('mgr', mgr
)
2842 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2843 format(m
=mgr
, s
=remote
.name
))
2844 self
._assert
_ipmi
(remote
)
2845 remote
.console
.power_on()
2846 self
.make_admin_daemon_dir(remote
)
2847 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
2849 def get_mon_status(self
, mon
):
2851 Extract all the monitor status information from the cluster
2853 out
= self
.raw_cluster_cmd('tell', 'mon.%s' % mon
, 'mon_status')
2854 return json
.loads(out
)
2856 def get_mon_quorum(self
):
2858 Extract monitor quorum information from the cluster
2860 out
= self
.raw_cluster_cmd('quorum_status')
2862 self
.log('quorum_status is %s' % out
)
2865 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2867 Loop until quorum size is reached.
2869 self
.log('waiting for quorum size %d' % size
)
2871 while not len(self
.get_mon_quorum()) == size
:
2872 if timeout
is not None:
2873 assert time
.time() - start
< timeout
, \
2874 ('failed to reach quorum size %d '
2875 'before timeout expired' % size
)
2877 self
.log("quorum is size %d" % size
)
2879 def get_mon_health(self
, debug
=False):
2881 Extract all the monitor health information.
2883 out
= self
.raw_cluster_cmd('health', '--format=json')
2885 self
.log('health:\n{h}'.format(h
=out
))
2886 return json
.loads(out
)
2888 def wait_until_healthy(self
, timeout
=None):
2889 self
.log("wait_until_healthy")
2891 while self
.get_mon_health()['status'] != 'HEALTH_OK':
2892 if timeout
is not None:
2893 assert time
.time() - start
< timeout
, \
2894 'timeout expired in wait_until_healthy'
2896 self
.log("wait_until_healthy done")
2898 def get_filepath(self
):
2900 Return path to osd data with {id} needing to be replaced
2902 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2904 def make_admin_daemon_dir(self
, remote
):
2906 Create /var/run/ceph directory on remote site.
2909 :param remote: Remote site
2911 remote
.run(args
=['sudo',
2912 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2914 def get_service_task_status(self
, service
, status_key
):
2916 Return daemon task status for a given ceph service.
2918 :param service: ceph service (mds, osd, etc...)
2919 :param status_key: matching task status key
2922 status
= self
.raw_cluster_status()
2924 for k
,v
in status
['servicemap']['services'][service
]['daemons'].items():
2925 ts
= dict(v
).get('task_status', None)
2927 task_status
[k
] = ts
[status_key
]
2928 except KeyError: # catches missing service and status key
2930 self
.log(task_status
)
2933 def utility_task(name
):
2935 Generate ceph_manager subtask corresponding to ceph_manager
2938 def task(ctx
, config
):
2941 args
= config
.get('args', [])
2942 kwargs
= config
.get('kwargs', {})
2943 cluster
= config
.get('cluster', 'ceph')
2944 fn
= getattr(ctx
.managers
[cluster
], name
)
2948 revive_osd
= utility_task("revive_osd")
2949 revive_mon
= utility_task("revive_mon")
2950 kill_osd
= utility_task("kill_osd")
2951 kill_mon
= utility_task("kill_mon")
2952 create_pool
= utility_task("create_pool")
2953 remove_pool
= utility_task("remove_pool")
2954 wait_for_clean
= utility_task("wait_for_clean")
2955 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
2956 set_pool_property
= utility_task("set_pool_property")
2957 do_pg_scrub
= utility_task("do_pg_scrub")
2958 wait_for_pool
= utility_task("wait_for_pool")
2959 wait_for_pools
= utility_task("wait_for_pools")