2 ceph manager -- Thrasher and CephManager objects
4 from cStringIO
import StringIO
5 from functools
import wraps
17 from teuthology
import misc
as teuthology
18 from tasks
.scrub
import Scrubber
19 from util
.rados
import cmd_erasure_code_profile
20 from util
import get_remote
21 from teuthology
.contextutil
import safe_while
22 from teuthology
.orchestra
.remote
import Remote
23 from teuthology
.orchestra
import run
24 from teuthology
.exceptions
import CommandFailedError
27 from subprocess
import DEVNULL
# py3k
29 DEVNULL
= open(os
.devnull
, 'r+')
31 DEFAULT_CONF_PATH
= '/etc/ceph/ceph.conf'
33 log
= logging
.getLogger(__name__
)
36 def write_conf(ctx
, conf_path
=DEFAULT_CONF_PATH
, cluster
='ceph'):
38 ctx
.ceph
[cluster
].conf
.write(conf_fp
)
40 writes
= ctx
.cluster
.run(
42 'sudo', 'mkdir', '-p', '/etc/ceph', run
.Raw('&&'),
43 'sudo', 'chmod', '0755', '/etc/ceph', run
.Raw('&&'),
46 ('import shutil, sys; '
47 'shutil.copyfileobj(sys.stdin, file(sys.argv[1], "wb"))'),
50 'sudo', 'chmod', '0644', conf_path
,
54 teuthology
.feed_many_stdins_and_close(conf_fp
, writes
)
58 def mount_osd_data(ctx
, remote
, cluster
, osd
):
63 :param remote: Remote site
64 :param cluster: name of ceph cluster
67 log
.debug('Mounting data for osd.{o} on {r}'.format(o
=osd
, r
=remote
))
68 role
= "{0}.osd.{1}".format(cluster
, osd
)
69 alt_role
= role
if cluster
!= 'ceph' else "osd.{0}".format(osd
)
70 if remote
in ctx
.disk_config
.remote_to_roles_to_dev
:
71 if alt_role
in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
73 if role
not in ctx
.disk_config
.remote_to_roles_to_dev
[remote
]:
75 dev
= ctx
.disk_config
.remote_to_roles_to_dev
[remote
][role
]
76 mount_options
= ctx
.disk_config
.\
77 remote_to_roles_to_dev_mount_options
[remote
][role
]
78 fstype
= ctx
.disk_config
.remote_to_roles_to_dev_fstype
[remote
][role
]
79 mnt
= os
.path
.join('/var/lib/ceph/osd', '{0}-{1}'.format(cluster
, osd
))
81 log
.info('Mounting osd.{o}: dev: {n}, cluster: {c}'
82 'mountpoint: {p}, type: {t}, options: {v}'.format(
83 o
=osd
, n
=remote
.name
, p
=mnt
, t
=fstype
, v
=mount_options
,
91 '-o', ','.join(mount_options
),
100 Object used to thrash Ceph
102 def __init__(self
, manager
, config
, logger
=None):
103 self
.ceph_manager
= manager
104 self
.cluster
= manager
.cluster
105 self
.ceph_manager
.wait_for_clean()
106 osd_status
= self
.ceph_manager
.get_osd_status()
107 self
.in_osds
= osd_status
['in']
108 self
.live_osds
= osd_status
['live']
109 self
.out_osds
= osd_status
['out']
110 self
.dead_osds
= osd_status
['dead']
111 self
.stopping
= False
114 self
.revive_timeout
= self
.config
.get("revive_timeout", 360)
115 self
.pools_to_fix_pgp_num
= set()
116 if self
.config
.get('powercycle'):
117 self
.revive_timeout
+= 120
118 self
.clean_wait
= self
.config
.get('clean_wait', 0)
119 self
.minin
= self
.config
.get("min_in", 4)
120 self
.chance_move_pg
= self
.config
.get('chance_move_pg', 1.0)
121 self
.sighup_delay
= self
.config
.get('sighup_delay')
122 self
.optrack_toggle_delay
= self
.config
.get('optrack_toggle_delay')
123 self
.dump_ops_enable
= self
.config
.get('dump_ops_enable')
124 self
.noscrub_toggle_delay
= self
.config
.get('noscrub_toggle_delay')
125 self
.chance_thrash_cluster_full
= self
.config
.get('chance_thrash_cluster_full', .05)
126 self
.chance_thrash_pg_upmap
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
127 self
.chance_thrash_pg_upmap_items
= self
.config
.get('chance_thrash_pg_upmap', 1.0)
128 self
.random_eio
= self
.config
.get('random_eio')
129 self
.chance_force_recovery
= self
.config
.get('chance_force_recovery', 0.3)
131 num_osds
= self
.in_osds
+ self
.out_osds
132 self
.max_pgs
= self
.config
.get("max_pgs_per_pool_osd", 1200) * len(num_osds
)
133 self
.min_pgs
= self
.config
.get("min_pgs_per_pool_osd", 1) * len(num_osds
)
134 if self
.logger
is not None:
135 self
.log
= lambda x
: self
.logger
.info(x
)
139 Implement log behavior
143 if self
.config
is None:
145 # prevent monitor from auto-marking things out while thrasher runs
146 # try both old and new tell syntax, in case we are testing old code
147 self
.saved_options
= []
148 # assuming that the default settings do not vary from one daemon to
150 first_mon
= teuthology
.get_first_mon(manager
.ctx
, self
.config
).split('.')
151 opts
= [('mon', 'mon_osd_down_out_interval', 0)]
152 for service
, opt
, new_value
in opts
:
153 old_value
= manager
.get_config(first_mon
[0],
156 self
.saved_options
.append((service
, opt
, old_value
))
157 manager
.inject_args(service
, '*', opt
, new_value
)
158 # initialize ceph_objectstore_tool property - must be done before
159 # do_thrash is spawned - http://tracker.ceph.com/issues/18799
160 if (self
.config
.get('powercycle') or
161 not self
.cmd_exists_on_osds("ceph-objectstore-tool") or
162 self
.config
.get('disable_objectstore_tool_tests', False)):
163 self
.ceph_objectstore_tool
= False
164 if self
.config
.get('powercycle'):
165 self
.log("Unable to test ceph-objectstore-tool, "
166 "powercycle testing")
168 self
.log("Unable to test ceph-objectstore-tool, "
169 "not available on all OSD nodes")
171 self
.ceph_objectstore_tool
= \
172 self
.config
.get('ceph_objectstore_tool', True)
174 self
.thread
= gevent
.spawn(self
.do_thrash
)
175 if self
.sighup_delay
:
176 self
.sighup_thread
= gevent
.spawn(self
.do_sighup
)
177 if self
.optrack_toggle_delay
:
178 self
.optrack_toggle_thread
= gevent
.spawn(self
.do_optrack_toggle
)
179 if self
.dump_ops_enable
== "true":
180 self
.dump_ops_thread
= gevent
.spawn(self
.do_dump_ops
)
181 if self
.noscrub_toggle_delay
:
182 self
.noscrub_toggle_thread
= gevent
.spawn(self
.do_noscrub_toggle
)
184 def cmd_exists_on_osds(self
, cmd
):
185 allremotes
= self
.ceph_manager
.ctx
.cluster
.only(\
186 teuthology
.is_type('osd', self
.cluster
)).remotes
.keys()
187 allremotes
= list(set(allremotes
))
188 for remote
in allremotes
:
189 proc
= remote
.run(args
=['type', cmd
], wait
=True,
190 check_status
=False, stdout
=StringIO(),
192 if proc
.exitstatus
!= 0:
196 def kill_osd(self
, osd
=None, mark_down
=False, mark_out
=False):
198 :param osd: Osd to be killed.
199 :mark_down: Mark down if true.
200 :mark_out: Mark out if true.
203 osd
= random
.choice(self
.live_osds
)
204 self
.log("Killing osd %s, live_osds are %s" % (str(osd
),
205 str(self
.live_osds
)))
206 self
.live_osds
.remove(osd
)
207 self
.dead_osds
.append(osd
)
208 self
.ceph_manager
.kill_osd(osd
)
210 self
.ceph_manager
.mark_down_osd(osd
)
211 if mark_out
and osd
in self
.in_osds
:
213 if self
.ceph_objectstore_tool
:
214 self
.log("Testing ceph-objectstore-tool on down osd")
215 remote
= self
.ceph_manager
.find_remote('osd', osd
)
216 FSPATH
= self
.ceph_manager
.get_filepath()
217 JPATH
= os
.path
.join(FSPATH
, "journal")
218 exp_osd
= imp_osd
= osd
219 exp_remote
= imp_remote
= remote
220 # If an older osd is available we'll move a pg from there
221 if (len(self
.dead_osds
) > 1 and
222 random
.random() < self
.chance_move_pg
):
223 exp_osd
= random
.choice(self
.dead_osds
[:-1])
224 exp_remote
= self
.ceph_manager
.find_remote('osd', exp_osd
)
225 if ('keyvaluestore_backend' in
226 self
.ceph_manager
.ctx
.ceph
[self
.cluster
].conf
['osd']):
227 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
228 "--data-path {fpath} --journal-path {jpath} "
229 "--type keyvaluestore "
231 "/var/log/ceph/objectstore_tool.\\$pid.log ".
232 format(fpath
=FSPATH
, jpath
=JPATH
))
234 prefix
= ("sudo adjust-ulimits ceph-objectstore-tool "
235 "--data-path {fpath} --journal-path {jpath} "
237 "/var/log/ceph/objectstore_tool.\\$pid.log ".
238 format(fpath
=FSPATH
, jpath
=JPATH
))
239 cmd
= (prefix
+ "--op list-pgs").format(id=exp_osd
)
241 # ceph-objectstore-tool might be temporarily absent during an
242 # upgrade - see http://tracker.ceph.com/issues/18014
243 with
safe_while(sleep
=15, tries
=40, action
="type ceph-objectstore-tool") as proceed
:
245 proc
= exp_remote
.run(args
=['type', 'ceph-objectstore-tool'],
246 wait
=True, check_status
=False, stdout
=StringIO(),
248 if proc
.exitstatus
== 0:
250 log
.debug("ceph-objectstore-tool binary not present, trying again")
252 # ceph-objectstore-tool might bogusly fail with "OSD has the store locked"
253 # see http://tracker.ceph.com/issues/19556
254 with
safe_while(sleep
=15, tries
=40, action
="ceph-objectstore-tool --op list-pgs") as proceed
:
256 proc
= exp_remote
.run(args
=cmd
, wait
=True,
258 stdout
=StringIO(), stderr
=StringIO())
259 if proc
.exitstatus
== 0:
261 elif proc
.exitstatus
== 1 and proc
.stderr
== "OSD has the store locked":
264 raise Exception("ceph-objectstore-tool: "
265 "exp list-pgs failure with status {ret}".
266 format(ret
=proc
.exitstatus
))
268 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
270 self
.log("No PGs found for osd.{osd}".format(osd
=exp_osd
))
272 pg
= random
.choice(pgs
)
273 exp_path
= teuthology
.get_testdir(self
.ceph_manager
.ctx
)
274 exp_path
= os
.path
.join(exp_path
, '{0}.data'.format(self
.cluster
))
275 exp_path
= os
.path
.join(exp_path
,
276 "exp.{pg}.{id}".format(
280 # Can't use new export-remove op since this is part of upgrade testing
281 cmd
= prefix
+ "--op export --pgid {pg} --file {file}"
282 cmd
= cmd
.format(id=exp_osd
, pg
=pg
, file=exp_path
)
283 proc
= exp_remote
.run(args
=cmd
)
285 raise Exception("ceph-objectstore-tool: "
286 "export failure with status {ret}".
287 format(ret
=proc
.exitstatus
))
289 cmd
= prefix
+ "--force --op remove --pgid {pg}"
290 cmd
= cmd
.format(id=exp_osd
, pg
=pg
)
291 proc
= exp_remote
.run(args
=cmd
)
293 raise Exception("ceph-objectstore-tool: "
294 "remove failure with status {ret}".
295 format(ret
=proc
.exitstatus
))
296 # If there are at least 2 dead osds we might move the pg
297 if exp_osd
!= imp_osd
:
298 # If pg isn't already on this osd, then we will move it there
299 cmd
= (prefix
+ "--op list-pgs").format(id=imp_osd
)
300 proc
= imp_remote
.run(args
=cmd
, wait
=True,
301 check_status
=False, stdout
=StringIO())
303 raise Exception("ceph-objectstore-tool: "
304 "imp list-pgs failure with status {ret}".
305 format(ret
=proc
.exitstatus
))
306 pgs
= proc
.stdout
.getvalue().split('\n')[:-1]
308 self
.log("Moving pg {pg} from osd.{fosd} to osd.{tosd}".
309 format(pg
=pg
, fosd
=exp_osd
, tosd
=imp_osd
))
310 if imp_remote
!= exp_remote
:
311 # Copy export file to the other machine
312 self
.log("Transfer export file from {srem} to {trem}".
313 format(srem
=exp_remote
, trem
=imp_remote
))
314 tmpexport
= Remote
.get_file(exp_remote
, exp_path
)
315 Remote
.put_file(imp_remote
, tmpexport
, exp_path
)
318 # Can't move the pg after all
320 imp_remote
= exp_remote
322 cmd
= (prefix
+ "--op import --file {file}")
323 cmd
= cmd
.format(id=imp_osd
, file=exp_path
)
324 proc
= imp_remote
.run(args
=cmd
, wait
=True, check_status
=False,
326 if proc
.exitstatus
== 1:
327 bogosity
= "The OSD you are using is older than the exported PG"
328 if bogosity
in proc
.stderr
.getvalue():
329 self
.log("OSD older than exported PG"
331 elif proc
.exitstatus
== 10:
332 self
.log("Pool went away before processing an import"
334 elif proc
.exitstatus
== 11:
335 self
.log("Attempt to import an incompatible export"
337 elif proc
.exitstatus
== 12:
338 # this should be safe to ignore because we only ever move 1
339 # copy of the pg at a time, and merge is only initiated when
340 # all replicas are peered and happy. /me crosses fingers
341 self
.log("PG merged on target"
343 elif proc
.exitstatus
:
344 raise Exception("ceph-objectstore-tool: "
345 "import failure with status {ret}".
346 format(ret
=proc
.exitstatus
))
347 cmd
= "rm -f {file}".format(file=exp_path
)
348 exp_remote
.run(args
=cmd
)
349 if imp_remote
!= exp_remote
:
350 imp_remote
.run(args
=cmd
)
352 # apply low split settings to each pool
353 for pool
in self
.ceph_manager
.list_pools():
354 no_sudo_prefix
= prefix
[5:]
355 cmd
= ("CEPH_ARGS='--filestore-merge-threshold 1 "
356 "--filestore-split-multiple 1' sudo -E "
357 + no_sudo_prefix
+ "--op apply-layout-settings --pool " + pool
).format(id=osd
)
358 proc
= remote
.run(args
=cmd
, wait
=True, check_status
=False, stderr
=StringIO())
359 output
= proc
.stderr
.getvalue()
360 if 'Couldn\'t find pool' in output
:
363 raise Exception("ceph-objectstore-tool apply-layout-settings"
364 " failed with {status}".format(status
=proc
.exitstatus
))
367 def blackhole_kill_osd(self
, osd
=None):
369 If all else fails, kill the osd.
370 :param osd: Osd to be killed.
373 osd
= random
.choice(self
.live_osds
)
374 self
.log("Blackholing and then killing osd %s, live_osds are %s" %
375 (str(osd
), str(self
.live_osds
)))
376 self
.live_osds
.remove(osd
)
377 self
.dead_osds
.append(osd
)
378 self
.ceph_manager
.blackhole_kill_osd(osd
)
380 def revive_osd(self
, osd
=None, skip_admin_check
=False):
383 :param osd: Osd to be revived.
386 osd
= random
.choice(self
.dead_osds
)
387 self
.log("Reviving osd %s" % (str(osd
),))
388 self
.ceph_manager
.revive_osd(
391 skip_admin_check
=skip_admin_check
)
392 self
.dead_osds
.remove(osd
)
393 self
.live_osds
.append(osd
)
394 if self
.random_eio
> 0 and osd
== self
.rerrosd
:
395 self
.ceph_manager
.set_config(self
.rerrosd
,
396 filestore_debug_random_read_err
= self
.random_eio
)
397 self
.ceph_manager
.set_config(self
.rerrosd
,
398 bluestore_debug_random_read_err
= self
.random_eio
)
401 def out_osd(self
, osd
=None):
404 :param osd: Osd to be marked.
407 osd
= random
.choice(self
.in_osds
)
408 self
.log("Removing osd %s, in_osds are: %s" %
409 (str(osd
), str(self
.in_osds
)))
410 self
.ceph_manager
.mark_out_osd(osd
)
411 self
.in_osds
.remove(osd
)
412 self
.out_osds
.append(osd
)
414 def in_osd(self
, osd
=None):
417 :param osd: Osd to be marked.
420 osd
= random
.choice(self
.out_osds
)
421 if osd
in self
.dead_osds
:
422 return self
.revive_osd(osd
)
423 self
.log("Adding osd %s" % (str(osd
),))
424 self
.out_osds
.remove(osd
)
425 self
.in_osds
.append(osd
)
426 self
.ceph_manager
.mark_in_osd(osd
)
427 self
.log("Added osd %s" % (str(osd
),))
429 def reweight_osd_or_by_util(self
, osd
=None):
431 Reweight an osd that is in
432 :param osd: Osd to be marked.
434 if osd
is not None or random
.choice([True, False]):
436 osd
= random
.choice(self
.in_osds
)
437 val
= random
.uniform(.1, 1.0)
438 self
.log("Reweighting osd %s to %s" % (str(osd
), str(val
)))
439 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
442 # do it several times, the option space is large
445 'max_change': random
.choice(['0.05', '1.0', '3.0']),
446 'overage': random
.choice(['110', '1000']),
447 'type': random
.choice([
448 'reweight-by-utilization',
449 'test-reweight-by-utilization']),
451 self
.log("Reweighting by: %s"%(str(options
),))
452 self
.ceph_manager
.raw_cluster_cmd(
456 options
['max_change'])
458 def primary_affinity(self
, osd
=None):
460 osd
= random
.choice(self
.in_osds
)
461 if random
.random() >= .5:
463 elif random
.random() >= .5:
467 self
.log('Setting osd %s primary_affinity to %f' % (str(osd
), pa
))
468 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
471 def thrash_cluster_full(self
):
473 Set and unset cluster full condition
475 self
.log('Setting full ratio to .001')
476 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.001')
478 self
.log('Setting full ratio back to .95')
479 self
.ceph_manager
.raw_cluster_cmd('osd', 'set-full-ratio', '.95')
481 def thrash_pg_upmap(self
):
483 Install or remove random pg_upmap entries in OSDMap
485 from random
import shuffle
486 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
488 self
.log('j is %s' % j
)
490 if random
.random() >= .3:
491 pgs
= self
.ceph_manager
.get_pg_stats()
492 pg
= random
.choice(pgs
)
493 pgid
= str(pg
['pgid'])
494 poolid
= int(pgid
.split('.')[0])
495 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
499 osds
= self
.in_osds
+ self
.out_osds
502 self
.log('Setting %s to %s' % (pgid
, osds
))
503 cmd
= ['osd', 'pg-upmap', pgid
] + [str(x
) for x
in osds
]
504 self
.log('cmd %s' % cmd
)
505 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
511 self
.log('Clearing pg_upmap on %s' % pg
)
512 self
.ceph_manager
.raw_cluster_cmd(
517 self
.log('No pg_upmap entries; doing nothing')
518 except CommandFailedError
:
519 self
.log('Failed to rm-pg-upmap, ignoring')
521 def thrash_pg_upmap_items(self
):
523 Install or remove random pg_upmap_items entries in OSDMap
525 from random
import shuffle
526 out
= self
.ceph_manager
.raw_cluster_cmd('osd', 'dump', '-f', 'json-pretty')
528 self
.log('j is %s' % j
)
530 if random
.random() >= .3:
531 pgs
= self
.ceph_manager
.get_pg_stats()
532 pg
= random
.choice(pgs
)
533 pgid
= str(pg
['pgid'])
534 poolid
= int(pgid
.split('.')[0])
535 sizes
= [x
['size'] for x
in j
['pools'] if x
['pool'] == poolid
]
539 osds
= self
.in_osds
+ self
.out_osds
542 self
.log('Setting %s to %s' % (pgid
, osds
))
543 cmd
= ['osd', 'pg-upmap-items', pgid
] + [str(x
) for x
in osds
]
544 self
.log('cmd %s' % cmd
)
545 self
.ceph_manager
.raw_cluster_cmd(*cmd
)
547 m
= j
['pg_upmap_items']
551 self
.log('Clearing pg_upmap on %s' % pg
)
552 self
.ceph_manager
.raw_cluster_cmd(
557 self
.log('No pg_upmap entries; doing nothing')
558 except CommandFailedError
:
559 self
.log('Failed to rm-pg-upmap-items, ignoring')
561 def force_recovery(self
):
563 Force recovery on some of PGs
565 backfill
= random
.random() >= 0.5
566 j
= self
.ceph_manager
.get_pgids_to_force(backfill
)
570 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-backfill', *j
)
572 self
.ceph_manager
.raw_cluster_cmd('pg', 'force-recovery', *j
)
573 except CommandFailedError
:
574 self
.log('Failed to force backfill|recovery, ignoring')
577 def cancel_force_recovery(self
):
579 Force recovery on some of PGs
581 backfill
= random
.random() >= 0.5
582 j
= self
.ceph_manager
.get_pgids_to_cancel_force(backfill
)
586 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-backfill', *j
)
588 self
.ceph_manager
.raw_cluster_cmd('pg', 'cancel-force-recovery', *j
)
589 except CommandFailedError
:
590 self
.log('Failed to force backfill|recovery, ignoring')
592 def force_cancel_recovery(self
):
594 Force or cancel forcing recovery
596 if random
.random() >= 0.4:
597 self
.force_recovery()
599 self
.cancel_force_recovery()
603 Make sure all osds are up and not out.
605 while len(self
.dead_osds
) > 0:
606 self
.log("reviving osd")
608 while len(self
.out_osds
) > 0:
609 self
.log("inning osd")
614 Make sure all osds are up and fully in.
617 for osd
in self
.live_osds
:
618 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
620 self
.ceph_manager
.raw_cluster_cmd('osd', 'primary-affinity',
625 Break out of this Ceph loop
629 if self
.sighup_delay
:
630 self
.log("joining the do_sighup greenlet")
631 self
.sighup_thread
.get()
632 if self
.optrack_toggle_delay
:
633 self
.log("joining the do_optrack_toggle greenlet")
634 self
.optrack_toggle_thread
.join()
635 if self
.dump_ops_enable
== "true":
636 self
.log("joining the do_dump_ops greenlet")
637 self
.dump_ops_thread
.join()
638 if self
.noscrub_toggle_delay
:
639 self
.log("joining the do_noscrub_toggle greenlet")
640 self
.noscrub_toggle_thread
.join()
644 Increase the size of the pool
646 pool
= self
.ceph_manager
.get_pool()
647 orig_pg_num
= self
.ceph_manager
.get_pool_pg_num(pool
)
648 self
.log("Growing pool %s" % (pool
,))
649 if self
.ceph_manager
.expand_pool(pool
,
650 self
.config
.get('pool_grow_by', 10),
652 self
.pools_to_fix_pgp_num
.add(pool
)
654 def shrink_pool(self
):
656 Decrease the size of the pool
658 pool
= self
.ceph_manager
.get_pool()
659 orig_pg_num
= self
.ceph_manager
.get_pool_pg_num(pool
)
660 self
.log("Shrinking pool %s" % (pool
,))
661 if self
.ceph_manager
.contract_pool(
663 self
.config
.get('pool_shrink_by', 10),
665 self
.pools_to_fix_pgp_num
.add(pool
)
667 def fix_pgp_num(self
, pool
=None):
669 Fix number of pgs in pool.
672 pool
= self
.ceph_manager
.get_pool()
676 self
.log("fixing pg num pool %s" % (pool
,))
677 if self
.ceph_manager
.set_pool_pgpnum(pool
, force
):
678 self
.pools_to_fix_pgp_num
.discard(pool
)
680 def test_pool_min_size(self
):
682 Kill and revive all osds except one.
684 self
.log("test_pool_min_size")
686 self
.ceph_manager
.wait_for_recovery(
687 timeout
=self
.config
.get('timeout')
689 the_one
= random
.choice(self
.in_osds
)
690 self
.log("Killing everyone but %s", the_one
)
691 to_kill
= filter(lambda x
: x
!= the_one
, self
.in_osds
)
692 [self
.kill_osd(i
) for i
in to_kill
]
693 [self
.out_osd(i
) for i
in to_kill
]
694 time
.sleep(self
.config
.get("test_pool_min_size_time", 10))
695 self
.log("Killing %s" % (the_one
,))
696 self
.kill_osd(the_one
)
697 self
.out_osd(the_one
)
698 self
.log("Reviving everyone but %s" % (the_one
,))
699 [self
.revive_osd(i
) for i
in to_kill
]
700 [self
.in_osd(i
) for i
in to_kill
]
701 self
.log("Revived everyone but %s" % (the_one
,))
702 self
.log("Waiting for clean")
703 self
.ceph_manager
.wait_for_recovery(
704 timeout
=self
.config
.get('timeout')
707 def inject_pause(self
, conf_key
, duration
, check_after
, should_be_down
):
709 Pause injection testing. Check for osd being down when finished.
711 the_one
= random
.choice(self
.live_osds
)
712 self
.log("inject_pause on {osd}".format(osd
=the_one
))
714 "Testing {key} pause injection for duration {duration}".format(
719 "Checking after {after}, should_be_down={shouldbedown}".format(
721 shouldbedown
=should_be_down
723 self
.ceph_manager
.set_config(the_one
, **{conf_key
: duration
})
724 if not should_be_down
:
726 time
.sleep(check_after
)
727 status
= self
.ceph_manager
.get_osd_status()
728 assert the_one
in status
['down']
729 time
.sleep(duration
- check_after
+ 20)
730 status
= self
.ceph_manager
.get_osd_status()
731 assert not the_one
in status
['down']
733 def test_backfill_full(self
):
735 Test backfills stopping when the replica fills up.
737 First, use injectfull admin command to simulate a now full
738 osd by setting it to 0 on all of the OSDs.
740 Second, on a random subset, set
741 osd_debug_skip_full_check_in_backfill_reservation to force
742 the more complicated check in do_scan to be exercised.
744 Then, verify that all backfillings stop.
746 self
.log("injecting backfill full")
747 for i
in self
.live_osds
:
748 self
.ceph_manager
.set_config(
750 osd_debug_skip_full_check_in_backfill_reservation
=
751 random
.choice(['false', 'true']))
752 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'backfillfull'],
753 check_status
=True, timeout
=30, stdout
=DEVNULL
)
755 status
= self
.ceph_manager
.compile_pg_status()
756 if 'backfilling' not in status
.keys():
759 "waiting for {still_going} backfillings".format(
760 still_going
=status
.get('backfilling')))
762 assert('backfilling' not in self
.ceph_manager
.compile_pg_status().keys())
763 for i
in self
.live_osds
:
764 self
.ceph_manager
.set_config(
766 osd_debug_skip_full_check_in_backfill_reservation
='false')
767 self
.ceph_manager
.osd_admin_socket(i
, command
=['injectfull', 'none'],
768 check_status
=True, timeout
=30, stdout
=DEVNULL
)
770 def test_map_discontinuity(self
):
772 1) Allows the osds to recover
774 3) allows the remaining osds to recover
775 4) waits for some time
777 This sequence should cause the revived osd to have to handle
778 a map gap since the mons would have trimmed
780 while len(self
.in_osds
) < (self
.minin
+ 1):
782 self
.log("Waiting for recovery")
783 self
.ceph_manager
.wait_for_all_osds_up(
784 timeout
=self
.config
.get('timeout')
786 # now we wait 20s for the pg status to change, if it takes longer,
787 # the test *should* fail!
789 self
.ceph_manager
.wait_for_clean(
790 timeout
=self
.config
.get('timeout')
793 # now we wait 20s for the backfill replicas to hear about the clean
795 self
.log("Recovered, killing an osd")
796 self
.kill_osd(mark_down
=True, mark_out
=True)
797 self
.log("Waiting for clean again")
798 self
.ceph_manager
.wait_for_clean(
799 timeout
=self
.config
.get('timeout')
801 self
.log("Waiting for trim")
802 time
.sleep(int(self
.config
.get("map_discontinuity_sleep_time", 40)))
805 def choose_action(self
):
807 Random action selector.
809 chance_down
= self
.config
.get('chance_down', 0.4)
810 chance_test_min_size
= self
.config
.get('chance_test_min_size', 0)
811 chance_test_backfill_full
= \
812 self
.config
.get('chance_test_backfill_full', 0)
813 if isinstance(chance_down
, int):
814 chance_down
= float(chance_down
) / 100
816 minout
= self
.config
.get("min_out", 0)
817 minlive
= self
.config
.get("min_live", 2)
818 mindead
= self
.config
.get("min_dead", 0)
820 self
.log('choose_action: min_in %d min_out '
821 '%d min_live %d min_dead %d' %
822 (minin
, minout
, minlive
, mindead
))
824 if len(self
.in_osds
) > minin
:
825 actions
.append((self
.out_osd
, 1.0,))
826 if len(self
.live_osds
) > minlive
and chance_down
> 0:
827 actions
.append((self
.kill_osd
, chance_down
,))
828 if len(self
.out_osds
) > minout
:
829 actions
.append((self
.in_osd
, 1.7,))
830 if len(self
.dead_osds
) > mindead
:
831 actions
.append((self
.revive_osd
, 1.0,))
832 if self
.config
.get('thrash_primary_affinity', True):
833 actions
.append((self
.primary_affinity
, 1.0,))
834 actions
.append((self
.reweight_osd_or_by_util
,
835 self
.config
.get('reweight_osd', .5),))
836 actions
.append((self
.grow_pool
,
837 self
.config
.get('chance_pgnum_grow', 0),))
838 actions
.append((self
.shrink_pool
,
839 self
.config
.get('chance_pgnum_shrink', 0),))
840 actions
.append((self
.fix_pgp_num
,
841 self
.config
.get('chance_pgpnum_fix', 0),))
842 actions
.append((self
.test_pool_min_size
,
843 chance_test_min_size
,))
844 actions
.append((self
.test_backfill_full
,
845 chance_test_backfill_full
,))
846 if self
.chance_thrash_cluster_full
> 0:
847 actions
.append((self
.thrash_cluster_full
, self
.chance_thrash_cluster_full
,))
848 if self
.chance_thrash_pg_upmap
> 0:
849 actions
.append((self
.thrash_pg_upmap
, self
.chance_thrash_pg_upmap
,))
850 if self
.chance_thrash_pg_upmap_items
> 0:
851 actions
.append((self
.thrash_pg_upmap_items
, self
.chance_thrash_pg_upmap_items
,))
852 if self
.chance_force_recovery
> 0:
853 actions
.append((self
.force_cancel_recovery
, self
.chance_force_recovery
))
855 for key
in ['heartbeat_inject_failure', 'filestore_inject_stall']:
858 self
.inject_pause(key
,
859 self
.config
.get('pause_short', 3),
862 self
.config
.get('chance_inject_pause_short', 1),),
864 self
.inject_pause(key
,
865 self
.config
.get('pause_long', 80),
866 self
.config
.get('pause_check_after', 70),
868 self
.config
.get('chance_inject_pause_long', 0),)]:
869 actions
.append(scenario
)
871 total
= sum([y
for (x
, y
) in actions
])
872 val
= random
.uniform(0, total
)
873 for (action
, prob
) in actions
:
885 self
.log(traceback
.format_exc())
892 Loops and sends signal.SIGHUP to a random live osd.
894 Loop delay is controlled by the config value sighup_delay.
896 delay
= float(self
.sighup_delay
)
897 self
.log("starting do_sighup with a delay of {0}".format(delay
))
898 while not self
.stopping
:
899 osd
= random
.choice(self
.live_osds
)
900 self
.ceph_manager
.signal_osd(osd
, signal
.SIGHUP
, silent
=True)
904 def do_optrack_toggle(self
):
906 Loops and toggle op tracking to all osds.
908 Loop delay is controlled by the config value optrack_toggle_delay.
910 delay
= float(self
.optrack_toggle_delay
)
912 self
.log("starting do_optrack_toggle with a delay of {0}".format(delay
))
913 while not self
.stopping
:
914 if osd_state
== "true":
919 self
.ceph_manager
.inject_args('osd', '*',
920 'osd_enable_op_tracker',
922 except CommandFailedError
:
923 self
.log('Failed to tell all osds, ignoring')
927 def do_dump_ops(self
):
929 Loops and does op dumps on all osds
931 self
.log("starting do_dump_ops")
932 while not self
.stopping
:
933 for osd
in self
.live_osds
:
934 # Ignore errors because live_osds is in flux
935 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_ops_in_flight'],
936 check_status
=False, timeout
=30, stdout
=DEVNULL
)
937 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_blocked_ops'],
938 check_status
=False, timeout
=30, stdout
=DEVNULL
)
939 self
.ceph_manager
.osd_admin_socket(osd
, command
=['dump_historic_ops'],
940 check_status
=False, timeout
=30, stdout
=DEVNULL
)
944 def do_noscrub_toggle(self
):
946 Loops and toggle noscrub flags
948 Loop delay is controlled by the config value noscrub_toggle_delay.
950 delay
= float(self
.noscrub_toggle_delay
)
952 self
.log("starting do_noscrub_toggle with a delay of {0}".format(delay
))
953 while not self
.stopping
:
954 if scrub_state
== "none":
955 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'noscrub')
956 scrub_state
= "noscrub"
957 elif scrub_state
== "noscrub":
958 self
.ceph_manager
.raw_cluster_cmd('osd', 'set', 'nodeep-scrub')
960 elif scrub_state
== "both":
961 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
962 scrub_state
= "nodeep-scrub"
964 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
967 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'noscrub')
968 self
.ceph_manager
.raw_cluster_cmd('osd', 'unset', 'nodeep-scrub')
973 Loop to select random actions to thrash ceph manager with.
975 cleanint
= self
.config
.get("clean_interval", 60)
976 scrubint
= self
.config
.get("scrub_interval", -1)
977 maxdead
= self
.config
.get("max_dead", 0)
978 delay
= self
.config
.get("op_delay", 5)
979 self
.rerrosd
= self
.live_osds
[0]
980 if self
.random_eio
> 0:
981 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
982 'filestore_debug_random_read_err',
984 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
985 'bluestore_debug_random_read_err',
987 self
.log("starting do_thrash")
988 while not self
.stopping
:
989 to_log
= [str(x
) for x
in ["in_osds: ", self
.in_osds
,
990 "out_osds: ", self
.out_osds
,
991 "dead_osds: ", self
.dead_osds
,
992 "live_osds: ", self
.live_osds
]]
993 self
.log(" ".join(to_log
))
994 if random
.uniform(0, 1) < (float(delay
) / cleanint
):
995 while len(self
.dead_osds
) > maxdead
:
997 for osd
in self
.in_osds
:
998 self
.ceph_manager
.raw_cluster_cmd('osd', 'reweight',
1000 if random
.uniform(0, 1) < float(
1001 self
.config
.get('chance_test_map_discontinuity', 0)) \
1002 and len(self
.live_osds
) > 5: # avoid m=2,k=2 stall, w/ some buffer for crush being picky
1003 self
.test_map_discontinuity()
1005 self
.ceph_manager
.wait_for_recovery(
1006 timeout
=self
.config
.get('timeout')
1008 time
.sleep(self
.clean_wait
)
1010 if random
.uniform(0, 1) < (float(delay
) / scrubint
):
1011 self
.log('Scrubbing while thrashing being performed')
1012 Scrubber(self
.ceph_manager
, self
.config
)
1013 self
.choose_action()()
1016 if self
.random_eio
> 0:
1017 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1018 'filestore_debug_random_read_err', '0.0')
1019 self
.ceph_manager
.inject_args('osd', self
.rerrosd
,
1020 'bluestore_debug_random_read_err', '0.0')
1021 for pool
in list(self
.pools_to_fix_pgp_num
):
1022 if self
.ceph_manager
.get_pool_pg_num(pool
) > 0:
1023 self
.fix_pgp_num(pool
)
1024 self
.pools_to_fix_pgp_num
.clear()
1025 for service
, opt
, saved_value
in self
.saved_options
:
1026 self
.ceph_manager
.inject_args(service
, '*', opt
, saved_value
)
1027 self
.saved_options
= []
1031 class ObjectStoreTool
:
1033 def __init__(self
, manager
, pool
, **kwargs
):
1034 self
.manager
= manager
1036 self
.osd
= kwargs
.get('osd', None)
1037 self
.object_name
= kwargs
.get('object_name', None)
1038 self
.do_revive
= kwargs
.get('do_revive', True)
1039 if self
.osd
and self
.pool
and self
.object_name
:
1040 if self
.osd
== "primary":
1041 self
.osd
= self
.manager
.get_object_primary(self
.pool
,
1044 if self
.object_name
:
1045 self
.pgid
= self
.manager
.get_object_pg_with_shard(self
.pool
,
1048 self
.remote
= self
.manager
.ctx
.\
1049 cluster
.only('osd.{o}'.format(o
=self
.osd
)).remotes
.keys()[0]
1050 path
= self
.manager
.get_filepath().format(id=self
.osd
)
1051 self
.paths
= ("--data-path {path} --journal-path {path}/journal".
1054 def build_cmd(self
, options
, args
, stdin
):
1056 if self
.object_name
:
1057 lines
.append("object=$(sudo adjust-ulimits ceph-objectstore-tool "
1058 "{paths} --pgid {pgid} --op list |"
1059 "grep '\"oid\":\"{name}\"')".
1060 format(paths
=self
.paths
,
1062 name
=self
.object_name
))
1063 args
= '"$object" ' + args
1064 options
+= " --pgid {pgid}".format(pgid
=self
.pgid
)
1065 cmd
= ("sudo adjust-ulimits ceph-objectstore-tool {paths} {options} {args}".
1066 format(paths
=self
.paths
,
1070 cmd
= ("echo {payload} | base64 --decode | {cmd}".
1071 format(payload
=base64
.encode(stdin
),
1074 return "\n".join(lines
)
1076 def run(self
, options
, args
, stdin
=None, stdout
=None):
1079 self
.manager
.kill_osd(self
.osd
)
1080 cmd
= self
.build_cmd(options
, args
, stdin
)
1081 self
.manager
.log(cmd
)
1083 proc
= self
.remote
.run(args
=['bash', '-e', '-x', '-c', cmd
],
1088 if proc
.exitstatus
!= 0:
1089 self
.manager
.log("failed with " + str(proc
.exitstatus
))
1090 error
= proc
.stdout
.getvalue() + " " + proc
.stderr
.getvalue()
1091 raise Exception(error
)
1094 self
.manager
.revive_osd(self
.osd
)
1095 self
.manager
.wait_till_osd_is_up(self
.osd
, 300)
1100 Ceph manager object.
1101 Contains several local functions that form a bulk of this module.
1103 Note: this class has nothing to do with the Ceph daemon (ceph-mgr) of
1108 ERASURE_CODED_POOL
= 3
1110 def __init__(self
, controller
, ctx
=None, config
=None, logger
=None,
1112 self
.lock
= threading
.RLock()
1114 self
.config
= config
1115 self
.controller
= controller
1116 self
.next_pool_id
= 0
1117 self
.cluster
= cluster
1119 self
.log
= lambda x
: logger
.info(x
)
1123 implement log behavior.
1127 if self
.config
is None:
1128 self
.config
= dict()
1129 pools
= self
.list_pools()
1132 # we may race with a pool deletion; ignore failures here
1134 self
.pools
[pool
] = self
.get_pool_property(pool
, 'pg_num')
1135 except CommandFailedError
:
1136 self
.log('Failed to get pg_num from pool %s, ignoring' % pool
)
1138 def raw_cluster_cmd(self
, *args
):
1140 Start ceph on a raw cluster. Return count
1142 testdir
= teuthology
.get_testdir(self
.ctx
)
1147 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1154 ceph_args
.extend(args
)
1155 proc
= self
.controller
.run(
1159 return proc
.stdout
.getvalue()
1161 def raw_cluster_cmd_result(self
, *args
, **kwargs
):
1163 Start ceph on a cluster. Return success or failure information.
1165 testdir
= teuthology
.get_testdir(self
.ctx
)
1170 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1177 ceph_args
.extend(args
)
1178 kwargs
['args'] = ceph_args
1179 kwargs
['check_status'] = False
1180 proc
= self
.controller
.run(**kwargs
)
1181 return proc
.exitstatus
1183 def run_ceph_w(self
, watch_channel
=None):
1185 Execute "ceph -w" in the background with stdout connected to a StringIO,
1186 and return the RemoteProcess.
1188 :param watch_channel: Specifies the channel to be watched. This can be
1189 'cluster', 'audit', ...
1190 :type watch_channel: str
1199 if watch_channel
is not None:
1200 args
.append("--watch-channel")
1201 args
.append(watch_channel
)
1202 return self
.controller
.run(args
=args
, wait
=False, stdout
=StringIO(), stdin
=run
.PIPE
)
1204 def flush_pg_stats(self
, osds
, no_wait
=None, wait_for_mon
=300):
1206 Flush pg stats from a list of OSD ids, ensuring they are reflected
1207 all the way to the monitor. Luminous and later only.
1209 :param osds: list of OSDs to flush
1210 :param no_wait: list of OSDs not to wait for seq id. by default, we
1211 wait for all specified osds, but some of them could be
1212 moved out of osdmap, so we cannot get their updated
1213 stat seq from monitor anymore. in that case, you need
1214 to pass a blacklist.
1215 :param wait_for_mon: wait for mon to be synced with mgr. 0 to disable
1216 it. (5 min by default)
1218 seq
= {osd
: int(self
.raw_cluster_cmd('tell', 'osd.%d' % osd
, 'flush_pg_stats'))
1220 if not wait_for_mon
:
1224 for osd
, need
in seq
.iteritems():
1228 while wait_for_mon
> 0:
1229 got
= int(self
.raw_cluster_cmd('osd', 'last-stat-seq', 'osd.%d' % osd
))
1230 self
.log('need seq {need} got {got} for osd.{osd}'.format(
1231 need
=need
, got
=got
, osd
=osd
))
1236 wait_for_mon
-= A_WHILE
1238 raise Exception('timed out waiting for mon to be updated with '
1239 'osd.{osd}: {got} < {need}'.
1240 format(osd
=osd
, got
=got
, need
=need
))
1242 def flush_all_pg_stats(self
):
1243 self
.flush_pg_stats(range(len(self
.get_osd_dump())))
1245 def do_rados(self
, remote
, cmd
, check_status
=True):
1247 Execute a remote rados command.
1249 testdir
= teuthology
.get_testdir(self
.ctx
)
1253 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1262 check_status
=check_status
1266 def rados_write_objects(self
, pool
, num_objects
, size
,
1267 timelimit
, threads
, cleanup
=False):
1270 Threads not used yet.
1274 '--num-objects', num_objects
,
1280 args
.append('--no-cleanup')
1281 return self
.do_rados(self
.controller
, map(str, args
))
1283 def do_put(self
, pool
, obj
, fname
, namespace
=None):
1285 Implement rados put operation
1288 if namespace
is not None:
1289 args
+= ['-N', namespace
]
1295 return self
.do_rados(
1301 def do_get(self
, pool
, obj
, fname
='/dev/null', namespace
=None):
1303 Implement rados get operation
1306 if namespace
is not None:
1307 args
+= ['-N', namespace
]
1313 return self
.do_rados(
1319 def do_rm(self
, pool
, obj
, namespace
=None):
1321 Implement rados rm operation
1324 if namespace
is not None:
1325 args
+= ['-N', namespace
]
1330 return self
.do_rados(
1336 def osd_admin_socket(self
, osd_id
, command
, check_status
=True, timeout
=0, stdout
=None):
1339 return self
.admin_socket('osd', osd_id
, command
, check_status
, timeout
, stdout
)
1341 def find_remote(self
, service_type
, service_id
):
1343 Get the Remote for the host where a particular service runs.
1345 :param service_type: 'mds', 'osd', 'client'
1346 :param service_id: The second part of a role, e.g. '0' for
1348 :return: a Remote instance for the host where the
1349 requested role is placed
1351 return get_remote(self
.ctx
, self
.cluster
,
1352 service_type
, service_id
)
1354 def admin_socket(self
, service_type
, service_id
,
1355 command
, check_status
=True, timeout
=0, stdout
=None):
1357 Remotely start up ceph specifying the admin socket
1358 :param command: a list of words to use as the command
1363 testdir
= teuthology
.get_testdir(self
.ctx
)
1364 remote
= self
.find_remote(service_type
, service_id
)
1369 '{tdir}/archive/coverage'.format(tdir
=testdir
),
1376 '/var/run/ceph/{cluster}-{type}.{id}.asok'.format(
1377 cluster
=self
.cluster
,
1381 args
.extend(command
)
1386 check_status
=check_status
1389 def objectstore_tool(self
, pool
, options
, args
, **kwargs
):
1390 return ObjectStoreTool(self
, pool
, **kwargs
).run(options
, args
)
1392 def get_pgid(self
, pool
, pgnum
):
1394 :param pool: pool name
1395 :param pgnum: pg number
1396 :returns: a string representing this pg.
1398 poolnum
= self
.get_pool_num(pool
)
1399 pg_str
= "{poolnum}.{pgnum}".format(
1404 def get_pg_replica(self
, pool
, pgnum
):
1406 get replica for pool, pgnum (e.g. (data, 0)->0
1408 pg_str
= self
.get_pgid(pool
, pgnum
)
1409 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1410 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1411 return int(j
['acting'][-1])
1414 def wait_for_pg_stats(func
):
1415 # both osd_mon_report_interval and mgr_stats_period are 5 seconds
1416 # by default, and take the faulty injection in ms into consideration,
1417 # 12 seconds are more than enough
1418 delays
= [1, 1, 2, 3, 5, 8, 13, 0]
1420 def wrapper(self
, *args
, **kwargs
):
1422 for delay
in delays
:
1424 return func(self
, *args
, **kwargs
)
1425 except AssertionError as e
:
1431 def get_pg_primary(self
, pool
, pgnum
):
1433 get primary for pool, pgnum (e.g. (data, 0)->0
1435 pg_str
= self
.get_pgid(pool
, pgnum
)
1436 output
= self
.raw_cluster_cmd("pg", "map", pg_str
, '--format=json')
1437 j
= json
.loads('\n'.join(output
.split('\n')[1:]))
1438 return int(j
['acting'][0])
1441 def get_pool_num(self
, pool
):
1443 get number for pool (e.g., data -> 2)
1445 return int(self
.get_pool_dump(pool
)['pool'])
1447 def list_pools(self
):
1451 osd_dump
= self
.get_osd_dump_json()
1452 self
.log(osd_dump
['pools'])
1453 return [str(i
['pool_name']) for i
in osd_dump
['pools']]
1455 def clear_pools(self
):
1459 [self
.remove_pool(i
) for i
in self
.list_pools()]
1461 def kick_recovery_wq(self
, osdnum
):
1463 Run kick_recovery_wq on cluster.
1465 return self
.raw_cluster_cmd(
1466 'tell', "osd.%d" % (int(osdnum
),),
1471 def wait_run_admin_socket(self
, service_type
,
1472 service_id
, args
=['version'], timeout
=75, stdout
=None):
1474 If osd_admin_socket call succeeds, return. Otherwise wait
1475 five seconds and try again.
1481 proc
= self
.admin_socket(service_type
, service_id
,
1482 args
, check_status
=False, stdout
=stdout
)
1483 if proc
.exitstatus
is 0:
1487 if (tries
* 5) > timeout
:
1488 raise Exception('timed out waiting for admin_socket '
1489 'to appear after {type}.{id} restart'.
1490 format(type=service_type
,
1492 self
.log("waiting on admin_socket for {type}-{id}, "
1493 "{command}".format(type=service_type
,
1498 def get_pool_dump(self
, pool
):
1500 get the osd dump part of a pool
1502 osd_dump
= self
.get_osd_dump_json()
1503 for i
in osd_dump
['pools']:
1504 if i
['pool_name'] == pool
:
1508 def get_config(self
, service_type
, service_id
, name
):
1510 :param node: like 'mon.a'
1511 :param name: the option name
1513 proc
= self
.wait_run_admin_socket(service_type
, service_id
,
1515 j
= json
.loads(proc
.stdout
.getvalue())
1518 def inject_args(self
, service_type
, service_id
, name
, value
):
1519 whom
= '{0}.{1}'.format(service_type
, service_id
)
1520 if isinstance(value
, bool):
1521 value
= 'true' if value
else 'false'
1522 opt_arg
= '--{name}={value}'.format(name
=name
, value
=value
)
1523 self
.raw_cluster_cmd('--', 'tell', whom
, 'injectargs', opt_arg
)
1525 def set_config(self
, osdnum
, **argdict
):
1527 :param osdnum: osd number
1528 :param argdict: dictionary containing values to set.
1530 for k
, v
in argdict
.iteritems():
1531 self
.wait_run_admin_socket(
1533 ['config', 'set', str(k
), str(v
)])
1535 def raw_cluster_status(self
):
1537 Get status from cluster
1539 status
= self
.raw_cluster_cmd('status', '--format=json-pretty')
1540 return json
.loads(status
)
1542 def raw_osd_status(self
):
1544 Get osd status from cluster
1546 return self
.raw_cluster_cmd('osd', 'dump')
1548 def get_osd_status(self
):
1550 Get osd statuses sorted by states that the osds are in.
1553 lambda x
: x
.startswith('osd.') and (("up" in x
) or ("down" in x
)),
1554 self
.raw_osd_status().split('\n'))
1556 in_osds
= [int(i
[4:].split()[0])
1557 for i
in filter(lambda x
: " in " in x
, osd_lines
)]
1558 out_osds
= [int(i
[4:].split()[0])
1559 for i
in filter(lambda x
: " out " in x
, osd_lines
)]
1560 up_osds
= [int(i
[4:].split()[0])
1561 for i
in filter(lambda x
: " up " in x
, osd_lines
)]
1562 down_osds
= [int(i
[4:].split()[0])
1563 for i
in filter(lambda x
: " down " in x
, osd_lines
)]
1564 dead_osds
= [int(x
.id_
)
1565 for x
in filter(lambda x
:
1568 iter_daemons_of_role('osd', self
.cluster
))]
1569 live_osds
= [int(x
.id_
) for x
in
1572 self
.ctx
.daemons
.iter_daemons_of_role('osd',
1574 return {'in': in_osds
, 'out': out_osds
, 'up': up_osds
,
1575 'down': down_osds
, 'dead': dead_osds
, 'live': live_osds
,
1578 def get_num_pgs(self
):
1580 Check cluster status for the number of pgs
1582 status
= self
.raw_cluster_status()
1584 return status
['pgmap']['num_pgs']
1586 def create_erasure_code_profile(self
, profile_name
, profile
):
1588 Create an erasure code profile name that can be used as a parameter
1589 when creating an erasure coded pool.
1592 args
= cmd_erasure_code_profile(profile_name
, profile
)
1593 self
.raw_cluster_cmd(*args
)
1595 def create_pool_with_unique_name(self
, pg_num
=16,
1596 erasure_code_profile_name
=None,
1598 erasure_code_use_overwrites
=False):
1600 Create a pool named unique_pool_X where X is unique.
1604 name
= "unique_pool_%s" % (str(self
.next_pool_id
),)
1605 self
.next_pool_id
+= 1
1609 erasure_code_profile_name
=erasure_code_profile_name
,
1611 erasure_code_use_overwrites
=erasure_code_use_overwrites
)
1614 @contextlib.contextmanager
1615 def pool(self
, pool_name
, pg_num
=16, erasure_code_profile_name
=None):
1616 self
.create_pool(pool_name
, pg_num
, erasure_code_profile_name
)
1618 self
.remove_pool(pool_name
)
1620 def create_pool(self
, pool_name
, pg_num
=16,
1621 erasure_code_profile_name
=None,
1623 erasure_code_use_overwrites
=False):
1625 Create a pool named from the pool_name parameter.
1626 :param pool_name: name of the pool being created.
1627 :param pg_num: initial number of pgs.
1628 :param erasure_code_profile_name: if set and !None create an
1629 erasure coded pool using the profile
1630 :param erasure_code_use_overwrites: if true, allow overwrites
1633 assert isinstance(pool_name
, basestring
)
1634 assert isinstance(pg_num
, int)
1635 assert pool_name
not in self
.pools
1636 self
.log("creating pool_name %s" % (pool_name
,))
1637 if erasure_code_profile_name
:
1638 self
.raw_cluster_cmd('osd', 'pool', 'create',
1639 pool_name
, str(pg_num
), str(pg_num
),
1640 'erasure', erasure_code_profile_name
)
1642 self
.raw_cluster_cmd('osd', 'pool', 'create',
1643 pool_name
, str(pg_num
))
1644 if min_size
is not None:
1645 self
.raw_cluster_cmd(
1646 'osd', 'pool', 'set', pool_name
,
1649 if erasure_code_use_overwrites
:
1650 self
.raw_cluster_cmd(
1651 'osd', 'pool', 'set', pool_name
,
1652 'allow_ec_overwrites',
1654 self
.raw_cluster_cmd(
1655 'osd', 'pool', 'application', 'enable',
1656 pool_name
, 'rados', '--yes-i-really-mean-it',
1657 run
.Raw('||'), 'true')
1658 self
.pools
[pool_name
] = pg_num
1661 def add_pool_snap(self
, pool_name
, snap_name
):
1664 :param pool_name: name of pool to snapshot
1665 :param snap_name: name of snapshot to take
1667 self
.raw_cluster_cmd('osd', 'pool', 'mksnap',
1668 str(pool_name
), str(snap_name
))
1670 def remove_pool_snap(self
, pool_name
, snap_name
):
1672 Remove pool snapshot
1673 :param pool_name: name of pool to snapshot
1674 :param snap_name: name of snapshot to remove
1676 self
.raw_cluster_cmd('osd', 'pool', 'rmsnap',
1677 str(pool_name
), str(snap_name
))
1679 def remove_pool(self
, pool_name
):
1681 Remove the indicated pool
1682 :param pool_name: Pool to be removed
1685 assert isinstance(pool_name
, basestring
)
1686 assert pool_name
in self
.pools
1687 self
.log("removing pool_name %s" % (pool_name
,))
1688 del self
.pools
[pool_name
]
1689 self
.raw_cluster_cmd('osd', 'pool', 'rm', pool_name
, pool_name
,
1690 "--yes-i-really-really-mean-it")
1697 return random
.choice(self
.pools
.keys())
1699 def get_pool_pg_num(self
, pool_name
):
1701 Return the number of pgs in the pool specified.
1704 assert isinstance(pool_name
, basestring
)
1705 if pool_name
in self
.pools
:
1706 return self
.pools
[pool_name
]
1709 def get_pool_property(self
, pool_name
, prop
):
1711 :param pool_name: pool
1712 :param prop: property to be checked.
1713 :returns: property as an int value.
1716 assert isinstance(pool_name
, basestring
)
1717 assert isinstance(prop
, basestring
)
1718 output
= self
.raw_cluster_cmd(
1724 return int(output
.split()[1])
1726 def set_pool_property(self
, pool_name
, prop
, val
):
1728 :param pool_name: pool
1729 :param prop: property to be set.
1730 :param val: value to set.
1732 This routine retries if set operation fails.
1735 assert isinstance(pool_name
, basestring
)
1736 assert isinstance(prop
, basestring
)
1737 assert isinstance(val
, int)
1740 r
= self
.raw_cluster_cmd_result(
1747 if r
!= 11: # EAGAIN
1751 raise Exception('timed out getting EAGAIN '
1752 'when setting pool property %s %s = %s' %
1753 (pool_name
, prop
, val
))
1754 self
.log('got EAGAIN setting pool property, '
1755 'waiting a few seconds...')
1758 def expand_pool(self
, pool_name
, by
, max_pgs
):
1760 Increase the number of pgs in a pool
1763 assert isinstance(pool_name
, basestring
)
1764 assert isinstance(by
, int)
1765 assert pool_name
in self
.pools
1766 if self
.get_num_creating() > 0:
1768 if (self
.pools
[pool_name
] + by
) > max_pgs
:
1770 self
.log("increase pool size by %d" % (by
,))
1771 new_pg_num
= self
.pools
[pool_name
] + by
1772 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
1773 self
.pools
[pool_name
] = new_pg_num
1776 def contract_pool(self
, pool_name
, by
, min_pgs
):
1778 Decrease the number of pgs in a pool
1781 self
.log('contract_pool %s by %s min %s' % (
1782 pool_name
, str(by
), str(min_pgs
)))
1783 assert isinstance(pool_name
, basestring
)
1784 assert isinstance(by
, int)
1785 assert pool_name
in self
.pools
1786 if self
.get_num_creating() > 0:
1787 self
.log('too many creating')
1789 proj
= self
.pools
[pool_name
] - by
1791 self
.log('would drop below min_pgs, proj %d, currently %d' % (proj
,self
.pools
[pool_name
],))
1793 self
.log("decrease pool size by %d" % (by
,))
1794 new_pg_num
= self
.pools
[pool_name
] - by
1795 self
.set_pool_property(pool_name
, "pg_num", new_pg_num
)
1796 self
.pools
[pool_name
] = new_pg_num
1799 def stop_pg_num_changes(self
):
1801 Reset all pg_num_targets back to pg_num, canceling splits and merges
1803 self
.log('Canceling any pending splits or merges...')
1804 osd_dump
= self
.get_osd_dump_json()
1805 for pool
in osd_dump
['pools']:
1806 if pool
['pg_num'] != pool
['pg_num_target']:
1807 self
.log('Setting pool %s (%d) pg_num %d -> %d' %
1808 (pool
['pool_name'], pool
['pool'],
1809 pool
['pg_num_target'],
1811 self
.raw_cluster_cmd('osd', 'pool', 'set', pool
['pool_name'],
1812 'pg_num', str(pool
['pg_num']))
1814 def set_pool_pgpnum(self
, pool_name
, force
):
1816 Set pgpnum property of pool_name pool.
1819 assert isinstance(pool_name
, basestring
)
1820 assert pool_name
in self
.pools
1821 if not force
and self
.get_num_creating() > 0:
1823 self
.set_pool_property(pool_name
, 'pgp_num', self
.pools
[pool_name
])
1826 def list_pg_unfound(self
, pgid
):
1828 return list of unfound pgs with the id specified
1833 out
= self
.raw_cluster_cmd('--', 'pg', pgid
, 'list_unfound',
1839 r
['objects'].extend(j
['objects'])
1844 offset
= j
['objects'][-1]['oid']
1849 def get_pg_stats(self
):
1851 Dump the cluster and get pg stats
1853 out
= self
.raw_cluster_cmd('pg', 'dump', '--format=json')
1854 j
= json
.loads('\n'.join(out
.split('\n')[1:]))
1856 return j
['pg_map']['pg_stats']
1858 return j
['pg_stats']
1860 def get_pgids_to_force(self
, backfill
):
1862 Return the randomized list of PGs that can have their recovery/backfill forced
1864 j
= self
.get_pg_stats();
1867 wanted
= ['degraded', 'backfilling', 'backfill_wait']
1869 wanted
= ['recovering', 'degraded', 'recovery_wait']
1871 status
= pg
['state'].split('+')
1873 if random
.random() > 0.5 and not ('forced_backfill' in status
or 'forced_recovery' in status
) and t
in status
:
1874 pgids
.append(pg
['pgid'])
1878 def get_pgids_to_cancel_force(self
, backfill
):
1880 Return the randomized list of PGs whose recovery/backfill priority is forced
1882 j
= self
.get_pg_stats();
1885 wanted
= 'forced_backfill'
1887 wanted
= 'forced_recovery'
1889 status
= pg
['state'].split('+')
1890 if wanted
in status
and random
.random() > 0.5:
1891 pgids
.append(pg
['pgid'])
1894 def compile_pg_status(self
):
1896 Return a histogram of pg state values
1899 j
= self
.get_pg_stats()
1901 for status
in pg
['state'].split('+'):
1902 if status
not in ret
:
1908 def with_pg_state(self
, pool
, pgnum
, check
):
1909 pgstr
= self
.get_pgid(pool
, pgnum
)
1910 stats
= self
.get_single_pg_stats(pgstr
)
1911 assert(check(stats
['state']))
1914 def with_pg(self
, pool
, pgnum
, check
):
1915 pgstr
= self
.get_pgid(pool
, pgnum
)
1916 stats
= self
.get_single_pg_stats(pgstr
)
1919 def get_last_scrub_stamp(self
, pool
, pgnum
):
1921 Get the timestamp of the last scrub.
1923 stats
= self
.get_single_pg_stats(self
.get_pgid(pool
, pgnum
))
1924 return stats
["last_scrub_stamp"]
1926 def do_pg_scrub(self
, pool
, pgnum
, stype
):
1928 Scrub pg and wait for scrubbing to finish
1930 init
= self
.get_last_scrub_stamp(pool
, pgnum
)
1931 RESEND_TIMEOUT
= 120 # Must be a multiple of SLEEP_TIME
1932 FATAL_TIMEOUT
= RESEND_TIMEOUT
* 3
1935 while init
== self
.get_last_scrub_stamp(pool
, pgnum
):
1936 assert timer
< FATAL_TIMEOUT
, "fatal timeout trying to " + stype
1937 self
.log("waiting for scrub type %s" % (stype
,))
1938 if (timer
% RESEND_TIMEOUT
) == 0:
1939 self
.raw_cluster_cmd('pg', stype
, self
.get_pgid(pool
, pgnum
))
1940 # The first time in this loop is the actual request
1941 if timer
!= 0 and stype
== "repair":
1942 self
.log("WARNING: Resubmitted a non-idempotent repair")
1943 time
.sleep(SLEEP_TIME
)
1946 def wait_snap_trimming_complete(self
, pool
):
1948 Wait for snap trimming on pool to end
1953 poolnum
= self
.get_pool_num(pool
)
1954 poolnumstr
= "%s." % (poolnum
,)
1957 if (now
- start
) > FATAL_TIMEOUT
:
1958 assert (now
- start
) < FATAL_TIMEOUT
, \
1959 'failed to complete snap trimming before timeout'
1960 all_stats
= self
.get_pg_stats()
1962 for pg
in all_stats
:
1963 if (poolnumstr
in pg
['pgid']) and ('snaptrim' in pg
['state']):
1964 self
.log("pg {pg} in trimming, state: {state}".format(
1970 self
.log("{pool} still trimming, waiting".format(pool
=pool
))
1971 time
.sleep(POLL_PERIOD
)
1973 def get_single_pg_stats(self
, pgid
):
1975 Return pg for the pgid specified.
1977 all_stats
= self
.get_pg_stats()
1979 for pg
in all_stats
:
1980 if pg
['pgid'] == pgid
:
1985 def get_object_pg_with_shard(self
, pool
, name
, osdid
):
1988 pool_dump
= self
.get_pool_dump(pool
)
1989 object_map
= self
.get_object_map(pool
, name
)
1990 if pool_dump
["type"] == CephManager
.ERASURE_CODED_POOL
:
1991 shard
= object_map
['acting'].index(osdid
)
1992 return "{pgid}s{shard}".format(pgid
=object_map
['pgid'],
1995 return object_map
['pgid']
1997 def get_object_primary(self
, pool
, name
):
2000 object_map
= self
.get_object_map(pool
, name
)
2001 return object_map
['acting_primary']
2003 def get_object_map(self
, pool
, name
):
2005 osd map --format=json converted to a python object
2006 :returns: the python object
2008 out
= self
.raw_cluster_cmd('--format=json', 'osd', 'map', pool
, name
)
2009 return json
.loads('\n'.join(out
.split('\n')[1:]))
2011 def get_osd_dump_json(self
):
2013 osd dump --format=json converted to a python object
2014 :returns: the python object
2016 out
= self
.raw_cluster_cmd('osd', 'dump', '--format=json')
2017 return json
.loads('\n'.join(out
.split('\n')[1:]))
2019 def get_osd_dump(self
):
2024 return self
.get_osd_dump_json()['osds']
2026 def get_osd_metadata(self
):
2028 osd metadata --format=json converted to a python object
2029 :returns: the python object containing osd metadata information
2031 out
= self
.raw_cluster_cmd('osd', 'metadata', '--format=json')
2032 return json
.loads('\n'.join(out
.split('\n')[1:]))
2034 def get_mgr_dump(self
):
2035 out
= self
.raw_cluster_cmd('mgr', 'dump', '--format=json')
2036 return json
.loads(out
)
2038 def get_stuck_pgs(self
, type_
, threshold
):
2040 :returns: stuck pg information from the cluster
2042 out
= self
.raw_cluster_cmd('pg', 'dump_stuck', type_
, str(threshold
),
2044 return json
.loads(out
).get('stuck_pg_stats',[])
2046 def get_num_unfound_objects(self
):
2048 Check cluster status to get the number of unfound objects
2050 status
= self
.raw_cluster_status()
2052 return status
['pgmap'].get('unfound_objects', 0)
2054 def get_num_creating(self
):
2056 Find the number of pgs in creating mode.
2058 pgs
= self
.get_pg_stats()
2061 if 'creating' in pg
['state']:
2065 def get_num_active_clean(self
):
2067 Find the number of active and clean pgs.
2069 pgs
= self
.get_pg_stats()
2072 if (pg
['state'].count('active') and
2073 pg
['state'].count('clean') and
2074 not pg
['state'].count('stale')):
2078 def get_num_active_recovered(self
):
2080 Find the number of active and recovered pgs.
2082 pgs
= self
.get_pg_stats()
2085 if (pg
['state'].count('active') and
2086 not pg
['state'].count('recover') and
2087 not pg
['state'].count('backfilling') and
2088 not pg
['state'].count('stale')):
2092 def get_is_making_recovery_progress(self
):
2094 Return whether there is recovery progress discernable in the
2097 status
= self
.raw_cluster_status()
2098 kps
= status
['pgmap'].get('recovering_keys_per_sec', 0)
2099 bps
= status
['pgmap'].get('recovering_bytes_per_sec', 0)
2100 ops
= status
['pgmap'].get('recovering_objects_per_sec', 0)
2101 return kps
> 0 or bps
> 0 or ops
> 0
2103 def get_num_active(self
):
2105 Find the number of active pgs.
2107 pgs
= self
.get_pg_stats()
2110 if pg
['state'].count('active') and not pg
['state'].count('stale'):
2114 def get_num_down(self
):
2116 Find the number of pgs that are down.
2118 pgs
= self
.get_pg_stats()
2121 if ((pg
['state'].count('down') and not
2122 pg
['state'].count('stale')) or
2123 (pg
['state'].count('incomplete') and not
2124 pg
['state'].count('stale'))):
2128 def get_num_active_down(self
):
2130 Find the number of pgs that are either active or down.
2132 pgs
= self
.get_pg_stats()
2135 if ((pg
['state'].count('active') and not
2136 pg
['state'].count('stale')) or
2137 (pg
['state'].count('down') and not
2138 pg
['state'].count('stale')) or
2139 (pg
['state'].count('incomplete') and not
2140 pg
['state'].count('stale'))):
2146 True if all pgs are clean
2148 return self
.get_num_active_clean() == self
.get_num_pgs()
2150 def is_recovered(self
):
2152 True if all pgs have recovered
2154 return self
.get_num_active_recovered() == self
.get_num_pgs()
2156 def is_active_or_down(self
):
2158 True if all pgs are active or down
2160 return self
.get_num_active_down() == self
.get_num_pgs()
2162 def wait_for_clean(self
, timeout
=1200):
2164 Returns true when all pgs are clean.
2166 self
.log("waiting for clean")
2168 num_active_clean
= self
.get_num_active_clean()
2169 while not self
.is_clean():
2170 if timeout
is not None:
2171 if self
.get_is_making_recovery_progress():
2172 self
.log("making progress, resetting timeout")
2175 self
.log("no progress seen, keeping timeout for now")
2176 if time
.time() - start
>= timeout
:
2177 self
.log('dumping pgs')
2178 out
= self
.raw_cluster_cmd('pg', 'dump')
2180 assert time
.time() - start
< timeout
, \
2181 'failed to become clean before timeout expired'
2182 cur_active_clean
= self
.get_num_active_clean()
2183 if cur_active_clean
!= num_active_clean
:
2185 num_active_clean
= cur_active_clean
2189 def are_all_osds_up(self
):
2191 Returns true if all osds are up.
2193 x
= self
.get_osd_dump()
2194 return (len(x
) == sum([(y
['up'] > 0) for y
in x
]))
2196 def wait_for_all_osds_up(self
, timeout
=None):
2198 When this exits, either the timeout has expired, or all
2201 self
.log("waiting for all up")
2203 while not self
.are_all_osds_up():
2204 if timeout
is not None:
2205 assert time
.time() - start
< timeout
, \
2206 'timeout expired in wait_for_all_osds_up'
2210 def pool_exists(self
, pool
):
2211 if pool
in self
.list_pools():
2215 def wait_for_pool(self
, pool
, timeout
=300):
2217 Wait for a pool to exist
2219 self
.log('waiting for pool %s to exist' % pool
)
2221 while not self
.pool_exists(pool
):
2222 if timeout
is not None:
2223 assert time
.time() - start
< timeout
, \
2224 'timeout expired in wait_for_pool'
2227 def wait_for_pools(self
, pools
):
2229 self
.wait_for_pool(pool
)
2231 def is_mgr_available(self
):
2232 x
= self
.get_mgr_dump()
2233 return x
.get('available', False)
2235 def wait_for_mgr_available(self
, timeout
=None):
2236 self
.log("waiting for mgr available")
2238 while not self
.is_mgr_available():
2239 if timeout
is not None:
2240 assert time
.time() - start
< timeout
, \
2241 'timeout expired in wait_for_mgr_available'
2243 self
.log("mgr available!")
2245 def wait_for_recovery(self
, timeout
=None):
2247 Check peering. When this exists, we have recovered.
2249 self
.log("waiting for recovery to complete")
2251 num_active_recovered
= self
.get_num_active_recovered()
2252 while not self
.is_recovered():
2254 if timeout
is not None:
2255 if self
.get_is_making_recovery_progress():
2256 self
.log("making progress, resetting timeout")
2259 self
.log("no progress seen, keeping timeout for now")
2260 if now
- start
>= timeout
:
2261 if self
.is_recovered():
2263 self
.log('dumping pgs')
2264 out
= self
.raw_cluster_cmd('pg', 'dump')
2266 assert now
- start
< timeout
, \
2267 'failed to recover before timeout expired'
2268 cur_active_recovered
= self
.get_num_active_recovered()
2269 if cur_active_recovered
!= num_active_recovered
:
2271 num_active_recovered
= cur_active_recovered
2273 self
.log("recovered!")
2275 def wait_for_active(self
, timeout
=None):
2277 Check peering. When this exists, we are definitely active
2279 self
.log("waiting for peering to complete")
2281 num_active
= self
.get_num_active()
2282 while not self
.is_active():
2283 if timeout
is not None:
2284 if time
.time() - start
>= timeout
:
2285 self
.log('dumping pgs')
2286 out
= self
.raw_cluster_cmd('pg', 'dump')
2288 assert time
.time() - start
< timeout
, \
2289 'failed to recover before timeout expired'
2290 cur_active
= self
.get_num_active()
2291 if cur_active
!= num_active
:
2293 num_active
= cur_active
2297 def wait_for_active_or_down(self
, timeout
=None):
2299 Check peering. When this exists, we are definitely either
2302 self
.log("waiting for peering to complete or become blocked")
2304 num_active_down
= self
.get_num_active_down()
2305 while not self
.is_active_or_down():
2306 if timeout
is not None:
2307 if time
.time() - start
>= timeout
:
2308 self
.log('dumping pgs')
2309 out
= self
.raw_cluster_cmd('pg', 'dump')
2311 assert time
.time() - start
< timeout
, \
2312 'failed to recover before timeout expired'
2313 cur_active_down
= self
.get_num_active_down()
2314 if cur_active_down
!= num_active_down
:
2316 num_active_down
= cur_active_down
2318 self
.log("active or down!")
2320 def osd_is_up(self
, osd
):
2322 Wrapper for osd check
2324 osds
= self
.get_osd_dump()
2325 return osds
[osd
]['up'] > 0
2327 def wait_till_osd_is_up(self
, osd
, timeout
=None):
2329 Loop waiting for osd.
2331 self
.log('waiting for osd.%d to be up' % osd
)
2333 while not self
.osd_is_up(osd
):
2334 if timeout
is not None:
2335 assert time
.time() - start
< timeout
, \
2336 'osd.%d failed to come up before timeout expired' % osd
2338 self
.log('osd.%d is up' % osd
)
2340 def is_active(self
):
2342 Wrapper to check if all pgs are active
2344 return self
.get_num_active() == self
.get_num_pgs()
2346 def wait_till_active(self
, timeout
=None):
2348 Wait until all pgs are active.
2350 self
.log("waiting till active")
2352 while not self
.is_active():
2353 if timeout
is not None:
2354 if time
.time() - start
>= timeout
:
2355 self
.log('dumping pgs')
2356 out
= self
.raw_cluster_cmd('pg', 'dump')
2358 assert time
.time() - start
< timeout
, \
2359 'failed to become active before timeout expired'
2363 def wait_till_pg_convergence(self
, timeout
=None):
2366 active_osds
= [osd
['osd'] for osd
in self
.get_osd_dump()
2367 if osd
['in'] and osd
['up']]
2369 # strictly speaking, no need to wait for mon. but due to the
2370 # "ms inject socket failures" setting, the osdmap could be delayed,
2371 # so mgr is likely to ignore the pg-stat messages with pgs serving
2372 # newly created pools which is not yet known by mgr. so, to make sure
2373 # the mgr is updated with the latest pg-stats, waiting for mon/mgr is
2375 self
.flush_pg_stats(active_osds
)
2376 new_stats
= dict((stat
['pgid'], stat
['state'])
2377 for stat
in self
.get_pg_stats())
2378 if old_stats
== new_stats
:
2380 if timeout
is not None:
2381 assert time
.time() - start
< timeout
, \
2382 'failed to reach convergence before %d secs' % timeout
2383 old_stats
= new_stats
2384 # longer than mgr_stats_period
2387 def mark_out_osd(self
, osd
):
2389 Wrapper to mark osd out.
2391 self
.raw_cluster_cmd('osd', 'out', str(osd
))
2393 def kill_osd(self
, osd
):
2395 Kill osds by either power cycling (if indicated by the config)
2398 if self
.config
.get('powercycle'):
2399 remote
= self
.find_remote('osd', osd
)
2400 self
.log('kill_osd on osd.{o} '
2401 'doing powercycle of {s}'.format(o
=osd
, s
=remote
.name
))
2402 self
._assert
_ipmi
(remote
)
2403 remote
.console
.power_off()
2404 elif self
.config
.get('bdev_inject_crash') and self
.config
.get('bdev_inject_crash_probability'):
2405 if random
.uniform(0, 1) < self
.config
.get('bdev_inject_crash_probability', .5):
2408 'bdev-inject-crash', self
.config
.get('bdev_inject_crash'))
2410 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).wait()
2414 raise RuntimeError('osd.%s did not fail' % osd
)
2416 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2418 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2421 def _assert_ipmi(remote
):
2422 assert remote
.console
.has_ipmi_credentials
, (
2423 "powercycling requested but RemoteConsole is not "
2424 "initialized. Check ipmi config.")
2426 def blackhole_kill_osd(self
, osd
):
2428 Stop osd if nothing else works.
2430 self
.inject_args('osd', osd
,
2431 'objectstore-blackhole', True)
2433 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).stop()
2435 def revive_osd(self
, osd
, timeout
=360, skip_admin_check
=False):
2437 Revive osds by either power cycling (if indicated by the config)
2440 if self
.config
.get('powercycle'):
2441 remote
= self
.find_remote('osd', osd
)
2442 self
.log('kill_osd on osd.{o} doing powercycle of {s}'.
2443 format(o
=osd
, s
=remote
.name
))
2444 self
._assert
_ipmi
(remote
)
2445 remote
.console
.power_on()
2446 if not remote
.console
.check_status(300):
2447 raise Exception('Failed to revive osd.{o} via ipmi'.
2449 teuthology
.reconnect(self
.ctx
, 60, [remote
])
2450 mount_osd_data(self
.ctx
, remote
, self
.cluster
, str(osd
))
2451 self
.make_admin_daemon_dir(remote
)
2452 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).reset()
2453 self
.ctx
.daemons
.get_daemon('osd', osd
, self
.cluster
).restart()
2455 if not skip_admin_check
:
2456 # wait for dump_ops_in_flight; this command doesn't appear
2457 # until after the signal handler is installed and it is safe
2458 # to stop the osd again without making valgrind leak checks
2459 # unhappy. see #5924.
2460 self
.wait_run_admin_socket('osd', osd
,
2461 args
=['dump_ops_in_flight'],
2462 timeout
=timeout
, stdout
=DEVNULL
)
2464 def mark_down_osd(self
, osd
):
2466 Cluster command wrapper
2468 self
.raw_cluster_cmd('osd', 'down', str(osd
))
2470 def mark_in_osd(self
, osd
):
2472 Cluster command wrapper
2474 self
.raw_cluster_cmd('osd', 'in', str(osd
))
2476 def signal_osd(self
, osd
, sig
, silent
=False):
2478 Wrapper to local get_daemon call which sends the given
2479 signal to the given osd.
2481 self
.ctx
.daemons
.get_daemon('osd', osd
,
2482 self
.cluster
).signal(sig
, silent
=silent
)
2485 def signal_mon(self
, mon
, sig
, silent
=False):
2487 Wrapper to local get_daemon call
2489 self
.ctx
.daemons
.get_daemon('mon', mon
,
2490 self
.cluster
).signal(sig
, silent
=silent
)
2492 def kill_mon(self
, mon
):
2494 Kill the monitor by either power cycling (if the config says so),
2497 if self
.config
.get('powercycle'):
2498 remote
= self
.find_remote('mon', mon
)
2499 self
.log('kill_mon on mon.{m} doing powercycle of {s}'.
2500 format(m
=mon
, s
=remote
.name
))
2501 self
._assert
_ipmi
(remote
)
2502 remote
.console
.power_off()
2504 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).stop()
2506 def revive_mon(self
, mon
):
2508 Restart by either power cycling (if the config says so),
2509 or by doing a normal restart.
2511 if self
.config
.get('powercycle'):
2512 remote
= self
.find_remote('mon', mon
)
2513 self
.log('revive_mon on mon.{m} doing powercycle of {s}'.
2514 format(m
=mon
, s
=remote
.name
))
2515 self
._assert
_ipmi
(remote
)
2516 remote
.console
.power_on()
2517 self
.make_admin_daemon_dir(remote
)
2518 self
.ctx
.daemons
.get_daemon('mon', mon
, self
.cluster
).restart()
2520 def revive_mgr(self
, mgr
):
2522 Restart by either power cycling (if the config says so),
2523 or by doing a normal restart.
2525 if self
.config
.get('powercycle'):
2526 remote
= self
.find_remote('mgr', mgr
)
2527 self
.log('revive_mgr on mgr.{m} doing powercycle of {s}'.
2528 format(m
=mgr
, s
=remote
.name
))
2529 self
._assert
_ipmi
(remote
)
2530 remote
.console
.power_on()
2531 self
.make_admin_daemon_dir(remote
)
2532 self
.ctx
.daemons
.get_daemon('mgr', mgr
, self
.cluster
).restart()
2534 def get_mon_status(self
, mon
):
2536 Extract all the monitor status information from the cluster
2538 addr
= self
.ctx
.ceph
[self
.cluster
].mons
['mon.%s' % mon
]
2539 out
= self
.raw_cluster_cmd('-m', addr
, 'mon_status')
2540 return json
.loads(out
)
2542 def get_mon_quorum(self
):
2544 Extract monitor quorum information from the cluster
2546 out
= self
.raw_cluster_cmd('quorum_status')
2548 self
.log('quorum_status is %s' % out
)
2551 def wait_for_mon_quorum_size(self
, size
, timeout
=300):
2553 Loop until quorum size is reached.
2555 self
.log('waiting for quorum size %d' % size
)
2557 while not len(self
.get_mon_quorum()) == size
:
2558 if timeout
is not None:
2559 assert time
.time() - start
< timeout
, \
2560 ('failed to reach quorum size %d '
2561 'before timeout expired' % size
)
2563 self
.log("quorum is size %d" % size
)
2565 def get_mon_health(self
, debug
=False):
2567 Extract all the monitor health information.
2569 out
= self
.raw_cluster_cmd('health', '--format=json')
2571 self
.log('health:\n{h}'.format(h
=out
))
2572 return json
.loads(out
)
2574 def get_filepath(self
):
2576 Return path to osd data with {id} needing to be replaced
2578 return '/var/lib/ceph/osd/' + self
.cluster
+ '-{id}'
2580 def make_admin_daemon_dir(self
, remote
):
2582 Create /var/run/ceph directory on remote site.
2585 :param remote: Remote site
2587 remote
.run(args
=['sudo',
2588 'install', '-d', '-m0777', '--', '/var/run/ceph', ], )
2591 def utility_task(name
):
2593 Generate ceph_manager subtask corresponding to ceph_manager
2596 def task(ctx
, config
):
2599 args
= config
.get('args', [])
2600 kwargs
= config
.get('kwargs', {})
2601 cluster
= config
.get('cluster', 'ceph')
2602 fn
= getattr(ctx
.managers
[cluster
], name
)
2606 revive_osd
= utility_task("revive_osd")
2607 revive_mon
= utility_task("revive_mon")
2608 kill_osd
= utility_task("kill_osd")
2609 kill_mon
= utility_task("kill_mon")
2610 create_pool
= utility_task("create_pool")
2611 remove_pool
= utility_task("remove_pool")
2612 wait_for_clean
= utility_task("wait_for_clean")
2613 flush_all_pg_stats
= utility_task("flush_all_pg_stats")
2614 set_pool_property
= utility_task("set_pool_property")
2615 do_pg_scrub
= utility_task("do_pg_scrub")
2616 wait_for_pool
= utility_task("wait_for_pool")
2617 wait_for_pools
= utility_task("wait_for_pools")